RocketMQ整合Spring的一个项目在apache中可以看到是rocketmq-spring。其提供了整合spring的方便使用方式。
下面我们来看看拉取和监听两种方式的消费模式的相关配置。
在Rocketmq整合Spring的过程中,我之前配置过这样的配置:
rocketmq:
name-server: 192.168.0.103:8080; 192.168.0.102:8080
tag: TAG_TEST
consumer:
access-key: test_uat
secret-key: 123456
tag: TAG_TEST
topic: TEST_TOPIC
group: GID_TEST_GROUP
enable-msg-trace: true
access-channel: CLOUD
pull-consumer:
group: GID_TEST_GROUP
topic: TEST_TOPIC
这个配置其实是配置了两种模式的配置,可以在消息诊断可以看到会存在多个消费者线程的情况,虽然诊断的结果是没有问题的,但是消费会出现消费不到,产生消息丢失的情况。为什么这么说呢?
从RocketMQ-Spring中,我们可以看到我们如果使用监听的方式进行消费的话,其实其会有一个配置是支持我们去做消费的,那就是RocketMQMessageListener这个类中的配置:org.apache.rocketmq.spring.annotation.RocketMQMessageListener
String NAME_SERVER_PLACEHOLDER = "${rocketmq.name-server:}";
String ACCESS_KEY_PLACEHOLDER = "${rocketmq.push-consumer.access-key:}";
String SECRET_KEY_PLACEHOLDER = "${rocketmq.push-consumer.secret-key:}";
String TRACE_TOPIC_PLACEHOLDER = "${rocketmq.push-consumer.customized-trace-topic:}";
String ACCESS_CHANNEL_PLACEHOLDER = "${rocketmq.access-channel:}";
String consumerGroup();
String topic();
SelectorType selectorType() default SelectorType.TAG;
String selectorExpression() default "*";
ConsumeMode consumeMode() default ConsumeMode.CONCURRENTLY;
MessageModel messageModel() default MessageModel.CLUSTERING;
@Deprecated
int consumeThreadMax() default 64;
int consumeThreadNumber() default 20;
int maxReconsumeTimes() default -1;
long consumeTimeout() default 15L;
int replyTimeout() default 3000;
String accessKey() default ACCESS_KEY_PLACEHOLDER;
String secretKey() default SECRET_KEY_PLACEHOLDER;
boolean enableMsgTrace() default false;
String customizedTraceTopic() default TRACE_TOPIC_PLACEHOLDER;
String nameServer() default NAME_SERVER_PLACEHOLDER;
String accessChannel() default ACCESS_CHANNEL_PLACEHOLDER;
String tlsEnable() default "false";
String namespace() default "";
/**
* Message consume retry strategy in concurrently mode.
*
* -1,no retry,put into DLQ directly
* 0,broker control retry frequency
* >0,client control retry frequency
*/
int delayLevelWhenNextConsume() default 0;
int suspendCurrentQueueTimeMillis() default 1000;
int awaitTerminationMillisWhenShutdown() default 1000;
String instanceName() default "DEFAULT";
从这个配置中,我们可以看到其存在相关的配置:其中可以看到最大的消费线程数已经被废弃,取而代之的是consumeThreadNumber,默认是20个线程执行拉取消费操作。消费的模式采用并行消费:ConsumeMode.CONCURRENTLY。消息模式是采用的集群模式。同时根据占位符可以看到基本覆盖了我们配置的yml配置。
我们知道只有注册到Spring中的bean才能被Spring处理。因此我们可以看到:
@Override
public void afterSingletonsInstantiated() {
//拿到带RocketMQMessageListener的注解,同时将其进行注册
Map<String, Object> beans = this.applicationContext.getBeansWithAnnotation(RocketMQMessageListener.class)
.entrySet().stream().filter(entry -> !ScopedProxyUtils.isScopedTarget(entry.getKey()))
.collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
beans.forEach(this::registerContainer);
}
注册到容器中:
private void registerContainer(String beanName, Object bean) {
Class<?> clazz = AopProxyUtils.ultimateTargetClass(bean);
//拿到监听,获取消费者组、主题
RocketMQMessageListener annotation = clazz.getAnnotation(RocketMQMessageListener.class);
String consumerGroup = this.environment.resolvePlaceholders(annotation.consumerGroup());
String topic = this.environment.resolvePlaceholders(annotation.topic());
//获取监听,是否开启了监听
boolean listenerEnabled =
(boolean) rocketMQProperties.getPushConsumer().getListeners().getOrDefault(consumerGroup, Collections.EMPTY_MAP)
.getOrDefault(topic, true);
//容器bean名称
String containerBeanName = String.format("%s_%s", DefaultRocketMQListenerContainer.class.getName(),
counter.incrementAndGet());
GenericApplicationContext genericApplicationContext = (GenericApplicationContext) applicationContext;
genericApplicationContext.registerBean(containerBeanName, DefaultRocketMQListenerContainer.class,
() -> createRocketMQListenerContainer(containerBeanName, bean, annotation));
DefaultRocketMQListenerContainer container = genericApplicationContext.getBean(containerBeanName,
DefaultRocketMQListenerContainer.class);
if (!container.isRunning()) {
try {
//启动容器
container.start();
} catch (Exception e) {
log.error("Started container failed. {}", container, e);
throw new RuntimeException(e);
}
}
log.info("Register the listener to container, listenerBeanName:{}, containerBeanName:{}", beanName, containerBeanName);
}
启动容器的本质是启动DefaultMQPushConsumer。
@Override
public void start() throws MQClientException {
setConsumerGroup(NamespaceUtil.wrapNamespace(this.getNamespace(), this.consumerGroup));
//启动默认推模式的消费方式
this.defaultMQPushConsumerImpl.start();
if (null != traceDispatcher) {
try {
//链路追踪
traceDispatcher.start(this.getNamesrvAddr(), this.getAccessChannel());
} catch (MQClientException e) {
log.warn("trace dispatcher start failed ", e);
}
}
}
那我们再来看看拉取的方式,其默认的使用litePull的方式RocketMQAutoConfiguration:这个自动注入的条件是
@Bean(CONSUMER_BEAN_NAME)
@ConditionalOnMissingBean(DefaultLitePullConsumer.class)
@ConditionalOnProperty(prefix = "rocketmq", value = {"name-server", "pull-consumer.group", "pull-consumer.topic"})
public DefaultLitePullConsumer defaultLitePullConsumer(RocketMQProperties rocketMQProperties)
throws MQClientException {
RocketMQProperties.PullConsumer consumerConfig = rocketMQProperties.getPullConsumer();
String nameServer = rocketMQProperties.getNameServer();
String groupName = consumerConfig.getGroup();
String topicName = consumerConfig.getTopic();
Assert.hasText(nameServer, "[rocketmq.name-server] must not be null");
Assert.hasText(groupName, "[rocketmq.pull-consumer.group] must not be null");
Assert.hasText(topicName, "[rocketmq.pull-consumer.topic] must not be null");
String accessChannel = rocketMQProperties.getAccessChannel();
MessageModel messageModel = MessageModel.valueOf(consumerConfig.getMessageModel());
SelectorType selectorType = SelectorType.valueOf(consumerConfig.getSelectorType());
String selectorExpression = consumerConfig.getSelectorExpression();
String ak = consumerConfig.getAccessKey();
String sk = consumerConfig.getSecretKey();
int pullBatchSize = consumerConfig.getPullBatchSize();
boolean useTLS = consumerConfig.isTlsEnable();
DefaultLitePullConsumer litePullConsumer = RocketMQUtil.createDefaultLitePullConsumer(nameServer, accessChannel,
groupName, topicName, messageModel, selectorType, selectorExpression, ak, sk, pullBatchSize, useTLS);
litePullConsumer.setEnableMsgTrace(consumerConfig.isEnableMsgTrace());
litePullConsumer.setCustomizedTraceTopic(consumerConfig.getCustomizedTraceTopic());
litePullConsumer.setNamespace(consumerConfig.getNamespace());
litePullConsumer.setInstanceName(consumerConfig.getInstanceName());
log.info(String.format("a pull consumer(%s sub %s) init on namesrv %s", groupName, topicName,nameServer));
return litePullConsumer;
}
从ISSUE中可以看到其是为了支持litepull消费消息的,[ISSUE #306] Support real LitePullMessage in RocketMQ-Spring (#307) 。因此,如果配置了rocketmq为前缀,同时采用pull-consumer的方式的话,此时就可以采用这种方式注入。同时其出现在ExtRocketMQConsumerConfiguration配置中。通常这种方式 适合于实现扩展类,比如下面这种:
@ExtRocketMQConsumerConfiguration(topic = "${demo.rocketmq.topic}", group = "string_consumer", tlsEnable = "${demo.ext.consumer.tlsEnable}")
public class ExtRocketMQTemplate extends RocketMQTemplate {
}
因此,如果使用了pull,就不要使用push。否则就会产生冲突,在一个项目中。可以看到启动的是DefaultLitePullConsumer。
private void registerTemplate(String beanName, Object bean) {
Class<?> clazz = AopProxyUtils.ultimateTargetClass(bean);
// 获取扩展配置消费者的配置
ExtRocketMQConsumerConfiguration annotation = clazz.getAnnotation(ExtRocketMQConsumerConfiguration.class);
GenericApplicationContext genericApplicationContext = (GenericApplicationContext) applicationContext;
DefaultLitePullConsumer consumer = null;
try {
//创建消费者
consumer = createConsumer(annotation);
//启动消费者DefaultLitePullConsumer
consumer.start();
} catch (Exception e) {
log.error("Failed to startup PullConsumer for RocketMQTemplate {}", beanName, e);
}
RocketMQTemplate rocketMQTemplate = (RocketMQTemplate) bean;
rocketMQTemplate.setConsumer(consumer);
rocketMQTemplate.setMessageConverter(rocketMQMessageConverter.getMessageConverter());
log.info("Set real consumer to :{} {}", beanName, annotation.value());
}
也即此时验证我们看到诊断信息出现的两个消费者的情况。通过 实践可以知道两者目前不可以共存。否则会出现竞争的情况。
同时可以看到基于RocketMQ模板方式的生产者和消费者分别为:
private DefaultMQProducer producer;
private DefaultLitePullConsumer consumer;
也即模板中的生产者是基于默认MQ生产,而消费者则是采用默认的litepull消费。
litepull模式下只是通过poll方法批量拉取消息,缺乏重试机制,而push方式可以进行重试,并且失败后,会重试16次。那如果使用了litepull的方式,如果出现了失败,需要如何补偿呢?此时需要在业务系统中,获取offset,然后重置offset,进行重新消费,同时需要进行幂等处理。