专栏首页码匠的流水账聊聊artemis的ServerConnectionLifeCycleListener

聊聊artemis的ServerConnectionLifeCycleListener

本文主要研究一下artemis的ServerConnectionLifeCycleListener

BaseConnectionLifeCycleListener

activemq-artemis-2.11.0/artemis-core-client/src/main/java/org/apache/activemq/artemis/spi/core/remoting/BaseConnectionLifeCycleListener.java

public interface BaseConnectionLifeCycleListener<ProtocolClass> {

   void connectionCreated(ActiveMQComponent component, Connection connection, ProtocolClass protocol);

   void connectionDestroyed(Object connectionID);

   void connectionException(Object connectionID, ActiveMQException me);

   void connectionReadyForWrites(Object connectionID, boolean ready);
}
  • BaseConnectionLifeCycleListener接口定义了connectionCreated、connectionDestroyed、connectionException、connectionReadyForWrites方法

ServerConnectionLifeCycleListener

activemq-artemis-2.11.0/artemis-server/src/main/java/org/apache/activemq/artemis/spi/core/remoting/ServerConnectionLifeCycleListener.java

public interface ServerConnectionLifeCycleListener extends BaseConnectionLifeCycleListener<ProtocolManager> {

}
  • ServerConnectionLifeCycleListener继承了BaseConnectionLifeCycleListener,其泛型为ProtocolManager

RemotingServiceImpl

activemq-artemis-2.11.0/artemis-server/src/main/java/org/apache/activemq/artemis/core/remoting/server/impl/RemotingServiceImpl.java

public class RemotingServiceImpl implements RemotingService, ServerConnectionLifeCycleListener {

    //......

   @Override
   public void connectionCreated(final ActiveMQComponent component,
                                 final Connection connection,
                                 final ProtocolManager protocol) {
      if (server == null) {
         throw new IllegalStateException("Unable to create connection, server hasn't finished starting up");
      }

      ConnectionEntry entry = protocol.createConnectionEntry((Acceptor) component, connection);
      try {
         if (server.hasBrokerConnectionPlugins()) {
            server.callBrokerConnectionPlugins(plugin -> plugin.afterCreateConnection(entry.connection));
         }
      } catch (ActiveMQException t) {
         logger.warn("Error executing afterCreateConnection plugin method: {}", t.getMessage(), t);
         throw new IllegalStateException(t.getMessage(), t.getCause());

      }
      if (logger.isTraceEnabled()) {
         logger.trace("Connection created " + connection);
      }

      connections.put(connection.getID(), entry);
      connectionCountLatch.countUp();
      totalConnectionCount.incrementAndGet();
   }

   @Override
   public void connectionDestroyed(final Object connectionID) {

      if (logger.isTraceEnabled()) {
         logger.trace("Connection removed " + connectionID + " from server " + this.server, new Exception("trace"));
      }

      issueFailure(connectionID, new ActiveMQRemoteDisconnectException());
   }

   private void issueFailure(Object connectionID, ActiveMQException e) {
      ConnectionEntry conn = connections.get(connectionID);

      if (conn != null && !conn.connection.isSupportReconnect()) {
         RemotingConnection removedConnection = removeConnection(connectionID);
         if (removedConnection != null) {
            try {
               if (server.hasBrokerConnectionPlugins()) {
                  server.callBrokerConnectionPlugins(plugin -> plugin.afterDestroyConnection(removedConnection));
               }
            } catch (ActiveMQException t) {
               logger.warn("Error executing afterDestroyConnection plugin method: {}", t.getMessage(), t);
               conn.connection.fail(t);
               return;
            }
         }
         conn.connection.fail(e);
      }
   }

   @Override
   public void connectionException(final Object connectionID, final ActiveMQException me) {
      issueFailure(connectionID, me);
   }

   @Override
   public void connectionReadyForWrites(final Object connectionID, final boolean ready) {
   }

    //......
}
  • RemotingServiceImpl实现了ServerConnectionLifeCycleListener接口,其connectionCreated在server.hasBrokerConnectionPlugins()为true的情况下会执行server.callBrokerConnectionPlugins(plugin -> plugin.afterCreateConnection(entry.connection));其connectionDestroyed方法主要执行issueFailure,在server.hasBrokerConnectionPlugins()为true的情况下会执行server.callBrokerConnectionPlugins(plugin -> plugin.afterDestroyConnection(removedConnection))

ActiveMQServerImpl

activemq-artemis-2.11.0/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ActiveMQServerImpl.java

public class ActiveMQServerImpl implements ActiveMQServer {

   //......

   public void registerBrokerPlugin(final ActiveMQServerBasePlugin plugin) {
      configuration.registerBrokerPlugin(plugin);
      plugin.registered(this);
   }

   public void unRegisterBrokerPlugin(final ActiveMQServerBasePlugin plugin) {
      configuration.unRegisterBrokerPlugin(plugin);
      plugin.unregistered(this);
   }

   public boolean hasBrokerConnectionPlugins() {
      return !getBrokerConnectionPlugins().isEmpty();
   }

   public List<ActiveMQServerConnectionPlugin> getBrokerConnectionPlugins() {
      return configuration.getBrokerConnectionPlugins();
   }

   public void callBrokerConnectionPlugins(final ActiveMQPluginRunnable<ActiveMQServerConnectionPlugin> pluginRun) throws ActiveMQException {
      callBrokerPlugins(getBrokerConnectionPlugins(), pluginRun);
   }

   private <P extends ActiveMQServerBasePlugin> void callBrokerPlugins(final List<P> plugins, final ActiveMQPluginRunnable<P> pluginRun) throws ActiveMQException {
      if (pluginRun != null) {
         for (P plugin : plugins) {
            try {
               pluginRun.run(plugin);
            } catch (Throwable e) {
               if (e instanceof ActiveMQException) {
                  logger.debug("plugin " + plugin + " is throwing ActiveMQException");
                  throw (ActiveMQException) e;
               } else {
                  logger.warn("Internal error on plugin " + pluginRun, e.getMessage(), e);
               }
            }
         }
      }
   }

   //......
}
  • callBrokerPlugins方法会遍历plugins,然后挨个执行pluginRun.run(plugin)方法

小结

BaseConnectionLifeCycleListener接口定义了connectionCreated、connectionDestroyed、connectionException、connectionReadyForWrites方法;RemotingServiceImpl实现了ServerConnectionLifeCycleListener接口,其connectionCreated在server.hasBrokerConnectionPlugins()为true的情况下会执行server.callBrokerConnectionPlugins(plugin -> plugin.afterCreateConnection(entry.connection));其connectionDestroyed方法主要执行issueFailure,在server.hasBrokerConnectionPlugins()为true的情况下会执行server.callBrokerConnectionPlugins(plugin -> plugin.afterDestroyConnection(removedConnection))

doc

  • ServerConnectionLifeCycleListener

本文分享自微信公众号 - 码匠的流水账(geek_luandun),作者:码匠乱炖

原文出处及转载信息见文内详细说明,如有侵权,请联系 yunjia_community@tencent.com 删除。

原始发表时间:2020-01-21

本文参与腾讯云自媒体分享计划,欢迎正在阅读的你也加入,一起分享。

我来说两句

0 条评论
登录 后参与评论

相关文章

  • 聊聊artemis的ServerConnectionLifeCycleListener

    本文主要研究一下artemis的ServerConnectionLifeCycleListener

    codecraft
  • 聊聊EurekaRibbonClientConfiguration

    spring-cloud-netflix-eureka-client-2.0.0.RELEASE-sources.jar!/org/springframewor...

    codecraft
  • 聊聊artemis的HAManager

    activemq-artemis-2.11.0/artemis-server/src/main/java/org/apache/activemq/artemis...

    codecraft
  • 聊聊artemis的ServerConnectionLifeCycleListener

    本文主要研究一下artemis的ServerConnectionLifeCycleListener

    codecraft
  • Python websocket服务器

    https://github.com/Pithikos/python-websocket-server

    audy
  • Django 2.1.7 Admin - 编辑页选项

    Django 2.1.7 Admin管理后台 - 注册模型、自定义显示列表字段 Django 2.1.7 Admin - 列表页选项

    Devops海洋的渔夫
  • SpringBoot入门建站全系列(九)文件上传功能与下载方式

    Spring对文件上传做了简单的封装,就是用MultipartFile这个对象去接收文件,当然有很多种写法,下面会一一介绍。

    品茗IT
  • ​SoundCloud的web播放库Maestro演进之路

    在SoundCloud,我们希望可以支持所有现代网络浏览器、移动浏览器和IE 11。我们的目标是利用浏览器提供的功能提供最佳的播放体验。

    LiveVideoStack
  • 一个完整的TDD演练案例(完)

    测试驱动开发完整案例的最后一部分,除完成了整个案例的测试驱动之外,还介绍了依赖注入以及测试驱动开发的定律与原则。

    张逸
  • 更适合孩子的冒险游戏!《Zingoshi》用AR帮助其建立信心、消除社障

    尽管市场上的游戏已经是数不胜数,但大多是充斥着力和血腥,适合于成人的战斗冒险类游戏。很显然,这对于孩子这一庞大的群体来说,略有些不太友好。是以,很多家长都在寻找...

    VRPinea

扫码关注云+社区

领取腾讯云代金券