前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >Rocketmq整合Spring中的推消费和litepull消费

Rocketmq整合Spring中的推消费和litepull消费

作者头像
路行的亚洲
发布2023-08-31 13:44:39
6740
发布2023-08-31 13:44:39
举报
文章被收录于专栏:后端技术学习

RocketMQ整合Spring的一个项目在apache中可以看到是rocketmq-spring。其提供了整合spring的方便使用方式。

一、rocketmq-spring中推拉模式的配置

下面我们来看看拉取和监听两种方式的消费模式的相关配置。

在Rocketmq整合Spring的过程中,我之前配置过这样的配置:

代码语言:javascript
复制
rocketmq:
  name-server: 192.168.0.103:8080; 192.168.0.102:8080
  tag: TAG_TEST
  consumer:
    access-key: test_uat
    secret-key: 123456
    tag: TAG_TEST
    topic: TEST_TOPIC
    group: GID_TEST_GROUP
    enable-msg-trace: true
    access-channel: CLOUD
  pull-consumer:  
    group: GID_TEST_GROUP
    topic: TEST_TOPIC

这个配置其实是配置了两种模式的配置,可以在消息诊断可以看到会存在多个消费者线程的情况,虽然诊断的结果是没有问题的,但是消费会出现消费不到,产生消息丢失的情况。为什么这么说呢?

二、监听下的DefaultMQPushConsumer

从RocketMQ-Spring中,我们可以看到我们如果使用监听的方式进行消费的话,其实其会有一个配置是支持我们去做消费的,那就是RocketMQMessageListener这个类中的配置:org.apache.rocketmq.spring.annotation.RocketMQMessageListener

代码语言:javascript
复制
    String NAME_SERVER_PLACEHOLDER = "${rocketmq.name-server:}";
    String ACCESS_KEY_PLACEHOLDER = "${rocketmq.push-consumer.access-key:}";
    String SECRET_KEY_PLACEHOLDER = "${rocketmq.push-consumer.secret-key:}";
    String TRACE_TOPIC_PLACEHOLDER = "${rocketmq.push-consumer.customized-trace-topic:}";
    String ACCESS_CHANNEL_PLACEHOLDER = "${rocketmq.access-channel:}";
    String consumerGroup();
    String topic();

    SelectorType selectorType() default SelectorType.TAG;

    String selectorExpression() default "*";


    ConsumeMode consumeMode() default ConsumeMode.CONCURRENTLY;


    MessageModel messageModel() default MessageModel.CLUSTERING;

    @Deprecated
    int consumeThreadMax() default 64;

    int consumeThreadNumber() default 20;


    int maxReconsumeTimes() default -1;


    long consumeTimeout() default 15L;


    int replyTimeout() default 3000;

    String accessKey() default ACCESS_KEY_PLACEHOLDER;

    String secretKey() default SECRET_KEY_PLACEHOLDER;


    boolean enableMsgTrace() default false;


    String customizedTraceTopic() default TRACE_TOPIC_PLACEHOLDER;

    String nameServer() default NAME_SERVER_PLACEHOLDER;


    String accessChannel() default ACCESS_CHANNEL_PLACEHOLDER;

    String tlsEnable() default "false";


    String namespace() default "";

    /**
     * Message consume retry strategy in concurrently mode.
     *
     * -1,no retry,put into DLQ directly
     * 0,broker control retry frequency
     * >0,client control retry frequency
     */
    int delayLevelWhenNextConsume() default 0;


    int suspendCurrentQueueTimeMillis() default 1000;


    int awaitTerminationMillisWhenShutdown() default 1000;

    String instanceName() default "DEFAULT";

从这个配置中,我们可以看到其存在相关的配置:其中可以看到最大的消费线程数已经被废弃,取而代之的是consumeThreadNumber,默认是20个线程执行拉取消费操作。消费的模式采用并行消费:ConsumeMode.CONCURRENTLY。消息模式是采用的集群模式。同时根据占位符可以看到基本覆盖了我们配置的yml配置。

我们知道只有注册到Spring中的bean才能被Spring处理。因此我们可以看到:

代码语言:javascript
复制
 @Override
    public void afterSingletonsInstantiated() {
        //拿到带RocketMQMessageListener的注解,同时将其进行注册
        Map<String, Object> beans = this.applicationContext.getBeansWithAnnotation(RocketMQMessageListener.class)
            .entrySet().stream().filter(entry -> !ScopedProxyUtils.isScopedTarget(entry.getKey()))
            .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));

        beans.forEach(this::registerContainer);
    }

注册到容器中:

代码语言:javascript
复制
   private void registerContainer(String beanName, Object bean) {
        Class<?> clazz = AopProxyUtils.ultimateTargetClass(bean);

        //拿到监听,获取消费者组、主题
        RocketMQMessageListener annotation = clazz.getAnnotation(RocketMQMessageListener.class);

        String consumerGroup = this.environment.resolvePlaceholders(annotation.consumerGroup());
        String topic = this.environment.resolvePlaceholders(annotation.topic());

        //获取监听,是否开启了监听
        boolean listenerEnabled =
            (boolean) rocketMQProperties.getPushConsumer().getListeners().getOrDefault(consumerGroup, Collections.EMPTY_MAP)
                .getOrDefault(topic, true);

       //容器bean名称
        String containerBeanName = String.format("%s_%s", DefaultRocketMQListenerContainer.class.getName(),
            counter.incrementAndGet());
        GenericApplicationContext genericApplicationContext = (GenericApplicationContext) applicationContext;

        genericApplicationContext.registerBean(containerBeanName, DefaultRocketMQListenerContainer.class,
            () -> createRocketMQListenerContainer(containerBeanName, bean, annotation));
        DefaultRocketMQListenerContainer container = genericApplicationContext.getBean(containerBeanName,
            DefaultRocketMQListenerContainer.class);
        if (!container.isRunning()) {
            try {
                //启动容器
                container.start();
            } catch (Exception e) {
                log.error("Started container failed. {}", container, e);
                throw new RuntimeException(e);
            }
        }

        log.info("Register the listener to container, listenerBeanName:{}, containerBeanName:{}", beanName, containerBeanName);
    }

启动容器的本质是启动DefaultMQPushConsumer。

代码语言:javascript
复制
    @Override
    public void start() throws MQClientException {
        setConsumerGroup(NamespaceUtil.wrapNamespace(this.getNamespace(), this.consumerGroup));
        //启动默认推模式的消费方式
        this.defaultMQPushConsumerImpl.start();
        if (null != traceDispatcher) {
            try {
                //链路追踪
                traceDispatcher.start(this.getNamesrvAddr(), this.getAccessChannel());
            } catch (MQClientException e) {
                log.warn("trace dispatcher start failed ", e);
            }
        }
    }

三、litePull方式的消费DefaultLitePullConsumer

那我们再来看看拉取的方式,其默认的使用litePull的方式RocketMQAutoConfiguration:这个自动注入的条件是

代码语言:javascript
复制
    @Bean(CONSUMER_BEAN_NAME)
    @ConditionalOnMissingBean(DefaultLitePullConsumer.class)
    @ConditionalOnProperty(prefix = "rocketmq", value = {"name-server", "pull-consumer.group", "pull-consumer.topic"})
    public DefaultLitePullConsumer defaultLitePullConsumer(RocketMQProperties rocketMQProperties)
            throws MQClientException {
        RocketMQProperties.PullConsumer consumerConfig = rocketMQProperties.getPullConsumer();
        String nameServer = rocketMQProperties.getNameServer();
        String groupName = consumerConfig.getGroup();
        String topicName = consumerConfig.getTopic();
        Assert.hasText(nameServer, "[rocketmq.name-server] must not be null");
        Assert.hasText(groupName, "[rocketmq.pull-consumer.group] must not be null");
        Assert.hasText(topicName, "[rocketmq.pull-consumer.topic] must not be null");

        String accessChannel = rocketMQProperties.getAccessChannel();
        MessageModel messageModel = MessageModel.valueOf(consumerConfig.getMessageModel());
        SelectorType selectorType = SelectorType.valueOf(consumerConfig.getSelectorType());
        String selectorExpression = consumerConfig.getSelectorExpression();
        String ak = consumerConfig.getAccessKey();
        String sk = consumerConfig.getSecretKey();
        int pullBatchSize = consumerConfig.getPullBatchSize();
        boolean useTLS = consumerConfig.isTlsEnable();

        DefaultLitePullConsumer litePullConsumer = RocketMQUtil.createDefaultLitePullConsumer(nameServer, accessChannel,
                groupName, topicName, messageModel, selectorType, selectorExpression, ak, sk, pullBatchSize, useTLS);
        litePullConsumer.setEnableMsgTrace(consumerConfig.isEnableMsgTrace());
        litePullConsumer.setCustomizedTraceTopic(consumerConfig.getCustomizedTraceTopic());
        litePullConsumer.setNamespace(consumerConfig.getNamespace());
        litePullConsumer.setInstanceName(consumerConfig.getInstanceName());
        log.info(String.format("a pull consumer(%s sub %s) init on namesrv %s",  groupName, topicName,nameServer));
        return litePullConsumer;
    }

从ISSUE中可以看到其是为了支持litepull消费消息的,[ISSUE #306] Support real LitePullMessage in RocketMQ-Spring (#307) 。因此,如果配置了rocketmq为前缀,同时采用pull-consumer的方式的话,此时就可以采用这种方式注入。同时其出现在ExtRocketMQConsumerConfiguration配置中。通常这种方式 适合于实现扩展类,比如下面这种:

代码语言:javascript
复制
@ExtRocketMQConsumerConfiguration(topic = "${demo.rocketmq.topic}", group = "string_consumer", tlsEnable = "${demo.ext.consumer.tlsEnable}")
public class ExtRocketMQTemplate extends RocketMQTemplate {
}

因此,如果使用了pull,就不要使用push。否则就会产生冲突,在一个项目中。可以看到启动的是DefaultLitePullConsumer。

代码语言:javascript
复制
  private void registerTemplate(String beanName, Object bean) {
        Class<?> clazz = AopProxyUtils.ultimateTargetClass(bean);

        // 获取扩展配置消费者的配置
        ExtRocketMQConsumerConfiguration annotation = clazz.getAnnotation(ExtRocketMQConsumerConfiguration.class);
        GenericApplicationContext genericApplicationContext = (GenericApplicationContext) applicationContext;

        DefaultLitePullConsumer consumer = null;
        try {
            //创建消费者
            consumer = createConsumer(annotation);
            //启动消费者DefaultLitePullConsumer
            consumer.start();
        } catch (Exception e) {
            log.error("Failed to startup PullConsumer for RocketMQTemplate {}", beanName, e);
        }
        RocketMQTemplate rocketMQTemplate = (RocketMQTemplate) bean;
        rocketMQTemplate.setConsumer(consumer);
        rocketMQTemplate.setMessageConverter(rocketMQMessageConverter.getMessageConverter());
        log.info("Set real consumer to :{} {}", beanName, annotation.value());
    }

也即此时验证我们看到诊断信息出现的两个消费者的情况。通过 实践可以知道两者目前不可以共存。否则会出现竞争的情况。

同时可以看到基于RocketMQ模板方式的生产者和消费者分别为:

代码语言:javascript
复制
private DefaultMQProducer producer;
private DefaultLitePullConsumer consumer;

也即模板中的生产者是基于默认MQ生产,而消费者则是采用默认的litepull消费。

四、两者有什么区别

litepull模式下只是通过poll方法批量拉取消息,缺乏重试机制,而push方式可以进行重试,并且失败后,会重试16次。那如果使用了litepull的方式,如果出现了失败,需要如何补偿呢?此时需要在业务系统中,获取offset,然后重置offset,进行重新消费,同时需要进行幂等处理。

本文参与 腾讯云自媒体同步曝光计划,分享自微信公众号。
原始发表:2023-05-03,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 后端技术学习 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 一、rocketmq-spring中推拉模式的配置
  • 二、监听下的DefaultMQPushConsumer
  • 三、litePull方式的消费DefaultLitePullConsumer
  • 四、两者有什么区别
相关产品与服务
容器服务
腾讯云容器服务(Tencent Kubernetes Engine, TKE)基于原生 kubernetes 提供以容器为核心的、高度可扩展的高性能容器管理服务,覆盖 Serverless、边缘计算、分布式云等多种业务部署场景,业内首创单个集群兼容多种计算节点的容器资源管理模式。同时产品作为云原生 Finops 领先布道者,主导开源项目Crane,全面助力客户实现资源优化、成本控制。
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档