前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >【RocketMq实战第二篇】-SpringBoot集成RocketMq

【RocketMq实战第二篇】-SpringBoot集成RocketMq

作者头像
胖虎
发布2019-06-26 16:56:36
4.8K0
发布2019-06-26 16:56:36
举报
文章被收录于专栏:晏霖

前言 本文笔者是一波三折啊,很多人像我一样第一次在springboot里搞rocketmq的,遇到各种麻烦,我也是一样,就比如:

踩坑:

nameserver启动成功了,broker启动失败,然后broker日志文件没有生成,错误原因也找不到。 在java里发送消息时发送失败,幸好有报错信息[Send [3] times, still failed],就是重发三次也没发出去的意思。网上资料说是broker启动方式不对,然而并没有解决 ……

本文是简单的使用发送消息和消费消息,让大家了解一下流程。

正文 在此之前我们要启动一个nameserver和一个broker

上文我已经介绍了,链接如下:启动消息服务

第一步

创建一个maven项目,pom文件如下:

代码语言:javascript
复制
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>
    <groupId>rocketmq_demo</groupId>
    <artifactId>rocketmq_demo</artifactId>
    <version>1.0.0</version>
    <packaging>jar</packaging>
    <parent>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-parent</artifactId>
        <version>2.0.0.RELEASE</version>
    </parent>
    <dependencies>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-actuator</artifactId>
        </dependency>
        <dependency>
            <groupId>org.apache.rocketmq</groupId>
            <artifactId>rocketmq-tools</artifactId>
            <version>4.2.0</version>
        </dependency>
    </dependencies>
</project>

第二步

创建启动类,RocketMQApp,这个启动类当作生产者

代码语言:javascript
复制
package com;

import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.remoting.common.RemotingHelper;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;

/**
 * @author yanlin
 * @version v1.3
 * @date 2019-01-24 3:43 PM
 * @since v8.0
 **/
@SpringBootApplication
public class RocketMQApp {
    public static void main(String[] args) throws MQClientException, InterruptedException {
        SpringApplication.run(RocketMQApp.class, args);
        DefaultMQProducer producer = new DefaultMQProducer("please_rename_unique_group_name");
        producer.setNamesrvAddr("127.0.0.1:9876");
        producer.start();
        for (int i = 0; i < 20; i++) {
            try {
                Message msg = new Message("TopicTest" /* Topic */,
                        "TagA" /* Tag */,
                        ("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET) /* Message body */
                );
                SendResult sendResult = producer.send(msg);
                System.out.printf("%s%n", sendResult);
            } catch (Exception e) {
                e.printStackTrace();
                Thread.sleep(1000);
            }
        }
        producer.shutdown();
    }
}

第三步

创建消费者 RocketMQConsumer,代码如下

代码语言:javascript
复制
package com.rocketmq;

import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.common.consumer.ConsumeFromWhere;
import org.apache.rocketmq.common.message.MessageExt;

import java.util.List;

/**
 * @author yanlin
 * @version v1.3
 * @date 2019-01-24 3:24 PM
 * @since v8.0
 **/
public class RocketMQConsumer {
    public static void main(String[] args) throws MQClientException {
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("please_rename_unique_group_name");
        consumer.setNamesrvAddr("127.0.0.1:9876");
        consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
        consumer.subscribe("TopicTest", "*");
        consumer.registerMessageListener(new MessageListenerConcurrently() {
            @Override
            public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,
                                                            ConsumeConcurrentlyContext context) {
                System.out.printf(Thread.currentThread().getName() + " Receive New Messages: " + msgs + "%n");
                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
            }
        });
        consumer.start();
        System.out.printf("Consumer Started.%n");
    }
}

最后一步

右键启动启动类-RocketMQApp,观察控制台:

主要发送消息的日志如下

代码语言:javascript
复制
SendResult [sendStatus=SEND_OK, msgId=0A08406F070318B4AAC27A2D94180000, offsetMsgId=0A08406F00002A9F0000000000058B56, messageQueue=MessageQueue [topic=TopicTest, brokerName=yanlin.local, queueId=0], queueOffset=505]
SendResult [sendStatus=SEND_OK, msgId=0A08406F070318B4AAC27A2D94270001, offsetMsgId=0A08406F00002A9F0000000000058C08, messageQueue=MessageQueue [topic=TopicTest, brokerName=yanlin.local, queueId=1], queueOffset=505]
SendResult [sendStatus=SEND_OK, msgId=0A08406F070318B4AAC27A2D942A0002, offsetMsgId=0A08406F00002A9F0000000000058CBA, messageQueue=MessageQueue [topic=TopicTest, brokerName=yanlin.local, queueId=2], queueOffset=505]
………………

右键启动消费者-RocketMQConsumer,观察控制台如下

Receive New Messages 后面的就是消息主体

代码语言:javascript
复制
ConsumeMessageThread_1 Receive New Messages: [MessageExt [queueId=0, storeSize=178, queueOffset=505, sysFlag=0, bornTimestamp=1548321807386, bornHost=/10.8.64.111:50266, storeTimestamp=1548321807396, storeHost=/10.8.64.111:10911, msgId=0A08406F00002A9F0000000000058B56, commitLogOffset=363350, bodyCRC=613185359, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message [topic=TopicTest, flag=0, properties={MIN_OFFSET=0, MAX_OFFSET=506, CONSUME_START_TIME=1548321807411, UNIQ_KEY=0A08406F070318B4AAC27A2D94180000, WAIT=true, TAGS=TagA}, body=16]]]
…………

我们发送了20条,然后成功消费20条,这就是rocketmq最简单的应用案例了。以后的篇幅我就会讲解更细致的内容,让大家能适用任何场景下,让大家了解各个对象的原理。

本文参与 腾讯云自媒体同步曝光计划,分享自微信公众号。
原始发表:2019-02-05,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 晏霖 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档