两个核心数据结构:
1.每个请求都有的postponed链表。一般情况下每个链表节点保存了该请求的一个子请求。
struct ngx_http_postponed_request_s {
ngx_http_request_t *request;
ngx_chain_t *out;
ngx_http_postponed_request_t *next;
};
2.posted_requests链表,它挂载了当前需要遍历的请求(链表上的节点), 该链表保存在主请求(根节点)的posted_requests字段。
struct ngx_http_posted_request_s {
ngx_http_request_t *request;
ngx_http_posted_request_t *next;
};
server {
listen 10.10.10.10:8080;
location / {
mirror /mirror;
proxy_pass http://vedio_play_support;
session_sticky_hide_cookie upstream=vedio_play_support;
}
location = /mirror {
internal;
proxy_pass http://backend$request_uri;
}
}
handler处于precontent阶段
static ngx_int_t ngx_http_mirror_handler(ngx_http_request_t *r)
{
...
mlcf = ngx_http_get_module_loc_conf(r, ngx_http_mirror_module);
// 接收包体打开
if (mlcf->request_body) {
rc = ngx_http_read_client_request_body(r, ngx_http_mirror_body_handler);
}
return ngx_http_mirror_handler_internal(r);
...
}
// 该handler是接收完包体的回调函数
static void ngx_http_mirror_body_handler(ngx_http_request_t *r)
{
ngx_http_mirror_ctx_t *ctx;
ctx = ngx_http_get_module_ctx(r, ngx_http_mirror_module);
// 子请求在这个函数中被创建,与主请求关联
ctx->status = ngx_http_mirror_handler_internal(r);
r->preserve_body = 1;
// 主请求继续run phases
r->write_event_handler = ngx_http_core_run_phases;
ngx_http_core_run_phases(r);
}
static ngx_int_t ngx_http_mirror_handler_internal(ngx_http_request_t *r)
{
...
for (i = 0; i < mlcf->mirror->nelts; i++) {
// 遍历mirror数组,生成subrequest
if (ngx_http_subrequest(r, &name[i], &r->args, &sr, NULL, NGX_HTTP_SUBREQUEST_BACKGROUND) != NGX_OK)
{
return NGX_HTTP_INTERNAL_SERVER_ERROR;
}
sr->header_only = 1;
sr->method = r->method;
sr->method_name = r->method_name;
}
return NGX_DECLINED;
...
}
ngx_int_t
ngx_http_subrequest(ngx_http_request_t *r, ngx_str_t *uri, ngx_str_t *args, ngx_http_request_t **psr, ngx_http_post_subrequest_t *ps, ngx_uint_t flags)
{
...
// 申请一个新的request
sr = ngx_pcalloc(r->pool, sizeof(ngx_http_request_t));
if (sr == NULL) {
return NGX_ERROR;
}
// 如果不是background模式(mirror默认使用这个模式),会申请ngx_http_postponed_request_t结构体(包装子请求)
//将sr赋值给pr->request,并把pr挂到r->postponed的链表里(sr设置为r的子节点)
if (!sr->background) {
if (c->data == r && r->postponed == NULL) {
c->data = sr;
}
pr = ngx_palloc(r->pool, sizeof(ngx_http_postponed_request_t));
if (pr == NULL) {
return NGX_ERROR;
}
pr->request = sr;
pr->out = NULL;
pr->next = NULL;
if (r->postponed) {
for (p = r->postponed; p->next; p = p->next) { /* void */ }
p->next = pr;
} else {
r->postponed = pr;
}
}
sr->internal = 1;
sr->method = NGX_HTTP_GET;
sr->read_event_handler = ngx_http_request_empty_handler;
sr->write_event_handler = ngx_http_handler;
r->main->count++;
*psr = sr;
// 将当前这个sr挂到main request的posted_requests中。
return ngx_http_post_request(sr, NULL);
...
}
ngx_int_t ngx_http_post_request(ngx_http_request_t *r, ngx_http_posted_request_t *pr)
{
...
// pr是null,申请一个pr
if (pr == NULL) {
pr = ngx_palloc(r->pool, sizeof(ngx_http_posted_request_t));
if (pr == NULL) {
return NGX_ERROR;
}
}
// 将当前的子请求赋值给pr->request
pr->request = r;
pr->next = NULL;
for (p = &r->main->posted_requests; *p; p = &(*p)->next) { /* void */ }
// 找到main request的posted_requests,给放到末尾
*p = pr;
...
}
ngx_http_run_posted_requests
走到这时说明子请求创建完毕,通常子请求的创建都发生在某个请求的content handler或者某个filter内。
子请求并没有马上被执行,只是被挂载在了主请求的posted_requests链表中。
posted_requests链表是在ngx_http_run_posted_requests函数中遍历。
在某个请求的读(写)事件的handler中,执行完该请求相关的处理后被调用。
比如主请求在走完一遍PHASE的时候会调用ngx_http_run_posted_requests。这时子请求得以运行。
void
ngx_http_run_posted_requests(ngx_connection_t *c)
{
ngx_http_request_t *r;
ngx_http_posted_request_t *pr;
for ( ;; ) {
if (c->destroyed) {
return;
}
r = c->data;
// 从main开始遍历他的所有子请求
pr = r->main->posted_requests;
if (pr == NULL) {
return;
}
r->main->posted_requests = pr->next;
r = pr->request;
ngx_http_set_log_request(c->log, r);
ngx_log_debug2(NGX_LOG_DEBUG_HTTP, c->log, 0, "http posted request: \"%V?%V\"", &r->uri, &r->args);
// 调用write event handller, mirror的handler就是ngx_http_handler
r->write_event_handler(r);
}
}
void
ngx_http_handler(ngx_http_request_t *r)
{
ngx_http_core_main_conf_t *cmcf;
r->connection->log->action = NULL;
if (!r->internal) {
switch (r->headers_in.connection_type) {
case 0:
r->keepalive = (r->http_version > NGX_HTTP_VERSION_10);
break;
case NGX_HTTP_CONNECTION_CLOSE:
r->keepalive = 0;
break;
case NGX_HTTP_CONNECTION_KEEP_ALIVE:
r->keepalive = 1;
break;
}
r->lingering_close = (r->headers_in.content_length_n > 0 || r->headers_in.chunked);
r->phase_handler = 0;
} else {
// 对于mirror来说,因为是interal,从server_rewrite开始
cmcf = ngx_http_get_module_main_conf(r, ngx_http_core_module);
r->phase_handler = cmcf->phase_engine.server_rewrite_index;
}
r->valid_location = 1;
#if (NGX_HTTP_GZIP)
r->gzip_tested = 0;
r->gzip_ok = 0;
r->gzip_vary = 0;
#endif
// 从头开始执行phases
r->write_event_handler = ngx_http_core_run_phases;
ngx_http_core_run_phases(r);
}
ngx_http_process_request_headers处理
static void ngx_http_process_request_headers(ngx_event_t *rev)
{
...
/* a whole header has been parsed successfully */
rc = ngx_http_process_request_header(r);
if (rc != NGX_OK) {
break;
}
ngx_http_process_request(r);
// 运行子请求
ngx_http_run_posted_requests(c);
...
}
void ngx_http_process_request(ngx_http_request_t *r)
{
...
c->read->handler = ngx_http_request_handler;
c->write->handler = ngx_http_request_handler;
r->read_event_handler = ngx_http_block_reading;
ngx_http_handler(r);
...
}
void ngx_http_handler(ngx_http_request_t *r)
{
...
if (!r->internal) {
switch (r->headers_in.connection_type) {
case 0:
r->keepalive = (r->http_version > NGX_HTTP_VERSION_10);
break;
case NGX_HTTP_CONNECTION_CLOSE:
r->keepalive = 0;
break;
case NGX_HTTP_CONNECTION_KEEP_ALIVE:
r->keepalive = 1;
break;
}
r->lingering_close = (r->headers_in.content_length_n > 0 || r->headers_in.chunked);
r->phase_handler = 0;
} else {
cmcf = ngx_http_get_module_main_conf(r, ngx_http_core_module);
r->phase_handler = cmcf->phase_engine.server_rewrite_index;
}
r->write_event_handler = ngx_http_core_run_phases;
ngx_http_core_run_phases(r);
...
}
对于mirror来说,是从server rewrite开始一直到upstream,结束之后,对这个request进行finalize
void ngx_http_finalize_request(ngx_http_request_t *r, ngx_int_t rc)
{
...
// 子请求,且有回调函数,执行
if (r != r->main && r->post_subrequest) {
rc = r->post_subrequest->handler(r, r->post_subrequest->data, rc);
}
// 子请求
if (r != r->main) {
// background直接关闭request
if (r->background) {
r->done = 1;
ngx_http_finalize_connection(r);
return;
}
/* 该子请求还有未处理完的数据或者子请求 */
if (r->buffered || r->postponed) {
// 添加一个该子请求的写事件,并设置合适的write event hander,
//以便下次写事件来的时候继续处理,这里实际上下次执行时会调用ngx_http_output_filter函数,
//最终还是会进入ngx_http_postpone_filter进行处理
if (ngx_http_set_write_handler(r) != NGX_OK) {
ngx_http_terminate_request(r, 0);
}
return;
}
pr = r->parent;
//该子请求处理完毕,如它有发送数据的权利,将权利移交给父请求,c->data指向正在处理的请求,和当前request相同说明是子请求移交过来的
if (r == c->data) {
r->main->count--;
r->done = 1;
// 如果该子请求不是提前完成,则从父请求的postponed链表中删除
if (pr->postponed && pr->postponed->request == r) {
pr->postponed = pr->postponed->next;
}
// 将c->data指向parent
c->data = pr;
} else {
// 到这里表明该子请求提前执行完成,且它没有产生任何数据,则它下次再次获得
//将会执行ngx_http_request_finalzier函数,它实际上是执行 ngx_http_finalzie_request(r,0),什么都不做直到轮到它发送数据时,
//ngx_http_finalzie_request函数会将它从父请求的postponed链表中删除
r->write_event_handler = ngx_http_request_finalizer;
if (r->waited) {
r->done = 1;
}
}
// 将父请求加入posted_request队尾
if (ngx_http_post_request(pr, NULL) != NGX_OK) {
r->main->count++;
ngx_http_terminate_request(r, 0);
return;
}
}
// 这里是处理主请求结束的逻辑,如果主请求有未发送的数据或者未处理的子请求
//则给主请求添加写事件,并设置合适的write event hander,
//以便下次写事件来的时候继续处理
if (r->buffered || c->buffered || r->postponed || r->blocked) {
if (ngx_http_set_write_handler(r) != NGX_OK) {
ngx_http_terminate_request(r, 0);
}
return;
}
...
}
nginx作为反向代理,upstream处理完请求后,开始发送response
static void ngx_http_upstream_process_header(ngx_http_request_t *r, ngx_http_upstream_t *u)
{
...
ngx_http_upstream_send_response(r, u);
...
}
static void ngx_http_upstream_send_response(ngx_http_request_t *r, ngx_http_upstream_t *u)
{
...
rc = ngx_http_send_header(r);
p->output_filter = ngx_http_upstream_output_filter;
...
}
static ngx_int_t ngx_http_upstream_output_filter(void *data, ngx_chain_t *chain)
{
...
rc = ngx_http_output_filter(r, chain);
...
}
ngx_int_t ngx_http_send_header(ngx_http_request_t *r)
{
return ngx_http_top_header_filter(r);
}
ngx_int_t ngx_http_output_filter(ngx_http_request_t *r, ngx_chain_t *in)
{
rc = ngx_http_top_body_filter(r, in);
}
output filter是发送output的filter,里面会调用top_body_filter。
这个链表里面有ngx_http_postpone_filter,这个是组织回包格式的filter。
ngx_int_t ngx_http_output_filter(ngx_http_request_t *r, ngx_chain_t *in)
{
ngx_int_t rc;
ngx_connection_t *c;
c = r->connection;
ngx_log_debug2(NGX_LOG_DEBUG_HTTP, c->log, 0, "http output filter \"%V?%V\"", &r->uri, &r->args);
rc = ngx_http_top_body_filter(r, in);
if (rc == NGX_ERROR) {
/* NGX_ERROR may be returned by any filter */
c->error = 1;
}
return rc;
}
static ngx_int_t ngx_http_postpone_filter(ngx_http_request_t *r, ngx_chain_t *in)
{
...
ngx_connection_t *c;
ngx_http_postponed_request_t *pr;
//取得当前的链接
c = r->connection;
//如果r不等于c->data,前面的分析知道c->data保存的是最新的一个sub request(同级的话,是第一个),因此不等于则说明是需要保存数据的父request。
if (r != c->data) {
if (in) {
//保存数据
ngx_http_postpone_filter_add(r, in);
//这里注意不发送任何数据,直接返回OK,会在finalize_request中处理。
return NGX_OK;
}
return NGX_OK;
}
//如果r->postponed为空,则说明是最后一个sub request,也就是最新的那个,因此需要将它先发送出去。
if (r->postponed == NULL) {
//如果in存在,则发送出去
if (in || c->buffered) {
return ngx_http_next_filter(r->main, in);
}
return NGX_OK;
}
...
}
子请求不存在响应头部的概念。
子请求读事件设置为空,子请求不受前段控制。
原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。
如有侵权,请联系 cloudcommunity@tencent.com 删除。
原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。
如有侵权,请联系 cloudcommunity@tencent.com 删除。