试想一下,dubbo的consumer与dubbo的provider端之间是通过一个长连接来进行通信的,但是dubbo的consumer还要处理很多线程的业务操作,会有很多线程的请求需要通过这个长连接来进行处理,那么它是怎么做到的呢?
client端的类结构图为:
com.alibaba.dubbo.remoting.exchange.ExchangeClient的结构:
可见ExchangeClient有很多实现类,现在我们着重分析dubbo默认使用的类com.alibaba.dubbo.remoting.exchange.support.header.HeaderExchangeClient:
/** * DefaultMessageClient * * @author william.liangf * @author chao.liuc */public class HeaderExchangeClient implements ExchangeClient { private static final Logger logger = LoggerFactory.getLogger( HeaderExchangeClient.class ); //处理心跳定时线程 private static final ScheduledThreadPoolExecutor scheduled = new ScheduledThreadPoolExecutor(2, new NamedThreadFactory("dubbo-remoting-client-heartbeat", true)); // 心跳定时器 private ScheduledFuture<?> heatbeatTimer; // 心跳超时,毫秒。缺省0,不会执行心跳。 private int heartbeat; private int heartbeatTimeout; //装饰器模式,真正的client,可能是nettyClient、minaClient等 //@see com.alibaba.dubbo.remoting.Transporters#connect(com.alibaba.dubbo.common.URL, com.alibaba.dubbo.remoting.ChannelHandler...) private final Client client; //通道 private final ExchangeChannel channel; public ResponseFuture request(Object request) throws RemotingException { return channel.request(request); }
Channel的类实现关系如下:
这里主要来看下ExchangeChannel,这里对应的实现是com.alibaba.dubbo.remoting.exchange.support.header.HeaderExchangeChannel:
/** * ExchangeReceiver * * @author william.liangf */final class HeaderExchangeChannel implements ExchangeChannel { private static final Logger logger = LoggerFactory.getLogger(HeaderExchangeChannel.class); private static final String CHANNEL_KEY = HeaderExchangeChannel.class.getName() + ".CHANNEL"; //装饰器模式,这个Channel是真正发送数据用到的Channel,可能是nettyChannel也可能是minaChannel private final Channel channel; private volatile boolean closed = false; public void send(Object message) throws RemotingException { send(message, getUrl().getParameter(Constants.SENT_KEY, false)); } public void send(Object message, boolean sent) throws RemotingException { if (closed) { throw new RemotingException(this.getLocalAddress(), null, "Failed to send message " + message + ", cause: The channel " + this + " is closed!"); } if (message instanceof Request || message instanceof Response || message instanceof String) { channel.send(message, sent); } else { Request request = new Request(); request.setVersion("2.0.0"); request.setTwoWay(false); request.setData(message); channel.send(request, sent); } } public ResponseFuture request(Object request) throws RemotingException { return request(request, channel.getUrl().getPositiveParameter(Constants.TIMEOUT_KEY, Constants.DEFAULT_TIMEOUT)); } public ResponseFuture request(Object request, int timeout) throws RemotingException { if (closed) { throw new RemotingException(this.getLocalAddress(), null, "Failed to send request " + request + ", cause: The channel " + this + " is closed!"); } // create request. Request req = new Request(); req.setVersion("2.0.0"); req.setTwoWay(true); req.setData(request); DefaultFuture future = new DefaultFuture(channel, req, timeout); try{ channel.send(req); }catch (RemotingException e) { future.cancel(); throw e; } return future; }
//维护requestid与Channel的map private static final Map<Long, Channel> CHANNELS = new ConcurrentHashMap<Long, Channel>();//维护requestId与ResponseFuture之间关系的map private static final Map<Long, DefaultFuture> FUTURES = new ConcurrentHashMap<Long, DefaultFuture>(); public DefaultFuture(Channel channel, Request request, int timeout){ this.channel = channel; this.request = request; this.id = request.getId(); this.timeout = timeout > 0 ? timeout : channel.getUrl().getPositiveParameter(Constants.TIMEOUT_KEY, Constants.DEFAULT_TIMEOUT); // put into waiting map. FUTURES.put(id, this); CHANNELS.put(id, channel); }private final Lock lock = new ReentrantLock();private final Condition done = lock.newCondition(); private volatile Response response; private volatile ResponseCallback callback;public Object get() throws RemotingException { return get(timeout);}public Object get(int timeout) throws RemotingException { if (timeout <= 0) { timeout = Constants.DEFAULT_TIMEOUT; } if (! isDone()) { long start = System.currentTimeMillis(); lock.lock(); try { while (! isDone()) {//这里是为了防止虚假唤醒,关于虚假唤醒的部分,有不明白的可以翻一下之前的推文 done.await(timeout, TimeUnit.MILLISECONDS);//当前线程进入等待状态,释放锁的占有权,等待被唤醒 if (isDone() || System.currentTimeMillis() - start > timeout) { break; } } } catch (InterruptedException e) { throw new RuntimeException(e); } finally { lock.unlock(); } if (! isDone()) { throw new TimeoutException(sent > 0, channel, getTimeoutMessage(false)); } } return returnFromResponse();}public boolean isDone() { return response != null;}
这个Condition被唤醒的地方,就是我们接下来要看的HeaderExchangeHandler。
/** * ChannelHandler. (API, Prototype, ThreadSafe) * * @see com.alibaba.dubbo.remoting.Transporter#bind(com.alibaba.dubbo.common.URL, ChannelHandler) * @see com.alibaba.dubbo.remoting.Transporter#connect(com.alibaba.dubbo.common.URL, ChannelHandler) * @author qian.lei * @author william.liangf */@SPIpublic interface ChannelHandler { /** * on channel connected. * * @param channel channel. */ void connected(Channel channel) throws RemotingException; /** * on channel disconnected. * * @param channel channel. */ void disconnected(Channel channel) throws RemotingException; /** * on message sent. * * @param channel channel. * @param message message. */ void sent(Channel channel, Object message) throws RemotingException; /** * on message received. * * @param channel channel. * @param message message. */ void received(Channel channel, Object message) throws RemotingException; /** * on exception caught. * * @param channel channel. * @param exception exception. */ void caught(Channel channel, Throwable exception) throws RemotingException;}
是一个spi拓展接口,(关于spi的部分,之后会有专门的文章介绍),是一个信息处理器,是将consumer和provider端的逻辑都包括在内的,在consumer端主要处理response的逻辑,它的部分类继承图为:
下面我们着重分析下com.alibaba.dubbo.remoting.exchange.support.header.HeaderExchangeHandler:
/** * ExchangeReceiver * * @author william.liangf * @author chao.liuc */public class HeaderExchangeHandler implements ChannelHandlerDelegate { protected static final Logger logger = LoggerFactory.getLogger(HeaderExchangeHandler.class); public static String KEY_READ_TIMESTAMP = HeartbeatHandler.KEY_READ_TIMESTAMP; public static String KEY_WRITE_TIMESTAMP = HeartbeatHandler.KEY_WRITE_TIMESTAMP; private final ExchangeHandler handler; public void received(Channel channel, Object message) throws RemotingException { channel.setAttribute(KEY_READ_TIMESTAMP, System.currentTimeMillis()); ExchangeChannel exchangeChannel = HeaderExchangeChannel.getOrAddChannel(channel); try { if (message instanceof Request) { // handle request. Request request = (Request) message; if (request.isEvent()) { handlerEvent(channel, request); } else { if (request.isTwoWay()) { Response response = handleRequest(exchangeChannel, request); channel.send(response); } else { handler.received(exchangeChannel, request.getData()); } } } else if (message instanceof Response) { //处理响应信息 handleResponse(channel, (Response) message); } else if (message instanceof String) { -------- } else { handler.received(exchangeChannel, message); } } finally { HeaderExchangeChannel.removeChannelIfDisconnected(channel); } } static void handleResponse(Channel channel, Response response) throws RemotingException { if (response != null && !response.isHeartbeat()) { DefaultFuture.received(channel, response); } }
private static final Map<Long, Channel> CHANNELS = new ConcurrentHashMap<Long, Channel>(); private static final Map<Long, DefaultFuture> FUTURES = new ConcurrentHashMap<Long, DefaultFuture>();public static void received(Channel channel, Response response) { try { DefaultFuture future = FUTURES.remove(response.getId()); if (future != null) { future.doReceived(response); } else { logger.warn("The timeout response finally returned at " + (new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS").format(new Date())) + ", response " + response + (channel == null ? "" : ", channel: " + channel.getLocalAddress() + " -> " + channel.getRemoteAddress())); } } finally { CHANNELS.remove(response.getId()); } }
这里根据response.getId()与对应的request的id相同,删除FUTURES这个map中对应的DefaultFuture,并保证删除CHANNELS对应的kv信息。
private void doReceived(Response res) { lock.lock();//获取锁 try { response = res; if (done != null) { done.signal();//唤醒,如果传入了回调方法,这里是没有实际唤醒的,真正调用的是下面的回调方法 } } finally { lock.unlock(); } if (callback != null) { invokeCallback(callback);//如果事先设定的回调方法不为null,则执行回调,就是上面说的非阻塞拿到DefaultFuture结果的方式。 } }
done是在lock上处于等待状态的condition,这时候lock处于被释放的状态,所以doReceived可以在这时候获取到锁的占有权并尝试唤醒done,这时也需要在上面防止虚假唤醒的操作。