前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >dubbo源码学习之服务端接收消息 负载均衡(四)

dubbo源码学习之服务端接收消息 负载均衡(四)

作者头像
周杰伦本人
发布2022-10-25 16:35:53
2100
发布2022-10-25 16:35:53
举报
文章被收录于专栏:同步文章

服务端接收消息处理过程

NettyHandler. messageReceived

接收消息的时候,通过NettyHandler.messageReceived作为入口。

代码语言:javascript
复制
@Override
public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) throws Exception {
    NettyChannel channel = NettyChannel.getOrAddChannel(ctx.getChannel(), url, handler);
    try {
        
        handler.received(channel, e.getMessage());
    } finally {
        NettyChannel.removeChannelIfDisconnected(ctx.getChannel());
    }
}

handler.received

这个handler是什么呢?还记得在服务发布的时候,组装了一系列的handler吗?代码如下

HeaderExchanger.bind

代码语言:javascript
复制
public ExchangeServer bind(URL url, ExchangeHandler handler) throws RemotingException {
    return new HeaderExchangeServer(Transporters.bind(url, new DecodeHandler(new HeaderExchangeHandler(handler))));
}

NettyServer

接着又在Nettyserver中,wrap了多个handler

代码语言:javascript
复制
public NettyServer(URL url, ChannelHandler handler) throws RemotingException{
    super(url, ChannelHandlers.wrap(handler, ExecutorUtil.setThreadName(url, SERVER_THREAD_POOL_NAME)));
}
代码语言:javascript
复制
protected ChannelHandler wrapInternal(ChannelHandler handler, URL url) {
    return new MultiMessageHandler(new HeartbeatHandler(ExtensionLoader.getExtensionLoader(Dispatcher.class)
                                    .getAdaptiveExtension().dispatch(handler, url)));
}

所以服务端的handler处理链为MultiMessageHandler(HeartbeatHandler(AllChannelHandler(DecodeHandler)))

MultiMessageHandler: 复合消息处理

HeartbeatHandler:心跳消息处理,接收心跳并发送心跳响应

AllChannelHandler:业务线程转化处理器,把接收到的消息封装成ChannelEventRunnable可执行任务给线程池处理

DecodeHandler:业务解码处理器

HeaderExchangeHandler.received

交互层请求响应处理,有三种处理方式

  1. handlerRequest,双向请求
  2. handler.received 单向请求
  3. handleResponse 响应消息
代码语言:javascript
复制
public void received(Channel channel, Object message) throws RemotingException {
    channel.setAttribute(KEY_READ_TIMESTAMP, System.currentTimeMillis());
    ExchangeChannel exchangeChannel = HeaderExchangeChannel.getOrAddChannel(channel);
    try {
        if (message instanceof Request) {
            // handle request.
            Request request = (Request) message;
            if (request.isEvent()) {
                handlerEvent(channel, request);
            } else {
                if (request.isTwoWay()) {
                    //这里 ================
                    Response response = handleRequest(exchangeChannel, request);
                    channel.send(response);
                } else {
                    handler.received(exchangeChannel, request.getData());
                }
            }
        } else if (message instanceof Response) {
            handleResponse(channel, (Response) message);
        } else if (message instanceof String) {
            if (isClientSide(channel)) {
                Exception e = new Exception("Dubbo client can not supported string message: " + message + " in channel: " + channel + ", url: " + channel.getUrl());
                logger.error(e.getMessage(), e);
            } else {
                String echo = handler.telnet(channel, (String) message);
                if (echo != null && echo.length() > 0) {
                    channel.send(echo);
                }
            }
        } else {
            handler.received(exchangeChannel, message);
        }
    } finally {
        HeaderExchangeChannel.removeChannelIfDisconnected(channel);
    }
}

handleRequest

处理请求并返回response

代码语言:javascript
复制
Response handleRequest(ExchangeChannel channel, Request req) throws RemotingException {
    Response res = new Response(req.getId(), req.getVersion());
    if (req.isBroken()) {
        Object data = req.getData();

        String msg;
        if (data == null) msg = null;
        else if (data instanceof Throwable) msg = StringUtils.toString((Throwable) data);
        else msg = data.toString();
        res.setErrorMessage("Fail to decode request due to: " + msg);
        res.setStatus(Response.BAD_REQUEST);

        return res;
    }
    // find handler by message class.
    Object msg = req.getData();
    try {
        // handle data.这里===================
        Object result = handler.reply(channel, msg);
        res.setStatus(Response.OK);
        res.setResult(result);
    } catch (Throwable e) {
        res.setStatus(Response.SERVICE_ERROR);
        res.setErrorMessage(StringUtils.toString(e));
    }
    return res;
}

ExchangeHandlerAdaptive.replay(DubboProtocol)

调用DubboProtocol中定义的ExchangeHandlerAdaptive.replay方法处理消息

代码语言:javascript
复制
private ExchangeHandler requestHandler = new ExchangeHandlerAdapter() {
    
    public Object reply(ExchangeChannel channel, Object message) throws RemotingException {
        if (message instanceof Invocation) {
            Invocation inv = (Invocation) message;
            Invoker<?> invoker = getInvoker(channel, inv);
            //如果是callback 需要处理高版本调用低版本的问题
            if (Boolean.TRUE.toString().equals(inv.getAttachments().get(IS_CALLBACK_SERVICE_INVOKE))){
                String methodsStr = invoker.getUrl().getParameters().get("methods");
                boolean hasMethod = false;
                if (methodsStr == null || methodsStr.indexOf(",") == -1){
                    hasMethod = inv.getMethodName().equals(methodsStr);
                } else {
                    String[] methods = methodsStr.split(",");
                    for (String method : methods){
                        if (inv.getMethodName().equals(method)){
                            hasMethod = true;
                            break;
                        }
                    }
                }
                if (!hasMethod){
                    logger.warn(new IllegalStateException("The methodName "+inv.getMethodName()+" not found in callback service interface ,invoke will be ignored. please update the api interface. url is:" + invoker.getUrl()) +" ,invocation is :"+inv );
                    return null;
                }
            }
            RpcContext.getContext().setRemoteAddress(channel.getRemoteAddress());
            //这里==========================
            return invoker.invoke(inv);
        }

那接下来invoker.invoke会调用哪个类中的方法呢?还记得在RegistryDirectory中发布本地方法的时候,对invoker做的包装吗?通过InvokerDelegete对原本的invoker做了一层包装,而原本的invoker是什么呢?是一个JavassistProxyFactory生成的动态代理吧。所以此处的invoker应该是

JavassistProxyFactory

代码语言:javascript
复制
wrapper.invokeMethod(proxy, methodName, parameterTypes, arguments);

Directory

集群目录服务Directory, 代表多个Invoker, 可以看成List,它的值可能是动态变化的比如注册中心推送变更。集群选择调用服务时通过目录服务找到所有服务

StaticDirectory: 静态目录服务, 它的所有Invoker通过构造函数传入, 服务消费方引用服务的时候, 服务对多注册中心的引用,将Invokers集合直接传入 StaticDirectory构造器,再由Cluster伪装成一个Invoker;StaticDirectory的list方法直接返回所有invoker集合;

RegistryDirectory: 注册目录服务, 它的Invoker集合是从注册中心获取的, 它实现了NotifyListener接口实现了回调接口notify(List)

Directory目录服务的更新过程

RegistryProtocol.doRefer方法,也就是消费端在初始化的时候,这里涉及到了RegistryDirectory这个类。然后执行cluster.join(directory)方法。

cluster.join其实就是将Directory中的多个Invoker伪装成一个Invoker, 对上层透明,包含集群的容错机制

directory.subscribe

订阅节点的变化,

  1. 当zookeeper上指定节点发生变化以后,会通知到RegistryDirectory的notify方法
  2. 将url转化为invoker对象

调用过程中invokers的使用

再调用过程中,AbstractClusterInvoker.invoke方法中,

list方法

从directory中获得invokers

负载均衡LoadBalance

LoadBalance负载均衡, 负责从多个 Invokers中选出具体的一个Invoker用于本次调用,调用过程中包含了负载均衡的算法。

负载均衡代码访问入口

在AbstractClusterInvoker.invoke中代码如下,通过名称获得指定的扩展点。RandomLoadBalance

AbstractClusterInvoker.doselect

调用LoadBalance.select方法,讲invokers按照指定算法进行负载

默认情况下,LoadBalance使用的是Random算法,但是这个随机和我们理解上的随机还是不一样的,因为他还有个概念叫weight(权重)

RandomLoadBalance

假设有四个集群节点A,B,C,D,对应的权重分别是1,2,3,4,那么请求到A节点的概率就为1/(1+2+3+4) = 10%.B,C,D节点依次类推为20%,30%,40%.

本文参与 腾讯云自媒体同步曝光计划,分享自作者个人站点/博客。
原始发表:2020-07-13,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 作者个人站点/博客 前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 服务端接收消息处理过程
    • NettyHandler. messageReceived
      • handler.received
        • HeaderExchangeHandler.received
          • handleRequest
            • ExchangeHandlerAdaptive.replay(DubboProtocol)
            • Directory
              • Directory目录服务的更新过程
                • directory.subscribe
                  • 调用过程中invokers的使用
                    • list方法
                    • 负载均衡LoadBalance
                      • 负载均衡代码访问入口
                        • AbstractClusterInvoker.doselect
                          • RandomLoadBalance
                          相关产品与服务
                          负载均衡
                          负载均衡(Cloud Load Balancer,CLB)提供安全快捷的流量分发服务,访问流量经由 CLB 可以自动分配到云中的多台后端服务器上,扩展系统的服务能力并消除单点故障。负载均衡支持亿级连接和千万级并发,可轻松应对大流量访问,满足业务需求。
                          领券
                          问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档