前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >activemq是如何实现消息分组的

activemq是如何实现消息分组的

作者头像
johnhuster的分享
发布2022-03-29 14:28:01
6260
发布2022-03-29 14:28:01
举报
文章被收录于专栏:johnhuster

activemq的消息分组是一个很有用的特性,首先需要说明的是该特性是针对queue的,对topic无感!

(1)入题

activemq的消息分组实现的功能就是使得同一个消息生产者产生的消息被同一个消费者消费,这样可以保证消费消息的顺序与生产消息的顺序一致,在这个功能上,有人可能会说使用consumer的exclusive特性以及消息selector都可以实现这个功能,是的如果没有其他不同的话那这个特性也就没有存在的必要了,下面进入讲述一下这三个特性的不同点:

1.消息过滤特性selector最大的不足在于如果该消费者down掉了,那么将没有消费者来消费这些消息(只有重新启动该消费者)

2.exclusive特性也可以实现只有一个消费者来消费某个queue上的消息,但是处理细度不足,无法处理消息生产者生产多种JMSXGroupID的消息

3.最后就是消息分组特性了,这是activemq提供的一种细粒度筛选消息的方式

(2)实现原理

最后activemq消息分组是通过JMSXGroupID、JMSXGroupSeq两个消息属性来完成的,同一个JMSXGroupID的消息会被发送给同一个consumer,除非该consumer挂掉,特别需要注意的是JMSXGroupSeq为-1时将会导致broker重新进行负载均衡,具体实现原理看下org.apache.activemq.broker.region.Queue的doActualDispatch方法相关代码:

代码语言:javascript
复制
                if (!fullConsumers.contains(s)) {
                     if (!s.isFull()) {
                         if (dispatchSelector.canSelect(s, node) && assignMessageGroup(s, (QueueMessageReference)node) && !((QueueMessageReference) node).isAcked() ) {
                             // Dispatch it.
                             s.add(node);
                             target = s;
                             break;
                         }
                     } else {
                         // no further dispatch of list to a full consumer to
                         // avoid out of order message receipt
                         fullConsumers.add(s);
                         LOG.trace("Subscription full {}", s);
                     }
                 }
    protected boolean assignMessageGroup(Subscription subscription, QueueMessageReference node) throws Exception {
         boolean result = true;
         // Keep message groups together.
         String groupId = node.getGroupID();
         int sequence = node.getGroupSequence();
         if (groupId != null) {
            MessageGroupMap messageGroupOwners = getMessageGroupOwners();
             // If we can own the first, then no-one else should own the
             // rest.
             if (sequence == 1) {
 assignGroup(subscription, messageGroupOwners, node, groupId);
             } else {
                // Make sure that the previous owner is still valid, we may
                 // need to become the new owner.
                 ConsumerId groupOwner;
                groupOwner = messageGroupOwners.get(groupId);
                 if (groupOwner == null) {
 assignGroup(subscription, messageGroupOwners, node, groupId);
                 } else {
                     if (groupOwner.equals(subscription.getConsumerInfo().getConsumerId())) {
                         // A group sequence < 1 is an end of group signal.
                         if (sequence < 0) {
                             messageGroupOwners.removeGroup(groupId);
                             subscription.getConsumerInfo().setLastDeliveredSequenceId(subscription.getConsumerInfo().getLastDeliveredSequenceId() - 1);
                         }
                     } else {
                         result = false;
                     }
                 }
             }
         }
        return result;
    }

打完收工~~

本文参与 腾讯云自媒体同步曝光计划,分享自作者个人站点/博客。
原始发表:2018/08/21 ,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 作者个人站点/博客 前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
相关产品与服务
负载均衡
负载均衡(Cloud Load Balancer,CLB)提供安全快捷的流量分发服务,访问流量经由 CLB 可以自动分配到云中的多台后端服务器上,扩展系统的服务能力并消除单点故障。负载均衡支持亿级连接和千万级并发,可轻松应对大流量访问,满足业务需求。
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档