nginx upstream模块完整逻辑源码分析
字数 980 2025-08-15 21:32:52

Nginx Upstream模块源码分析与实现原理

一、Upstream模块概述

Nginx的upstream模块实现了反向代理功能的核心机制,负责与上游服务器(后端服务器)建立连接、发送请求、接收响应以及错误处理等完整流程。upstream机制将整个处理过程分为几个关键阶段:

  1. 启动upstream
  2. 连接上游服务器
  3. 向上游发送请求
  4. 接收上游响应(包头/包体)
  5. 结束请求

二、核心数据结构

1. ngx_http_upstream_t

struct ngx_http_upstream_s {
    // 事件处理回调
    ngx_http_upstream_handler_pt read_event_handler;
    ngx_http_upstream_handler_pt write_event_handler;
    
    // 连接相关
    ngx_peer_connection_t peer;  // 向上游发起的连接
    ngx_event_pipe_t *pipe;      // 缓存转发响应时使用
    
    // 请求处理
    ngx_chain_t *request_bufs;   // 需要发送到上游的请求内容链表
    ngx_output_chain_ctx_t output;
    ngx_chain_writer_ctx_t writer;
    
    // 配置
    ngx_http_upstream_conf_t *conf;
    
    // 响应处理
    ngx_http_upstream_headers_in_t headers_in; // 解析后的响应头部
    ngx_http_upstream_resolved_t *resolved;    // 主机域名解析结果
    
    // 缓冲区
    ngx_buf_t from_client;
    ngx_buf_t buffer;           // 接收响应包头/包体的缓冲区
    off_t length;               // 响应包体长度
    ngx_chain_t *out_bufs;      // 不同场景下有不同意义
    ngx_chain_t *busy_bufs;     // 未发送完成的内容
    ngx_chain_t *free_bufs;     // 可回收的buf结构体
    
    // 回调函数
    ngx_int_t (*input_filter_init)(void *data);
    ngx_int_t (*input_filter)(void *data, ssize_t bytes);
    void *input_filter_ctx;
    
    // HTTP模块实现的回调
    ngx_int_t (*create_request)(ngx_http_request_t *r);
    ngx_int_t (*reinit_request)(ngx_http_request_t *r);
    ngx_int_t (*process_header)(ngx_http_request_t *r);
    void (*abort_request)(ngx_http_request_t *r);
    void (*finalize_request)(ngx_http_request_t *r, ngx_int_t rc);
    ngx_int_t (*rewrite_redirect)(ngx_http_request_t *r, ngx_table_elt_t *h, size_t prefix);
    ngx_int_t (*rewrite_cookie)(ngx_http_request_t *r, ngx_table_elt_t *h);
    
    // 其他属性
    ngx_msec_t timeout;
    ngx_http_upstream_state_t *state;
    ngx_str_t method;
    ngx_str_t schema;
    ngx_str_t uri;
    ngx_http_cleanup_pt *cleanup;
    
    // 标志位
    unsigned store:1;          // 是否指定文件缓存路径
    unsigned cacheable:1;      // 是否启用文件缓存
    unsigned accel:1;
    unsigned ssl:1;            // 是否基于SSL协议
    unsigned buffering:1;      // 是否开启缓存转发
    unsigned keepalive:1;      // 是否开启keepalive
    unsigned upgrade:1;
    unsigned request_sent:1;   // 是否已发送请求
    unsigned header_sent:1;    // 包头是否已转发
};

2. ngx_http_upstream_conf_t

typedef struct {
    // 上游服务器配置
    ngx_http_upstream_srv_conf_t *upstream;
    
    // 超时设置
    ngx_msec_t connect_timeout;  // 建立TCP连接超时
    ngx_msec_t send_timeout;     // 发送请求超时
    ngx_msec_t read_timeout;     // 接收响应超时
    ngx_msec_t timeout;
    ngx_msec_t next_upstream_timeout;
    
    // 缓冲区设置
    size_t send_lowat;           // 发送缓冲区下限
    size_t buffer_size;          // 接收头部缓冲区大小
    size_t limit_rate;
    size_t busy_buffers_size;    // 转发响应时使用的busy_size
    size_t max_temp_file_size;   // 临时文件大小限制
    size_t temp_file_write_size; // 写入临时文件的最大长度
    ngx_bufs_t bufs;             // 缓存转发使用的内存大小
    
    // 头部处理
    ngx_uint_t ignore_headers;   // 需要忽略的headers位图
    ngx_hash_t hide_headers_hash; // 不转发的头部散列表
    ngx_array_t *hide_headers;   // 不转发的头部数组
    ngx_array_t *pass_headers;   // 需要转发的头部数组
    
    // 重试策略
    ngx_uint_t next_upstream;    // 需要重试的错误码位图
    ngx_uint_t next_upstream_tries;
    
    // 权限设置
    ngx_uint_t store_access;     // 临时目录和文件权限
    
    // 标志位
    ngx_flag_t buffering;        // 是否开启缓存转发
    ngx_flag_t ignore_client_abort; // 是否忽略客户端断开
    ngx_flag_t intercept_errors; // 错误拦截标志
    ngx_flag_t cyclic_temp_file; // 是否复用临时文件空间
    
    // 路径设置
    ngx_path_t *temp_path;       // 临时文件路径
    ngx_http_upstream_local_t *local; // 本机地址
    ngx_array_t *store_lengths;  // 缓存文件路径长度
    ngx_array_t *store_values;   // 缓存文件路径
    
    // 其他
    signed store:2;
    unsigned intercept_404:1;
    unsigned change_buffering:1;
    ngx_str_t module;            // 模块名称(日志用)
};

三、Upstream处理流程

1. 创建和初始化

static ngx_int_t ngx_http_proxy_handler(ngx_http_request_t *r) {
    // 创建upstream结构
    if (ngx_http_upstream_create(r) != NGX_OK) {
        return NGX_HTTP_INTERNAL_SERVER_ERROR;
    }
    
    // 获取配置
    plcf = ngx_http_get_module_loc_conf(r, ngx_http_proxy_module);
    u = r->upstream;
    
    // 设置配置
    u->conf = &plcf->upstream;
    
    // 设置回调函数
    u->create_request = ngx_http_proxy_create_request;
    u->reinit_request = ngx_http_proxy_reinit_request;
    u->process_header = ngx_http_proxy_process_status_line;
    u->abort_request = ngx_http_proxy_abort_request;
    u->finalize_request = ngx_http_proxy_finalize_request;
    
    // 设置缓冲标志
    u->buffering = plcf->upstream.buffering;
    
    // 初始化upstream
    rc = ngx_http_read_client_request_body(r, ngx_http_upstream_init);
    
    return NGX_DONE;
}

2. 启动Upstream

void ngx_http_upstream_init(ngx_http_request_t *r) {
    ngx_connection_t *c = r->connection;
    
    // 移除客户端读事件定时器
    if (c->read->timer_set) {
        ngx_del_timer(c->read);
    }
    
    ngx_http_upstream_init_request(r);
}

static void ngx_http_upstream_init_request(ngx_http_request_t *r) {
    u = r->upstream;
    u->store = u->conf->store;
    
    // 设置连接检查方法
    if (!u->store && !r->post_action && !u->conf->ignore_client_abort) {
        r->read_event_handler = ngx_http_upstream_rd_check_broken_connection;
        r->write_event_handler = ngx_http_upstream_wr_check_broken_connection;
    }
    
    // 调用create_request构造请求
    if (u->create_request(r) != NGX_OK) {
        ngx_http_finalize_request(r, NGX_HTTP_INTERNAL_SERVER_ERROR);
        return;
    }
    
    // 添加清理回调
    cln = ngx_http_cleanup_add(r, 0);
    cln->handler = ngx_http_upstream_cleanup;
    cln->data = r;
    u->cleanup = &cln->handler;
    
    // 连接上游服务器
    ngx_http_upstream_connect(r, u);
}

3. 与上游建立连接

static void ngx_http_upstream_connect(ngx_http_request_t *r, ngx_http_upstream_t *u) {
    // 建立连接
    rc = ngx_event_connect_peer(&u->peer);
    
    c = u->peer.connection;
    c->data = r;
    
    // 设置读写事件回调
    c->write->handler = ngx_http_upstream_handler;
    c->read->handler = ngx_http_upstream_handler;
    
    // 设置upstream读写处理器
    u->write_event_handler = ngx_http_upstream_send_request_handler;
    u->read_event_handler = ngx_http_upstream_process_header;
    
    if (rc == NGX_AGAIN) {
        // 连接未完成,设置超时定时器
        ngx_add_timer(c->write, u->conf->connect_timeout);
        return;
    }
    
    // 连接成功,发送请求
    ngx_http_upstream_send_request(r, u);
}

连接建立的核心函数ngx_event_connect_peer:

ngx_int_t ngx_event_connect_peer(ngx_peer_connection_t *pc) {
    // 创建socket
    s = ngx_socket(pc->sockaddr->sa_family, SOCK_STREAM, 0);
    
    // 获取连接结构
    c = ngx_get_connection(s, pc->log);
    
    // 设置非阻塞
    if (ngx_nonblocking(s) == -1) {
        // 错误处理
    }
    
    // 绑定本地地址
    if (pc->local) {
        if (bind(s, pc->local->sockaddr, pc->local->socklen) == -1) {
            // 错误处理
        }
    }
    
    // 设置连接回调
    c->recv = ngx_recv;
    c->send = ngx_send;
    c->recv_chain = ngx_recv_chain;
    c->send_chain = ngx_send_chain;
    c->sendfile = 1;
    
    // 设置读写事件
    rev = c->read;
    wev = c->write;
    
    pc->connection = c;
    
    // 添加到事件监听
    if (ngx_add_conn) {
        if (ngx_add_conn(c) == NGX_ERROR) {
            goto failed;
        }
    }
    
    // 发起连接
    rc = connect(s, pc->sockaddr, pc->socklen);
    
    // 处理连接结果
    // ...
}

4. 事件处理函数

static void ngx_http_upstream_handler(ngx_event_t *ev) {
    c = ev->data;
    r = c->data;
    u = r->upstream;
    c = r->connection;
    
    if (ev->write) {
        // 写事件处理
        u->write_event_handler(r, u);
    } else {
        // 读事件处理
        u->read_event_handler(r, u);
    }
    
    // 执行post请求
    ngx_http_run_posted_requests(c);
}

5. 发送请求到上游

static void ngx_http_upstream_send_request_handler(ngx_http_request_t *r, ngx_http_upstream_t *u) {
    c = u->peer.connection;
    
    // 检查超时
    if (c->write->timedout) {
        ngx_http_upstream_next(r, u, NGX_HTTP_UPSTREAM_FT_TIMEOUT);
        return;
    }
    
    // 如果包头已发送,设置空处理器
    if (u->header_sent) {
        u->write_event_handler = ngx_http_upstream_dummy_handler;
        (void) ngx_handle_write_event(c->write, 0);
        return;
    }
    
    // 发送请求
    ngx_http_upstream_send_request(r, u);
}

static void ngx_http_upstream_send_request(ngx_http_request_t *r, ngx_http_upstream_t *u) {
    // 发送请求体
    rc = ngx_http_upstream_send_request_body(r, u, do_write);
    
    u->request_sent = 1;
    
    // 移除写定时器
    if (c->write->timer_set) {
        ngx_del_timer(c->write);
    }
    
    if (rc == NGX_AGAIN) {
        // 请求未发完,重新设置定时器
        ngx_add_timer(c->write, u->conf->send_timeout);
        if (ngx_handle_write_event(c->write, u->conf->send_lowat) != NGX_OK) {
            ngx_http_upstream_finalize_request(r, u, NGX_HTTP_INTERNAL_SERVER_ERROR);
            return;
        }
        return;
    }
    
    /* rc == NGX_OK */
    // 请求发送完成,设置空处理器
    u->write_event_handler = ngx_http_upstream_dummy_handler;
    if (ngx_handle_write_event(c->write, 0) != NGX_OK) {
        ngx_http_upstream_finalize_request(r, u, NGX_HTTP_INTERNAL_SERVER_ERROR);
        return;
    }
    
    // 设置读超时
    ngx_add_timer(c->read, u->conf->read_timeout);
    
    // 如果已有响应,开始处理
    if (c->read->ready) {
        ngx_http_upstream_process_header(r, u);
        return;
    }
}

static ngx_int_t ngx_http_upstream_send_request_body(ngx_http_request_t *r, ngx_http_upstream_t *u, ngx_uint_t do_write) {
    // 首次发送时设置request_bufs
    if (!u->request_sent) {
        u->request_sent = 1;
        out = u->request_bufs;
        if (r->request_body->bufs) {
            for (cl = out; cl->next; cl = cl->next) { /* void */ }
            cl->next = r->request_body->bufs;
            r->request_body->bufs = NULL;
        }
    }
    
    // 执行发送
    if (do_write) {
        rc = ngx_output_chain(&u->output, out);
        // ...
    }
}

四、结束请求处理

1. 清理函数

static void ngx_http_upstream_cleanup(void *data) {
    ngx_http_request_t *r = data;
    ngx_http_upstream_finalize_request(r, r->upstream, NGX_DONE);
}

2. 重试机制

static void ngx_http_upstream_next(ngx_http_request_t *r, ngx_http_upstream_t *u, ngx_uint_t ft_type) {
    // 检查是否需要重试
    if (status) {
        u->state->status = status;
        timeout = u->conf->next_upstream_timeout;
        
        // 检查重试条件
        if (u->peer.tries == 0 || !(u->conf->next_upstream & ft_type) || 
            (timeout && ngx_current_msec - u->peer.start_time >= timeout)) {
            ngx_http_upstream_finalize_request(r, u, status);
            return;
        }
    }
    
    // 关闭现有连接
    if (u->peer.connection) {
        if (u->peer.connection->pool) {
            ngx_destroy_pool(u->peer.connection->pool);
        }
        ngx_close_connection(u->peer.connection);
        u->peer.connection = NULL;
    }
    
    // 重新连接
    ngx_http_upstream_connect(r, u);
}

3. 最终结束函数

static void ngx_http_upstream_finalize_request(ngx_http_request_t *r, ngx_http_upstream_t *u, ngx_int_t rc) {
    // 清理回调
    if (u->cleanup) {
        *u->cleanup = NULL;
        u->cleanup = NULL;
    }
    
    // 释放解析资源
    if (u->resolved && u->resolved->ctx) {
        ngx_resolve_name_done(u->resolved->ctx);
        u->resolved->ctx = NULL;
    }
    
    // 调用模块的finalize_request
    u->finalize_request(r, rc);
    
    // 关闭连接
    if (u->peer.connection) {
        if (u->peer.connection->pool) {
            ngx_destroy_pool(u->peer.connection->pool);
        }
        ngx_close_connection(u->peer.connection);
    }
    u->peer.connection = NULL;
    
    // 删除临时文件
    if (u->store && u->pipe && u->pipe->temp_file && u->pipe->temp_file->file.fd != NGX_INVALID_FILE) {
        if (ngx_delete_file(u->pipe->temp_file->file.name.data) == NGX_FILE_ERROR) {
            // 错误处理
        }
    }
    
    // 结束请求
    ngx_http_finalize_request(r, rc);
}

五、关键设计要点

  1. 非阻塞设计:所有网络操作都采用非阻塞模式,通过事件驱动机制实现高并发

  2. 模块化回调:通过函数指针允许HTTP模块自定义关键处理逻辑(如create_request、process_header等)

  3. 灵活的重试机制:通过next_upstream配置和ngx_http_upstream_next实现负载均衡和故障转移

  4. 高效的内存管理:使用内存池和缓冲区链管理请求和响应数据

  5. 完善的超时控制:对连接、发送、接收等各阶段都有独立的超时设置

  6. 灵活的缓冲策略:可根据配置选择直接转发或缓冲转发响应数据

  7. 事件驱动的状态机:通过读写事件处理器实现清晰的状态转换

六、性能优化建议

  1. 合理设置各种超时参数(connect_timeout、send_timeout、read_timeout)

  2. 根据实际场景选择是否开启buffering:

    • 开启buffering适合上游响应快、下游响应慢的场景
    • 关闭buffering适合上游响应慢、下游响应快的场景
  3. 优化buffer_size和busy_buffers_size等缓冲区参数

  4. 合理配置next_upstream实现故障自动转移

  5. 对于高并发场景,考虑启用keepalive减少连接建立开销

  6. 根据业务需求配置hide_headers和pass_headers,减少不必要的头部处理

Nginx Upstream模块源码分析与实现原理 一、Upstream模块概述 Nginx的upstream模块实现了反向代理功能的核心机制,负责与上游服务器(后端服务器)建立连接、发送请求、接收响应以及错误处理等完整流程。upstream机制将整个处理过程分为几个关键阶段: 启动upstream 连接上游服务器 向上游发送请求 接收上游响应(包头/包体) 结束请求 二、核心数据结构 1. ngx_ http_ upstream_ t 2. ngx_ http_ upstream_ conf_ t 三、Upstream处理流程 1. 创建和初始化 2. 启动Upstream 3. 与上游建立连接 连接建立的核心函数 ngx_event_connect_peer : 4. 事件处理函数 5. 发送请求到上游 四、结束请求处理 1. 清理函数 2. 重试机制 3. 最终结束函数 五、关键设计要点 非阻塞设计 :所有网络操作都采用非阻塞模式,通过事件驱动机制实现高并发 模块化回调 :通过函数指针允许HTTP模块自定义关键处理逻辑(如create_ request、process_ header等) 灵活的重试机制 :通过next_ upstream配置和ngx_ http_ upstream_ next实现负载均衡和故障转移 高效的内存管理 :使用内存池和缓冲区链管理请求和响应数据 完善的超时控制 :对连接、发送、接收等各阶段都有独立的超时设置 灵活的缓冲策略 :可根据配置选择直接转发或缓冲转发响应数据 事件驱动的状态机 :通过读写事件处理器实现清晰的状态转换 六、性能优化建议 合理设置各种超时参数(connect_ timeout、send_ timeout、read_ timeout) 根据实际场景选择是否开启buffering: 开启buffering适合上游响应快、下游响应慢的场景 关闭buffering适合上游响应慢、下游响应快的场景 优化buffer_ size和busy_ buffers_ size等缓冲区参数 合理配置next_ upstream实现故障自动转移 对于高并发场景,考虑启用keepalive减少连接建立开销 根据业务需求配置hide_ headers和pass_ headers,减少不必要的头部处理