前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >Springboot2整合RocketMQ消费端普通开发

Springboot2整合RocketMQ消费端普通开发

作者头像
用户5640963
发布2020-11-25 11:55:07
6440
发布2020-11-25 11:55:07
举报
文章被收录于专栏:卯金刀GG卯金刀GG

1、创建springboot项目,略;

2、POM.XML配置文件

代码语言:javascript
复制
        <dependency>
            <groupId>org.apache.rocketmq</groupId>
            <artifactId>rocketmq-client</artifactId>
            <version>4.7.1</version>
        </dependency>

3、java代码,实现并发处理

代码语言:javascript
复制
/**
 * @Author: Liu Yue
 * @Descripition:
 * @Date; Create in 2020/10/30 17:08
 **/
@Component
@Configuration
@Slf4j
@Data
public class BaseMsgConsumer {



    private DefaultMQPushConsumer consumer;
    private static final String CONSUMER_GROUP = "base_group";
    private final static String NAMESRV_ADDR = "192.168.27.16:9876";
    private final static String TOPIC = "base_topic";
    private final static String TAGS = "base_tags";


    public void consumer_3() throws MQClientException {
        consumer = new DefaultMQPushConsumer(CONSUMER_GROUP + "_syncMsg");
        consumer.setNamesrvAddr(NAMESRV_ADDR);
        consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
        consumer.subscribe(TOPIC, TAGS);

        consumer.registerMessageListener(new MessageListenerConcurrently() {
            public ConsumeConcurrentlyStatus consumeMessage(
                    List< MessageExt > msgs, ConsumeConcurrentlyContext context) {

                log.info(Thread.currentThread().getName()
                        + " Receive New Messages: " + msgs.size());
                try {
                    for (MessageExt msg : msgs) {
                        // 业务实现
                        log.info(Thread.currentThread().getName() + "\t" + "组1消费" + msg.getKeys() + new String(msg.getBody()));

                    return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
                } catch (Exception e) {
                    log.info("==========RECONSUME_LATER===========");
                    log.error(e.getMessage(),e);
                    return ConsumeConcurrentlyStatus.RECONSUME_LATER;
                }
            }
        });

        consumer.start();

        System.out.println("Consumer1 Started.");

    }

4、实现排序处理

代码语言:javascript
复制
public void consumer() throws MQClientException {
        log.info("初始化成功");
        consumer = new DefaultMQPushConsumer(CONSUMER_GROUP + "_syncMsg");
        consumer.setNamesrvAddr(NAMESRV_ADDR);
        consumer.subscribe(TOPIC, TAGS);
        // 分散消费 MessageModel.CLUSTERING 同一个 Consumer ID 所标识的所有 Consumer 分散消费消息。
        // 广播消费 MessageModel.BROADCASTING 同一个 Consumer ID 所标识的所有 Consumer 都会各自消费某条消息一次。
        consumer.setMessageModel(MessageModel.BROADCASTING);
        consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);

        // 并发消费模式 (MessageListenerConcurrently)
        // 有序消费模式 (MessageListenerOrderly)
        consumer.registerMessageListener((MessageListenerOrderly) (list, consumeConcurrentlyContext) -> {
            System.out.println(Thread.currentThread().getName()
                    + " Receive New Messages: " + list.size());
            synchronized (hkEquipmentAnalyselist) {
                list.forEach(msg -> {
                    // 业务实现
                    log.info(Thread.currentThread().getName() + "\t" + msg.getKeys() + "组1消费:" + new String(msg.getBody()));
                    
                });
            }
            return ConsumeOrderlyStatus.SUCCESS;
        });
        consumer.start();
    }

RocketMQ消息处理有两种,一种是多线程的并发模式,使用MessageListenerConcurrently;一种是有序消费模式,使用MessageListenerOrderly。根据系统的特别选择。

每天提高一点点!!!

本文参与 腾讯云自媒体分享计划,分享自作者个人站点/博客。
如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 作者个人站点/博客 前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体分享计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档