nginx upstream模块完整逻辑源码分析
字数 980 2025-08-15 21:32:52
Nginx Upstream模块源码分析与实现原理
一、Upstream模块概述
Nginx的upstream模块实现了反向代理功能的核心机制,负责与上游服务器(后端服务器)建立连接、发送请求、接收响应以及错误处理等完整流程。upstream机制将整个处理过程分为几个关键阶段:
- 启动upstream
- 连接上游服务器
- 向上游发送请求
- 接收上游响应(包头/包体)
- 结束请求
二、核心数据结构
1. ngx_http_upstream_t
struct ngx_http_upstream_s {
// 事件处理回调
ngx_http_upstream_handler_pt read_event_handler;
ngx_http_upstream_handler_pt write_event_handler;
// 连接相关
ngx_peer_connection_t peer; // 向上游发起的连接
ngx_event_pipe_t *pipe; // 缓存转发响应时使用
// 请求处理
ngx_chain_t *request_bufs; // 需要发送到上游的请求内容链表
ngx_output_chain_ctx_t output;
ngx_chain_writer_ctx_t writer;
// 配置
ngx_http_upstream_conf_t *conf;
// 响应处理
ngx_http_upstream_headers_in_t headers_in; // 解析后的响应头部
ngx_http_upstream_resolved_t *resolved; // 主机域名解析结果
// 缓冲区
ngx_buf_t from_client;
ngx_buf_t buffer; // 接收响应包头/包体的缓冲区
off_t length; // 响应包体长度
ngx_chain_t *out_bufs; // 不同场景下有不同意义
ngx_chain_t *busy_bufs; // 未发送完成的内容
ngx_chain_t *free_bufs; // 可回收的buf结构体
// 回调函数
ngx_int_t (*input_filter_init)(void *data);
ngx_int_t (*input_filter)(void *data, ssize_t bytes);
void *input_filter_ctx;
// HTTP模块实现的回调
ngx_int_t (*create_request)(ngx_http_request_t *r);
ngx_int_t (*reinit_request)(ngx_http_request_t *r);
ngx_int_t (*process_header)(ngx_http_request_t *r);
void (*abort_request)(ngx_http_request_t *r);
void (*finalize_request)(ngx_http_request_t *r, ngx_int_t rc);
ngx_int_t (*rewrite_redirect)(ngx_http_request_t *r, ngx_table_elt_t *h, size_t prefix);
ngx_int_t (*rewrite_cookie)(ngx_http_request_t *r, ngx_table_elt_t *h);
// 其他属性
ngx_msec_t timeout;
ngx_http_upstream_state_t *state;
ngx_str_t method;
ngx_str_t schema;
ngx_str_t uri;
ngx_http_cleanup_pt *cleanup;
// 标志位
unsigned store:1; // 是否指定文件缓存路径
unsigned cacheable:1; // 是否启用文件缓存
unsigned accel:1;
unsigned ssl:1; // 是否基于SSL协议
unsigned buffering:1; // 是否开启缓存转发
unsigned keepalive:1; // 是否开启keepalive
unsigned upgrade:1;
unsigned request_sent:1; // 是否已发送请求
unsigned header_sent:1; // 包头是否已转发
};
2. ngx_http_upstream_conf_t
typedef struct {
// 上游服务器配置
ngx_http_upstream_srv_conf_t *upstream;
// 超时设置
ngx_msec_t connect_timeout; // 建立TCP连接超时
ngx_msec_t send_timeout; // 发送请求超时
ngx_msec_t read_timeout; // 接收响应超时
ngx_msec_t timeout;
ngx_msec_t next_upstream_timeout;
// 缓冲区设置
size_t send_lowat; // 发送缓冲区下限
size_t buffer_size; // 接收头部缓冲区大小
size_t limit_rate;
size_t busy_buffers_size; // 转发响应时使用的busy_size
size_t max_temp_file_size; // 临时文件大小限制
size_t temp_file_write_size; // 写入临时文件的最大长度
ngx_bufs_t bufs; // 缓存转发使用的内存大小
// 头部处理
ngx_uint_t ignore_headers; // 需要忽略的headers位图
ngx_hash_t hide_headers_hash; // 不转发的头部散列表
ngx_array_t *hide_headers; // 不转发的头部数组
ngx_array_t *pass_headers; // 需要转发的头部数组
// 重试策略
ngx_uint_t next_upstream; // 需要重试的错误码位图
ngx_uint_t next_upstream_tries;
// 权限设置
ngx_uint_t store_access; // 临时目录和文件权限
// 标志位
ngx_flag_t buffering; // 是否开启缓存转发
ngx_flag_t ignore_client_abort; // 是否忽略客户端断开
ngx_flag_t intercept_errors; // 错误拦截标志
ngx_flag_t cyclic_temp_file; // 是否复用临时文件空间
// 路径设置
ngx_path_t *temp_path; // 临时文件路径
ngx_http_upstream_local_t *local; // 本机地址
ngx_array_t *store_lengths; // 缓存文件路径长度
ngx_array_t *store_values; // 缓存文件路径
// 其他
signed store:2;
unsigned intercept_404:1;
unsigned change_buffering:1;
ngx_str_t module; // 模块名称(日志用)
};
三、Upstream处理流程
1. 创建和初始化
static ngx_int_t ngx_http_proxy_handler(ngx_http_request_t *r) {
// 创建upstream结构
if (ngx_http_upstream_create(r) != NGX_OK) {
return NGX_HTTP_INTERNAL_SERVER_ERROR;
}
// 获取配置
plcf = ngx_http_get_module_loc_conf(r, ngx_http_proxy_module);
u = r->upstream;
// 设置配置
u->conf = &plcf->upstream;
// 设置回调函数
u->create_request = ngx_http_proxy_create_request;
u->reinit_request = ngx_http_proxy_reinit_request;
u->process_header = ngx_http_proxy_process_status_line;
u->abort_request = ngx_http_proxy_abort_request;
u->finalize_request = ngx_http_proxy_finalize_request;
// 设置缓冲标志
u->buffering = plcf->upstream.buffering;
// 初始化upstream
rc = ngx_http_read_client_request_body(r, ngx_http_upstream_init);
return NGX_DONE;
}
2. 启动Upstream
void ngx_http_upstream_init(ngx_http_request_t *r) {
ngx_connection_t *c = r->connection;
// 移除客户端读事件定时器
if (c->read->timer_set) {
ngx_del_timer(c->read);
}
ngx_http_upstream_init_request(r);
}
static void ngx_http_upstream_init_request(ngx_http_request_t *r) {
u = r->upstream;
u->store = u->conf->store;
// 设置连接检查方法
if (!u->store && !r->post_action && !u->conf->ignore_client_abort) {
r->read_event_handler = ngx_http_upstream_rd_check_broken_connection;
r->write_event_handler = ngx_http_upstream_wr_check_broken_connection;
}
// 调用create_request构造请求
if (u->create_request(r) != NGX_OK) {
ngx_http_finalize_request(r, NGX_HTTP_INTERNAL_SERVER_ERROR);
return;
}
// 添加清理回调
cln = ngx_http_cleanup_add(r, 0);
cln->handler = ngx_http_upstream_cleanup;
cln->data = r;
u->cleanup = &cln->handler;
// 连接上游服务器
ngx_http_upstream_connect(r, u);
}
3. 与上游建立连接
static void ngx_http_upstream_connect(ngx_http_request_t *r, ngx_http_upstream_t *u) {
// 建立连接
rc = ngx_event_connect_peer(&u->peer);
c = u->peer.connection;
c->data = r;
// 设置读写事件回调
c->write->handler = ngx_http_upstream_handler;
c->read->handler = ngx_http_upstream_handler;
// 设置upstream读写处理器
u->write_event_handler = ngx_http_upstream_send_request_handler;
u->read_event_handler = ngx_http_upstream_process_header;
if (rc == NGX_AGAIN) {
// 连接未完成,设置超时定时器
ngx_add_timer(c->write, u->conf->connect_timeout);
return;
}
// 连接成功,发送请求
ngx_http_upstream_send_request(r, u);
}
连接建立的核心函数ngx_event_connect_peer:
ngx_int_t ngx_event_connect_peer(ngx_peer_connection_t *pc) {
// 创建socket
s = ngx_socket(pc->sockaddr->sa_family, SOCK_STREAM, 0);
// 获取连接结构
c = ngx_get_connection(s, pc->log);
// 设置非阻塞
if (ngx_nonblocking(s) == -1) {
// 错误处理
}
// 绑定本地地址
if (pc->local) {
if (bind(s, pc->local->sockaddr, pc->local->socklen) == -1) {
// 错误处理
}
}
// 设置连接回调
c->recv = ngx_recv;
c->send = ngx_send;
c->recv_chain = ngx_recv_chain;
c->send_chain = ngx_send_chain;
c->sendfile = 1;
// 设置读写事件
rev = c->read;
wev = c->write;
pc->connection = c;
// 添加到事件监听
if (ngx_add_conn) {
if (ngx_add_conn(c) == NGX_ERROR) {
goto failed;
}
}
// 发起连接
rc = connect(s, pc->sockaddr, pc->socklen);
// 处理连接结果
// ...
}
4. 事件处理函数
static void ngx_http_upstream_handler(ngx_event_t *ev) {
c = ev->data;
r = c->data;
u = r->upstream;
c = r->connection;
if (ev->write) {
// 写事件处理
u->write_event_handler(r, u);
} else {
// 读事件处理
u->read_event_handler(r, u);
}
// 执行post请求
ngx_http_run_posted_requests(c);
}
5. 发送请求到上游
static void ngx_http_upstream_send_request_handler(ngx_http_request_t *r, ngx_http_upstream_t *u) {
c = u->peer.connection;
// 检查超时
if (c->write->timedout) {
ngx_http_upstream_next(r, u, NGX_HTTP_UPSTREAM_FT_TIMEOUT);
return;
}
// 如果包头已发送,设置空处理器
if (u->header_sent) {
u->write_event_handler = ngx_http_upstream_dummy_handler;
(void) ngx_handle_write_event(c->write, 0);
return;
}
// 发送请求
ngx_http_upstream_send_request(r, u);
}
static void ngx_http_upstream_send_request(ngx_http_request_t *r, ngx_http_upstream_t *u) {
// 发送请求体
rc = ngx_http_upstream_send_request_body(r, u, do_write);
u->request_sent = 1;
// 移除写定时器
if (c->write->timer_set) {
ngx_del_timer(c->write);
}
if (rc == NGX_AGAIN) {
// 请求未发完,重新设置定时器
ngx_add_timer(c->write, u->conf->send_timeout);
if (ngx_handle_write_event(c->write, u->conf->send_lowat) != NGX_OK) {
ngx_http_upstream_finalize_request(r, u, NGX_HTTP_INTERNAL_SERVER_ERROR);
return;
}
return;
}
/* rc == NGX_OK */
// 请求发送完成,设置空处理器
u->write_event_handler = ngx_http_upstream_dummy_handler;
if (ngx_handle_write_event(c->write, 0) != NGX_OK) {
ngx_http_upstream_finalize_request(r, u, NGX_HTTP_INTERNAL_SERVER_ERROR);
return;
}
// 设置读超时
ngx_add_timer(c->read, u->conf->read_timeout);
// 如果已有响应,开始处理
if (c->read->ready) {
ngx_http_upstream_process_header(r, u);
return;
}
}
static ngx_int_t ngx_http_upstream_send_request_body(ngx_http_request_t *r, ngx_http_upstream_t *u, ngx_uint_t do_write) {
// 首次发送时设置request_bufs
if (!u->request_sent) {
u->request_sent = 1;
out = u->request_bufs;
if (r->request_body->bufs) {
for (cl = out; cl->next; cl = cl->next) { /* void */ }
cl->next = r->request_body->bufs;
r->request_body->bufs = NULL;
}
}
// 执行发送
if (do_write) {
rc = ngx_output_chain(&u->output, out);
// ...
}
}
四、结束请求处理
1. 清理函数
static void ngx_http_upstream_cleanup(void *data) {
ngx_http_request_t *r = data;
ngx_http_upstream_finalize_request(r, r->upstream, NGX_DONE);
}
2. 重试机制
static void ngx_http_upstream_next(ngx_http_request_t *r, ngx_http_upstream_t *u, ngx_uint_t ft_type) {
// 检查是否需要重试
if (status) {
u->state->status = status;
timeout = u->conf->next_upstream_timeout;
// 检查重试条件
if (u->peer.tries == 0 || !(u->conf->next_upstream & ft_type) ||
(timeout && ngx_current_msec - u->peer.start_time >= timeout)) {
ngx_http_upstream_finalize_request(r, u, status);
return;
}
}
// 关闭现有连接
if (u->peer.connection) {
if (u->peer.connection->pool) {
ngx_destroy_pool(u->peer.connection->pool);
}
ngx_close_connection(u->peer.connection);
u->peer.connection = NULL;
}
// 重新连接
ngx_http_upstream_connect(r, u);
}
3. 最终结束函数
static void ngx_http_upstream_finalize_request(ngx_http_request_t *r, ngx_http_upstream_t *u, ngx_int_t rc) {
// 清理回调
if (u->cleanup) {
*u->cleanup = NULL;
u->cleanup = NULL;
}
// 释放解析资源
if (u->resolved && u->resolved->ctx) {
ngx_resolve_name_done(u->resolved->ctx);
u->resolved->ctx = NULL;
}
// 调用模块的finalize_request
u->finalize_request(r, rc);
// 关闭连接
if (u->peer.connection) {
if (u->peer.connection->pool) {
ngx_destroy_pool(u->peer.connection->pool);
}
ngx_close_connection(u->peer.connection);
}
u->peer.connection = NULL;
// 删除临时文件
if (u->store && u->pipe && u->pipe->temp_file && u->pipe->temp_file->file.fd != NGX_INVALID_FILE) {
if (ngx_delete_file(u->pipe->temp_file->file.name.data) == NGX_FILE_ERROR) {
// 错误处理
}
}
// 结束请求
ngx_http_finalize_request(r, rc);
}
五、关键设计要点
-
非阻塞设计:所有网络操作都采用非阻塞模式,通过事件驱动机制实现高并发
-
模块化回调:通过函数指针允许HTTP模块自定义关键处理逻辑(如create_request、process_header等)
-
灵活的重试机制:通过next_upstream配置和ngx_http_upstream_next实现负载均衡和故障转移
-
高效的内存管理:使用内存池和缓冲区链管理请求和响应数据
-
完善的超时控制:对连接、发送、接收等各阶段都有独立的超时设置
-
灵活的缓冲策略:可根据配置选择直接转发或缓冲转发响应数据
-
事件驱动的状态机:通过读写事件处理器实现清晰的状态转换
六、性能优化建议
-
合理设置各种超时参数(connect_timeout、send_timeout、read_timeout)
-
根据实际场景选择是否开启buffering:
- 开启buffering适合上游响应快、下游响应慢的场景
- 关闭buffering适合上游响应慢、下游响应快的场景
-
优化buffer_size和busy_buffers_size等缓冲区参数
-
合理配置next_upstream实现故障自动转移
-
对于高并发场景,考虑启用keepalive减少连接建立开销
-
根据业务需求配置hide_headers和pass_headers,减少不必要的头部处理