前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >activemq持久订阅工作原理

activemq持久订阅工作原理

作者头像
johnhuster的分享
发布2022-03-29 14:27:11
4870
发布2022-03-29 14:27:11
举报
文章被收录于专栏:johnhuster

对activemq消息订阅模式来说有两种:持久订阅/非持久订阅。

非持久订阅consumer只能消费在该consumer激活状态时传送给对应topic的消息才能被该consumer消费,一旦该consumer 挂掉到下次启动期间发布到该topic的消息不能被该consumer重新恢复时使用!!!

持久订阅:订阅之后,无论消息是否是在该consumer激活或者down掉期间发送的,最终都会被该consumer接收到,直到被显示取消持久订阅(session.unscribe(“topic名字”))!!!

那么持久订阅到底是如何实现的呢,笔者在这里将展现其中的奥秘:

先来看下TopicRegion的addConsumer方法

代码语言:javascript
复制
    public Subscription addConsumer(ConnectionContext context, ConsumerInfo info) throws Exception {
         if (info.isDurable()) {
        //看该消息是否是持久化订阅        
          ActiveMQDestination destination = info.getDestination();
             if (!destination.isPattern()) {
                 // Make sure the destination is created.
                 lookup(context, destination,true);
             }
            String clientId = context.getClientId();
             String subscriptionName = info.getSubscriptionName();
             SubscriptionKey key = new SubscriptionKey(clientId, subscriptionName);
             DurableTopicSubscription sub = durableSubscriptions.get(key);
             if (sub != null) {
                 if (sub.isActive()) {
                     throw new JMSException("Durable consumer is in use for client: " + clientId + " and subscriptionName: " + subscriptionName);
                 }
                 // 看下该订阅者的消息筛选项是否变化
                 if (hasDurableSubChanged(info, sub.getConsumerInfo())) {
                     // 如果变化了那么首先移除该订阅者对应的DurableTopicSubscription,然后再追加最新创建的DurableTopicSubscription 
                     durableSubscriptions.remove(key);
                     destinationsLock.readLock().lock();
                     try {
                         for (Destination dest : destinations.values()) {
                             //Account for virtual destinations
                             if (dest instanceof Topic){
                                 Topic topic = (Topic)dest;
 topic.deleteSubscription(context, key);
                             }
                         }
                     } finally {
                         destinationsLock.readLock().unlock();
                     }
                    super.removeConsumer(context, sub.getConsumerInfo());
                     super.addConsumer(context, info);
                     sub = durableSubscriptions.get(key);
                 } else {
                     // 如果消息筛选项没有变化,那么直接将刚恢复连接的订阅者id与之前的DurableTopicSubscription 关联起来
                     if (sub.getConsumerInfo().getConsumerId() != null) {
                         subscriptions.remove(sub.getConsumerInfo().getConsumerId());
                     }
                     subscriptions.put(info.getConsumerId(), sub);
                 }
             } else {
                 super.addConsumer(context, info);
                 sub = durableSubscriptions.get(key);
                 if (sub == null) {
                     throw new JMSException("Cannot use the same consumerId: " + info.getConsumerId() + " for two different durable subscriptions clientID: " + key.getClientId()
                                            + " subscriberName: " + key.getSubscriptionName());
                 }
             }
             sub.activate(usageManager, context, info, broker);
             return sub;
         } else {
             return super.addConsumer(context, info);
         }
     }

上面代码是订阅者连接到消息提供者时的处理代码,下面看下更核心的持久订阅与消息提供者断开连接时的处理:

代码语言:javascript
复制
    @Override
     public void removeConsumer(ConnectionContext context, ConsumerInfo info) throws Exception {
         if (info.isDurable()) {
            SubscriptionKey key = new SubscriptionKey(context.getClientId(), info.getSubscriptionName());
             DurableTopicSubscription sub = durableSubscriptions.get(key);
             if (sub != null) {
                 sub.deactivate(keepDurableSubsActive);
             }
        } else {
 super.removeConsumer(context, info);
         }
     }

从上面代码可以看到,针对持久订阅者来说,当其与消息提供者断开连接时,provider并没有将该连接移除,仅仅是将断开连接者对应的DurableTopicSubscription状态设置为非激活状态,改状态不影响provider将发送到该topic的消息保存下来,非持久订阅者则在与provider失去连接这段期间无法接收该时间段发送的消息!

本文参与 腾讯云自媒体同步曝光计划,分享自作者个人站点/博客。
原始发表:2018/08/16 ,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 作者个人站点/博客 前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档