前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >dubbo源码之单一长连接与客户端多线程并发请求是如何协调的

dubbo源码之单一长连接与客户端多线程并发请求是如何协调的

作者头像
山行AI
发布2019-07-12 15:20:49
2K0
发布2019-07-12 15:20:49
举报
文章被收录于专栏:山行AI山行AI

试想一下,dubbo的consumer与dubbo的provider端之间是通过一个长连接来进行通信的,但是dubbo的consumer还要处理很多线程的业务操作,会有很多线程的请求需要通过这个长连接来进行处理,那么它是怎么做到的呢?

dubbo consumer端的client端代码分析:

1. Client

client端的类结构图为:

com.alibaba.dubbo.remoting.exchange.ExchangeClient的结构:

可见ExchangeClient有很多实现类,现在我们着重分析dubbo默认使用的类com.alibaba.dubbo.remoting.exchange.support.header.HeaderExchangeClient:

代码语言:javascript
复制
/** * DefaultMessageClient * * @author william.liangf * @author chao.liuc */public class HeaderExchangeClient implements ExchangeClient {    private static final Logger logger = LoggerFactory.getLogger( HeaderExchangeClient.class );    //处理心跳定时线程    private static final ScheduledThreadPoolExecutor scheduled = new ScheduledThreadPoolExecutor(2, new NamedThreadFactory("dubbo-remoting-client-heartbeat", true));    // 心跳定时器    private ScheduledFuture<?> heatbeatTimer;    // 心跳超时,毫秒。缺省0,不会执行心跳。    private int heartbeat;    private int heartbeatTimeout;    //装饰器模式,真正的client,可能是nettyClient、minaClient等    //@see com.alibaba.dubbo.remoting.Transporters#connect(com.alibaba.dubbo.common.URL, com.alibaba.dubbo.remoting.ChannelHandler...)    private final Client client;    //通道    private final ExchangeChannel channel;    public ResponseFuture request(Object request) throws RemotingException {        return channel.request(request);    }

2. Channel

Channel的类实现关系如下:

这里主要来看下ExchangeChannel,这里对应的实现是com.alibaba.dubbo.remoting.exchange.support.header.HeaderExchangeChannel:

代码语言:javascript
复制
/** * ExchangeReceiver * * @author william.liangf */final class HeaderExchangeChannel implements ExchangeChannel {    private static final Logger logger      = LoggerFactory.getLogger(HeaderExchangeChannel.class);    private static final String CHANNEL_KEY = HeaderExchangeChannel.class.getName() + ".CHANNEL";    //装饰器模式,这个Channel是真正发送数据用到的Channel,可能是nettyChannel也可能是minaChannel    private final Channel       channel;    private volatile boolean    closed      = false;     public void send(Object message) throws RemotingException {        send(message, getUrl().getParameter(Constants.SENT_KEY, false));    }    public void send(Object message, boolean sent) throws RemotingException {        if (closed) {            throw new RemotingException(this.getLocalAddress(), null, "Failed to send message " + message + ", cause: The channel " + this + " is closed!");        }        if (message instanceof Request                || message instanceof Response                || message instanceof String) {            channel.send(message, sent);        } else {            Request request = new Request();            request.setVersion("2.0.0");            request.setTwoWay(false);            request.setData(message);            channel.send(request, sent);        }    }    public ResponseFuture request(Object request) throws RemotingException {        return request(request, channel.getUrl().getPositiveParameter(Constants.TIMEOUT_KEY, Constants.DEFAULT_TIMEOUT));    }    public ResponseFuture request(Object request, int timeout) throws RemotingException {        if (closed) {            throw new RemotingException(this.getLocalAddress(), null, "Failed to send request " + request + ", cause: The channel " + this + " is closed!");        }        // create request.        Request req = new Request();        req.setVersion("2.0.0");        req.setTwoWay(true);        req.setData(request);        DefaultFuture future = new DefaultFuture(channel, req, timeout);        try{            channel.send(req);        }catch (RemotingException e) {            future.cancel();            throw e;        }        return future;    }
  • 可以看到这里用的是装饰器模式,另外方法调用通过外层的HeaderExchangeClient的request方法调到channel的request方法,然后调用的是channel的send方法。
  • ResponseFuture的类继承图为:
  • 这个Future是一个包装,为了进行非阻塞调用,它的真正结果是通过提前设置的回调方法异步获取或者手动调用get方法来同步获取的。它的get方法如下:
代码语言:javascript
复制
 //维护requestid与Channel的map private static final Map<Long, Channel>       CHANNELS   = new ConcurrentHashMap<Long, Channel>();//维护requestId与ResponseFuture之间关系的map private static final Map<Long, DefaultFuture> FUTURES   = new ConcurrentHashMap<Long, DefaultFuture>();     public DefaultFuture(Channel channel, Request request, int timeout){        this.channel = channel;        this.request = request;        this.id = request.getId();        this.timeout = timeout > 0 ? timeout : channel.getUrl().getPositiveParameter(Constants.TIMEOUT_KEY, Constants.DEFAULT_TIMEOUT);        // put into waiting map.        FUTURES.put(id, this);        CHANNELS.put(id, channel);    }private final Lock                            lock = new ReentrantLock();private final Condition                       done = lock.newCondition(); private volatile Response                     response; private volatile ResponseCallback             callback;public Object get() throws RemotingException {    return get(timeout);}public Object get(int timeout) throws RemotingException {    if (timeout <= 0) {        timeout = Constants.DEFAULT_TIMEOUT;    }    if (! isDone()) {        long start = System.currentTimeMillis();        lock.lock();        try {            while (! isDone()) {//这里是为了防止虚假唤醒,关于虚假唤醒的部分,有不明白的可以翻一下之前的推文                done.await(timeout, TimeUnit.MILLISECONDS);//当前线程进入等待状态,释放锁的占有权,等待被唤醒                if (isDone() || System.currentTimeMillis() - start > timeout) {                    break;                }            }        } catch (InterruptedException e) {            throw new RuntimeException(e);        } finally {            lock.unlock();        }        if (! isDone()) {            throw new TimeoutException(sent > 0, channel, getTimeoutMessage(false));        }    }    return returnFromResponse();}public boolean isDone() {    return response != null;}
  • done.await(timeout, TimeUnit.MILLISECONDS);在response为null时会持续等待。
  • 在创建DefaultFuture时会向两个静态的map:FUTURES和CHANNELS中以requestId为key,以DefaultFuture和Channel为value存入map中。
  • 最终该线程会阻塞在这个对象的下面这个方法调用上

这个Condition被唤醒的地方,就是我们接下来要看的HeaderExchangeHandler。

3. ChannelHandler代码分析

代码语言:javascript
复制
/** * ChannelHandler. (API, Prototype, ThreadSafe) * * @see com.alibaba.dubbo.remoting.Transporter#bind(com.alibaba.dubbo.common.URL, ChannelHandler) * @see com.alibaba.dubbo.remoting.Transporter#connect(com.alibaba.dubbo.common.URL, ChannelHandler) * @author qian.lei * @author william.liangf */@SPIpublic interface ChannelHandler {    /**     * on channel connected.     *     * @param channel channel.     */    void connected(Channel channel) throws RemotingException;    /**     * on channel disconnected.     *     * @param channel channel.     */    void disconnected(Channel channel) throws RemotingException;    /**     * on message sent.     *     * @param channel channel.     * @param message message.     */    void sent(Channel channel, Object message) throws RemotingException;    /**     * on message received.     *     * @param channel channel.     * @param message message.     */    void received(Channel channel, Object message) throws RemotingException;    /**     * on exception caught.     *     * @param channel channel.     * @param exception exception.     */    void caught(Channel channel, Throwable exception) throws RemotingException;}

是一个spi拓展接口,(关于spi的部分,之后会有专门的文章介绍),是一个信息处理器,是将consumer和provider端的逻辑都包括在内的,在consumer端主要处理response的逻辑,它的部分类继承图为:

下面我们着重分析下com.alibaba.dubbo.remoting.exchange.support.header.HeaderExchangeHandler:

代码语言:javascript
复制
/** * ExchangeReceiver * * @author william.liangf * @author chao.liuc */public class HeaderExchangeHandler implements ChannelHandlerDelegate {    protected static final Logger logger              = LoggerFactory.getLogger(HeaderExchangeHandler.class);    public static String          KEY_READ_TIMESTAMP  = HeartbeatHandler.KEY_READ_TIMESTAMP;    public static String          KEY_WRITE_TIMESTAMP = HeartbeatHandler.KEY_WRITE_TIMESTAMP;    private final ExchangeHandler handler;     public void received(Channel channel, Object message) throws RemotingException {            channel.setAttribute(KEY_READ_TIMESTAMP, System.currentTimeMillis());            ExchangeChannel exchangeChannel = HeaderExchangeChannel.getOrAddChannel(channel);            try {                if (message instanceof Request) {                    // handle request.                    Request request = (Request) message;                    if (request.isEvent()) {                        handlerEvent(channel, request);                    } else {                        if (request.isTwoWay()) {                            Response response = handleRequest(exchangeChannel, request);                            channel.send(response);                        } else {                            handler.received(exchangeChannel, request.getData());                        }                    }                } else if (message instanceof Response) {                    //处理响应信息                    handleResponse(channel, (Response) message);                } else if (message instanceof String) {                  --------                } else {                    handler.received(exchangeChannel, message);                }            } finally {                HeaderExchangeChannel.removeChannelIfDisconnected(channel);            }        }     static void handleResponse(Channel channel, Response response) throws RemotingException {             if (response != null && !response.isHeartbeat()) {                 DefaultFuture.received(channel, response);             }         }   
  • 一看很明显的一点是使用了装饰器模式,对传入的ExchangeHandler进行了一层装饰,添加了些东西,然后进行方法调用.
  • handleResponse方法中进行的操作是DefaultFuture.received(channel, response);这个DefaultFuture就是上面Channel中返回的DefaultFuture对象。
  • com.alibaba.dubbo.remoting.exchange.support.DefaultFuture#received:
代码语言:javascript
复制
 private static final Map<Long, Channel>       CHANNELS   = new ConcurrentHashMap<Long, Channel>();    private static final Map<Long, DefaultFuture> FUTURES   = new ConcurrentHashMap<Long, DefaultFuture>();public static void received(Channel channel, Response response) {        try {            DefaultFuture future = FUTURES.remove(response.getId());            if (future != null) {                future.doReceived(response);            } else {                logger.warn("The timeout response finally returned at "                             + (new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS").format(new Date()))                             + ", response " + response                            + (channel == null ? "" : ", channel: " + channel.getLocalAddress()                                 + " -> " + channel.getRemoteAddress()));            }        } finally {            CHANNELS.remove(response.getId());        }    }

这里根据response.getId()与对应的request的id相同,删除FUTURES这个map中对应的DefaultFuture,并保证删除CHANNELS对应的kv信息。

  • future.doReceived方法:
代码语言:javascript
复制
private void doReceived(Response res) {        lock.lock();//获取锁        try {            response = res;            if (done != null) {                done.signal();//唤醒,如果传入了回调方法,这里是没有实际唤醒的,真正调用的是下面的回调方法            }        } finally {            lock.unlock();        }        if (callback != null) {            invokeCallback(callback);//如果事先设定的回调方法不为null,则执行回调,就是上面说的非阻塞拿到DefaultFuture结果的方式。        }    }

done是在lock上处于等待状态的condition,这时候lock处于被释放的状态,所以doReceived可以在这时候获取到锁的占有权并尝试唤醒done,这时也需要在上面防止虚假唤醒的操作。

4. 总结

  • dubbo的Client、Channel、Handler都使用了装饰器模式,真正工作的是传入的对象,外层对象是对传入的对象的工作进行了一定的装饰或增强。
  • Client包裹着Channel(NettyChannel,这个NettyChannel中也包括传入的Client的引用)和传入的真正的Client,如NettyClient和MinaClient。这里以NettyClient为例,NettyClient中包裹着netty原生的channel,这个channel是长连接的那个channel,也是最终真正工作的那个。
  • consumer端多线程的请求进入Client后会先调用request方法,非阻塞地返回DefaultFuture对象,然后从future对象中获取响应结果,获取结果的方式有两种,一种是通过get方法阻塞获取,还有一种是通过传入回调方法,在响应的时候进行回调。
  • DefaultFuture中维护着Map CHANNELS和Map FUTURES,这两个static map都以requestId作为key,其中每个response中也维护着和它对应的request相同的id,在响应时是通过这个id来寻找client端返回的那个DefaultFuture然后进行响应信息的获取。
  • 这个相当于用DefaultFuture中的两个静态map维护着等待响应的请求信息,然后一个长连接作为worker来处理(在handler中进行),每有一个响应过来,静态map中对应的kv被移除,get方法阻塞的部分被唤醒。这样就完成了一个长连接,多个并发请求都能正常工作的效果。
  • 这一节请参考下之前的关于http2的推文协助理解这么设计的思路——Request-Response通讯模式的优化(share connection、pipline、asynchrous)
本文参与 腾讯云自媒体分享计划,分享自微信公众号。
原始发表:2019-07-09,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 开发架构二三事 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体分享计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • dubbo consumer端的client端代码分析:
    • 1. Client
      • 2. Channel
        • 3. ChannelHandler代码分析
          • 4. 总结
          相关产品与服务
          腾讯云代码分析
          腾讯云代码分析(内部代号CodeDog)是集众多代码分析工具的云原生、分布式、高性能的代码综合分析跟踪管理平台,其主要功能是持续跟踪分析代码,观测项目代码质量,支撑团队传承代码文化。
          领券
          问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档