前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >Nacos2# 服务注册与发现客户端示例与源码解析(二)

Nacos2# 服务注册与发现客户端示例与源码解析(二)

作者头像
瓜农老梁
发布2021-06-10 17:01:33
3K0
发布2021-06-10 17:01:33
举报
文章被收录于专栏:瓜农老梁瓜农老梁

引言

引言

上一篇客户端初始化没有撸完,这篇继续。Nacos从2.0以后增加了对grpc的支持,代码中HTTP的代理初始化还有保留,我们注册发现通常为临时节点,这部分已由gRPC接管。可以对比下新旧逻辑的实现差异。

一、内容提要

HTTP代理初始化

HTTP心跳检测器

  • HTTP心跳检测只适用于注册的节点持久节点,临时节点会使用grpc代理(HTTP的心跳检测默认废弃由grpc替代)
  • 在初始化时客户端注册代理NamingClientProxy时,初始化了一个HTTP心跳器用于向Nacos Server发起心跳
  • 在注册节点时通过向心跳执行器添加心跳任务addBeatInfo触发
  • 心跳执行器通过每隔五秒中向Nacos Server发起HTTP请求
  • 如果返回的server not found会向Nacos Server发起注册请求重新注册

UDP接受服务端推送

  • Client通过UDP接受到nacos server推动的消息
  • 如果服务端推送的为服务信息通过processServiceInfo处理逻辑见上篇,主要实例变更时的通知机制
  • 如果dump类型,则客户端发送服务信息serviceInfoMap的ack信息到服务端

gRPC代理初始化

gRPC初始化逻辑概览

  • gRPC 客户端代理的初始化主要逻辑为创建gRPC Client并启动
  • 并注册ServerRequestHandler用于处理Nacos Server推送的NotifySubscriberRequest请求
  • 注册ConnectionListener用于处理gRPC建立和断开连接事件
  • 请求超时时间可以通过「namingRequestTimeout」设置,默认为3秒

gRPC Client启动逻辑

  • gRPC Client启动逻辑主要在于建立与nacos server的grpc连接,其中两个守护线程一直在运行
  • 守护线程1用于处理grpc连接的建立和关闭事件
  • 守护线程2用于与nacos server的心跳保鲜,并负责异步建立grpc连接
  • 守护线程2同时负责当nacos server的地址信息发生变更时重新与新server建立连接
  • nacos server的地址变更通过grpc通道由server推送ConnectResetRequest到client
  • grpc client只与nacos server集群中一台建立grpc连接

二、源码分析

public NamingClientProxyDelegate(String namespace, ServiceInfoHolder serviceInfoHolder, Properties properties,
            InstancesChangeNotifier changeNotifier) throws NacosException {
        this.serviceInfoUpdateService = new ServiceInfoUpdateService(properties, serviceInfoHolder, this,
                changeNotifier);
      this.serverListManager = new ServerListManager(properties);
      this.serviceInfoHolder = serviceInfoHolder;
        this.securityProxy = new SecurityProxy(properties, NamingHttpClientManager.getInstance().getNacosRestTemplate());
        initSecurityProxy();
      // @注解7.4
        this.httpClientProxy = new NamingHttpClientProxy(namespace, securityProxy, serverListManager, properties,serviceInfoHolder);
     // @注解7.5
        this.grpcClientProxy = new NamingGrpcClientProxy(namespace, securityProxy, serverListManager, properties,serviceInfoHolder);
}

三、HTTP代理初始化

@注解7.4 Http代理的初始化,该代理主要在nacos 2.0以前版本使用,2.0之后通过grpc与nacos server通信。

public NamingHttpClientProxy(String namespaceId, SecurityProxy securityProxy, ServerListManager serverListManager,
            Properties properties, ServiceInfoHolder serviceInfoHolder) {
        super(securityProxy, properties);
        this.serverListManager = serverListManager;
        this.setServerPort(DEFAULT_SERVER_PORT);
        this.namespaceId = namespaceId;
     // @注解7.4.1
        this.beatReactor = new BeatReactor(this, properties);
     // @注解7.4.2
        this.pushReceiver = new PushReceiver(serviceInfoHolder);
     // @注解7.4.3
        this.maxRetry = ConvertUtils.toInt(properties.getProperty(PropertyKeyConst.NAMING_REQUEST_DOMAIN_RETRY_COUNT,
                String.valueOf(UtilAndComs.REQUEST_DOMAIN_RETRY_COUNT)));
 }

HTTP心跳检测器

@注解7.4.1 初始化BeatReactor,用于向nacos server发送心跳

public BeatReactor(NamingHttpClientProxy serverProxy, Properties properties) {
        this.serverProxy = serverProxy;
        // 心跳线程池大小,默认为核数的二分之一,最小为1,可通过properties参数「namingClientBeatThreadCount」设置
        int threadCount = initClientBeatThreadCount(properties);
     // 初始化线程执行器
        this.executorService = new ScheduledThreadPoolExecutor(threadCount, new ThreadFactory() {
            @Override
            public Thread newThread(Runnable r) {
                Thread thread = new Thread(r);
                thread.setDaemon(true);
                thread.setName("com.alibaba.nacos.naming.beat.sender");
                return thread;
            }
        });
}

接着一下这个执行器再做什么事情。

public void addBeatInfo(String serviceName, BeatInfo beatInfo) {
        NAMING_LOGGER.info("[BEAT] adding beat: {} to beat map.", beatInfo);
        String key = buildKey(serviceName, beatInfo.getIp(), beatInfo.getPort());
        BeatInfo existBeat = null;
        //fix #1733
        if ((existBeat = dom2Beat.remove(key)) != null) {
            existBeat.setStopped(true);
        }
        dom2Beat.put(key, beatInfo);
        // 默认延迟5秒
        executorService.schedule(new BeatTask(beatInfo), beatInfo.getPeriod(), TimeUnit.MILLISECONDS);
        MetricsMonitor.getDom2BeatSizeMonitor().set(dom2Beat.size());
}

当通过addBeatInfo增加一个心跳信息BeatInfo时,执行器会创建BeatTask(Runnable)延迟5秒运行。

class BeatTask implements Runnable {
     BeatInfo beatInfo;
     public BeatTask(BeatInfo beatInfo) {
            this.beatInfo = beatInfo;
        }
     @Override
        public void run() {
            if (beatInfo.isStopped()) {
                return;
            }
            long nextTime = beatInfo.getPeriod();
            try {
               // 向nacos server「/nacos/v1/ns/instance/beat」发送心跳
                JsonNode result = serverProxy.sendBeat(beatInfo, BeatReactor.this.lightBeatEnabled);
                long interval = result.get("clientBeatInterval").asLong();
                boolean lightBeatEnabled = false;
                if (result.has(CommonParams.LIGHT_BEAT_ENABLED)) {
                    lightBeatEnabled = result.get(CommonParams.LIGHT_BEAT_ENABLED).asBoolean();
                }
                BeatReactor.this.lightBeatEnabled = lightBeatEnabled;
                if (interval > 0) {
                    nextTime = interval;
                }
                int code = NamingResponseCode.OK;
                if (result.has(CommonParams.CODE)) {
                    code = result.get(CommonParams.CODE).asInt();
                }
               // 如果nacos server返回NOT FOUND则重新发起注册请求
                if (code == NamingResponseCode.RESOURCE_NOT_FOUND) {
                    Instance instance = new Instance();
                    instance.setPort(beatInfo.getPort());
                    instance.setIp(beatInfo.getIp());
                    instance.setWeight(beatInfo.getWeight());
                    instance.setMetadata(beatInfo.getMetadata());
                    instance.setClusterName(beatInfo.getCluster());
                    instance.setServiceName(beatInfo.getServiceName());
                    instance.setInstanceId(instance.getInstanceId());
                    instance.setEphemeral(true);
                    try {
                        serverProxy.registerService(beatInfo.getServiceName(),
                                NamingUtils.getGroupName(beatInfo.getServiceName()), instance);
                    } catch (Exception ignore) {
                    }
                }
            } catch (NacosException ex) {
                NAMING_LOGGER.error("[CLIENT-BEAT] failed to send beat: {}, code: {}, msg: {}",
                        JacksonUtils.toJson(beatInfo), ex.getErrCode(), ex.getErrMsg());

            }
           // 默认为5秒,可以通过PreservedMetadataKeys.HEART_BEAT_INTERVAL设置
            executorService.schedule(new BeatTask(beatInfo), nextTime, TimeUnit.MILLISECONDS);
        }
}

addBeatInfo调用时机,当节点在注册时如果实例为临时节点,则会创建心跳任务发起

public void registerService(String serviceName, String groupName, Instance instance) throws NacosException {
        
        NAMING_LOGGER.info("[REGISTER-SERVICE] {} registering service {} with instance: {}", namespaceId, serviceName,instance);
        String groupedServiceName = NamingUtils.getGroupedName(serviceName, groupName);
        if (instance.isEphemeral()) {
            BeatInfo beatInfo = beatReactor.buildBeatInfo(groupedServiceName, instance);
           // 添加心跳任务
            beatReactor.addBeatInfo(groupedServiceName, beatInfo);
        }
        final Map<String, String> params = new HashMap<String, String>(16);
        params.put(CommonParams.NAMESPACE_ID, namespaceId);
        params.put(CommonParams.SERVICE_NAME, groupedServiceName);
        params.put(CommonParams.GROUP_NAME, groupName);
        params.put(CommonParams.CLUSTER_NAME, instance.getClusterName());
        params.put("ip", instance.getIp());
        params.put("port", String.valueOf(instance.getPort()));
        params.put("weight", String.valueOf(instance.getWeight()));
        params.put("enable", String.valueOf(instance.isEnabled()));
        params.put("healthy", String.valueOf(instance.isHealthy()));
        params.put("ephemeral", String.valueOf(instance.isEphemeral()));
        params.put("metadata", JacksonUtils.toJson(instance.getMetadata()));
        
        reqApi(UtilAndComs.nacosUrlInstance, params, HttpMethod.POST);
        
}

再跟踪下注册入口,判读使用哪个ClientProxy

@Override
public void registerService(String serviceName, String groupName, Instance instance) throws NacosException {
    getExecuteClientProxy(instance).registerService(serviceName, groupName, instance);
}

private NamingClientProxy getExecuteClientProxy(Instance instance) {
        // 是否为临时节点,临时节点使用grpc,持久节点使用http;默认为true,也就是默认使用grpcClientProxy
        return instance.isEphemeral() ? grpcClientProxy : httpClientProxy;
}

小结: HTTP心跳检测只适用于注册的节点持久节点,临时节点会使用grpc代理,即HTTP的心跳检测默认废弃由grpc替代;在初始化时客户端注册代理NamingClientProxy时,初始化了一个HTTP心跳器用于向Nacos Server发起心跳;在注册节点时通过向心跳执行器添加心跳任务addBeatInfo触发;心跳执行器通过每隔五秒中向Nacos Server发起HTTP请求,如果返回的server not found会向Nacos Server发起注册请求重新注册;

UDP接受服务端推送

@注解7.4.2 初始化PushReceiver用于接受nacos server信息推送,使用UDP协议。

 public PushReceiver(ServiceInfoHolder serviceInfoHolder) {
        try {
            this.serviceInfoHolder = serviceInfoHolder;
            this.udpSocket = new DatagramSocket();
            this.executorService = new ScheduledThreadPoolExecutor(1, new ThreadFactory() {
                @Override
                public Thread newThread(Runnable r) {
                    Thread thread = new Thread(r);
                    thread.setDaemon(true);
                    thread.setName("com.alibaba.nacos.naming.push.receiver");
                    return thread;
                }
            });
            
            this.executorService.execute(this);
        } catch (Exception e) {
            NAMING_LOGGER.error("[NA] init udp socket failed", e);
        }
}

备注: PushReceiver实现Runnable接口,在构造方法中通过守护线程运行。

public void run() {
  while (!closed) {
    try {
      // byte[] is initialized with 0 full filled by default
      byte[] buffer = new byte[UDP_MSS];
      DatagramPacket packet = new DatagramPacket(buffer, buffer.length);
      // 接受nacos server推送
      udpSocket.receive(packet);
      // 将推送内容转换为json字符串
      String json = new String(IoUtils.tryDecompress(packet.getData()), UTF_8).trim();
      NAMING_LOGGER.info("received push data: " + json + " from " + packet.getAddress().toString());
      PushPacket pushPacket = JacksonUtils.toObj(json, PushPacket.class);
      String ack;
      // 推送类型服务信息(例如订阅实例的变更)会通知订阅者逻辑已在上篇分析
      if ("dom".equals(pushPacket.type) || "service".equals(pushPacket.type)) {
        serviceInfoHolder.processServiceInfo(pushPacket.data);
        // send ack to server
        ack = "{\"type\": \"push-ack\"" + ", \"lastRefTime\":\"" + pushPacket.lastRefTime + "\", \"data\":"
          + "\"\"}";
      } else if ("dump".equals(pushPacket.type)) {
        // dump data to server
        ack = "{\"type\": \"dump-ack\"" + ", \"lastRefTime\": \"" + pushPacket.lastRefTime + "\", \"data\":"
          + "\"" + StringUtils.escapeJavaScript(JacksonUtils.toJson(serviceInfoHolder.getServiceInfoMap()))
          + "\"}";
      } else {
        // do nothing send ack only
        ack = "{\"type\": \"unknown-ack\"" + ", \"lastRefTime\":\"" + pushPacket.lastRefTime
          + "\", \"data\":" + "\"\"}";
      }
   // 向Server发送ack消息
      udpSocket.send(new DatagramPacket(ack.getBytes(UTF_8), ack.getBytes(UTF_8).length,
                                        packet.getSocketAddress()));
    } catch (Exception e) {
      if (closed) {
        return;
      }
      NAMING_LOGGER.error("[NA] error while receiving push data", e);
    }
  }
}

小结: Client通过UDP接受到nacos server推动的消息:@1如果推送的为服务信息通过processServiceInfo处理,逻辑见上篇;@2 如果dump类型,则客户端发送服务信息serviceInfoMap的ack信息到服务端。

HTTP重试次数

@注解7.4.3 client通过HTTP向Nacos Server请求的重试次数,默认为3次。可以通过「namingRequestDomainMaxRetryCount」指定

 public String reqApi(String api, Map<String, String> params, Map<String, String> body, List<String> servers,
            String method) throws NacosException {
   // ...
   if (serverListManager.isDomain()) {
            String nacosDomain = serverListManager.getNacosDomain();
            for (int i = 0; i < maxRetry; i++) { // 请求发送异常最大重试次数
                try {
                    return callServer(api, params, body, nacosDomain, method);
                } catch (NacosException e) {
                    exception = e;
                    if (NAMING_LOGGER.isDebugEnabled()) {
                        NAMING_LOGGER.debug("request {} failed.", nacosDomain, e);
                    }
                }
            }
   }
   //....
 }

四、gRPC代理初始化

gRPC初始化逻辑概览

@注解7.5 下面接着gRPC 客户端代理的初始化逻辑

public NamingGrpcClientProxy(String namespaceId, SecurityProxy securityProxy, ServerListFactory serverListFactory,
            Properties properties, ServiceInfoHolder serviceInfoHolder) throws NacosException {
        super(securityProxy, properties);
        this.namespaceId = namespaceId;
        this.uuid = UUID.randomUUID().toString();
        // 设置请求超时时间,默认为3秒。可以通过参数「namingRequestTimeout」设置
        this.requestTimeout = Long.parseLong(properties.getProperty(CommonParams.NAMING_REQUEST_TIMEOUT, "-1"));
        Map<String, String> labels = new HashMap<String, String>();
        // 设置source=sdk,module=naming
        labels.put(RemoteConstants.LABEL_SOURCE, RemoteConstants.LABEL_SOURCE_SDK);
        labels.put(RemoteConstants.LABEL_MODULE, RemoteConstants.LABEL_MODULE_NAMING);
        // 创建gRPC Client:clientName=uuid,ConnectionType=GRPC
        this.rpcClient = RpcClientFactory.createClient(uuid, ConnectionType.GRPC, labels);
        // 创建ConnectionEventListener用于建立和断开gRPC连接时的事件响应
        this.namingGrpcConnectionEventListener = new NamingGrpcConnectionEventListener(this);
      // 启动grpc client
        start(serverListFactory, serviceInfoHolder);
}
private void start(ServerListFactory serverListFactory, ServiceInfoHolder serviceInfoHolder) throws NacosException {
        rpcClient.serverListFactory(serverListFactory);
     // @注解7.5.1 gRPC Client启动
        rpcClient.start();
     // 注册registerServerRequestHandler用于处理从Nacos Push到Client的请求
        rpcClient.registerServerRequestHandler(new NamingPushRequestHandler(serviceInfoHolder));
     // 注册连接事件Listener,当连接建立和断开时处理事件
        rpcClient.registerConnectionListener(namingGrpcConnectionEventListener);
}

小结: gRPC 客户端代理的初始化主要逻辑为创建gRPC Client并启动;并注册ServerRequestHandler用于处理Nacos Server推送的NotifySubscriberRequest请求;注册ConnectionListener用于处理gRPC建立和断开连接事件;另外,请求超时时间可以通过「namingRequestTimeout」设置,默认为3秒

gRPC Client启动逻辑

@注解7.5.1 gRPC Client启动逻辑

public final void start() throws NacosException {
  // 将Client状态由INITIALIZED变更为STARTING
  boolean success = rpcClientStatus.compareAndSet(RpcClientStatus.INITIALIZED, RpcClientStatus.STARTING);
  if (!success) {
    return;
  }
  // -------------------------@1 satart---------------------------------------------
  // 守护线程执行器
  clientEventExecutor = new ScheduledThreadPoolExecutor(2, new ThreadFactory() {
    @Override
    public Thread newThread(Runnable r) {
      Thread t = new Thread(r);
      t.setName("com.alibaba.nacos.client.remote.worker");
      t.setDaemon(true);
      return t;
    }
  });
 
  // 从BlockingQueue中不断获取连接Event,根据事件类型回调onConnected()/onDisConnect()事件
  clientEventExecutor.submit(new Runnable() {
    @Override
    public void run() {
      while (true) {
        ConnectionEvent take = null;
        try {
          take = eventLinkedBlockingQueue.take();
          if (take.isConnected()) {
            notifyConnected();
          } else if (take.isDisConnected()) {
            notifyDisConnected();
          }
        } catch (Throwable e) {
          //Do nothing
        }
      }
    }
  });
 // -------------------------@1 end---------------------------------------------
  
  
  // -------------------------@2 start---------------------------------------------
  clientEventExecutor.submit(new Runnable() {
    @Override
    public void run() {
      while (true) {
        try {
          // 获取重定向连接上下文,指重新连接到其他server节点
          ReconnectContext reconnectContext = reconnectionSignal
            .poll(keepAliveTime, TimeUnit.MILLISECONDS);
          if (reconnectContext == null) {
            // check alive time.
            // client活动时间超过5秒钟,向Nacos Server发起健康检测
            if (System.currentTimeMillis() - lastActiveTimeStamp >= keepAliveTime) {
              // 发送健康检查
              boolean isHealthy = healthCheck();
              // 非健康节点
              if (!isHealthy) {
                if (currentConnection == null) {
                  continue;
                }
                LoggerUtils.printIfInfoEnabled(LOGGER,
                                               "[{}]Server healthy check fail,currentConnection={}", name,
                                               currentConnection.getConnectionId());
                // 标记客户端状态为unhealthy
                rpcClientStatus.set(RpcClientStatus.UNHEALTHY);
                // 重置ReconnectContext移除serverInfo
                reconnectContext = new ReconnectContext(null, false);

              } else {
                // 健康连接更新时间戳
                lastActiveTimeStamp = System.currentTimeMillis();
                continue;
              }
            } else {
              // 心跳保鲜未过期,跳过本次检测
              continue;
            }

          }

          if (reconnectContext.serverInfo != null) {
            // clear recommend server if server is not in server list.
            boolean serverExist = false;
            // 判断连接上下文的reconnectContext.serverInfo是否在我们推荐设置的列表中
            for (String server : getServerListFactory().getServerList()) {
              ServerInfo serverInfo = resolveServerInfo(server);
              if (serverInfo.getServerIp().equals(reconnectContext.serverInfo.getServerIp())) {
                serverExist = true;
                reconnectContext.serverInfo.serverPort = serverInfo.serverPort;
                break;
              }
            }
            // 不再推荐的列表中则移除,改为随机选择
            if (!serverExist) {
              LoggerUtils.printIfInfoEnabled(LOGGER,
                                             "[{}] Recommend server is not in server list ,ignore recommend server {}", name,
                                             reconnectContext.serverInfo.getAddress());

              reconnectContext.serverInfo = null;

            }
          }
          // 发起重新连接
          reconnect(reconnectContext.serverInfo, reconnectContext.onRequestFail);
        } catch (Throwable throwable) {
          //Do nothing
        }
      }
    }
  });
  // -------------------------@2 end---------------------------------------------
  
  
  // -------------------------@3 start---------------------------------------------
  // 异步连接nacos server失败,改为同步连接
  //connect to server ,try to connect to server sync once, async starting if fail.
  Connection connectToServer = null;
  rpcClientStatus.set(RpcClientStatus.STARTING);

  int startUpRetryTimes = RETRY_TIMES;
  while (startUpRetryTimes > 0 && connectToServer == null) {
    try {
      startUpRetryTimes--;
      ServerInfo serverInfo = nextRpcServer();

      LoggerUtils.printIfInfoEnabled(LOGGER, "[{}] Try to connect to server on start up, server: {}", name,
                                     serverInfo);

      connectToServer = connectToServer(serverInfo);
    } catch (Throwable e) {
      LoggerUtils.printIfWarnEnabled(LOGGER,
                                     "[{}]Fail to connect to server on start up, error message={}, start up retry times left: {}",
                                     name, e.getMessage(), startUpRetryTimes);
    }

  }

  // -------------------------@3 end---------------------------------------------
  
  
 // -------------------------@4 start---------------------------------------------
  
  if (connectToServer != null) {
    LoggerUtils.printIfInfoEnabled(LOGGER, "[{}] Success to connect to server [{}] on start up,connectionId={}",name, connectToServer.serverInfo.getAddress(),connectToServer.getConnectionId());
      this.currentConnection = connectToServer;
      rpcClientStatus.set(RpcClientStatus.RUNNING);
      // 连接成功添加ConnectionEvent
      eventLinkedBlockingQueue.offer(new ConnectionEvent(ConnectionEvent.CONNECTED));
    } else {
     // 未成功建立连接重新发起异步建立连接需求
      switchServerAsync();
    }
  
 // 注册ConnectResetRequestHandler用于处理nacos server推送的重置连接请求
  registerServerRequestHandler(new ConnectResetRequestHandler());

  //register client detection request.
  registerServerRequestHandler(new ServerRequestHandler() {
    @Override
    public Response requestReply(Request request) {
      if (request instanceof ClientDetectionRequest) {
        return new ClientDetectionResponse();
      }

      return null;
    }
  });
  Runtime.getRuntime().addShutdownHook(new Thread() {
    @Override
    public void run() {
      try {
        RpcClient.this.shutdown();
      } catch (NacosException e) {
        LoggerUtils.printIfErrorEnabled(LOGGER, "[{}]RpcClient shutdown exception, errorMessage ={}", name,
                                        e.getMessage());
      }
    }
  });
  // -------------------------@4 end---------------------------------------------
}

备注: grpc client启动时的逻辑:逻辑块@1 守护线程不断从阻塞队列eventLinkedBlockingQueue获取grpc连接/断开事件,并调用上文中注册的namingGrpcConnectionEventListener回调其onConnected/onDisConnect方法。其中事件添加时机为:

grpc连接建立时,添加连接事件:

// 连接成功添加ConnectionEvent
eventLinkedBlockingQueue.offer(new ConnectionEvent(ConnectionEvent.CONNECTED));

grpc连接关闭时,添加关闭事件:

private void closeConnection(Connection connection) {
        if (connection != null) {
            connection.close();
            // 断开连接添加DISCONNECTED事件
            eventLinkedBlockingQueue.add(new ConnectionEvent(ConnectionEvent.DISCONNECTED));
        }
}

逻辑块@2 守护线程不断从阻塞队列reconnectionSignal获取重新连接事件(ReconnectContext)也就是更换nacos server的连接grpc通道:

阻塞队列没有重新连接事件:则做心跳保鲜检测,心跳频率为5秒。当超过5秒时会向Nacos Server发起健康检查,当返回不健康时,将grpc client标记为unhealthy;返回健康则刷新心跳时间lastActiveTimeStamp

阻塞队列有重新连接事件:重连事件上下文reconnectContext的的server ip在我们设置的nacos server 列表则使用,否则改为随机选择nacos server ip地址,并与新server建立连接

protected void reconnect(final ServerInfo recommendServerInfo, boolean onRequestFail) {

  try {

    AtomicReference<ServerInfo> recommendServer = new AtomicReference<ServerInfo>(recommendServerInfo);
    // onRequestFail=true表示当健康检查失败grpcClient被设置为unhealthy,重连时重新发起健康检查,如果检查通过则不再执行重连
    if (onRequestFail && healthCheck()) {
      LoggerUtils.printIfInfoEnabled(LOGGER, "[{}] Server check success,currentServer is{} ", name,
                                     currentConnection.serverInfo.getAddress());
      rpcClientStatus.set(RpcClientStatus.RUNNING);
      return;
    }

    LoggerUtils.printIfInfoEnabled(LOGGER, "[{}] try to re connect to a new server ,server is {}", name,
                                   recommendServerInfo == null ? " not appointed,will choose a random server."
                                   : (recommendServerInfo.getAddress() + ", will try it once."));

    // loop until start client success.
    boolean switchSuccess = false;

    int reConnectTimes = 0;
    int retryTurns = 0;
    Exception lastException = null;
    // 切换nacos server没有成功则会一直重试
    while (!switchSuccess && !isShutdown()) {

      //1.get a new server
      ServerInfo serverInfo = null;
      try {
        // 获取需要重新连接的server地址
        serverInfo = recommendServer.get() == null ? nextRpcServer() : recommendServer.get();
        //2.create a new channel to new server
        // 与新的server建立grpc连接,如果连接失败返回null
        Connection connectionNew = connectToServer(serverInfo);
        // 关闭缓存的当前连接并重定向到新的连接
        if (connectionNew != null) {
          LoggerUtils.printIfInfoEnabled(LOGGER, "[{}] success to connect a server  [{}],connectionId={}",
                                         name, serverInfo.getAddress(), connectionNew.getConnectionId());
          //successfully create a new connect.
          if (currentConnection != null) {
            LoggerUtils.printIfInfoEnabled(LOGGER,"[{}] Abandon prev connection ,server is  {}, connectionId is {}", name,currentConnection.serverInfo.getAddress(), currentConnection.getConnectionId());
            //set current connection to enable connection event.
            currentConnection.setAbandon(true);
            closeConnection(currentConnection);
          }
          currentConnection = connectionNew;
          rpcClientStatus.set(RpcClientStatus.RUNNING);
          switchSuccess = true;
          // 添加连接成功时间到阻塞队列
          boolean s = eventLinkedBlockingQueue.add(new ConnectionEvent(ConnectionEvent.CONNECTED));
          return;
        }

        //close connection if client is already shutdown.
        if (isShutdown()) {
          closeConnection(currentConnection);
        }

        lastException = null;

      } catch (Exception e) {
        lastException = e;
      } finally {
        // 清理本次重连请求
        recommendServer.set(null);
      }

      // 执行到这里表示上面没有成功建立连接,打印重试次数日志
      if (reConnectTimes > 0
          && reConnectTimes % RpcClient.this.serverListFactory.getServerList().size() == 0) {
        LoggerUtils.printIfInfoEnabled(LOGGER,"[{}] fail to connect server,after trying {} times, last try server is {},error={}", name,reConnectTimes, serverInfo, lastException == null ? "unknown" : lastException);
        if (Integer.MAX_VALUE == retryTurns) {
          retryTurns = 50;
        } else {
          retryTurns++;
        }
      }

      reConnectTimes++;
   // 重试时等待特定的时间
      try {
        //sleep x milliseconds to switch next server.
        if (!isRunning()) {
          // first round ,try servers at a delay 100ms;second round ,200ms; max delays 5s. to be reconsidered.
          Thread.sleep(Math.min(retryTurns + 1, 50) * 100L);
        }
      } catch (InterruptedException e) {
        // Do  nothing.
      }
    }

    if (isShutdown()) {
      LoggerUtils.printIfInfoEnabled(LOGGER, "[{}] Client is shutdown ,stop reconnect to server", name);
    }

  } catch (Exception e) {
    LoggerUtils.printIfWarnEnabled(LOGGER, "[{}] Fail to  re connect to server ,error is {}", name, e);
  }
}

备注: 重新切换连接server逻辑:@1当检查失败grpc client会被标记为unhealthy这类型onRequestFail为true,重连时重新发起健康检查,如果检查成功,则退出本次重连。@2 获取重连的server地址和端口,并建立grpc连接,关闭当前缓存的旧连接并重定向到新连接,同时添加连接成功时间到阻塞队列。@3 一直重试直到连接建立成功,每次重试等待一些时间(100ms,200ms...最大为5秒)

逻辑块@3 当异步与nacos server建立失败时,改为尝试同步建立连接。

逻辑块@4 如果连接建立成功添加连接事件到阻塞队列;连接建立失败发起异步建立连接请求;注册ConnectResetRequestHandler用于处理nacos server推送的重置连接请求;jvm退出时通过hook关闭grpc client。

小结: gRPC Client启动逻辑主要在于建立与nacos server的grpc连接,其中两个守护线程一直在运行。一个用于处理grpc连接的建立和关闭事件;一个用于与nacos server的心跳保鲜,并负责异步建立grpc连接,当nacos server的地址信息发生变更时负责重新与新server建立连接;grpc client只与nacos server集群中一台建立grpc连接


本文参与 腾讯云自媒体分享计划,分享自微信公众号。
原始发表:2021-05-23,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 瓜农老梁 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体分享计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 引言
    • HTTP代理初始化
      • gRPC代理初始化
        • HTTP心跳检测器
          • UDP接受服务端推送
            • HTTP重试次数
              • gRPC初始化逻辑概览
                • gRPC Client启动逻辑
                相关产品与服务
                对象存储
                对象存储(Cloud Object Storage,COS)是由腾讯云推出的无目录层次结构、无数据格式限制,可容纳海量数据且支持 HTTP/HTTPS 协议访问的分布式存储服务。腾讯云 COS 的存储桶空间无容量上限,无需分区管理,适用于 CDN 数据分发、数据万象处理或大数据计算与分析的数据湖等多种场景。
                领券
                问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档