前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >SpringBoot优雅整合RocketMQ

SpringBoot优雅整合RocketMQ

作者头像
RRT冻羊
发布2022-11-03 14:12:06
1.4K0
发布2022-11-03 14:12:06
举报
文章被收录于专栏:冻羊技术思考冻羊技术思考

SpringBoot优雅整合RocketMQ

本篇文章默认你已经有RocketMQ的基础:

  • Producer启动过程,消息发送过程
  • Consumer启动过程,消息拉取消息消费过程
  • NameServer,Broker,Topic,Queue等相关概念

本篇内容默认你已经有SpringBoot的基础:

  • @Component ,@Service
  • @PostConstruct
  • @PreDestory
  • ApplicationEventPublisher

具备模板方法设计模式的概念

你只有简单具备上述知识,就可以继续往下阅读文章

Step1 : Maven引入相关依赖

代码语言:javascript
复制
<dependency>
  <groupId>com.alibaba</groupId>
  <artifactId>fastjson</artifactId>
  <version>1.2.47</version>
</dependency>

<dependency>
   <groupId>org.apache.rocketmq</groupId>
   <artifactId>rocketmq-client</artifactId>
   <version>4.1.0-incubating</version>
</dependency>

引入fastjson及rocketmq-client依赖,这两个都是必须的。版本号根据自己实际需求可更改

Step2:生产者

思想:利用@Compoent注解让生产者实例受Spring容器管理,并且利用@PostConstruct实现生产者启动以及@PreDestory实现生产者关闭 注意事项:

  1. 下面的生产者,会伴随SpringBoot启动时调用start()启动生产者,在开发者需要使用的时候利用SpringIoc依赖注入即可。在项目运行过程中,可以随时调用shutdown()方法以关闭生产者。
  2. 如果你认为,生产者长时间闲置不好,亦可以根据自己的需求,变更逻辑。可以是每次由Spring容器返回一个新的实例,但是要记得,你这样做,每次使用完都要手动shutdown()
  3. 实际开发中,应该用Log日志方式代替System.out.println()输出
代码语言:javascript
复制
/**
 * 长连接producer抽象
 */
public abstract class AbstractMqProducer {
    protected DefaultMQProducer producer;

	@PostConstruct
    public abstract void start() throws MQClientException;

    @PreDestroy
    public void shutdown() {
        System.err.println("AbstractMqProducer @PreDestroy调用");
        producer.shutdown();
    }
}

生产者的抽象类,定义了start()和shutdown()方法。其中start()方法需要开发者进行重写。开发者需要在start()方法中,为producer进行初始化和启动工作。

代码语言:javascript
复制
/**
 * 生产者示例1
 *
 * 利用SpringBoot的特性,首先将其注解Component,让Spring容器接管这个实例
 * 利用PostConstruct来让实例化后的Bean进行后置处理
 */
@Component
public class TestProducer1 extends AbstractMqProducer{

    @Value("${dy.rocketmq.producer.producerGroup}")
    private String producerGroup;

    @Value("${dy.rocketmq.namesrvAddr}")
    private String namesrvAddr;

    @Value("${dy.rocketmq.producer.instanceName}")
    private String instanceName;

   
    @Override
    public void start() throws MQClientException {
        if (null == producer) {
            producer = new DefaultMQProducer(producerGroup);
            producer.setNamesrvAddr(namesrvAddr);
            producer.setInstanceName(instanceName);
        }
        producer.start();
        System.out.println(namesrvAddr);
        System.err.println("rocketmq producer is starting...");
    }

    public boolean send(String topic, String tag, String key, TestMqMessageDto msg) {
        try {
            Message message = new Message(
                    topic, tag, key,
                    JSONObject.toJSONString(msg).getBytes("utf-8")
            );

            SendResult sendResult = producer.send(message);
            System.err.println("消息生产结果:" + sendResult);
            return true;
        } catch (Exception e) {
            e.printStackTrace();
            return false;
        }
    }
}
重写start()方法,@Value注解大家应该不陌生,他会从application.yml中获取对应的属性值,此处你也可以直接手动设置值。
为你的producer定义send()方法,可根据你的需求写多个不同的send方法以达到生产消息的目的
如果你想每一次消息发送的时候才启动producer,并且在发送成功后shutdown(),那么你可以修改AbstractProducer类的逻辑,并且在子类send()中,每次都调用start(),然后在发送结束后shutdown()
代码语言:javascript
复制
// 使用方式:在Spring接管的Bean中,直接使用@Autowired来获取producer实例

@Autowired
private TestProducer1 testProducer1;

{
	// 直接使用 发送消息
	testProducer1.send("topic", "tag", "key", 内容);
}

Step3: 消费者

代码语言:javascript
复制
/**
 * MQ消费者抽象类
 *
 * 定义消费消息的逻辑
 */
public abstract class AbstractMqConsumer {

    protected DefaultMQPushConsumer consumer;
    // 是否允许顺序消费
    protected boolean isOrderConsumer = false;

    @Autowired
    private ApplicationEventPublisher publisher;

    /**
     * 初始化consumer,由开发者控制
     *
     * 例如:
     * try {
     *      consumer = new DefaultMQPushConsumer(consumerGroup);
     *      consumer.setNamesrvAddr(namesrvAddr);
     *      consumer.setMessageModel(MessageModel.CLUSTERING);
     *      consumer.setConsumeMessageBatchMaxSize(1);
     *      consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET);
     *      consumer.subscribe("TopicTest", "*");
     *  } catch (MQClientException e) {
     *      e.printStackTrace();
     *  }
     */
    abstract void start0();

    @PostConstruct
    private void start() {
        if (null == consumer) {
            start0();
        }

        if (isOrderConsumer) {
            consumer.registerMessageListener(new MessageListenerOrderly() {
                @Override
                public ConsumeOrderlyStatus consumeMessage(List<MessageExt> msgs, ConsumeOrderlyContext consumeOrderlyContext) {
                    try {
                        consumeOrderlyContext.setAutoCommit(true);
                        if (null == msgs || msgs.size() == 0) {
                            return ConsumeOrderlyStatus.SUCCESS;
                        }
                        publisher.publishEvent(new MqMessageEvent(consumer, msgs));
                    } catch (Exception e) {
                        e.printStackTrace();
                        return ConsumeOrderlyStatus.SUSPEND_CURRENT_QUEUE_A_MOMENT;
                    }
                    return ConsumeOrderlyStatus.SUCCESS;
                }
            });
        }
        else {
            consumer.registerMessageListener(new MessageListenerConcurrently() {
                @Override
                public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext consumeConcurrentlyContext) {
                    try {
                        if (null == msgs || msgs.size() == 0) {
                            return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
                        }
                        publisher.publishEvent(new MqMessageEvent(consumer, msgs));
                    } catch (Exception e) {
                        e.printStackTrace();
                        return ConsumeConcurrentlyStatus.RECONSUME_LATER;
                    }

                    return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
                }
            });
        }

        new Thread(new Runnable() {
            @Override
            public void run() {

                try {
                    Thread.sleep(5000L);

                    consumer.start();
                    System.err.println("rocketmq consumer server is starting...");
                } catch (InterruptedException e) {
                    e.printStackTrace();
                } catch (Exception e) {
                    e.printStackTrace();
                }
            }
        }).start();
    }

    @PreDestroy
    public void shutdown() {
        consumer.shutdown();
    }
}

抽象了消费者,所有消费者继承这个类,然后实现start0()方法。

start0方法:由开发者去编写,目的是初始化consumer,但无需调用consumer.start();

start()方法: 该方法定义了ApplicationEventPublisher发布消息事件的逻辑,他会根据你的consumer类型(顺序消费,并发消费)来注册不同的MessageListener

PS:

  1. 如果你不懂ApplicationEventPublisher,请自行百度。大概来说,它是Spring实现的一种“观察者模式”。由ApplicationEventPublisher.publish()来通知对应的订阅者处理事件。
代码语言:javascript
复制
/**
 * 消费者1示例
 */
@Component
public class TestConsumer1 extends AbstractMqConsumer {

    @Value("${dy.rocketmq.consumer.consumerGroup}")
    private String consumerGroup;

    @Value("${dy.rocketmq.namesrvAddr}")
    private String namesrvAddr;

    @Override
    void start0() {
        try {
            consumer = new DefaultMQPushConsumer(consumerGroup);
            // 设置namesrv地址
            consumer.setNamesrvAddr(namesrvAddr);
            // 设置集群消费
            consumer.setMessageModel(MessageModel.CLUSTERING);
            // 设置每次消费消息的数量,官方一般建议1条,除非你有批量处理的需求
            consumer.setConsumeMessageBatchMaxSize(1);
            // 设置消费策略
            consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET);
            // 设置订阅的topic和tags
            consumer.subscribe("TopicTest", "*");
            // ... 根据自己的需求设置consumer其他参数
            
        } catch (MQClientException e) {
            e.printStackTrace();
        }

    }
}
代码语言:javascript
复制
/**
 * 监听rocketMQ消费消息的spring event
 */
public class MqMessageEvent extends ApplicationEvent {
    private DefaultMQPushConsumer consumer;
    private List<MessageExt> msgs;

    public MqMessageEvent(DefaultMQPushConsumer consumer, List<MessageExt> msgs) {
        super(msgs);
        this.consumer = consumer;
        this.msgs = msgs;
    }

    public DefaultMQPushConsumer getConsumer() {
        return consumer;
    }

    public void setConsumer(DefaultMQPushConsumer consumer) {
        this.consumer = consumer;
    }

    public List<MessageExt> getMsgs() {
        return msgs;
    }

    public void setMsgs(List<MessageExt> msgs) {
        this.msgs = msgs;
    }
}

最后定义消费消息的Service:

代码语言:javascript
复制
/**
 * 用于监听MqMessageEvent的服务
 * 消费MQ消息
 * 
 * 一般两种方式:
 * (1)第一种:这个类的作用就是监听SpringEvent事件,然后再根据消息分发给其他Service进行处理,所以这里一般不会包含业务逻辑代码
 * (2)第二种:这个类的作用就是具体的消费消息类
 */
@Service
public class TestConsumerService {

    /**
     * 消费TopicTest下的TagA
     * @param event
     */
    @EventListener(condition = "#event.msgs[0].topic=='TopicTest' && #event.msgs[0].tags=='TagA'")
    public void testConsumer(MqMessageEvent event) {
        // 由于mq消费者设置了batch=1,所以每次都只会消费一条
        MessageExt msg = event.getMsgs().get(0);
        if (null != msg) {
            // 具体的消费MessageExt的逻辑
        }
    }
    
    /**
     * 消费TopicTestB
     * @param event
     */
    @EventListener(condition = "#event.msgs[0].topic=='TopicTestB' ")
    public void testConsumer(MqMessageEvent event) {
        // 由于mq消费者设置了batch=1,所以每次都只会消费一条
        MessageExt msg = event.getMsgs().get(0);
        if (null != msg) {
            // 具体的消费MessageExt的逻辑
            if (msg.getTags() == "TagA") {
            	//消费TagA的消息
            }
            else if (msg.getTags() == 'TagB'){
				//消费TagB的消息
			}
        }
    }
}
综上,你已经整合好了RocketMQ的生产者和消费者。
可能你的项目会需要多个不同生产者,多个不同的消费者,你只需要按上面的方式,新建多个不同的producer,consumer继承AbstractProducer, AbstractConsumer即可。
GitHub参考项目地址,找到src/main目录下的rocketmq包即可查看相关源码:

GitHub示例项目

最后简单脚注一些rocketmq的注意点

这些注意点都是在学习源码的过程中总结的,希望对很多还未深入源码了解rocketmq的程序员,在使用mq的过程中有所注意:

  1. 在集群消费模式下,同一个消费组(consumerGroup名相同)的所有消费者。他们所订阅的Topic,Tags都务必需要一致。 具体分析可以参考 参考 。此处涉及两个源码知识点,消息过滤和消息拉取。
  2. 生产消息过程中,消息是org.apache.rocketmq.common.message.Message . 该消息的构造方法有个参数虽然叫tags,但是它并不支持多个tag标签。一条Message仅能对应有且只有一个tag。所以不要被tags这个复数被误导了。如果你有去关注,你会发现consumer启动前配置的subscribe()订阅topic,tags时,它的参数是叫subExpression,所以在这里是支持表达式配置多个Tags。
  3. consumer属性consumeMessageBatchMaxSize默认为1,不建议去改动这个参数。这个参数表示你每次获得的List<MessageExt> msgs的消息个数。如果这里设置为1,表示每次你都只获得一个消息,也就是msgs.get(0)就可以取出这条消息。 它的工作原理大概是,配合pullBatchSize=32 。首先consumer会从它负责的queue中每隔一段时间一次拉取最多32(pullBatchSize)条消息(如果有这么多的话),然后再将这32条消息再根据tags进行一个消息过滤(因为“表达式过滤"模式,有可能会拉到其他非订阅的消息),最后将符合当前consumer订阅的消息内容,一次传递1(consumeMessageBatchMaxSize)条给开发者进行消息消费。 因此出于几点考虑: (1)一次拉一条进行消费,消费成功就返回SUCCESS,出问题就按照逻辑是记录下载,还是直接稍后重试。一般来说,都是一条消息对应一次业务处理。如果你拉N条消息,那么其中某一条失败了,你需要稍后重试。那么就会导致N-1条本来应该返回SUCESS消费成功的消息,而被迫全部失败。这样一来是不同的业务放在一起处理相互影响,另一方面如果失败了是很大的成本开销。 (2)暂时想不到
  4. 重复消费问题的产生原因: (1)consumeMessage()方法里没有用try-catch包住消费逻辑,导致一些意外的抛出异常而导致消费重试。 (2)消费消息超时时间默认15min,也就是说如果你消费消息过程中,超过15min没有返回CONSUME_SUCCESS或者RECONSUME_LATER ,即使之后返回了,也属于TIME_OUT。默认会重发这条消息给你并且无限重试,因此需要注意消息消费的时间不能超过consumeTimeout属性设置的值。
本文参与 腾讯云自媒体同步曝光计划,分享自作者个人站点/博客。
原始发表:2019-08-07,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 作者个人站点/博客 前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • SpringBoot优雅整合RocketMQ
    • Step1 : Maven引入相关依赖
      • Step2:生产者
        • Step3: 消费者
          • GitHub参考项目地址,找到src/main目录下的rocketmq包即可查看相关源码:
        • GitHub示例项目
          • 最后简单脚注一些rocketmq的注意点
      相关产品与服务
      容器服务
      腾讯云容器服务(Tencent Kubernetes Engine, TKE)基于原生 kubernetes 提供以容器为核心的、高度可扩展的高性能容器管理服务,覆盖 Serverless、边缘计算、分布式云等多种业务部署场景,业内首创单个集群兼容多种计算节点的容器资源管理模式。同时产品作为云原生 Finops 领先布道者,主导开源项目Crane,全面助力客户实现资源优化、成本控制。
      领券
      问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档