前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >SpringBoot RocketMQ 整合使用和监控

SpringBoot RocketMQ 整合使用和监控

作者头像
botkenni
发布2022-05-06 15:58:06
7420
发布2022-05-06 15:58:06
举报
文章被收录于专栏:IT码农

创建项目

在 IDEA 创建一个 SpringBoot 项目,项目结构如下:

pom 文件

引入 RocketMQ 的一些相关依赖,最后的 pom 文件如下:

代码语言:javascript
复制
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
	xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
	<modelVersion>4.0.0</modelVersion>

	<groupId>com.zhisheng</groupId>
	<artifactId>rocketmq</artifactId>
	<version>0.0.1-SNAPSHOT</version>
	<packaging>jar</packaging>

	<name>rocketmq</name>
	<description>Demo project for Spring Boot RocketMQ</description>

	<parent>
		<groupId>org.springframework.boot</groupId>
		<artifactId>spring-boot-starter-parent</artifactId>
		<version>1.5.9.RELEASE</version>
		<relativePath/> <!-- lookup parent from repository -->
	</parent>

	<properties>
		<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
		<project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
		<java.version>1.8</java.version>
	</properties>

	<dependencies>
		<dependency>
			<groupId>org.springframework.boot</groupId>
			<artifactId>spring-boot-starter-web</artifactId>
		</dependency>

		<dependency>
			<groupId>org.springframework.boot</groupId>
			<artifactId>spring-boot-starter-test</artifactId>
			<scope>test</scope>
		</dependency>

		<dependency>
			<groupId>org.apache.rocketmq</groupId>
			<artifactId>rocketmq-common</artifactId>
			<version>4.2.0</version>
		</dependency>

		<dependency>
			<groupId>org.apache.rocketmq</groupId>
			<artifactId>rocketmq-client</artifactId>
			<version>4.2.0</version>
		</dependency>
	</dependencies>

	<build>
		<plugins>
			<plugin>
				<groupId>org.springframework.boot</groupId>
				<artifactId>spring-boot-maven-plugin</artifactId>
			</plugin>
		</plugins>
	</build>
</project>
配置文件

application.properties 中如下:

代码语言:javascript
复制
# 消费者的组名
apache.rocketmq.consumer.PushConsumer=PushConsumer
# 生产者的组名
apache.rocketmq.producer.producerGroup=Producer
# NameServer地址
apache.rocketmq.namesrvAddr=localhost:9876
生产者
代码语言:javascript
复制
package com.zhisheng.rocketmq.client;

import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.remoting.common.RemotingHelper;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;
import org.springframework.util.StopWatch;

import javax.annotation.PostConstruct;

/**
 * Created by zhisheng_tian on 2018/2/6
 */
@Component
public class RocketMQClient {
    /**
     * 生产者的组名
     */
    @Value("${apache.rocketmq.producer.producerGroup}")
    private String producerGroup;

    /**
     * NameServer 地址
     */
    @Value("${apache.rocketmq.namesrvAddr}")
    private String namesrvAddr;

    @PostConstruct
    public void defaultMQProducer() {
        //生产者的组名
        DefaultMQProducer producer = new DefaultMQProducer(producerGroup);
        //指定NameServer地址,多个地址以 ; 隔开
        producer.setNamesrvAddr(namesrvAddr);

        try {
            /**
             * Producer对象在使用之前必须要调用start初始化,初始化一次即可
             * 注意:切记不可以在每次发送消息时,都调用start方法
             */
            producer.start();

          	 //创建一个消息实例,包含 topic、tag 和 消息体
             //如下:topic 为 "TopicTest",tag 为 "push"
            Message message = new Message("TopicTest", "push", "发送消息----zhisheng-----".getBytes(RemotingHelper.DEFAULT_CHARSET));

            StopWatch stop = new StopWatch();
            stop.start();

            for (int i = 0; i < 10000; i++) {
                SendResult result = producer.send(message);
                System.out.println("发送响应:MsgId:" + result.getMsgId() + ",发送状态:" + result.getSendStatus());
            }
            stop.stop();
            System.out.println("----------------发送一万条消息耗时:" + stop.getTotalTimeMillis());
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            producer.shutdown();
        }
    }
}
消费者
代码语言:javascript
复制
package com.zhisheng.rocketmq.server;

import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.common.consumer.ConsumeFromWhere;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.remoting.common.RemotingHelper;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;

import javax.annotation.PostConstruct;

/**
 * Created by zhisheng_tian on 2018/2/6
 */
@Component
public class RocketMQServer {
    /**
     * 消费者的组名
     */
    @Value("${apache.rocketmq.consumer.PushConsumer}")
    private String consumerGroup;

    /**
     * NameServer 地址
     */
    @Value("${apache.rocketmq.namesrvAddr}")
    private String namesrvAddr;

    @PostConstruct
    public void defaultMQPushConsumer() {
        //消费者的组名
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(consumerGroup);

        //指定NameServer地址,多个地址以 ; 隔开
        consumer.setNamesrvAddr(namesrvAddr);
        try {
            //订阅PushTopic下Tag为push的消息
            consumer.subscribe("TopicTest", "push");

            //设置Consumer第一次启动是从队列头部开始消费还是队列尾部开始消费
            //如果非第一次启动,那么按照上次消费的位置继续消费
            consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
            consumer.registerMessageListener((MessageListenerConcurrently) (list, context) -> {
                try {
                    for (MessageExt messageExt : list) {

                        System.out.println("messageExt: " + messageExt);//输出消息内容

                        String messageBody = new String(messageExt.getBody(), RemotingHelper.DEFAULT_CHARSET);

                        System.out.println("消费响应:msgId : " + messageExt.getMsgId() + ",  msgBody : " + messageBody);//输出消息内容
                    }
                } catch (Exception e) {
                    e.printStackTrace();
                    return ConsumeConcurrentlyStatus.RECONSUME_LATER; //稍后再试
                }
                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; //消费成功
            });
            consumer.start();
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}
启动类
代码语言:javascript
复制
package com.zhisheng.rocketmq;

import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;

@SpringBootApplication
public class RocketmqApplication {

	public static void main(String[] args) {
		SpringApplication.run(RocketmqApplication.class, args);
	}
}

RocketMQ

代码已经都写好了,接下来我们需要将与 RocketMQ 有关的启动起来。

启动 Name Server

进入到目录 :

代码语言:javascript
复制
cd distribution/target/apache-rocketmq

启动:

代码语言:javascript
复制
nohup sh bin/mqnamesrv &
tail -f ~/logs/rocketmqlogs/namesrv.log //通过日志查看是否启动成功
启动 Broker
代码语言:javascript
复制
nohup sh bin/mqbroker -n localhost:9876 &
tail -f ~/logs/rocketmqlogs/broker.log    //通过日志查看是否启动成功

然后运行启动类,运行效果如下:

监控

RocketMQ有一个对其扩展的开源项目 ocketmq-console ,如今也提交给了 Apache ,

地址在:https://github.com/apache/rocketmq-externals/tree/master/rocketmq-console

官方也给出了其支持的功能的中文文档:https://github.com/apache/rocketmq-externals/blob/master/rocketmq-console/doc/1_0_0/UserGuide_CN.md

那么该如何安装?

Docker 安装

1、获取 Docker 镜像

代码语言:javascript
复制
docker pull styletang/rocketmq-console-ng

2、运行,注意将你自己的 NameServer 地址替换下面的 127.0.0.1

代码语言:javascript
复制
docker run -e "JAVA_OPTS=-Drocketmq.namesrv.addr=127.0.0.1:9876 -Dcom.rocketmq.sendMessageWithVIPChannel=false" -p 8080:8080 -t styletang/rocketmq-console-ng
非 Docker 安装

我们 git clone 一份代码到本地:

代码语言:javascript
复制
git clone https://github.com/apache/rocketmq-externals.git

cd rocketmq-externals/rocketmq-console/

需要 jdk 1.7 以上。 执行以下命令:

代码语言:javascript
复制
mvn clean package -Dmaven.test.skip=true
java -jar target/rocketmq-console-ng-1.0.0.jar

注意:

1、如果你下载依赖缓慢,你可以重新设置 maven 的 mirror 的镜像

代码语言:javascript
复制
<mirrors>
    <mirror>
          <id>alimaven</id>
          <name>aliyun maven</name>
          <url>http://maven.aliyun.com/nexus/content/groups/public/</url>
          <mirrorOf>central</mirrorOf>        
    </mirror>
</mirrors>

2、如果你使用的 RocketMQ 版本小于 3.5.8,如果您使用 rocketmq < 3.5.8,请在启动 rocketmq-console-ng 时添加 -Dcom.rocketmq.sendMessageWithVIPChannel = false(或者您可以在 ops 页面中更改它)

3、更改 resource / application.properties 中的 rocketmq.config.namesrvAddr(或者可以在ops页面中更改它)

错误解决方法

1、Docker 启动项目报错

org.apache.rocketmq.remoting.exception.RemotingConnectException: connect to <null> failed

将 Docker 启动命令改成如下以后:

代码语言:javascript
复制
docker run -e "JAVA_OPTS=-Drocketmq.config.namesrvAddr=127.0.0.1:9876 -Drocketmq.config.isVIPChannel=false" -p 8080:8080 -t styletang/rocketmq-console-ng

报错信息改变了,新的报错信息如下:

代码语言:javascript
复制
ERROR op=global_exception_handler_print_error
org.apache.rocketmq.console.exception.ServiceException: This date have't data!
 

看到网上有人也遇到这个问题,他们都通过自己的方式解决了,但是方法我都试了,不适合我。不得不说,阿里,你能再用心点吗?既然把 RocketMQ 捐给 Apache 了,这些文档啥的都必须更新啊,不要还滞后着呢,不然少不了被吐槽!

搞了很久这种方法没成功,暂时放弃!mmp

2、非 Docker 安装,只好把源码编译打包了。

1) 注意需要修改如下图中的配置:

rocketmq.config.namesrvAddr=localhost:9876        //注意替换你自己的ip

#如果你 rocketmq 版本小于 3.5.8 才需设置 `rocketmq.config.isVIPChannel` 为 false,默认是 true, 这个可以在源码中可以看到的 rocketmq.config.isVIPChannel=

2) 执行以下命令:

代码语言:javascript
复制
mvn clean package -Dmaven.test.skip=true

编译成功:

可以看到已经打好了 jar 包:

运行:

代码语言:javascript
复制
java -jar rocketmq-console-ng-1.0.0.jar

成功,不报错了,开心😄,访问 http://localhost:8080/

整个监控大概就是这些了。

然后我运行之前的 SpringBoot 整合项目,查看监控信息如下:

本文参与 腾讯云自媒体同步曝光计划,分享自作者个人站点/博客。
原始发表:2022-05-05,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 作者个人站点/博客 前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 创建项目
    • pom 文件
      • 配置文件
        • 生产者
          • 消费者
            • 启动类
            • RocketMQ
              • 启动 Name Server
                • 启动 Broker
                • 监控
                  • Docker 安装
                    • 非 Docker 安装
                    • 错误解决方法
                    相关产品与服务
                    容器镜像服务
                    容器镜像服务(Tencent Container Registry,TCR)为您提供安全独享、高性能的容器镜像托管分发服务。您可同时在全球多个地域创建独享实例,以实现容器镜像的就近拉取,降低拉取时间,节约带宽成本。TCR 提供细颗粒度的权限管理及访问控制,保障您的数据安全。
                    领券
                    问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档