前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >springboot整合rocketmq实现顺序消费

springboot整合rocketmq实现顺序消费

作者头像
一缕82年的清风
发布2021-12-06 10:02:41
2.1K0
发布2021-12-06 10:02:41
举报
文章被收录于专栏:lsqingfeng

消息队列已然成为当下非常火热的中间件,而rocketmq作为阿里开源的中间件产品,历经数次超大并发的考验,已然成为中间件产品的首选。而有时候我们在使用消息队列的时候,往往需要能够保证消息的顺序消费,而rocketmq是可以支持消息的顺序消费的。rocketmq在发送消息的时候,是将消息发送到不同的队列(queue,也有人称之为分区)中,然后消费端从多个队列中读取消息进行消费,很明显,在这种全局模式下,是无法实现顺序消费的。为了实现顺序消费,我们需要把有顺序的消息按照他的顺序,将他们发送到同一个queue中,这样消费端在消费的时候,就保证了其顺序。但是顺序消费的性能肯定也相对差一些,因为只能使用一个队列。

好了,接下来我们使用springboot来看一下顺序消费是如何实现的。 官网上给出了一个顺序消费的案例,但是都是通过main方法的形式演示的(http://rocketmq.apache.org/docs/order-example/)。

一. 添加依赖:

代码语言:javascript
复制
<dependency>
      <groupId>org.apache.rocketmq</groupId>
      <artifactId>rocketmq-spring-boot-starter</artifactId>
      <version>2.1.0</version>
</dependency>

二. 配置rocketmq地址(需要先自己搭建好rocketmq服务)

在application.yml中配置:

代码语言:javascript
复制
rocketmq:
  name-server: 192.168.1.11:9876;192.168.1.12:9876;192.168.1.13:9876
  producer:
    group: my-group1
    sendMessageTimeout: 300000

这里我使用的rocketmq的集群,如果是单机版,name-server只写一个地址即可

三. 一个简单的生产消费案例:

我们使用controller 来下一个生产者,这样当我通过浏览器发起请求是,就调用生产者来生产一条消息,同时写一个消费者,来监听对应的消息,实现消费

生产者代码:

代码语言:javascript
复制
@RestController
@RequestMapping("/mq")
@Slf4j
public class ProducerController {

    @Resource
    private RocketMQTemplate rocketMQTemplate;


    @RequestMapping("/sync/send1")
    public String syncSendString(){
        //发送一个同步 消息,会返回值 ---发送到 stringTopic主题
        SendResult sendResult = rocketMQTemplate.syncSend("topicTest", "Hello, World!");
        System.out.printf("syncSend1 to topic %s sendResult=%s %n", stringTopic, sendResult);
        //consumer result:------- StringConsumerNewNS received: Hello, World!
        return sendResult.toString();
    }


}

上面的案例,就是我向"topicTest" 的主题中发送一个 Hello,World 的字符串

消费者代码:

代码语言:javascript
复制
/**
 * RocketMQMessageListener
 */
@Service
@RocketMQMessageListener(nameServer = "${rocketmq.nameserver}", topic = "topicTest", consumerGroup = "string_consumer")
public class StringConsumerNewNS implements RocketMQListener<String> {
    @Override
    public void onMessage(String message) {
        System.out.printf("------- StringConsumerNewNS received: %s \n", message);
    }
}

这个消费者需要绑定nameserver, 然后监听 topicTest 这个主题。要注意,要把这个消费者放到能够被spring容器扫描到的地方。

接下来启动服务: 在浏览器访问: localhost:{your port}/mq/sync/send1 此时就会生产出一条消息,观察控制台,就会看到Hello, World打印出来,代表消息消费成功

四. 实现顺序消费

生产者: 此时要生产多条消息,方便观察顺序,我们依然写一个controller

代码语言:javascript
复制
/**************验证rocketmq顺序消费***************/
    @RequestMapping("/send/ordered")
    public String sendOrderedMsg(){
        /**
         * hashKey: 为了保证报到同一个队列中,将消息发送到orderTopic主题上
         */
        rocketMQTemplate.syncSendOrderly("orderTopic","no1","order");
        rocketMQTemplate.syncSendOrderly("orderTopic","no2","order");
        rocketMQTemplate.syncSendOrderly("orderTopic","no3","order");
        rocketMQTemplate.syncSendOrderly("orderTopic","no4","order");
        return "success";
    }

这里要注意,我是向 orderTopic主题发送4条消息,内容分别是 no1 no2 no3 no4. 第三个参数是order ,他的作用是会根据他的hash值计算发送到哪一个队列,我用的是同一个值order,那么他们的hash一样就可以保证发送到同一个队列里

消费者。要注意,消费者在消费的时候,默认是异步多线程消费的,所以无法保证顺序,我们要指定同步消费才行;先看代码:

代码语言:javascript
复制
/**
 * 监听顺序消息,保证顺序缴费
 */
@Component
@Slf4j
@RocketMQMessageListener(topic = "orderTopic", consumerGroup = "ordered-consumer",consumeMode = ConsumeMode.ORDERLY)

public class OrderedMsqConsumer implements RocketMQListener<String> {
    @Override
    public void onMessage(String message) {
        log.info("consumer 顺序消费,收到消息{}",message);
    }
}

这里边指定了 consumeMode = ConsumeMode.ORDERLY, 默认值是 consumeMode = ConsumeMode.CONCURRENT

修改完毕后,启动项目;

浏览器访问:http://localhost:8888/mq/send/ordered

观察控制台日志,顺序打印: no1 no2 no3 no4

好了实现了顺序消费;相关源码已上传至github: https://github.com/lsqingfeng/action/ (springboot分支)欢迎大家关注交流

本文参与 腾讯云自媒体同步曝光计划,分享自作者个人站点/博客。
原始发表:2020/03/18 ,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 作者个人站点/博客 前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
相关产品与服务
消息队列 CMQ 版
消息队列 CMQ 版(TDMQ for CMQ,简称 TDMQ CMQ 版)是一款分布式高可用的消息队列服务,它能够提供可靠的,基于消息的异步通信机制,能够将分布式部署的不同应用(或同一应用的不同组件)中的信息传递,存储在可靠有效的 CMQ 队列中,防止消息丢失。TDMQ CMQ 版支持多进程同时读写,收发互不干扰,无需各应用或组件始终处于运行状态。
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档