前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >聊聊artemis的DiscoveryGroup

聊聊artemis的DiscoveryGroup

作者头像
code4it
发布2020-02-24 09:46:17
3760
发布2020-02-24 09:46:17
举报
文章被收录于专栏:码匠的流水账码匠的流水账

本文主要研究一下artemis的DiscoveryGroup

DiscoveryGroup

activemq-artemis-2.11.0/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/cluster/DiscoveryGroup.java

代码语言:javascript
复制
public final class DiscoveryGroup implements ActiveMQComponent {

   private static final Logger logger = Logger.getLogger(DiscoveryGroup.class);

   private final List<DiscoveryListener> listeners = new ArrayList<>();

   private final String name;

   private Thread thread;

   private boolean received;

   private final Object waitLock = new Object();

   private final Map<String, DiscoveryEntry> connectors = new ConcurrentHashMap<>();

   private final long timeout;

   private volatile boolean started;

   private final String nodeID;

   private final Map<String, String> uniqueIDMap = new HashMap<>();

   private final BroadcastEndpoint endpoint;

   private final NotificationService notificationService;

   /**
    * This is the main constructor, intended to be used
    *
    * @param nodeID
    * @param name
    * @param timeout
    * @param endpointFactory
    * @param service
    * @throws Exception
    */
   public DiscoveryGroup(final String nodeID,
                         final String name,
                         final long timeout,
                         BroadcastEndpointFactory endpointFactory,
                         NotificationService service) throws Exception {
      this.nodeID = nodeID;
      this.name = name;
      this.timeout = timeout;
      this.endpoint = endpointFactory.createBroadcastEndpoint();
      this.notificationService = service;
   }

   @Override
   public synchronized void start() throws Exception {
      if (started) {
         return;
      }

      endpoint.openClient();

      started = true;

      thread = new Thread(new DiscoveryRunnable(), "activemq-discovery-group-thread-" + name);

      thread.setDaemon(true);

      thread.start();

      if (notificationService != null) {
         TypedProperties props = new TypedProperties();

         props.putSimpleStringProperty(new SimpleString("name"), new SimpleString(name));

         Notification notification = new Notification(nodeID, CoreNotificationType.DISCOVERY_GROUP_STARTED, props);

         notificationService.sendNotification(notification);
      }
   }

   /**
    * This will start the DiscoveryRunnable and run it directly.
    * This is useful for a test process where we need this execution blocking a thread.
    */
   public void internalRunning() throws Exception {
      endpoint.openClient();
      started = true;
      DiscoveryRunnable runnable = new DiscoveryRunnable();
      runnable.run();
   }

   @Override
   public void stop() {
      synchronized (this) {
         if (!started) {
            return;
         }

         started = false;
      }

      synchronized (waitLock) {
         waitLock.notifyAll();
      }

      try {
         endpoint.close(false);
      } catch (Exception e1) {
         ActiveMQClientLogger.LOGGER.errorStoppingDiscoveryBroadcastEndpoint(endpoint, e1);
      }

      try {
         if (thread != null) {
            thread.interrupt();
            thread.join(10000);
            if (thread.isAlive()) {
               ActiveMQClientLogger.LOGGER.timedOutStoppingDiscovery();
            }
         }
      } catch (InterruptedException e) {
         throw new ActiveMQInterruptedException(e);
      }

      thread = null;

      if (notificationService != null) {
         TypedProperties props = new TypedProperties();
         props.putSimpleStringProperty(new SimpleString("name"), new SimpleString(name));
         Notification notification = new Notification(nodeID, CoreNotificationType.DISCOVERY_GROUP_STOPPED, props);
         try {
            notificationService.sendNotification(notification);
         } catch (Exception e) {
            ActiveMQClientLogger.LOGGER.errorSendingNotifOnDiscoveryStop(e);
         }
      }
   }

   @Override
   public boolean isStarted() {
      return started;
   }

   public String getName() {
      return name;
   }

   public synchronized List<DiscoveryEntry> getDiscoveryEntries() {
      return new ArrayList<>(connectors.values());
   }

   //......
}
  • DiscoveryGroup的构造器会使用endpointFactory.createBroadcastEndpoint()创建endpoint;start方法会执行endpoint.openClient(),创建并执行DiscoveryRunnable

DiscoveryRunnable

activemq-artemis-2.11.0/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/cluster/DiscoveryGroup.java

代码语言:javascript
复制
   class DiscoveryRunnable implements Runnable {

      @Override
      public void run() {
         byte[] data = null;

         while (started) {
            try {
               try {

                  data = endpoint.receiveBroadcast();
                  if (data == null) {
                     if (started) {
                        ActiveMQClientLogger.LOGGER.unexpectedNullDataReceived();
                     }
                     break;
                  }
               } catch (Exception e) {
                  if (!started) {
                     return;
                  } else {
                     ActiveMQClientLogger.LOGGER.errorReceivingPacketInDiscovery(e);
                  }
               }

               ActiveMQBuffer buffer = ActiveMQBuffers.wrappedBuffer(data);

               String originatingNodeID = buffer.readString();

               String uniqueID = buffer.readString();

               checkUniqueID(originatingNodeID, uniqueID);

               if (nodeID.equals(originatingNodeID)) {
                  if (checkExpiration()) {
                     callListeners();
                  }
                  // Ignore traffic from own node
                  continue;
               }

               int size = buffer.readInt();

               boolean changed = false;

               DiscoveryEntry[] entriesRead = new DiscoveryEntry[size];
               // Will first decode all the elements outside of any lock
               for (int i = 0; i < size; i++) {
                  TransportConfiguration connector = new TransportConfiguration();

                  connector.decode(buffer);

                  entriesRead[i] = new DiscoveryEntry(originatingNodeID, connector, System.currentTimeMillis());
               }

               synchronized (DiscoveryGroup.this) {
                  for (DiscoveryEntry entry : entriesRead) {
                     if (connectors.put(originatingNodeID, entry) == null) {
                        changed = true;
                     }
                  }

                  changed = changed || checkExpiration();
               }
               //only call the listeners if we have changed
               //also make sure that we aren't stopping to avoid deadlock
               if (changed && started) {
                  if (logger.isTraceEnabled()) {
                     logger.trace("Connectors changed on Discovery:");
                     for (DiscoveryEntry connector : connectors.values()) {
                        logger.trace(connector);
                     }
                  }
                  callListeners();
               }

               synchronized (waitLock) {
                  received = true;

                  waitLock.notifyAll();
               }
            } catch (Throwable e) {
               ActiveMQClientLogger.LOGGER.failedToReceiveDatagramInDiscovery(e);
            }
         }
      }

   }
  • DiscoveryRunnable实现了Runnable接口,其run方法通过endpoint.receiveBroadcast()接收数据,之后解析为DiscoveryEntry更新到connectors中;在changed为true时会执行callListeners,执行DiscoveryListener.connectorsChanged回调

JGroupsBroadcastEndpoint

activemq-artemis-2.11.0/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/JGroupsBroadcastEndpoint.java

代码语言:javascript
复制
public abstract class JGroupsBroadcastEndpoint implements BroadcastEndpoint {

   private static final Logger logger = Logger.getLogger(JGroupsBroadcastEndpoint.class);

   private final String channelName;

   private boolean clientOpened;

   private boolean broadcastOpened;

   private JChannelWrapper channel;

   private JGroupsReceiver receiver;

   private JChannelManager manager;

   public JGroupsBroadcastEndpoint(JChannelManager manager, String channelName) {
      this.manager = manager;
      this.channelName = channelName;
   }

   @Override
   public void broadcast(final byte[] data) throws Exception {
      if (logger.isTraceEnabled())
         logger.trace("Broadcasting: BroadCastOpened=" + broadcastOpened + ", channelOPen=" + channel.getChannel().isOpen());
      if (broadcastOpened) {
         org.jgroups.Message msg = new org.jgroups.Message();

         msg.setBuffer(data);

         channel.send(msg);
      }
   }

   @Override
   public byte[] receiveBroadcast() throws Exception {
      if (logger.isTraceEnabled())
         logger.trace("Receiving Broadcast: clientOpened=" + clientOpened + ", channelOPen=" + channel.getChannel().isOpen());
      if (clientOpened) {
         return receiver.receiveBroadcast();
      } else {
         return null;
      }
   }

   @Override
   public byte[] receiveBroadcast(long time, TimeUnit unit) throws Exception {
      if (logger.isTraceEnabled())
         logger.trace("Receiving Broadcast2: clientOpened=" + clientOpened + ", channelOPen=" + channel.getChannel().isOpen());
      if (clientOpened) {
         return receiver.receiveBroadcast(time, unit);
      } else {
         return null;
      }
   }

   @Override
   public synchronized void openClient() throws Exception {
      if (clientOpened) {
         return;
      }
      internalOpen();
      receiver = new JGroupsReceiver();
      channel.addReceiver(receiver);
      clientOpened = true;
   }

   @Override
   public synchronized void openBroadcaster() throws Exception {
      if (broadcastOpened)
         return;
      internalOpen();
      broadcastOpened = true;
   }

   public abstract JChannel createChannel() throws Exception;

   public JGroupsBroadcastEndpoint initChannel() throws Exception {
      this.channel = manager.getJChannel(channelName, this);
      return this;
   }

   protected void internalOpen() throws Exception {
      channel.connect();
   }

   @Override
   public synchronized void close(boolean isBroadcast) throws Exception {
      if (isBroadcast) {
         broadcastOpened = false;
      } else {
         channel.removeReceiver(receiver);
         clientOpened = false;
      }
      internalCloseChannel(channel);
   }

   /**
    * Closes the channel used in this JGroups Broadcast.
    * Can be overridden by implementations that use an externally managed channel.
    *
    * @param channel
    */
   protected synchronized void internalCloseChannel(JChannelWrapper channel) {
      channel.close(true);
   }

}
  • JGroupsBroadcastEndpoint是个抽象类,它声明实现了BroadcastEndpoint接口;其broadcast方法创建org.jgroups.Message然后使用JChannelWrapper发送消息;其receiveBroadcast方法使用JGroupsReceiver来receiveBroadcast;其openClient则创建JGroupsReceiver;internalOpen方法则是执行channel.connect()

BroadcastGroupImpl

activemq-artemis-2.11.0/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/impl/BroadcastGroupImpl.java

代码语言:javascript
复制
public class BroadcastGroupImpl implements BroadcastGroup, Runnable {

   //......

   public synchronized void broadcastConnectors() throws Exception {
      ActiveMQBuffer buff = ActiveMQBuffers.dynamicBuffer(4096);

      buff.writeString(nodeManager.getNodeId().toString());

      buff.writeString(uniqueID);

      buff.writeInt(connectors.size());

      for (TransportConfiguration tcConfig : connectors) {
         tcConfig.encode(buff);
      }

      // Only send as many bytes as we need.
      byte[] data = new byte[buff.readableBytes()];
      buff.getBytes(buff.readerIndex(), data);

      endpoint.broadcast(data);
   }

   public void run() {
      if (!started) {
         return;
      }

      try {
         broadcastConnectors();
         loggedBroadcastException = false;
      } catch (Exception e) {
         // only log the exception at ERROR level once, even if it fails multiple times in a row - HORNETQ-919
         if (!loggedBroadcastException) {
            ActiveMQServerLogger.LOGGER.errorBroadcastingConnectorConfigs(e);
            loggedBroadcastException = true;
         } else {
            logger.debug("Failed to broadcast connector configs...again", e);
         }
      }
   }

   //......
}  
  • BroadcastGroupImpl实现了BroadcastGroup及Runnable方法,其run方法执行broadcastConnectors;broadcastConnectors方法则遍历connectors将TransportConfiguration写入到buff中之后通过endpoint.broadcast(data)广播出去

小结

DiscoveryGroup的构造器会使用endpointFactory.createBroadcastEndpoint()创建endpoint;start方法会执行endpoint.openClient(),创建并执行DiscoveryRunnable;DiscoveryRunnable实现了Runnable接口,其run方法通过endpoint.receiveBroadcast()接收数据,之后解析为DiscoveryEntry更新到connectors中;在changed为true时会执行callListeners,执行DiscoveryListener.connectorsChanged回调;JGroupsBroadcastEndpoint是个抽象类,它声明实现了BroadcastEndpoint接口;其broadcast方法创建org.jgroups.Message然后使用JChannelWrapper发送消息;其receiveBroadcast方法使用JGroupsReceiver来receiveBroadcast;其openClient则创建JGroupsReceiver;internalOpen方法则是执行channel.connect()

doc

  • DiscoveryGroup
本文参与 腾讯云自媒体分享计划,分享自微信公众号。
原始发表:2020-02-11,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 码匠的流水账 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体分享计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • DiscoveryGroup
  • DiscoveryRunnable
  • JGroupsBroadcastEndpoint
  • BroadcastGroupImpl
  • 小结
  • doc
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档