前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >聊聊artemis的ServerConnectionLifeCycleListener

聊聊artemis的ServerConnectionLifeCycleListener

作者头像
code4it
发布2020-02-24 09:36:41
3070
发布2020-02-24 09:36:41
举报
文章被收录于专栏:码匠的流水账码匠的流水账

本文主要研究一下artemis的ServerConnectionLifeCycleListener

BaseConnectionLifeCycleListener

activemq-artemis-2.11.0/artemis-core-client/src/main/java/org/apache/activemq/artemis/spi/core/remoting/BaseConnectionLifeCycleListener.java

代码语言:javascript
复制
public interface BaseConnectionLifeCycleListener<ProtocolClass> {

   void connectionCreated(ActiveMQComponent component, Connection connection, ProtocolClass protocol);

   void connectionDestroyed(Object connectionID);

   void connectionException(Object connectionID, ActiveMQException me);

   void connectionReadyForWrites(Object connectionID, boolean ready);
}
  • BaseConnectionLifeCycleListener接口定义了connectionCreated、connectionDestroyed、connectionException、connectionReadyForWrites方法

ServerConnectionLifeCycleListener

activemq-artemis-2.11.0/artemis-server/src/main/java/org/apache/activemq/artemis/spi/core/remoting/ServerConnectionLifeCycleListener.java

代码语言:javascript
复制
public interface ServerConnectionLifeCycleListener extends BaseConnectionLifeCycleListener<ProtocolManager> {

}
  • ServerConnectionLifeCycleListener继承了BaseConnectionLifeCycleListener,其泛型为ProtocolManager

RemotingServiceImpl

activemq-artemis-2.11.0/artemis-server/src/main/java/org/apache/activemq/artemis/core/remoting/server/impl/RemotingServiceImpl.java

代码语言:javascript
复制
public class RemotingServiceImpl implements RemotingService, ServerConnectionLifeCycleListener {

    //......

   @Override
   public void connectionCreated(final ActiveMQComponent component,
                                 final Connection connection,
                                 final ProtocolManager protocol) {
      if (server == null) {
         throw new IllegalStateException("Unable to create connection, server hasn't finished starting up");
      }

      ConnectionEntry entry = protocol.createConnectionEntry((Acceptor) component, connection);
      try {
         if (server.hasBrokerConnectionPlugins()) {
            server.callBrokerConnectionPlugins(plugin -> plugin.afterCreateConnection(entry.connection));
         }
      } catch (ActiveMQException t) {
         logger.warn("Error executing afterCreateConnection plugin method: {}", t.getMessage(), t);
         throw new IllegalStateException(t.getMessage(), t.getCause());

      }
      if (logger.isTraceEnabled()) {
         logger.trace("Connection created " + connection);
      }

      connections.put(connection.getID(), entry);
      connectionCountLatch.countUp();
      totalConnectionCount.incrementAndGet();
   }

   @Override
   public void connectionDestroyed(final Object connectionID) {

      if (logger.isTraceEnabled()) {
         logger.trace("Connection removed " + connectionID + " from server " + this.server, new Exception("trace"));
      }

      issueFailure(connectionID, new ActiveMQRemoteDisconnectException());
   }

   private void issueFailure(Object connectionID, ActiveMQException e) {
      ConnectionEntry conn = connections.get(connectionID);

      if (conn != null && !conn.connection.isSupportReconnect()) {
         RemotingConnection removedConnection = removeConnection(connectionID);
         if (removedConnection != null) {
            try {
               if (server.hasBrokerConnectionPlugins()) {
                  server.callBrokerConnectionPlugins(plugin -> plugin.afterDestroyConnection(removedConnection));
               }
            } catch (ActiveMQException t) {
               logger.warn("Error executing afterDestroyConnection plugin method: {}", t.getMessage(), t);
               conn.connection.fail(t);
               return;
            }
         }
         conn.connection.fail(e);
      }
   }

   @Override
   public void connectionException(final Object connectionID, final ActiveMQException me) {
      issueFailure(connectionID, me);
   }

   @Override
   public void connectionReadyForWrites(final Object connectionID, final boolean ready) {
   }

    //......
}
  • RemotingServiceImpl实现了ServerConnectionLifeCycleListener接口,其connectionCreated在server.hasBrokerConnectionPlugins()为true的情况下会执行server.callBrokerConnectionPlugins(plugin -> plugin.afterCreateConnection(entry.connection));其connectionDestroyed方法主要执行issueFailure,在server.hasBrokerConnectionPlugins()为true的情况下会执行server.callBrokerConnectionPlugins(plugin -> plugin.afterDestroyConnection(removedConnection))

ActiveMQServerImpl

activemq-artemis-2.11.0/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ActiveMQServerImpl.java

代码语言:javascript
复制
public class ActiveMQServerImpl implements ActiveMQServer {

   //......

   public void registerBrokerPlugin(final ActiveMQServerBasePlugin plugin) {
      configuration.registerBrokerPlugin(plugin);
      plugin.registered(this);
   }

   public void unRegisterBrokerPlugin(final ActiveMQServerBasePlugin plugin) {
      configuration.unRegisterBrokerPlugin(plugin);
      plugin.unregistered(this);
   }

   public boolean hasBrokerConnectionPlugins() {
      return !getBrokerConnectionPlugins().isEmpty();
   }

   public List<ActiveMQServerConnectionPlugin> getBrokerConnectionPlugins() {
      return configuration.getBrokerConnectionPlugins();
   }

   public void callBrokerConnectionPlugins(final ActiveMQPluginRunnable<ActiveMQServerConnectionPlugin> pluginRun) throws ActiveMQException {
      callBrokerPlugins(getBrokerConnectionPlugins(), pluginRun);
   }

   private <P extends ActiveMQServerBasePlugin> void callBrokerPlugins(final List<P> plugins, final ActiveMQPluginRunnable<P> pluginRun) throws ActiveMQException {
      if (pluginRun != null) {
         for (P plugin : plugins) {
            try {
               pluginRun.run(plugin);
            } catch (Throwable e) {
               if (e instanceof ActiveMQException) {
                  logger.debug("plugin " + plugin + " is throwing ActiveMQException");
                  throw (ActiveMQException) e;
               } else {
                  logger.warn("Internal error on plugin " + pluginRun, e.getMessage(), e);
               }
            }
         }
      }
   }

   //......
}
  • callBrokerPlugins方法会遍历plugins,然后挨个执行pluginRun.run(plugin)方法

小结

BaseConnectionLifeCycleListener接口定义了connectionCreated、connectionDestroyed、connectionException、connectionReadyForWrites方法;RemotingServiceImpl实现了ServerConnectionLifeCycleListener接口,其connectionCreated在server.hasBrokerConnectionPlugins()为true的情况下会执行server.callBrokerConnectionPlugins(plugin -> plugin.afterCreateConnection(entry.connection));其connectionDestroyed方法主要执行issueFailure,在server.hasBrokerConnectionPlugins()为true的情况下会执行server.callBrokerConnectionPlugins(plugin -> plugin.afterDestroyConnection(removedConnection))

doc

  • ServerConnectionLifeCycleListener
本文参与 腾讯云自媒体分享计划,分享自微信公众号。
原始发表:2020-01-21,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 码匠的流水账 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体分享计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • BaseConnectionLifeCycleListener
  • ServerConnectionLifeCycleListener
  • RemotingServiceImpl
  • ActiveMQServerImpl
  • 小结
  • doc
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档