序 本文主要研究一下RocketMQTemplate RocketMQTemplate rocketmq-spring-boot-2.0.3-sources.jar!.../org/apache/rocketmq/spring/core/RocketMQTemplate.java public class RocketMQTemplate extends AbstractMessageSendingTemplate.../org/apache/rocketmq/spring/core/RocketMQTemplate.java /** * Same to {@link #syncSend(String.../org/apache/rocketmq/spring/core/RocketMQTemplate.java /** * Same to {@link #syncSendOrderly.../org/apache/rocketmq/spring/core/RocketMQTemplate.java /** * Same to {@link #asyncSend(String
序 本文主要研究一下RocketMQTemplate timg (38).jpeg RocketMQTemplate rocketmq-spring-boot-2.0.3-sources.jar!.../org/apache/rocketmq/spring/core/RocketMQTemplate.java public class RocketMQTemplate extends AbstractMessageSendingTemplate.../org/apache/rocketmq/spring/core/RocketMQTemplate.java /** * Same to {@link #syncSend(String.../org/apache/rocketmq/spring/core/RocketMQTemplate.java /** * Same to {@link #syncSendOrderly.../org/apache/rocketmq/spring/core/RocketMQTemplate.java /** * Same to {@link #asyncSend(String
127.0.0.1:9876 # 纯消费者不需要以下配置 producer: group: test-group 消息生产者 获取客户端模板 @Autowired private final RocketMQTemplate...rocketMQTemplate; 发送消息 // 默认使用同步发送, 但拿不到回执, 源码见下文org.apache.rocketmq.spring.core.RocketMQTemplate.doSent...rocketMQTemplate.convertAndSend("test-topic", entity); rocketMQTemplate.send("test-topic", MessageBuilder.withPayload...(entity).build()); // 带tag rocketMQTemplate.convertAndSend("test-topic:tag1", entity); rocketMQTemplate.send...("test-topic:tag2", MessageBuilder.withPayload(entity).build()); 发送单向消息(不关心发送结果) rocketMQTemplate.sendOneWay
rocketMQTemplate(DefaultMQProducer mqProducer, RocketMQMessageConverter rocketMQMessageConverter...) { RocketMQTemplate rocketMQTemplate = new RocketMQTemplate(); rocketMQTemplate.setProducer...(mqProducer); rocketMQTemplate.setMessageConverter(rocketMQMessageConverter.getMessageConverter...()); return rocketMQTemplate; } } @Configuration 说明这是一个配置类,类中被@Bean注解了的方法,就是spring的一个bean...,例如rocketMQTemplate。
= consumeridcs; } } SelectMessageQueueByMachineRoom实现了MessageQueueSelector接口,其select方法目前返回null RocketMQTemplate.../org/apache/rocketmq/spring/core/RocketMQTemplate.java public class RocketMQTemplate extends AbstractMessageSendingTemplate...implements InitializingBean, DisposableBean { private static final Logger log = LoggerFactory.getLogger(RocketMQTemplate.class...messageQueueSelector) { this.messageQueueSelector = messageQueueSelector; } //...... } RocketMQTemplate...MessageQueue;它有几个实现类,分别是SelectMessageQueueByHash、SelectMessageQueueByRandom、SelectMessageQueueByMachineRoom;RocketMQTemplate
consumer: group: boot_group_1 # 每次提取的最大消息数 pull-batch-size: 5 4、配置类 在配置类中主要定义两个Bean的加载,即RocketMQTemplate...rocketMqTemplate(){ RocketMQTemplate rocketMqTemplate = new RocketMQTemplate(); rocketMqTemplate.setProducer...(defaultMqProducer()); return rocketMqTemplate; } @Bean public DefaultMQProducer...this.retryTimesWhenSendAsyncFailed); return producer; } } 四、基础用法 1、消息生产 编写一个生产者接口类,分别使用RocketMQTemplate...rocketMqTemplate; @GetMapping("/send/msg1") public String sendMsg1 (){ try {
MQClientException("Bad usage of @RocketMQTransactionListener, " + "the class must work with RocketMQTemplate...TransactionHandlerRegistry.java public class TransactionHandlerRegistry implements DisposableBean { private RocketMQTemplate...rocketMQTemplate; private final Set listenerContainers = new ConcurrentSet(); ...public TransactionHandlerRegistry(RocketMQTemplate template) { this.rocketMQTemplate = template...,其clear方法直接清空listenerContainers;registerTransactionHandler方法会往listenerContainers添加该handler的name,然后执行rocketMQTemplate.createAndStartTransactionMQProducer
= consumeridcs; } } SelectMessageQueueByMachineRoom实现了MessageQueueSelector接口,其select方法目前返回null RocketMQTemplate.../org/apache/rocketmq/spring/core/RocketMQTemplate.java public class RocketMQTemplate extends AbstractMessageSendingTemplate...implements InitializingBean, DisposableBean { private static final Logger log = LoggerFactory.getLogger(RocketMQTemplate.class...messageQueueSelector) { this.messageQueueSelector = messageQueueSelector; } //...... } RocketMQTemplate...MessageQueue;它有几个实现类,分别是SelectMessageQueueByHash、SelectMessageQueueByRandom、SelectMessageQueueByMachineRoom;RocketMQTemplate
RocketMQTemplate.class.isAssignableFrom(bean.getClass())) { throw new IllegalStateException...(clazz + " is not instance of " + RocketMQTemplate.class.getName()); } ExtRocketMQTemplateConfiguration...throw new BeanDefinitionValidationException(String.format("Failed to startup MQProducer for RocketMQTemplate...{}", beanName), e); } RocketMQTemplate rocketMQTemplate = (RocketMQTemplate...) bean; rocketMQTemplate.setProducer(mqProducer); rocketMQTemplate.setObjectMapper(objectMapper
RocketMQTemplate.class.isAssignableFrom(bean.getClass())) { throw new IllegalStateException...(clazz + " is not instance of " + RocketMQTemplate.class.getName()); } ExtRocketMQTemplateConfiguration...throw new BeanDefinitionValidationException(String.format("Failed to startup MQProducer for RocketMQTemplate...{}", beanName), e); } RocketMQTemplate rocketMQTemplate = (RocketMQTemplate...) bean; rocketMQTemplate.setProducer(mqProducer); rocketMQTemplate.setObjectMapper(objectMapper
send-message-timeout: 30000 3、测试生产消息 @SpringBootTest class ScriptApplicationTests { @Resource private RocketMQTemplate...rocketMQTemplate; @Test void contextLoads() { rocketMQTemplate.send("test_topic", MessageBuilder.withPayload...("这是一条消息").build()); rocketMQTemplate.send("test_topic", MessageBuilder.withPayload(MapUtil.of
在 RocketMQTemplate 中,会创建一个 RocketMQ DefaultMQProducer 生产者 producer ,所以 RocketMQTemplate 后续的各种发送消息的方法,...同时,需要继承 RocketMQTemplate 类,从而使我们可以直接使用 @Autowire 或 @Resource 注解,注入 RocketMQTemplate Bean 属性。 4....在 RocketMQTemplate 中,提供了一个方法方法批量发送消息的方法。...RocketMQTemplate 在发送顺序消息时,默认采用 SelectMessageQueueByHash 策略。...在调用这个方法之前,RocketMQTemplate 已经使用 Producer 发送了一条事务消息。
背景 今天将一个SpringBoot项目的配置参数从原有的.yml文件迁移到Apollo后,启动报错“Bean method 'rocketMQTemplate' in 'RocketMQAutoConfiguration...********************** APPLICATION FAILED TO START *************************** Description: Field rocketMQTemplate...in net.yourpackage.myServiceBImpl required a bean of type 'org.apache.rocketmq.spring.starter.core.RocketMQTemplate...- Bean method 'rocketMQTemplate' in 'RocketMQAutoConfiguration' not loaded because @ConditionalOnBean...revisiting the conditions above or defining a bean of type 'org.apache.rocketmq.spring.starter.core.RocketMQTemplate
RestController @RequestMapping("/mq") @Slf4j public class ProducerController { @Resource private RocketMQTemplate...rocketMQTemplate; @RequestMapping("/sync/send1") public String syncSendString(){ /.../发送一个同步 消息,会返回值 ---发送到 stringTopic主题 SendResult sendResult = rocketMQTemplate.syncSend("topicTest...("orderTopic","no1","order"); rocketMQTemplate.syncSendOrderly("orderTopic","no2","order");...rocketMQTemplate.syncSendOrderly("orderTopic","no3","order"); rocketMQTemplate.syncSendOrderly
) // 不存在 RocketMQTemplate Bean 对象 public RocketMQTemplate rocketMQTemplate(DefaultMQProducer mqProducer..., ObjectMapper rocketMQMessageObjectMapper) { // 创建 RocketMQTemplate 对象 RocketMQTemplate rocketMQTemplate...= new RocketMQTemplate(); // 设置其属性 rocketMQTemplate.setProducer(mqProducer); rocketMQTemplate.setObjectMapper...(rocketMQMessageObjectMapper); return rocketMQTemplate; } 关于 RocketMQTemplate 类,详细解析,见 「9.4 RocketMQTemplate...后续的逻辑,关于 RocketMQTemplate 类,详细解析,见 「9.4 RocketMQTemplate」 中。
MQClientException("Bad usage of @RocketMQTransactionListener, " + "the class must work with RocketMQTemplate...TransactionHandlerRegistry.java public class TransactionHandlerRegistry implements DisposableBean { private RocketMQTemplate...rocketMQTemplate; private final Set listenerContainers = new ConcurrentSet(); public...TransactionHandlerRegistry(RocketMQTemplate template) { this.rocketMQTemplate = template;...,其clear方法直接清空listenerContainers;registerTransactionHandler方法会往listenerContainers添加该handler的name,然后执行rocketMQTemplate.createAndStartTransactionMQProducer
2、需要发送消息的类中注入 RcoketMQTemplate @Autowired private RocketMQTemplate rocketMQTemplate; @Value("${rocketmq.topic...}") private String smsTopic; 3、发送消息,消息体可以是自定义对象,也可以是 Message 对象 rocketMQTemplate 类包含多钟发送消息的方法: 同步发送...topic : topic + ":" + tags; SendResult sendResult = rocketMQTemplate.syncSend( destination...RocketMQProperties 对象 中,然后使用 RocketMQ 的原生 API 分别创建生产者 Bean 和拉取消费者 Bean , 分别将两个 bean 设置到 RocketMQTemplate...图片 RocketMQTemplate 类封装了拉取消费者的receive方法,以方便开发者使用。
创建生产者 生产者发送消息一般都是通过rocketMQTemplate来发送消息,原理是在引入starter包后会有自动配置类RocketMQAutoConfiguration,里面定义了几个bean...defaultMQProducer defaultLitePullConsumer rocketMQTemplate 如果不重写覆盖bean会默认使用这些bean的内容。...@RestController public class RocketController { @Autowired private RocketMQTemplate rocketMQTemplate...发送同步消息 */ @GetMapping("/rocket") public void rocket() { SendResult sendResult = rocketMQTemplate.syncSend.../** * 异步消息 */ @GetMapping("/rocket/tag1") public void rocketTag2() { rocketMQTemplate.asyncSend
messages.getBody()));}}发送同步消息生产者@Testvoid sendMsg() { /** * 发送同步消息 * destination 目的地-主题 * payload 消息 */ rocketMQTemplate.syncSend...messages.getBody())); }}发送异步消息生产者@Testvoid asyncTest() { /** * 发送异步消息 * destination 目的地-主题 * payload 消息 */ rocketMQTemplate.asyncSend...timestamp 连接超时 * delayLevel 延时级别 */ Message msg = MessageBuilder.withPayload("延时消息").build(); rocketMQTemplate.syncSend...("TagMQ:tagA","带tagA的消息"); rocketMQTemplate.syncSend("TagMQ:tagB","带tagB的消息");}消费者@Component@RocketMQMessageListener...MessageBuilder .withPayload("带key消息"). setHeader(RocketMQHeaders.KEYS, Key) .build(); /** * 带Key消息 */ rocketMQTemplate.syncSend
领取专属 10元无门槛券
手把手带您无忧上云