ElasticSearch 5.6源码解析HTTP/TCP请求

http请求解析

NettyHttpServerTransport 监听http请求。在其他版本中这个类在源码内,可直接找到。但在5.6中这个类被封装在netty插件中。因此从监听到http请求到请求转发到restController这部分我没看到。以下是从网络上找到的。http://blog.csdn.net/xgjianstart/article/details/70143365

首先,NettyHttpServerTransport 会负责进行监听Http请求。通过配置http.netty.http.blocking_server 你可以选择是Nio还是传统的阻塞式服务。默认是NIO。该类在配置pipeline的时候,最后添加了HttpRequestHandler,所以具体的接受到请求后的处理逻辑就由该类来完成了。

pipeline.addLast("handler", requestHandler);

HttpRequestHandler 实现了标准的 messageReceived(ChannelHandlerContext ctx, MessageEvent e) 方法,在该方法中,HttpRequestHandler 会回调NettyHttpServerTransport.dispatchRequest方法,而该方法会调用HttpServerAdapter.dispatchRequest,接着又会调用HttpServer.internalDispatchRequest方法(额,好吧,我承认嵌套挺深有点深):

public void internalDispatchRequest(final HttpRequest request, final HttpChannel channel) {
        String rawPath = request.rawPath();
        if (rawPath.startsWith("/_plugin/")) {
            RestFilterChain filterChain = restController.filterChain(pluginSiteFilter);
            filterChain.continueProcessing(request, channel);
            return;
        } else if (rawPath.equals("/favicon.ico")) {
            handleFavicon(request, channel);
            return;
        }
        restController.dispatchRequest(request, channel);
    }

这个方法里我们看到了plugin等被有限处理。最后请求又被转发给 RestController。

从上面可以看出,请求被分发到RestController,然后调用dispatchRequest方法。在dispatchRequest方法中首先获取到根据请求的类型获取handler:

final RestHandler handler = getHandler(request);

这个handler是在创建具体的请求实例时写入restController中的。

public RestSearchAction(Settings settings, RestController controller) {
    super(settings);
    controller.registerHandler(GET, "/_search", this);
    controller.registerHandler(POST, "/_search", this);
    controller.registerHandler(GET, "/{index}/_search", this);
    controller.registerHandler(POST, "/{index}/_search", this);
    controller.registerHandler(GET, "/{index}/{type}/_search", this);
    controller.registerHandler(POST, "/{index}/{type}/_search", this);
}

以search请求为例,此处的handler是RestSearchAction类型的。 最后调用的是handler的handlerequest方法。

  wrappedHandler.handleRequest(request, channel, client);

RestSearchAction继承了 BaseRestHandler。因此这个地方调用的是BaseRestHandler的handleRequest,在handleRequest中准备请求:

 final RestChannelConsumer action = prepareRequest(request, client);

这个prepareRequest方法是RestSearchAction的prepareRequest,根据RestRequest生成一个SearchRequest,交给NodeClient执行。

channel -> client.search(searchRequest, new RestStatusToXContentListener<>(channel))

NodeClient执行doExecute方法,将之前的SearchAction.INSTANCE(GenericAction)转化为对应的TransportSearchAction:

transportAction(action).execute(request, listener);

上述TransportSearchAction的execute方法最终执行的是自身的doExecute方法

this.action.doExecute(task, request, listener);

在doExecute中执行真正的查找流程。

Tcp请求解析

Tcp请求是由Netty4Transport来接收解析的。和上述一样,Netty4Transport被封装在netty插件中,因此没看到tcp接收的过程。 后面请求被转发至TCPtransport的messageReceived方法。而在该方法中调用handleRequest处理请求。

handleRequest(channel, profileName, streamIn, requestId, messageLengthBytes, version, remoteAddress, status);

在handleRequest中读取出action,跟不同的action生成具体的TransportRequest子类和一个RequestHandler,并执行该RequestHandler。

final String action = stream.readString();
.....
final TransportRequest request = reg.newRequest();
.....
threadPool.executor(reg.getExecutor()).execute(new RequestHandler(reg, request, transportChannel));

RequestHandler是一个可运行的AbstractRunnable,最后执行的dorun方法:

 protected void doRun() throws Exception {
    reg.processMessageReceived(request, transportChannel);
 }

processMessageReceived中则是通过handler调用其messageReceived方法

handler.messageReceived(request, channel);

以search请求为例,这个handler不是上述生成的RequestHandler而是RequestHandlerRegistry中的handler,是一个TransportRequestHandler。而这个TransportRequestHandler是handledTransportAction中的一个变量,而handledTransportAction继承于transportAction,且是所有transport*action的父类,比如说transportBulkAction,transportSearchAction等等。 因此上述的handler.messageReceived(request, channel)调用的是handledTransportAction的子类transportSearchAction的messageReceived。至此总算找到了执行该请求的最终方法,即ransportSearchAction的messageReceived方法。但是还有个问题,reg(RequestHandlerRegistry )是从哪来的? 这个RequestHandlerRegistry 则在TCPtransport的messageReceived方法中通过transportServiceAdapter获取的:

final RequestHandlerRegistry reg = transportServiceAdapter.getRequestHandler(action);

而这个RequestHandlerRegistry是transportSearchAction构建的时候调用父类HandledTransportAction的构造方法注册的:

protected HandledTransportAction(Settings settings, String actionName, boolean canTripCircuitBreaker, ThreadPool threadPool,TransportService transportService, ActionFilters actionFilters,IndexNameExpressionResolver indexNameExpressionResolver, Supplier<Request> request) {
     super(settings, actionName, threadPool, actionFilters, indexNameExpressionResolver, transportService.getTaskManager());
     transportService.registerRequestHandler(actionName, request, ThreadPool.Names.SAME, false, canTripCircuitBreaker,
            new TransportHandler());
    }

找到了RequestHandlerRegistry(TransportService中),获取reg的transportServiceAdapter(TCPtransport),二者是怎么联系起来的?

TransportService中有一个adapter,实现了TransportServiceAdapter方法。在dostart方法中设置了transport的adpter:

transport.transportServiceAdapter(adapter);

即设置了TCPtransport的adapter。那么TCPtransport调用的RequestHandlerRegistry reg = transportServiceAdapter.getRequestHandler(action),就是调用transportService中的adapter,从而获取到注册的RequestHandlerRegistry。 此处我们还是没回答TransportService和TCPtransport二者的关系。

networkModule中的提供了注册transport的方法,但是一直没找到TcpTransport的注册方法,后来发现tcptransport是通过netty插件注册进去的。 在Node类构造TransportService时传入:

Transport transport = networkModule.getTransportSupplier().get();
final TransportService transportService = newTransportService(settings, transport, threadPool,networkModule.getTransportInterceptor(), localNodeFactory, settingsModule.getClusterSettings());

终于找到源头了。。真的好乱。其类依赖关系如下:

image.png

本文参与腾讯云自媒体分享计划,欢迎正在阅读的你也加入,一起分享。

发表于

我来说两句

0 条评论
登录 后参与评论

相关文章

来自专栏我和PYTHON有个约会

Django来敲门~第一部分【5.1.项目配置settings.py详解】

我们创建好了一个Python项目(mysite/)之后,需要在项目中添加模块应用(polls/),在模块应用中添加处理功能逻辑,如添加模块中的视图处理函数(po...

11830
来自专栏葡萄城控件技术团队

七天学会ASP.NET MVC (三)——ASP.Net MVC 数据处理

? 第三天我们将学习Asp.Net中数据处理功能,了解数据访问层,EF,以及EF中常用的代码实现方式,创建数据访问层和数据入口,处理Post数据,以及数据验...

366100
来自专栏码农阿宇

国内开源社区巨作AspectCore-Framework入门

在软件业,AOP为Aspect Oriented Programming的缩写,意为:面向切面编程,通过预编译方式和运行期动态代理实现程序功能的统一维护的一种技...

29710
来自专栏xdecode

Java源码安全审查

最近业务需要出一份Java Web应用源码安全审查报告, 对比了市面上数种工具及其分析结果, 基于结果总结了一份规则库. 本文目录结构如下: 

85920
来自专栏Java架构沉思录

你真的懂Mybatis缓存机制吗

Mybatis的一级缓存是指Session缓存。一级缓存的作用域默认是一个SqlSession。Mybatis默认开启一级缓存。 也就是在同一个SqlSessi...

1.8K50
来自专栏Java帮帮-微信公众号-技术文章全总结

Hibernate_day01总结

? 第1章 Hibernate_day01总结 今日内容 Hibernate框架的概述 Hibernate的快速入门 Hibernate核心API的介绍 Hi...

35090
来自专栏IT笔记

JAVAWEB开发的微信公众号H5支付

一切需求都是来源于业务需要,前一阵子做了微信扫码支付,的确相对PC用户来说方便了很多。但是如果手机下单,你总不能让用户自己扫自己吧?查看了一下文档,微信还是支持...

1.8K50
来自专栏张泽旭的专栏

基于spring boot sftp文件上传

对sftp文件上传将行封装,实现连接的单例模式,完成线程安全的改进,sftp文件上传下载失败的重试。

44110
来自专栏JadePeng的技术博客

XNginx - nginx 集群可视化管理工具

之前团队的nginx管理,都是运维同学每次去修改配置文件,然后重启,非常不方便,一直想找一个可以方便管理nginx集群的工具,翻遍web,未寻到可用之物,于是自...

2.2K40
来自专栏Seebug漏洞平台

Exim Off-by-one(CVE-2018-6789)漏洞复现分析

前段时间meh又挖了一个Exim的RCE漏洞[1],而且这次RCE的漏洞的约束更少了,就算开启了PIE仍然能被利用。虽然去年我研究过Exim,但是时间过去这么久...

56370

扫码关注云+社区

领取腾讯云代金券