前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >聊聊artemis的FederationManager

聊聊artemis的FederationManager

原创
作者头像
code4it
修改2020-02-14 10:18:16
5040
修改2020-02-14 10:18:16
举报
文章被收录于专栏:码匠的流水账

本文主要研究一下artemis的FederationManager

FederationManager

activemq-artemis-2.11.0/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/federation/FederationManager.java

代码语言:javascript
复制
public class FederationManager implements ActiveMQComponent {
​
   private final ActiveMQServer server;
​
   private Map<String, Federation> federations = new HashMap<>();
   private State state;
​
   enum State {
      STOPPED,
      STOPPING,
      /**
       * Deployed means {@link FederationManager#deploy()} was called but
       * {@link FederationManager#start()} was not called.
       * <p>
       * We need the distinction if {@link FederationManager#stop()} is called before 'start'. As
       * otherwise we would leak locators.
       */
      DEPLOYED, STARTED,
   }
​
​
   public FederationManager(final ActiveMQServer server) {
      this.server = server;
   }
​
   @Override
   public synchronized void start() throws ActiveMQException {
      if (state == State.STARTED) return;
      deploy();
      for (Federation federation : federations.values()) {
         federation.start();
      }
      state = State.STARTED;
   }
​
   @Override
   public synchronized void stop() {
      if (state == State.STOPPED) return;
      state = State.STOPPING;
​
​
      for (Federation federation : federations.values()) {
         federation.stop();
      }
      federations.clear();
      state = State.STOPPED;
   }
​
   @Override
   public boolean isStarted() {
      return state == State.STARTED;
   }
​
   public synchronized void deploy() throws ActiveMQException {
      for (FederationConfiguration federationConfiguration : server.getConfiguration().getFederationConfigurations()) {
         deploy(federationConfiguration);
      }
      if (state != State.STARTED) {
         state = State.DEPLOYED;
      }
   }
​
   public synchronized boolean undeploy(String name) {
      Federation federation = federations.remove(name);
      if (federation != null) {
         federation.stop();
      }
      return true;
   }
​
​
​
   public synchronized boolean deploy(FederationConfiguration federationConfiguration) throws ActiveMQException {
      Federation federation = federations.get(federationConfiguration.getName());
      if (federation == null) {
         federation = newFederation(federationConfiguration);
      } else if (!Objects.equals(federation.getConfig().getCredentials(), federationConfiguration.getCredentials())) {
         undeploy(federationConfiguration.getName());
         federation = newFederation(federationConfiguration);
      }
      federation.deploy();
      return true;
   }
​
   private synchronized Federation newFederation(FederationConfiguration federationConfiguration) throws ActiveMQException {
      Federation federation = new Federation(server, federationConfiguration);
      federations.put(federationConfiguration.getName(), federation);
      if (state == State.STARTED) {
         federation.start();
      }
      return federation;
   }
​
   public Federation get(String name) {
      return federations.get(name);
   }
​
   public void register(FederatedAbstract federatedAbstract) {
      server.registerBrokerPlugin(federatedAbstract);
   }
​
   public void unregister(FederatedAbstract federatedAbstract) {
      server.unRegisterBrokerPlugin(federatedAbstract);
   }
​
}
  • FederationManager实现了ActiveMQComponent接口,它提供了start()、stop()、deploy、undeploy等方法;其中start方法会先执行deploy方法,然后遍历federations.values()执行federation.start();stop方法则是遍历federations.values()执行federation.stop(),然后清空federations;deploy方法在federation为null时执行newFederation,然后执行federation.deploy(),若不为null且credentials与配置不一致则执行undeploy,重新newFederation;undeploy方法则是将federation从federations中移除然后执行federation.stop()

Federation

activemq-artemis-2.11.0/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/federation/Federation.java

代码语言:javascript
复制
public class Federation {
​
​
   private final ActiveMQServer server;
   private final SimpleString name;
​
   private final Map<String, FederationUpstream> upstreams = new HashMap<>();
   private final Map<String, FederationDownstream> downstreams = new HashMap<>();
   private final FederationConfiguration config;
   private FederationManager.State state;
​
   //......
​
   public synchronized void deploy() throws ActiveMQException {
      for (FederationUpstreamConfiguration upstreamConfiguration : config.getUpstreamConfigurations()) {
         deploy(upstreamConfiguration, config.getFederationPolicyMap());
      }
      for (FederationDownstreamConfiguration downstreamConfiguration : config.getDownstreamConfigurations()) {
         deploy(downstreamConfiguration, config.getFederationPolicyMap());
      }
      if (state != FederationManager.State.STARTED) {
         state = FederationManager.State.DEPLOYED;
      }
   }
​
   public synchronized boolean deploy(FederationUpstreamConfiguration upstreamConfiguration, Map<String, FederationPolicy> federationPolicyMap) throws ActiveMQException {
      String name = upstreamConfiguration.getName();
      FederationUpstream upstream = upstreams.get(name);
​
      //If connection has changed we will need to do a full undeploy and redeploy.
      if (upstream == null) {
         undeploy(name);
         upstream = deploy(name, upstreamConfiguration);
      } else if (!upstream.getConnection().getConfig().equals(upstreamConfiguration.getConnectionConfiguration())) {
         undeploy(name);
         upstream = deploy(name, upstreamConfiguration);
      }
​
      upstream.deploy(upstreamConfiguration.getPolicyRefs(), federationPolicyMap);
      return true;
   }
​
   public synchronized boolean deploy(FederationDownstreamConfiguration downstreamConfiguration, Map<String, FederationPolicy> federationPolicyMap) throws ActiveMQException {
      String name = downstreamConfiguration.getName();
      FederationDownstream downstream = downstreams.get(name);
​
      //If connection has changed we will need to do a full undeploy and redeploy.
      if (downstream == null) {
         undeploy(name);
         downstream = deploy(name, downstreamConfiguration);
      } else if (!downstream.getConnection().getConfig().equals(downstreamConfiguration.getConnectionConfiguration())) {
         undeploy(name);
         downstream = deploy(name, downstreamConfiguration);
      }
​
      downstream.deploy(config);
      return true;
   }
​
   private synchronized FederationUpstream deploy(String name, FederationUpstreamConfiguration upstreamConfiguration) {
      FederationUpstream upstream = new FederationUpstream(server, this, name, upstreamConfiguration);
      upstreams.put(name, upstream);
      if (state == FederationManager.State.STARTED) {
         upstream.start();
      }
      return upstream;
   }
​
   private synchronized FederationDownstream deploy(String name, FederationDownstreamConfiguration downstreamConfiguration) {
      //If we have a matching upstream connection already configured then use it for the initiating downstream connection
      FederationConnection connection = null;
      if (downstreamConfiguration.getConnectionConfiguration().isShareConnection()) {
         for (FederationUpstream upstream : upstreams.values()) {
            if (upstream.getConfig().getConnectionConfiguration()
                .equals(downstreamConfiguration.getConnectionConfiguration())) {
               connection = upstream.getConnection();
               connection.setSharedConnection(true);
               break;
            }
         }
      }
​
      FederationDownstream downstream = new FederationDownstream(server, this, name, downstreamConfiguration, connection);
      downstreams.put(name, downstream);
      if (state == FederationManager.State.STARTED) {
         downstream.start();
      }
      return downstream;
   }
​
   //......
​
}   
  • Federation的deploy方法先遍历config.getUpstreamConfigurations(),进行upstream的deploy,再遍历config.getDownstreamConfigurations(),进行downstream的deploy

小结

FederationManager实现了ActiveMQComponent接口,它提供了start()、stop()、deploy、undeploy等方法;其中start方法会先执行deploy方法,然后遍历federations.values()执行federation.start();stop方法则是遍历federations.values()执行federation.stop(),然后清空federations;deploy方法在federation为null时执行newFederation,然后执行federation.deploy(),若不为null且credentials与配置不一致则执行undeploy,重新newFederation;undeploy方法则是将federation从federations中移除然后执行federation.stop()

doc

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • FederationManager
  • Federation
  • 小结
  • doc
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档