前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >Dubbo [DUBBO] disconected from 问题

Dubbo [DUBBO] disconected from 问题

作者头像
王小明_HIT
发布2020-07-01 14:55:47
2K0
发布2020-07-01 14:55:47
举报
文章被收录于专栏:程序员奇点程序员奇点

Dubbo [DUBBO] disconected from 问题

重启 Dubbo provider(生产者) 服务,出现如下异常日志:

代码语言:javascript
复制
[INFO ] 2017-11-15 10:50:07,790--DubboServerHandler-10.255.242.97:20990-thread-517--[com.alibaba.dubbo.rpc.protocol.dubbo.DubboProtocol]  [DUBBO] disconected from /10.255.242.96:11582,url:dubbo://10.255.242.97:20990/com.tms.express.service.ServiceA?anyhost=true&application=tms-express-service&channel.readonly.sent=true&codec=dubbo&default.accepts=4000&default.buffer=8192&default.connections=20&default.exporter.listener=apiMonitorProviderExporterListener&default.loadbalance=random&default.payload=88388608&default.queues=0&default.retries=0&default.service.filter=apiMonitorProviderFilter,,&default.threadpool=fixed&default.threads=600&default.weight=100&dubbo=2.8.3.2&generic=false&heartbeat=60000&interface=com.tms.express.service.ServiceA&logger=slf4j&methods=handlePassbackDataSf,orderWeightSchedule&owner=nobody&pid=6694&revision=1.0-SNAPSHOT&side=provider×tamp=1510563828892,dubbo version: 2.8.3.2, current host: 10.255.242.97

解决方案是重启服务调用端(消费者)即可。

在 dubbo 创建客户端连接服务端的时候,会同时创建一个心跳定时任务,该任务会每隔 2 s 发送一次心跳,但是如果服务端宕机,那么心跳将会超时,客户端会重连。

消费者的日志来源

HeaderExchangeClient#startHeatbeatTimer

消费者创建连接时会创建定时任务

代码语言:javascript
复制
private void startHeatbeatTimer() {
        stopHeartbeatTimer();
        if ( heartbeat > 0 ) {
            heatbeatTimer = scheduled.scheduleWithFixedDelay(
                    new HeartBeatTask( new HeartBeatTask.ChannelProvider() {
                        public Collection<Channel> getChannels() {
                            return Collections.<Channel>singletonList( HeaderExchangeClient.this );
                        }
                    }, heartbeat, heartbeatTimeout),
                    heartbeat, heartbeat, TimeUnit.MILLISECONDS );
        }
    }

定时任务核心逻辑 有几个关键逻辑:

  1. 当前时间戳减去最后操作的时间戳,大于心跳时间,则发送心跳。
  2. 当前时间戳减去最后操作时间不仅大于心跳时间,还大于心跳超时时间,那么可以认为通道预警关闭,需要重连
代码语言:javascript
复制
public void run() {
        try {
            long now = System.currentTimeMillis();
            for ( Channel channel : channelProvider.getChannels() ) {
                if (channel.isClosed()) {
                    continue;
                }
                try {
                    Long lastRead = ( Long ) channel.getAttribute(
                            HeaderExchangeHandler.KEY_READ_TIMESTAMP );
                    Long lastWrite = ( Long ) channel.getAttribute(
                            HeaderExchangeHandler.KEY_WRITE_TIMESTAMP );
                    // 当前时间戳,减去最后操作的时间戳大于心跳时间,则发送心跳
                    if ( ( lastRead != null && now - lastRead > heartbeat )
                            || ( lastWrite != null && now - lastWrite > heartbeat ) ) {
                        Request req = new Request();
                        req.setVersion( "2.0.0" );
                        req.setTwoWay( true );
                        req.setEvent( Request.HEARTBEAT_EVENT );
                        // 发送心跳
                        channel.send( req );
                        if ( logger.isDebugEnabled() ) {
                            logger.debug( "Send heartbeat to remote channel " + channel.getRemoteAddress()
                                                  + ", cause: The channel has no data-transmission exceeds a heartbeat period: " + heartbeat + "ms" );
                        }
                    }
                    //当前时间戳 减去 最后操作的时间戳不仅大于心跳时间, 还大于了心跳超时时间, 那么可以任务通道已经被关闭, 开始尝试重连
                    if ( lastRead != null && now - lastRead > heartbeatTimeout ) {
                        logger.warn( "Close channel " + channel
                                             + ", because heartbeat read idle time out: " + heartbeatTimeout + "ms" );
                        if (channel instanceof Client) {
                         try {
                          // 当连接超时, 执行此方法进行重连
                          ((Client)channel).reconnect();
                         }catch (Exception e) {
        //do nothing
       }
                        } else {
                         channel.close();
                        }
                    }
                } catch ( Throwable t ) {
                    logger.warn( "Exception when heartbeat to remote channel " + channel.getRemoteAddress(), t );
                }
            }
        } catch ( Throwable t ) {
            logger.warn( "Unhandled exception when heartbeat, cause: " + t.getMessage(), t );
        }
    }

AbstractClient#reconnect 超时重新连接

重连时先断开连接,然后重新连接服务端(生产者)

代码语言:javascript
复制
   public void reconnect() throws RemotingException {
        disconnect();
        connect();
    }

AbstractClient#connect

创建连接

  • 第一步,初始化连接任务 initConnectStatusCheckCommand 方法主要逻辑是创建一个定时任务线程,每隔两秒一次 connect() 方法尝试重连服务端
  • 第二步,doConnect()方法的主要逻辑是去连接服务端。
代码语言:javascript
复制
 protected void connect() throws RemotingException {
        connectLock.lock();
        try {
            if (isConnected()) {
                return;
            }
            /**
         * 1. 初始化重连任务
         */
            initConnectStatusCheckCommand();
            /**
             *创建连接
            **/
            doConnect();
            if (! isConnected()) {
                throw new RemotingException(this, "Failed connect to server " + getRemoteAddress() + " from " + getClass().getSimpleName() + " "
                                            + NetUtils.getLocalHost() + " using dubbo version " + Version.getVersion()
                                            + ", cause: Connect wait timeout: " + getTimeout() + "ms.");
            } else {
             if (logger.isInfoEnabled()){
              logger.info("Successed connect to server " + getRemoteAddress() + " from " + getClass().getSimpleName() + " "
                                            + NetUtils.getLocalHost() + " using dubbo version " + Version.getVersion()
                                            + ", channel is " + this.getChannel());
             }
            }
            reconnect_count.set(0);
            reconnect_error_log_flag.set(false);
        } catch (RemotingException e) {
            throw e;
        } catch (Throwable e) {
            throw new RemotingException(this, "Failed connect to server " + getRemoteAddress() + " from " + getClass().getSimpleName() + " "
                                        + NetUtils.getLocalHost() + " using dubbo version " + Version.getVersion()
                                        + ", cause: " + e.getMessage(), e);
        } finally {
            connectLock.unlock();
        }
    }

AbstractClient#initConnectStatusCheckCommand

代码语言:javascript
复制
private synchronized void initConnectStatusCheckCommand(){
        //reconnect=false to close reconnect 
        int reconnect = getReconnectParam(getUrl());
        if(reconnect > 0 && (reconnectExecutorFuture == null || reconnectExecutorFuture.isCancelled())){
            Runnable connectStatusCheckCommand =  new Runnable() {
                public void run() {
                    try {
                        if (! isConnected()) {
                           /**
                            在定时任务中执行 connect()方法
                            取重新初始化 reconnectExecutorFuture l连接重连
                           **/
                            connect();
                        } else {
                            lastConnectedTime = System.currentTimeMillis();
                        }
                    } catch (Throwable t) { 
                        String errorMsg = "client reconnect to "+getUrl().getAddress()+" find error . url: "+ getUrl();
                        // wait registry sync provider list
                        if (System.currentTimeMillis() - lastConnectedTime > shutdown_timeout){
                            if (!reconnect_error_log_flag.get()){
                                reconnect_error_log_flag.set(true);
                                logger.error(errorMsg, t);
                                return ;
                            }
                        }
                        if ( reconnect_count.getAndIncrement() % reconnect_warning_period == 0){
                            logger.warn(errorMsg, t);
                        }
                    }
                }
            };
            // 每隔2秒,尝试一次重连
            reconnectExecutorFuture = reconnectExecutorService.scheduleWithFixedDelay(connectStatusCheckCommand, reconnect, reconnect, TimeUnit.MILLISECONDS);
        }
    }
        

NettyClient#doConnect

代码语言:javascript
复制
protected void doConnect() throws Throwable {
        long start = System.currentTimeMillis();
        ChannelFuture future = bootstrap.connect(getConnectAddress());
        try{
            boolean ret = future.awaitUninterruptibly(getConnectTimeout(), TimeUnit.MILLISECONDS);
            
            if (ret && future.isSuccess()) {
                Channel newChannel = future.getChannel();
                newChannel.setInterestOps(Channel.OP_READ_WRITE);
                try {
                    // 关闭旧的连接
                    Channel oldChannel = NettyClient.this.channel; // copy reference
                    if (oldChannel != null) {
                        try {
                            if (logger.isInfoEnabled()) {
                                logger.info("Close old netty channel " + oldChannel + " on create new netty channel " + newChannel);
                            }
                            oldChannel.close();
                        } finally {
                            NettyChannel.removeChannelIfDisconnected(oldChannel);
                        }
                    }
                } finally {
                    /**
                     如果 Netty 客户端为关闭状态,则关闭新创建的Channel
                    **/
                    if (NettyClient.this.isClosed()) {
                        try {
                            if (logger.isInfoEnabled()) {
                                logger.info("Close new netty channel " + newChannel + ", because the client closed.");
                            }
                            newChannel.close();
                        } finally {
                            NettyClient.this.channel = null;
                            NettyChannel.removeChannelIfDisconnected(newChannel);
                        }
                    } else {
                        NettyClient.this.channel = newChannel;
                    }
                }
            } else if (future.getCause() != null) {
                throw new RemotingException(this, "client(url: " + getUrl() + ") failed to connect to server "
                        + getRemoteAddress() + ", error message is:" + future.getCause().getMessage(), future.getCause());
            } else {
                throw new RemotingException(this, "client(url: " + getUrl() + ") failed to connect to server "
                        + getRemoteAddress() + " client-side timeout "
                        + getConnectTimeout() + "ms (elapsed: " + (System.currentTimeMillis() - start) + "ms) from netty client "
                        + NetUtils.getLocalHost() + " using dubbo version " + Version.getVersion());
            }
        }finally{
            if (! isConnected()) {
                future.cancel();
            }
        }
    }

AbstractClient#disconnect

断开连接时,执行的是 destroyConnectStatusCheckCommand 方法,该方法的主要逻辑是取消connected()方法执行时创建的重连任务reconnectExecutorFuture。cancel 掉

代码语言:javascript
复制
    public void disconnect() {
        connectLock.lock();
        try {
            destroyConnectStatusCheckCommand();
            try {
                Channel channel = getChannel();
                if (channel != null) {
                    channel.close();
                }
            } catch (Throwable e) {
                logger.warn(e.getMessage(), e);
            }
            try {
                doDisConnect();
            } catch (Throwable e) {
                logger.warn(e.getMessage(), e);
            }
        } finally {
            connectLock.unlock();
        }
    }
    private synchronized void destroyConnectStatusCheckCommand(){
        try {
            if (reconnectExecutorFuture != null && ! reconnectExecutorFuture.isDone()){
            /**
             * 关闭重连任务, 定时任务取消,不再进行重连
             * bug: 满足上面的前提是 reconnectExecutorFuture.cancel(true)执行时, 重连的定时任务线程并没有执行到connect()处
             * 否则, 由于zookeeper只会通知一次取消定时任务, 但是在connect()方法中又重新创建了一个定时任务, 这将会导致定时任务将不会再被取消, 客户端将一直进行重连
             */
                reconnectExecutorFuture.cancel(true);
                //清除线程的一些资源信息
                reconnectExecutorService.purge();
            }
        } catch (Throwable e) {
            logger.warn(e.getMessage(), e);
        }
    }    

消费者调用服务过程可能出现什么问题?

关键看下面这段代码:

代码语言:javascript
复制
    public void disconnect() {
        connectLock.lock();
        try {
            destroyConnectStatusCheckCommand();
            try {
                Channel channel = getChannel();
                if (channel != null) {
                    channel.close();
                }
            } catch (Throwable e) {
                logger.warn(e.getMessage(), e);
            }
            try {
                doDisConnect();
            } catch (Throwable e) {
                logger.warn(e.getMessage(), e);
            }
        } finally {
            connectLock.unlock();
        }
    }
   private synchronized void destroyConnectStatusCheckCommand(){
        try {
            if (reconnectExecutorFuture != null && ! reconnectExecutorFuture.isDone()){
                reconnectExecutorFuture.cancel(true);
                reconnectExecutorService.purge();
            }
        } catch (Throwable e) {
            logger.warn(e.getMessage(), e);
        }
    }    

通过查看 disconnect 调用链可以看到如下:

在关闭连接的时候和重连的时候会调用 disconnect 方法。这样就存在一个问题:加入线程A 进行 重连, 线程 B 关闭连接。加入此时,线程A 已经指向到了 connect方法,但是还未执行。线程B 执行到了 reconnectExecutorFuture.cancel(true) 方法并将 重连任务取消了。此时 线程A 再次进入Connect 方法,在执行到 connect 方法中的 initConnectStatusCheckCommand 方法时,有如下判断:

代码语言:javascript
复制
if (reconnect > 0 && (reconnectExecutorFuture == null || reconnectExecutorFuture.isCancelled() ))

由于线程 B 已经将 reconnectExecutorFuture 取消了,上面的判断是返回 true ,因此会 执行如下代码:

代码语言:javascript
复制
  reconnectExecutorFuture = reconnectExecutorService.scheduleWithFixedDelay(connectStatusCheckCommand, reconnect, reconnect, TimeUnit.MILLISECONDS);

意味着还是会启动一个重连任务。由于 zookeeper的节点变更事件只会通知一次,之后disconnect 中的 destroyConnectStatusCheckCommand() 方法不再会被执行,因此这个重连的定时任务会一直执行下去。

  • 由于定时重连任务一直存在,每执行一次重连任务,都会创建一个新的channel, 此时消费者可以连接到服务提供者。
  • 其次当 zookeeper 发送节点变更通知时,会去关闭已经失去连接的 NettyClient (服务端重启将创建一个新的NettyClient连接去连接服务器),并将此客户端关闭标识 Close 设置成 true。因此会去关闭刚刚创建的 channel,客户端 channel 关闭后也就导致了服务器将不能连接到该 channel,会报错 disconnect from xxx 错误。

服务提供者日志来源

DubboProtocol协议

代码语言:javascript
复制
public class DubboProtocol extendsAbstractProtocol {
   ......省略部分代码
   private ExchangeHandler requestHandler = new ExchangeHandlerAdapter() {
       public Object reply(ExchangeChannel channel, Object message) throwsRemotingException {
           if (message instanceof Invocation) {
                Invocation inv = (Invocation)message;
                Invoker<?> invoker =getInvoker(channel, inv);
                //如果是callback 需要处理高版本调用低版本的问题
                if(Boolean.TRUE.toString().equals(inv.getAttachments().get(IS_CALLBACK_SERVICE_INVOKE))){
                    String methodsStr =invoker.getUrl().getParameters().get("methods");
                    boolean hasMethod = false;
                    if (methodsStr == null ||methodsStr.indexOf(",") == -1) {
                        hasMethod =inv.getMethodName().equals(methodsStr);
                    } else {
                        String[] methods =methodsStr.split(",");
                        for (String method :methods) {
                            if(inv.getMethodName().equals(method)) {
                                hasMethod =true;
                                break;
                            }
                        }
                    }
                    if (!hasMethod) {
                        logger.warn(newIllegalStateException("The methodName " + inv.getMethodName() +" not found in callback service interface ,invoke will be ignored. pleaseupdate the api interface. url is:" + invoker.getUrl()) + ",invocation is :" + inv);
                        return null;
                   }
                }
               RpcContext.getContext().setRemoteAddress(channel.getRemoteAddress());
                return invoker.invoke(inv);
           }
           throw new RemotingException(channel, "Unsupported request: " +message == null ? null : (message.getClass().getName() + ": " +message) + ", channel: consumer: " + channel.getRemoteAddress() +" --> provider: " + channel.getLocalAddress());
       }
 
       @Override
       public void received(Channel channel, Object message) throwsRemotingException {
           if (message instanceof Invocation) {
                reply((ExchangeChannel)channel, message);
           } else {
                super.received(channel,message);
           }
       }
 
       @Override
       public void connected(Channel channel) throws RemotingException {
           invoke(channel, Constants.ON_CONNECT_KEY);
       }
 
       @Override
       public void disconnected(Channel channel) throws RemotingException {
           if (logger.isInfoEnabled()) {
                logger.info("disconected from " + channel.getRemoteAddress() + ",url:" +channel.getUrl());
           }
           invoke(channel, Constants.ON_DISCONNECT_KEY);
       }
 
       private void invoke(Channel channel, String methodKey) {
           Invocation invocation = createInvocation(channel, channel.getUrl(),methodKey);
           if (invocation != null) {
                try {
                    received(channel,invocation);
               } catch (Throwable t) {
                    logger.warn("Failed toinvoke event method " + invocation.getMethodName() + "(), cause:" + t.getMessage(), t);
                }
           }
       }
 
       private Invocation createInvocation(Channel channel, URL url, StringmethodKey) {
           String method = url.getParameter(methodKey);
           if (method == null || method.length() == 0) {
                return null;
           }
           RpcInvocation invocation = new RpcInvocation(method, newClass<?>[0], new Object[0]);
           invocation.setAttachment(Constants.PATH_KEY, url.getPath());
           invocation.setAttachment(Constants.GROUP_KEY,url.getParameter(Constants.GROUP_KEY));
           invocation.setAttachment(Constants.INTERFACE_KEY,url.getParameter(Constants.INTERFACE_KEY));
           invocation.setAttachment(Constants.VERSION_KEY,url.getParameter(Constants.VERSION_KEY));
           if (url.getParameter(Constants.STUB_EVENT_KEY, false)) {
               invocation.setAttachment(Constants.STUB_EVENT_KEY,Boolean.TRUE.toString());
           }
           return invocation;
       }
    }
 
   ...省略部分代码
 
   private void openServer(URL url) {
       // find server.
        String key = url.getAddress();
       //client 也可以暴露一个只有server可以调用的服务。
       boolean isServer = url.getParameter(Constants.IS_SERVER_KEY, true);
       if (isServer) {
           ExchangeServer server = serverMap.get(key);
           if (server == null) {
                serverMap.put(key,createServer(url));
           } else {
                //server支持reset,配合override功能使用
                server.reset(url);
           }
       }
    }
   private ExchangeServer createServer(URL url) {
        //默认开启server关闭时发送readonly事件
       url = url.addParameterIfAbsent(Constants.CHANNEL_READONLYEVENT_SENT_KEY,Boolean.TRUE.toString());
       //默认开启heartbeat
       url = url.addParameterIfAbsent(Constants.HEARTBEAT_KEY,String.valueOf(Constants.DEFAULT_HEARTBEAT));
       String str = url.getParameter(Constants.SERVER_KEY,Constants.DEFAULT_REMOTING_SERVER);
 
       if (str != null && str.length() > 0 && !ExtensionLoader.getExtensionLoader(Transporter.class).hasExtension(str))
           throw new RpcException("Unsupported server type: " + str +", url: " + url);
 
       url = url.addParameter(Constants.CODEC_KEY,Version.isCompatibleVersion() ? COMPATIBLE_CODEC_NAME : DubboCodec.NAME);
       ExchangeServer server;
       try {
           server = Exchangers.bind(url, requestHandler);
       } catch (RemotingException e) {
           throw new RpcException("Fail to start server(url: " + url +") " + e.getMessage(), e);
       }
       str = url.getParameter(Constants.CLIENT_KEY);
       if (str != null && str.length() > 0) {
           Set<String> supportedTypes =ExtensionLoader.getExtensionLoader(Transporter.class).getSupportedExtensions();
            if (!supportedTypes.contains(str)) {
                throw newRpcException("Unsupported client type: " + str);
           }
       }
       return server;
    }
}

requestHandler绑定到了provider的url上(DubboProtocol的openServer方法),用于响应dubbo的连接、断开、调用等请求,如果consumer到这个provider的连接断开了,就输出日志(requestHandler的disconnected方法)disconected from 日志输出。

总结

主要原因是服务调用者(消费者),在不断重连(断开连接,然后连接)channel在不断的被关闭和新建,主要服务提供方响应连接断开情况,服务提供者(生产者)就不断在打印 disconnect from xxx 日志。

参考资料
  • https://blog.csdn.net/qq_38975553/article/details/104494713
  • https://blog.csdn.net/lkforce/article/details/78543990
本文参与 腾讯云自媒体分享计划,分享自微信公众号。
原始发表:2020-06-29,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 程序员奇点 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体分享计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • Dubbo [DUBBO] disconected from 问题
    • 消费者的日志来源
      • HeaderExchangeClient#startHeatbeatTimer
      • AbstractClient#reconnect 超时重新连接
      • AbstractClient#connect
      • AbstractClient#initConnectStatusCheckCommand
      • NettyClient#doConnect
      • AbstractClient#disconnect
    • 消费者调用服务过程可能出现什么问题?
      • 服务提供者日志来源
        • 总结
        领券
        问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档