前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >spring整合中间件(RocketMQ、kafka、RabbitMQ、ActiveMQ、ZeroMQ、TubeMQ、NSQ)

spring整合中间件(RocketMQ、kafka、RabbitMQ、ActiveMQ、ZeroMQ、TubeMQ、NSQ)

作者头像
逍遥壮士
发布2021-03-23 14:47:31
1.1K0
发布2021-03-23 14:47:31
举报
文章被收录于专栏:技术趋势技术趋势

文代码:https://gitee.com/hong99/spring/issues/I1N1DF


注:由于本文比较长,建议还是下载源码学习。


中间件是什么?

中间件是介于应用系统和系统软件之间的一类软件,它使用系统软件所提供的基础服务(功能),衔接网络上应用系统的各个部分或不同的应用,能够达到资源共享、功能共享的目的。---百度百科

消息中间件利用高效可靠的消息传递机制进行平台无关的数据交流,并基于数据通信来进行分布式系统的集成。通过提供消息传递和消息排队模型,它可以在分布式环境下扩展进程间的通信。对于消息中间件,常见的角色大致也就有Producer(生产者)、Consumer(消费者)。

参考地址:https://baike.baidu.com/item/%E4%B8%AD%E9%97%B4%E4%BB%B6/452240?fr=aladdin

中间件解决了什么问题?

使高度耦合的系统解耦;

瞬时高峰的削峰处理;

保证数据安全性和最终一致;

原来系统间的交互是通过JMS或http协议调用,在效率、安全、可靠上面都不是很高,而中间件可以让原有的复杂系统解耦,分为消息端、生产方,队列,并且还可以让数据保存到中间件持久下来,也解决了数据丢失问题,数据也可以异步化,在一些平台大触或高峰时期的时候可以起到削峰作用,避免同步产生的一系列问题。

中间件有哪些?

RocketMQ

Rocket Mqj是阿里巴巴开源,是一个分布式消息和流数据平台,具有低 延迟、高性能、高可靠性、万亿级容量和灵活的可扩展性。RocketMQ是2012年阿里巴巴开源的第三代分布式消息中间件,2016年11月21日,阿里巴巴向Apache软件基金会捐赠了RocketMQ;第二年2月20日,Apache软件基金会宣布Apache RocketMQ成为顶级项目。

维基百科:https://zh.wikipedia.org/wiki/Apache_RocketMQ

官网:https://rocketmq.apache.org/

kafka

Kafka最初由领英开发开源,由Apache软件基金会开发的一个开源流处理平台,由Scala和Java编写。该项目的目标是为处理实时数据提供一个统一、高吞吐、低延迟的平台。其持久化层本质上是一个“按照分布式事务日志架构的大规模发布/订阅消息队列”,[3]这使它作为企业级基础设施来处理流式数据非常有价值。此外,Kafka可以通过Kafka Connect连接到外部系统(用于数据输入/输出),并提供了Kafka Streams——一个Java流式处理库。---维基百科

官网:http://kafka.apache.org/

RabbitMQ

Rabbit科技有限公司开发了RabbitMQ,RabbitMQ是实现了高级消息队列协议(AMQP)的开源消息代理软件(亦称面向消息的中间件)。RabbitMQ服务器是用Erlang语言编写的,而群集和故障转移是构建在开放电信平台框架上的。所有主要的编程语言均有与代理接口通讯的客户端库。

官网:https://www.rabbitmq.com/

ActiveMQ

Apache ActiveMQ是Apache软件基金会所研发的开放源代码消息中间件;由于ActiveMQ是一个纯Java程序,因此只需要操作系统支持Java虚拟机,ActiveMQ便可执行。

官网:http://activemq.apache.org/index.html

ZeroMQ

ZeroMQ(也称为ØMQ,0MQ或ZMQ)是一种高性能的异步消息传递库,旨在用于分布式或并发应用程序中。它提供了一个消息队列,但是与面向消息的中间件不同,ZeroMQ系统可以在没有专用消息代理的情况下运行。

官网:https://zeromq.org/

TubeMQ

apache TubeMQ是腾讯开源万亿级分布式消息中间件,专注于海量数据下的数据传输和存储,与许多开源MQ项目相比,TubeMQ在稳定性、性能和低成本方面具有独特的优势。

官网:https://tubemq.apache.org/zh-cn/

NSQ

NSQ 是无中心设计、节点自动注册和发现的开源消息系统。可作为内部通讯框架的基础,易于配置和发布。

官网:https://nsq.io/

....

消息中间件少说也有十几个多的话直的也太多了...不建议每个都深入了解,找一两个合适的就OK...

最后

代码实现

本文代码:https://gitee.com/hong99/spring/issues/I1N1DF

RocketMQ

基本概念

1 消息模型(Message Model)

RocketMQ主要由 Producer、Broker、Consumer 三部分组成,其中Producer 负责生产消息,Consumer 负责消费消息,Broker 负责存储消息。Broker 在实际部署过程中对应一台服务器,每个 Broker 可以存储多个Topic的消息,每个Topic的消息也可以分片存储于不同的 Broker。Message Queue 用于存储消息的物理地址,每个Topic中的消息地址存储于多个 Message Queue 中。ConsumerGroup 由多个Consumer 实例构成。

2 消息生产者(Producer) 负责生产消息,一般由业务系统负责生产消息。一个消息生产者会把业务应用系统里产生的消息发送到broker服务器。RocketMQ提供多种发送方式,同步发送、异步发送、顺序发送、单向发送。同步和异步方式均需要Broker返回确认信息,单向发送不需要。

3 消息消费者(Consumer)

负责消费消息,一般是后台系统负责异步消费。一个消息消费者会从Broker服务器拉取消息、并将其提供给应用程序。从用户应用的角度而言提供了两种消费形式:拉取式消费、推动式消费。

4 主题(Topic)

表示一类消息的集合,每个主题包含若干条消息,每条消息只能属于一个主题,是RocketMQ进行消息订阅的基本单位。

5 代理服务器(Broker Server)

消息中转角色,负责存储消息、转发消息。代理服务器在RocketMQ系统中负责接收从生产者发送来的消息并存储、同时为消费者的拉取请求作准备。代理服务器也存储消息相关的元数据,包括消费者组、消费进度偏移和主题和队列消息等。

6 名字服务(Name Server)

名称服务充当路由消息的提供者。生产者或消费者能够通过 名字服务查找各主题相应的Broker IP列表。多个Namesrv实例组成集群,但相互独立,没有信息交换。

7 拉取式消费(Pull Consumer)

Consumer消费的一种类型,应用通常主动调用Consumer的拉消息方法从Broker服务器拉消息、主动权由应用控制。一旦获取了批量消息,应用就会启动消费过程。

8 推动式消费(Push Consumer)

Consumer消费的一种类型,该模式下Broker收到数据后会主动推送给消费端,该消费模式一般实时性较高。

9 生产者组(Producer Group)

同一类Producer的集合,这类Producer发送同一类消息且发送逻辑一致。如果发送的是事务消息且原始生产者在发送之后崩溃,则Broker服务器会联系同一生产者组的其他生产者实例以提交或回溯消费。

10 消费者组(Consumer Group)

同一类Consumer的集合,这类Consumer通常消费同一类消息且消费逻辑一致。消费者组使得在消息消费方面,实现负载均衡和容错的目标变得非常容易。要注意的是,消费者组的消费者实例必须订阅完全相同的Topic。RocketMQ 支持两种消息模式:集群消费(Clustering)和广播消费(Broadcasting)。

11 集群消费(Clustering)

集群消费模式下,相同Consumer Group的每个Consumer实例平均分摊消息。

12 广播消费(Broadcasting)

广播消费模式下,相同Consumer Group的每个Consumer实例都接收全量的消息。

13 普通顺序消息(Normal Ordered Message)

普通顺序消费模式下,消费者通过同一个消费队列收到的消息是有顺序的,不同消息队列收到的消息则可能是无顺序的。

14 严格顺序消息(Strictly Ordered Message)

严格顺序消息模式下,消费者收到的所有消息均是有顺序的。

15 消息(Message)

消息系统所传输信息的物理载体,生产和消费数据的最小单位,每条消息必须属于一个主题。RocketMQ中每个消息拥有唯一的Message ID,且可以携带具有业务标识的Key。系统提供了通过Message ID和Key查询消息的功能。

16 标签(Tag)

为消息设置的标志,用于同一主题下区分不同类型的消息。来自同一业务单元的消息,可以根据不同业务目的在同一主题下设置不同标签。标签能够有效地保持代码的清晰度和连贯性,并优化RocketMQ提供的查询系统。消费者可以根据Tag实现对不同子主题的不同消费逻辑,实现更好的扩展性。

特性(features)

订阅与发布:消息的发布是指某个生产者向某个topic发送消息;消息的订阅是指某个消费者关注了某个topic中带有某些tag的消息,进而从该topic消费数据。

消息顺序:消息有序指的是一类消息消费时,能按照发送的顺序来消费。

消息过滤:RocketMQ的消费者可以根据Tag进行消息过滤,也支持自定义属性过滤。

消息可靠性:RocketMQ支持消息的高可靠。

至少一次:至少一次(At least Once)指每个消息必须投递一次。

回溯消费:回溯消费是指Consumer已经消费成功的消息,由于业务上需求需要重新消费,要支持此功能,Broker在向Consumer投递成功消息后,消息仍然需要保留。

事务消息:RocketMQ事务消息(Transactional Message)是指应用本地事务和发送消息操作可以被定义到全局事务中,要么同时成功,要么同时失败。

定时消息:定时消息(延迟队列)是指消息发送到broker后,不会立即被消费,等待特定时间投递给真正的topic。

消息重试:Consumer消费消息失败后,要提供一种重试机制,令消息再消费一次。

消息重投:生产者在发送消息时,同步消息失败会重投,异步消息有重试,oneway没有任何保证。

流量控制:生产者流控,因为broker处理能力达到瓶颈;消费者流控,因为消费能力达到瓶颈。

死信队列:死信队列用于处理无法被正常消费的消息。

架构设计

NameServer 是注册中心,类似于zk,这个是rocketmq自研的注册中心;

Broker是节点服务器,类似于主机

producer是生产者;

Consumer是消费者;

流程:Broker每次启动的时候都会向NameServer注册,而NameServer每隔30秒会自动检测Broker是否存活若Broker死亡则从NameServer中移除,

详细可以参考:https://github.com/apache/rocketmq/blob/master/docs/cn/architecture.md

基于windows 10安装rocketmq

环境要求:

64位 windows10;

64位JDK 1.8+;

Maven 3.2.x;

Git;

足够的4G磁盘空间

下载reocketmq:https://www.apache.org/dyn/closer.cgi?path=rocketmq/4.8.0/rocketmq-all-4.8.0-bin-release.zip

配置环境变量

代码语言:javascript
复制
ROCKETMQ_HOME="你的目标"
NAMESRV_ADDR="localhost:9876"

运行

遇到问题

代码语言:javascript
复制
Please set the ROCKETMQ_HOME variable in your environment!

解决方法

powershell配置环境变量

代码语言:javascript
复制
$Env:ROCKETMQ_HOME="你的地址"
$Env:NAMESRV_ADDR="localhost:9876"

如下:

发现The Name Server boot success. 就证明配置是OK的。

启动broker

代码语言:javascript
复制
start mqbroker.cmd -n 127.0.0.1:9876 autoCreateTopicEnable=true

注意:启动 mqbroker.cmd会报找不到主类,需要修改 mqbroker下的%CLASSPATH%添加双引号~

部署管理后台

下载代码:https://github.com/apache/rocketmq-externals

解压后进入:..\rocketmq-externals-master\rocketmq-console\src\main\resources 打开application.properties

\rocketmq-externals-master\rocketmq-console 修改pom.xml 版本

进入\rocketmq-externals-master\rocketmq-console根目录进行打包:

代码语言:javascript
复制
mvn clean package -Dmaven.test.skip=true

进入生成的target目录进得运下命令如下:

代码语言:javascript
复制
java -jar rocketmq-console-ng-2.0.0.jar

然后访问管理台 127.0.0.1:端口 ,这里我改的是8081所以就是127.0.0.1:8081

java实现rocketmq

搭好单节点的rocketmq和管理平台,接下来通过maven纯java进行学习测试。

相关参数:https://github.com/apache/rocketmq/blob/master/docs/cn/client/java/API_Reference_DefaultMQProducer.md

官方案例https://github.com/apache/rocketmq/blob/master/docs/cn/RocketMQ_Example.md

目录结构

代码语言:javascript
复制
│ pom.xml
│
└─src
    ├─main
    │ ├─java
    │ │ └─com
    │ │ └─hong
    │ │ └─rocketmq
    │ │ ├─consumer
    │ │ │ Consumer.java
    │ │ │ ScheduledMessageConsumer.java
    │ │ │
    │ │ └─producer
    │ │ AsyncProducer.java
    │ │ BroadcastProducer.java
    │ │ OnewayProducer.java
    │ │ ScheduledMessageProducer.java
    │ │ SyncProducer.java
    │ │
    │ └─resources
    └─test
        └─java

spring_mq/java_rocketmq/pom.xml

代码语言:javascript
复制
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <parent>
        <artifactId>spring_mq</artifactId>
        <groupId>com.hong</groupId>
        <version>1.0-SNAPSHOT</version>
    </parent>
    <modelVersion>4.0.0</modelVersion>

    <artifactId>java_rocketmq</artifactId>
    <build>
        <plugins>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-compiler-plugin</artifactId>
                <configuration>
                    <source>6</source>
                    <target>6</target>
                </configuration>
            </plugin>
        </plugins>
    </build>

    <dependencies>
        <dependency>
            <groupId>org.apache.rocketmq</groupId>
            <artifactId>rocketmq-client</artifactId>
            <version>4.8.0</version>
        </dependency>
    </dependencies>

</project>

com.hong.rocketmq.producer.SyncProducer

代码语言:javascript
复制
package com.hong.rocketmq.producer;

import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.remoting.common.RemotingHelper;

/**
 * @author: csh
 * @Date: 2021/3/12 09:50
 * @Description:同步发送消息
 */
public class SyncProducer {
    public static void main(String[] args) throws Exception {
        // 实例化消息生产者Producer
        DefaultMQProducer producer = new DefaultMQProducer("please_rename_unique_group_name");
        // 设置NameServer的地址
        producer.setNamesrvAddr("localhost:9876");
        // 启动Producer实例
        producer.start();
        for (int i = 0; i < 100; i++) {
            // 创建消息,并指定Topic,Tag和消息体
            Message msg = new Message("TopicTest" /* Topic */,
                    "TagA" /* Tag */,
                    ("Hello RocketMQ 我是同步发送 " + i).getBytes(RemotingHelper.DEFAULT_CHARSET) /* Message body */
            );
            // 发送消息到一个Broker
            SendResult sendResult = producer.send(msg);
            // 通过sendResult返回消息是否成功送达
            System.out.printf("%s%n", sendResult);
        }
        // 如果不再发送消息,关闭Producer实例。
        producer.shutdown();
    }
}

com.hong.rocketmq.consumer.Consumer

代码语言:javascript
复制
package com.hong.rocketmq.consumer;

import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.common.message.MessageExt;

import java.util.List;
/**
 *
 * 功能描述: 消费端
 *
 * @param: 
 * @return: 
 * @auther: csh
 * @date: 2021/3/12 11:02
 */
public class Consumer {

    public static void main(String[] args) throws InterruptedException, MQClientException {

        // Instantiate with specified consumer group name.
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("please_rename_unique_group_name");
         
        // Specify name server addresses.
        consumer.setNamesrvAddr("localhost:9876");
        
        // Subscribe one more more topics to consume.
        consumer.subscribe("TopicTest", "*");
        // Register callback to execute on arrival of messages fetched from brokers.
        consumer.registerMessageListener(new MessageListenerConcurrently() {

            @Override
            public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,
                                                            ConsumeConcurrentlyContext context) {
                System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msgs);


                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
            }
        });

        //Launch the consumer instance.
        consumer.start();

        System.out.printf("Consumer Started.%n");
    }
}

可以把消息打开放到那里,然后再运行生产端,结果如下:

生产发送数据

接收端(consumer)

到你的本地ip:8081 中Message 查看如下:

写了几个java的这里不一一贴出来,建议下载代码学习。

遇到过的问题

代码语言:javascript
复制
org.apache.rocketmq.client.exception.MQBrokerException: CODE: 14  DESC: service not available now. It may be caused by one of the following reasons: the broker's disk is full [CL: 1.00 CQ: 1.00 INDEX: 1.00], messages are put to the slave, message store has been shut down, etc. BROKER: 10.3.6.59:10911

解决

将 c盘用户下面的store下面全部干掉!

代码语言:javascript
复制
C:\Users\你的用户\store

spring整合rocketmq

生产者 通过controller添加用户,然后发送给消费者进行添加。

生产者

项目结构

代码语言:javascript
复制
│ pom.xml
│ spring_rocketmq_producer.iml
│
├─src
│ ├─main
│ │ ├─java
│ │ │ └─com
│ │ │ └─hong
│ │ │ └─spring
│ │ │ ├─config
│ │ │ │ Producer.java
│ │ │ │ TopicAll.java
│ │ │ │
│ │ │ └─controller
│ │ │ │ UserController.java
│ │ │ │
│ │ │ └─ao
│ │ │ UserSaveAO.java
│ │ │
│ │ └─resources
│ │ application.properties
│ │ applicationContext.xml
│ │ log4j2.xml
│ │ logging.properties
│ │ rocketmq.properties
│ │ rocketmq.xml
│ │
│ └─test
│ └─java
└─web
    └─WEB-INF
            web.xml

com.hong.spring.config.Producer

代码语言:javascript
复制
package com.hong.spring.config;

import lombok.extern.log4j.Log4j2;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.DefaultMQProducer;

/**
 * @author: csh
 * @Date: 2021/3/16 09:43
 * @Description:rocketmq生产者
 */
@Log4j2
public class Producer{
    //生产实例
    private DefaultMQProducer defaultMQProducer;
    //生产组
    private String producerGroup;
    //地址
    private String namesrvAddr;

    /**
     * 初始化
     */
    public void init() throws MQClientException {
        // 参数信息
        log.info("DefaultMQProducer 初始化");
        log.info(producerGroup);
        log.info(namesrvAddr);
        // 初始化
        defaultMQProducer = new DefaultMQProducer(producerGroup);
        defaultMQProducer.setNamesrvAddr(namesrvAddr);
        defaultMQProducer.setInstanceName(String.valueOf(System.currentTimeMillis()));
        defaultMQProducer.start();
        log.info("rocketmq start success!");
    }


    public void destroy() {
        defaultMQProducer.shutdown();
    }

    public DefaultMQProducer getDefaultMQProducer() {
        return defaultMQProducer;
    }

    public void setProducerGroup(String producerGroup) {
        this.producerGroup = producerGroup;
    }

    public void setNamesrvAddr(String namesrvAddr) {
        this.namesrvAddr = namesrvAddr;
    }

}

com.hong.spring.config.TopicAll

代码语言:javascript
复制
package com.hong.spring.config;

/**
 * @author: csh
 * @Date: 2021/3/16 14:15
 * @Description:存放所有的topic
 */
public class TopicAll {
    //用户topic
    public static final String HONG_TOPIC="user_topic";
}

com.hong.spring.controller.ao.UserSaveAO

代码语言:javascript
复制
package com.hong.spring.config;

/**
 * @author: csh
 * @Date: 2021/3/16 14:15
 * @Description:存放所有的topic
 */
public class TopicAll {
    //用户topic
    public static final String HONG_TOPIC="user_topic";
}

com.hong.spring.controller.UserController

代码语言:javascript
复制
package com.hong.spring.controller;

import com.alibaba.fastjson.JSONObject;
import com.hong.spring.config.Producer;
import com.hong.spring.config.TopicAll;
import com.hong.spring.controller.ao.UserSaveAO;
import com.hong.spring.entity.User;
import com.hong.spring.utils.DataResponse;
import lombok.SneakyThrows;
import lombok.extern.log4j.Log4j2;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.client.producer.SendStatus;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.remoting.protocol.RemotingSerializable;
import org.springframework.beans.BeanUtils;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;

/**
 * @Auther: csh
 * @Date: 2020/8/18 16:11
 * @Description:
 */
@RestController
@RequestMapping("/user/")
@Log4j2
public class UserController {

    @Autowired
    private Producer hongProducer;


    @RequestMapping("save")
    public DataResponse<Boolean> save(UserSaveAO ao){
        log.info("添加用户入参{}",JSONObject.toJSONString(ao));
        if(null==ao){
            return DataResponse.BuildFailResponse("参数不能为空!");
        }
       try {
           User user =new User();
           BeanUtils.copyProperties(ao,user);
           //在正式的时候 可以将通tags再正式区分不同的业务 比如:更新 、新增 等
           Message message = new Message(TopicAll.HONG_TOPIC,"saveTags", RemotingSerializable.encode(user));
           //发送同步消息
           log.info("最终发出去的消息{}", JSONObject.toJSONString(message));
           SendResult send = hongProducer.getDefaultMQProducer().send(message);
           if(null==send || send.getSendStatus()!= SendStatus.SEND_OK){
               return DataResponse.BuildFailResponse("添加用户失败!");
           }
           return DataResponse.BuildFailResponse("添加用户成功!");
       }catch (Exception e){
           log.error("添加出错{}",e);
           return DataResponse.BuildFailResponse("添加出错请重试!");
       }
    }
}

application.properties

代码语言:javascript
复制
logging.level.root=WARN
logging.level.org.springframework.web=DEBUG
logging.level.org.hibernate=ERROR

applicationContext.xml

代码语言:javascript
复制
<?xml version="1.0" encoding="UTF-8" ?>
<beans xmlns="http://www.springframework.org/schema/beans"
      xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
      xmlns:context="http://www.springframework.org/schema/context" xmlns:tx="http://www.springframework.org/schema/tx"
      xmlns:mvc="http://www.springframework.org/schema/mvc"
      xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-4.0.xsd
    http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context-4.0.xsd http://www.springframework.org/schema/tx http://www.springframework.org/schema/tx/spring-tx.xsd http://www.springframework.org/schema/mvc http://www.springframework.org/schema/mvc/spring-mvc.xsd">


   <!-- 配置组件扫描 -->
   <context:component-scan base-package="com.hong.spring"></context:component-scan>
   <!--加载配置文件-->
   <context:property-placeholder location="classpath:rocketmq.properties"/>

   <!-- 开启注解 -->
   <context:annotation-config />

   <mvc:default-servlet-handler />


   <bean class="org.springframework.web.servlet.view.InternalResourceViewResolver"
        id="internalResourceViewResolver">
      <!-- 前缀 -->
      <property name="prefix" value="/WEB-INF/pages/" />
      <!-- 后缀 -->
      <property name="suffix" value=".html" />
      <property name="contentType" value="text/html"/>

   </bean>

   <!--开启mvc注解事务-->
   <!-- 定义注解驱动 -->
   <mvc:annotation-driven>
      <mvc:message-converters>
         <!-- 设置支持中文 -->
         <bean class="org.springframework.http.converter.StringHttpMessageConverter">
            <property name="supportedMediaTypes">
               <list>
                  <value>text/plain;charset=UTF-8</value>
                  <value>text/html;charset=UTF-8</value>
               </list>
            </property>
         </bean>
         <bean class="com.alibaba.fastjson.support.spring.FastJsonHttpMessageConverter"/>
      </mvc:message-converters>
   </mvc:annotation-driven>
</beans>

log4j2.xml

代码语言:javascript
复制
<?xml version="1.0" encoding="UTF-8"?>
<configuration status="INFO">
    <appenders>
        <Console name="Console" target="SYSTEM_OUT">
            <PatternLayout pattern="%d{HH:mm:ss.SSS} [%t] %-5level %logger{36} - %msg%n"/>
        </Console>
        <RollingFile name="RollingFile" fileName="logs/app.log"
                     filePattern="logs/$${date:yyyy-MM}/app-%d{MM-dd-yyyy}-%i.log.gz">
            <PatternLayout pattern="%d{yyyy.MM.dd 'at' HH:mm:ss z} %-5level %class{36} %L %M - %msg%xEx%n"/>
            <SizeBasedTriggeringPolicy size="5 MB"/>
        </RollingFile>
    </appenders>
    <loggers>
        <root level="DEBUG">
            <appender-ref ref="Console"/>
            <appender-ref ref="RollingFile"/>
        </root>
    </loggers>
</configuration>

logging.properties

代码语言:javascript
复制
org.apache.catalina.core.ContainerBase.[Catalina].level=INFO 
org.apache.catalina.core.ContainerBase.[Catalina].handlers=java.util.logging.ConsoleHandler

handlers = org.apache.juli.FileHandler, java.util.logging.ConsoleHandler

############################################################
# Handler specific properties.
# Describes specific configuration info for Handlers.
############################################################

org.apache.juli.FileHandler.level = FINE
org.apache.juli.FileHandler.directory = ../logs
org.apache.juli.FileHandler.prefix = error-debug.

java.util.logging.ConsoleHandler.level = FINE
java.util.logging.ConsoleHandler.formatter = java.util.logging.SimpleFormatter

rocketmq.properties

代码语言:javascript
复制
rocketmq.producerGroup=hong_group
rocketmq.namesrvAddr=127.0.0.1:9876

rocketmq.xml

代码语言:javascript
复制
<?xml version="1.0" encoding="UTF-8" ?>
<beans xmlns="http://www.springframework.org/schema/beans"
      xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
      xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-4.0.xsd">

   <bean id="hongProducer"  name="hongProducer" class="com.hong.spring.config.Producer" init-method="init" destroy-method="destroy">
      <property name="producerGroup" value="${rocketmq.producerGroup}" />
      <property name="namesrvAddr" value="${rocketmq.namesrvAddr}" />
   </bean>
</beans>

spring_mq/spring_rocketmq_producer/pom.xml

代码语言:javascript
复制
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <parent>
        <artifactId>spring_mq</artifactId>
        <groupId>com.hong</groupId>
        <version>1.0-SNAPSHOT</version>
    </parent>
    <modelVersion>4.0.0</modelVersion>

    <artifactId>spring_rocketmq_producer</artifactId>



    <dependencies>
        <dependency>
            <artifactId>spring_mq_common_api</artifactId>
            <version>1.0-SNAPSHOT</version>
            <groupId>com.hong</groupId>
        </dependency>
        <dependency>
            <groupId>org.apache.rocketmq</groupId>
            <artifactId>rocketmq-client</artifactId>
            <version>4.8.0</version>
        </dependency>
        <dependency>
            <groupId>org.apache.rocketmq</groupId>
            <artifactId>rocketmq-all</artifactId>
            <version>4.8.0</version>
            <type>pom</type>
        </dependency>
        <!--<dependency>-->
            <!--<groupId>org.apache.logging.log4j</groupId>-->
            <!--<artifactId>log4j-core</artifactId>-->
            <!--<version>2.9.0</version>-->
        <!--</dependency>-->
        <!--<dependency>-->
            <!--<groupId>org.apache.logging.log4j</groupId>-->
            <!--<artifactId>log4j-api</artifactId>-->
            <!--<version>2.5</version>-->
        <!--</dependency>-->
    </dependencies>

    <!--静态资源导出问题-->
    <build>
        <resources>
            <resource>
                <directory>src/main/java</directory>
                <includes>
                    <include>**/*.properties</include>
                    <include>**/*.xml</include>
                </includes>
                <filtering>false</filtering>
            </resource>
            <resource>
                <directory>src/main/resources</directory>
                <includes>
                    <include>**/*.properties</include>
                    <include>**/*.xml</include>
                </includes>
                <filtering>false</filtering>
            </resource>
        </resources>
    </build>
</project>

web.xml

代码语言:javascript
复制
<?xml version="1.0" encoding="UTF-8"?>
<web-app xmlns="http://xmlns.jcp.org/xml/ns/javaee"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://xmlns.jcp.org/xml/ns/javaee http://xmlns.jcp.org/xml/ns/javaee/web-app_3_1.xsd"
         version="3.1">
    <servlet>
        <servlet-name>spring_rocketmq_producer</servlet-name>
        <servlet-class>org.springframework.web.servlet.DispatcherServlet</servlet-class>
        <init-param>
            <param-name>contextConfigLocation</param-name>
            <param-value>classpath:applicationContext.xml,
                classpath:rocketmq.xml
            </param-value>
        </init-param>
        <load-on-startup>1</load-on-startup>
    </servlet>

    <filter>
        <filter-name>encodingFilter</filter-name>
        <filter-class>org.springframework.web.filter.CharacterEncodingFilter</filter-class>
        <init-param>
            <param-name>encoding</param-name>
            <param-value>UTF-8</param-value>
        </init-param>
        <init-param>
            <param-name>forceEncoding</param-name>
            <param-value>true</param-value>
        </init-param>
    </filter>
    <filter-mapping>
        <filter-name>encodingFilter</filter-name>
        <url-pattern>/*</url-pattern>
    </filter-mapping>

    <servlet-mapping>
        <servlet-name>spring_rocketmq_producer</servlet-name>
        <url-pattern>/</url-pattern>
    </servlet-mapping>
</web-app>
代码语言:javascript
复制
[2021-03-16 03:21:09,364] Artifact spring_rocketmq_producer:war exploded: Artifact is deployed successfully
[2021-03-16 03:21:09,364] Artifact spring_rocketmq_producer:war exploded: Deploy took 10,004 milliseconds
15:22:17.505 [http-nio-8080-exec-5] INFO com.hong.spring.controller.UserController - 添加用户入参{"age":1000,"username":"hong"}
15:22:17.518 [http-nio-8080-exec-5] INFO com.hong.spring.controller.UserController - 最终发出去的消息{"body":"eyJhZ2UiOjEwMDAsInVzZXJuYW1lIjoiaG9uZyJ9","delayTimeLevel":0,"flag":0,"properties":{"WAIT":"true","TAGS":"saveTags"},"tags":"saveTags","topic":"user_topic","waitStoreMsgOK":true}

消息状态

消息者

代码语言:javascript
复制
│ pom.xml
│ spring_rocketmq_consumer.iml
│
├─src
│ ├─main
│ │ ├─java
│ │ │ └─com
│ │ │ └─hong
│ │ │ └─spring
│ │ │ ├─config
│ │ │ │ Consumer.java
│ │ │ │
│ │ │ ├─dao
│ │ │ │ UserMapper.java
│ │ │ │
│ │ │ ├─listener
│ │ │ │ UserListener.java
│ │ │ │
│ │ │ ├─mapper
│ │ │ │ UserMapper.xml
│ │ │ │
│ │ │ └─provider
│ │ │ UserServiceImpl.java
│ │ │
│ │ └─resources
│ │ application.properties
│ │ applicationContext.xml
│ │ jdbc.properties
│ │ log4j2.xml
│ │ logging.properties
│ │ mybatis.xml
│ │ rocketmq.properties
│ │ rocketmq.xml
│ │
│ └─test
│ └─java
└─web
    └─WEB-INF
            web.xml

com.hong.spring.config.Consumer

代码语言:javascript
复制
package com.hong.spring.config;

import lombok.extern.log4j.Log4j2;
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.MessageListener;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.common.consumer.ConsumeFromWhere;
import org.apache.rocketmq.common.protocol.heartbeat.MessageModel;

/**
 * @author: csh
 * @Date: 2021/3/16 09:43
 * @Description:rocketmq生产者
 */
@Log4j2
public class Consumer {
    //生产实例
    private DefaultMQPushConsumer defaultMQConsumer;
    //生产组
    private String producerGroup;
    //地址
    private String namesrvAddr;
    //topic
    private String topic;
    //tag
    private String tag="*";
    //监听
    private MessageListener messageListener;

    /**
     * 初始化
     */
    public void init() throws MQClientException {
        // 参数信息
        log.info("DefaultMQProducer 初始化");
        log.info(producerGroup);
        log.info(namesrvAddr);
        // 初始化
        defaultMQConsumer = new DefaultMQPushConsumer(producerGroup);
        defaultMQConsumer.setNamesrvAddr(namesrvAddr);
        defaultMQConsumer.setInstanceName(String.valueOf(System.currentTimeMillis()));
        defaultMQConsumer.subscribe(topic,tag);
        // 设置Consumer第一次启动是从队列头部开始消费还是队列尾部开始消费<br>
        // 如果非第一次启动,那么按照上次消费的位置继续消费
        defaultMQConsumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
        // 设置为集群消费(区别于广播消费) CLUSTERING:集群消费 BROADCASTING:广播消费
        defaultMQConsumer.setMessageModel(MessageModel.CLUSTERING);
        defaultMQConsumer.setMessageListener(messageListener);
        defaultMQConsumer.start();
        log.info("rocketmqConsumer start success!");
    }


    public void destroy() {
        defaultMQConsumer.shutdown();
    }


    public void setProducerGroup(String producerGroup) {
        this.producerGroup = producerGroup;
    }

    public void setNamesrvAddr(String namesrvAddr) {
        this.namesrvAddr = namesrvAddr;
    }

    public void setTopic(String topic) {
        this.topic = topic;
    }

    public String getTag() {
        return tag;
    }

    public void setTag(String tag) {
        this.tag = tag;
    }

    public void setDefaultMQConsumer(DefaultMQPushConsumer defaultMQConsumer) {
        this.defaultMQConsumer = defaultMQConsumer;
    }

    public void setMessageListener(MessageListener messageListener) {
        this.messageListener = messageListener;
    }
}

com.hong.spring.dao.UserMapper

代码语言:javascript
复制
package com.hong.spring.dao;

import com.hong.spring.entity.User;
import com.hong.spring.entity.ao.UserAO;
import org.apache.ibatis.annotations.Param;

import java.util.List;

/**
 * @Auther: csh
 * @Date: 2020/8/18 15:04
 * @Description:用户dao层
 */

public interface UserMapper {

    /**
     *
     * 功能描述:查询总条数
     *
     * @param:
     * @return:
     * @auther: csh
     * @date: 2020/8/18 15:31
     */
    List<User> findAllUserList();
    /**
     *
     * 功能描述:获取总数
     *
     * @param:
     * @return:
     * @auther: csh
     * @date: 2020/8/18 15:30
     */
    int findAllTotal();
    /**
     *
     * 功能描述:更新
     *
     * @param:
     * @return:
     * @auther: csh
     * @date: 2020/8/18 15:30
     */
    int update(User user);
    /**
     *
     * 功能描述:添加
     *
     * @param:
     * @return:
     * @auther: csh
     * @date: 2020/8/19 18:39
     */
    int save(User user);
    /**
     *
     * 功能描述:批量添加
     *
     * @param:
     * @return:
     * @auther: csh
     * @date: 2020/8/21 15:46
     */
    int insertBatch(@Param("list") List <User> list);
    /**
     *
     * 功能描述:通过id查询
     *
     * @param:
     * @return:
     * @auther: csh
     * @date: 2020/8/19 18:39
     */
    User findById(int id);
    /**
     *
     * 功能描述:通过分页查询
     *
     * @param:
     * @return:
     * @auther: csh
     * @date: 2020/8/21 16:05
     */
    List<User> findByPage(UserAO ao);
}

com.hong.spring.listener.UserListener

代码语言:javascript
复制
package com.hong.spring.listener;

import com.alibaba.fastjson.JSONObject;
import com.hong.spring.api.IUserService;
import com.hong.spring.entity.User;
import com.hong.spring.utils.DataResponse;
import lombok.extern.log4j.Log4j2;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.remoting.protocol.RemotingSerializable;
import org.springframework.beans.factory.annotation.Autowired;

import java.util.List;

/**
 * @author: csh
 * @Date: 2021/3/16 11:14
 * @Description:用户监听
 */
@Log4j2
public class UserListener implements MessageListenerConcurrently {

    @Autowired
    private IUserService userService;

    @Override
    public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {

            try {
                for (MessageExt msg : msgs) {
                User user = RemotingSerializable.decode(msg.getBody(), User.class);
                    log.info("获取的用户信息{}", JSONObject.toJSONString(user));
                    DataResponse <Boolean> save = userService.save(user);
                    if(save==null || save.getData()==null || !save.getData()){
                        log.info("添加失败,原因{}",JSONObject.toJSONString(save));
                        return ConsumeConcurrentlyStatus.RECONSUME_LATER;
                    }
                }
                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
            }catch (Exception e){
                log.error("添加用户异常{}",e);
                return ConsumeConcurrentlyStatus.RECONSUME_LATER;
            }
    }
}

com/hong/spring/mapper/UserMapper.xml

代码语言:javascript
复制
<?xml version="1.0" encoding="UTF-8" ?>
<!DOCTYPE mapper
        PUBLIC "-//mybatis.org//DTD Mapper 3.0//EN"
        "http://mybatis.org/dtd/mybatis-3-mapper.dtd">
<mapper namespace="com.hong.spring.dao.UserMapper">
    <resultMap type="com.hong.spring.entity.User" id="user">
        <id column="id" property="id" />
        <result column="user_name" property="username" />
        <result column="age" property="age" />
    </resultMap>

    <select id="findById" resultType="com.hong.spring.entity.User">
      SELECT * FROM user WHERE id = #{id,jdbcType=INTEGER}
    </select>

    <select id="findByPage" resultMap="user" parameterType="com.hong.spring.entity.ao.UserAO">
        select * from user where 1=1 limit #{page},#{pageSize}
    </select>

    <select id="findAllUserList" resultMap="user">
      SELECT * FROM user
    </select>

    <select id="findAllTotal" resultType="int">
      SELECT count(*) FROM user
    </select>

    <insert id="save" >
         INSERT INTO user ( user_name, age)
        VALUES (#{username,jdbcType=VARCHAR},
        #{age,jdbcType=INTEGER})
    </insert>

    <insert id="insertBatch">
        insert into user
        ( user_name, age)
        values
        <foreach collection="list" item="user" index="index"
                 separator=",">
            (#{user.username,jdbcType=VARCHAR},#{user.age,jdbcType=INTEGER})
        </foreach>
    </insert>

    <update id="update" >
        update user
        <set>
            <if test="username !=null">
                user_name=#{username,jdbcType=VARCHAR},
            </if>
            <if test="age !=null">
                age =#{age,jdbcType=INTEGER}
            </if>
        </set>
        where id = #{id,jdbcType=INTEGER}
    </update>
</mapper>

com.hong.spring.provider.UserServiceImpl

代码语言:javascript
复制
package com.hong.spring.provider;

import com.hong.spring.api.IUserService;
import com.hong.spring.dao.UserMapper;
import com.hong.spring.entity.User;
import com.hong.spring.entity.ao.UserAO;
import com.hong.spring.utils.DataResponse;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;

import java.util.List;


/**
 * @Auther: csh
 * @Date: 2020/8/18 15:16
 * @Description:用户实现
 */
@Service("userService")
public class UserServiceImpl implements IUserService {
    @Autowired
    private UserMapper userDao;

    @Override
    public DataResponse<List<User>> findByAll() {
        List <User> allUserList = userDao.findAllUserList();
        int allTotal = userDao.findAllTotal();
        return DataResponse.BuildSuccessResponse(allUserList,allTotal);
    }
    @Override
    @Transactional
    public DataResponse <Boolean> save(User user) {
        if(null==user){
            return DataResponse.BuildFailResponse("必传参数不能为空!");
        }
        int save = userDao.save(user);
        return DataResponse.BuildSuccessResponse(save>0?true:false);
    }

    @Override
    public DataResponse <Boolean> insertBatch(List <User> list) {
        if(null==list){
            return DataResponse.BuildFailResponse("参数不能为空!");
        }
        int batchSave = userDao.insertBatch(list);
        return DataResponse.BuildSuccessResponse(batchSave>0?true:false);
    }

    @Override
    @Transactional
    public DataResponse <Boolean> update(User user) {
        if(null==user || user.getId()==null){
            return DataResponse.BuildFailResponse("必传参数不能为空!");
        }
        int update = userDao.update(user);
        return DataResponse.BuildSuccessResponse(update>0?true:false);
    }
    @Override
    public DataResponse <User> findById(int i) {
        User byId = userDao.findById(i);
        return DataResponse.BuildSuccessResponse(byId);
    }

    @Override
    public DataResponse <List <User>> findByPage(UserAO ao) {
        if(ao==null){
            ao.setPage(0);
            ao.setPageSize(10);
        }else{
            ao.setPage(ao.getPageSize() * ao.getPage());
        }
        int allTotal = userDao.findAllTotal();
        List <User> byPage = userDao.findByPage(ao);
        return DataResponse.BuildSuccessResponse(byPage,allTotal);
    }
}

application.properties

代码语言:javascript
复制
logging.level.root=WARN
logging.level.org.springframework.web=DEBUG
logging.level.org.hibernate=ERROR

applicationContext.xml

代码语言:javascript
复制
<?xml version="1.0" encoding="UTF-8" ?>
<beans xmlns="http://www.springframework.org/schema/beans"
      xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
      xmlns:context="http://www.springframework.org/schema/context" xmlns:tx="http://www.springframework.org/schema/tx"
      xmlns:mvc="http://www.springframework.org/schema/mvc"
      xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-4.0.xsd
    http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context-4.0.xsd http://www.springframework.org/schema/tx http://www.springframework.org/schema/tx/spring-tx.xsd http://www.springframework.org/schema/mvc http://www.springframework.org/schema/mvc/spring-mvc.xsd">


   <!-- 配置组件扫描 -->
   <context:component-scan base-package="com.hong.spring"></context:component-scan>
   <!--加载配置文件-->
   <context:property-placeholder location="classpath:jdbc.properties,classpath:rocketmq.properties"/>

   <!-- 开启注解 -->
   <context:annotation-config />
   <!--开启注解事务-->
   <tx:annotation-driven transaction-manager="transactionManager" />
   <!--放行静态资源-->
   <mvc:default-servlet-handler />


   <bean class="org.springframework.web.servlet.view.InternalResourceViewResolver"
        id="internalResourceViewResolver">
      <!-- 前缀 -->
      <property name="prefix" value="/WEB-INF/pages/" />
      <!-- 后缀 -->
      <property name="suffix" value=".html" />
      <property name="contentType" value="text/html"/>

   </bean>

   <!--开启mvc注解事务-->
   <!-- 定义注解驱动 -->
   <mvc:annotation-driven>
      <mvc:message-converters>
         <!-- 设置支持中文 -->
         <bean class="org.springframework.http.converter.StringHttpMessageConverter">
            <property name="supportedMediaTypes">
               <list>
                  <value>text/plain;charset=UTF-8</value>
                  <value>text/html;charset=UTF-8</value>
               </list>
            </property>
         </bean>
         <bean class="com.alibaba.fastjson.support.spring.FastJsonHttpMessageConverter"/>
      </mvc:message-converters>
   </mvc:annotation-driven>


   <bean id="dataSource" class="com.alibaba.druid.pool.DruidDataSource">
      <!-- 基础配置 -->
      <property name="url" value="${jdbc.url}"></property>
      <property name="driverClassName" value="${jdbc.driver}"></property>
      <property name="username" value="${jdbc.user}"></property>
      <property name="password" value="${jdbc.password}"></property>

      <!-- 关键配置 -->
      <!-- 初始化时建立物理连接的个数。初始化发生在显示调用init方法,或者第一次getConnection时 -->
      <property name="initialSize" value="3" />
      <!-- 最小连接池数量 -->
      <property name="minIdle" value="2" />
      <!-- 最大连接池数量 -->
      <property name="maxActive" value="15" />
      <!-- 配置获取连接等待超时的时间 -->
      <property name="maxWait" value="10000" />

      <!-- 性能配置 -->
      <!-- 打开PSCache,并且指定每个连接上PSCache的大小 -->
      <property name="poolPreparedStatements" value="true" />
      <property name="maxPoolPreparedStatementPerConnectionSize" value="20" />

      <!-- 其他配置 -->
      <!-- 配置间隔多久才进行一次检测,检测需要关闭的空闲连接,单位是毫秒 -->
      <property name="timeBetweenEvictionRunsMillis" value="60000" />
      <!-- 配置一个连接在池中最小生存的时间,单位是毫秒 -->
      <property name="minEvictableIdleTimeMillis" value="300000" />
      <!-- 建议配置为true,不影响性能,并且保证安全性。申请连接的时候检测,如果空闲时间大于timeBetweenEvictionRunsMillis,
                  执行validationQuery检测连接是否有效。-->
      <property name="testWhileIdle" value="true" />
      <!-- 这里建议配置为TRUE,防止取到的连接不可用 ,申请连接时执行validationQuery检测连接是否有效,做了这个配置会降低性能。-->
      <property name="testOnBorrow" value="true" />
      <!-- 归还连接时执行validationQuery检测连接是否有效,做了这个配置会降低性能 -->
      <property name="testOnReturn" value="false" />
   </bean>

   <!--事务管理器-->
   <!-- sqlSessionFactory -->
   <bean id="sqlSessionFactory" class="org.mybatis.spring.SqlSessionFactoryBean">
      <!-- 加载 MyBatis 的配置文件 -->
      <property name="configLocation" value="classpath:mybatis.xml"/>
      <!-- 数据源 -->
      <property name="dataSource" ref="dataSource"/>
      <!-- 所有配置的mapper文件 -->
      <property name="mapperLocations" value="classpath*:com/hong/spring/mapper/*.xml" />
   </bean>

   <!-- Mapper 扫描器 -->
   <bean class="org.mybatis.spring.mapper.MapperScannerConfigurer">
      <!-- 扫描 包下的组件 -->
      <property name="basePackage" value="com.hong.spring.dao" />
      <!-- 关联mapper扫描器 与 sqlsession管理器 -->
      <property name="sqlSessionFactoryBeanName" value="sqlSessionFactory" />
   </bean>
   <!--事务配置-->
   <bean id="transactionManager" class="org.springframework.jdbc.datasource.DataSourceTransactionManager">
      <property name="dataSource" ref="dataSource" />
   </bean>
</beans>

jdbc.properties

代码语言:javascript
复制
config.properties:
#数据库驱动
jdbc.driver=com.mysql.jdbc.Driver
#数据库连接url
jdbc.url=jdbc:mysql://localhost:3306/spring?useUnicode=true&characterEncoding=utf-8
#数据库用户名
jdbc.user=root
#数据库密码
jdbc.password=123456

log4j2.xml

代码语言:javascript
复制
<?xml version="1.0" encoding="UTF-8"?>
<configuration status="INFO">
    <appenders>
        <Console name="Console" target="SYSTEM_OUT">
            <PatternLayout pattern="%d{HH:mm:ss.SSS} [%t] %-5level %logger{36} - %msg%n"/>
        </Console>
        <RollingFile name="RollingFile" fileName="logs/app.log"
                     filePattern="logs/$${date:yyyy-MM}/app-%d{MM-dd-yyyy}-%i.log.gz">
            <PatternLayout pattern="%d{yyyy.MM.dd 'at' HH:mm:ss z} %-5level %class{36} %L %M - %msg%xEx%n"/>
            <SizeBasedTriggeringPolicy size="5 MB"/>
        </RollingFile>
    </appenders>
    <loggers>
        <root level="DEBUG">
            <appender-ref ref="Console"/>
            <appender-ref ref="RollingFile"/>
        </root>
    </loggers>
</configuration>

logging.properties

代码语言:javascript
复制
org.apache.catalina.core.ContainerBase.[Catalina].level=INFO 
org.apache.catalina.core.ContainerBase.[Catalina].handlers=java.util.logging.ConsoleHandler
org.apache.jasper.servlet.TldScanner.level = FINE

handlers = org.apache.juli.FileHandler, java.util.logging.ConsoleHandler

############################################################
# Handler specific properties.
# Describes specific configuration info for Handlers.
############################################################

org.apache.juli.FileHandler.level = FINE
org.apache.juli.FileHandler.directory = ../logs
org.apache.juli.FileHandler.prefix = error-debug.

java.util.logging.ConsoleHandler.level = FINE
java.util.logging.ConsoleHandler.formatter = java.util.logging.SimpleFormatter

rocketmq.properties

代码语言:javascript
复制
rocketmq.producerGroup=hong_group
rocketmq.namesrvAddr=127.0.0.1:9876
rocketmq.user_tocke=user_topic

mybatis.xml

代码语言:javascript
复制
<?xml version="1.0" encoding="UTF-8" ?>
<!DOCTYPE configuration
        PUBLIC "-//mybatis.org//DTD Config 3.0//EN"
        "http://mybatis.org/dtd/mybatis-3-config.dtd">
<configuration>

    <!-- settings -->
    <settings>
        <!-- 打开延迟加载的开关 -->
        <setting name="lazyLoadingEnabled" value="true"/>
        <!-- 将积极加载改为消极加载(即按需加载) -->
        <setting name="aggressiveLazyLoading" value="false"/>
        <!-- 打开全局缓存开关(二级缓存)默认值就是 true -->
        <setting name="cacheEnabled" value="true"/>
        <!-- 开启驼峰命名转换 Table(create_time) -> Entity(createtime) -->
        <setting name="mapUnderscoreToCamelCase" value="true"/>
        <!-- 使用列别名代替列名 默认:true seslect name as title from table -->
        <setting name="useColumnLabel" value="true"/>
        <!--使用jdbc的getGeneratedKeys获取数据库自增主键值-->
        <setting name="useGeneratedKeys" value="true"/>
    </settings>

    <!-- 别名定义 -->
    <typeAliases>
        <package name="com.hong.spring.entity"/>
    </typeAliases>

</configuration>

rocketmq.xml

代码语言:javascript
复制
<?xml version="1.0" encoding="UTF-8" ?>
<beans xmlns="http://www.springframework.org/schema/beans"
      xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
      xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-4.0.xsd">

   <bean id="userConsumer" name="userConsumer" class="com.hong.spring.config.Consumer" init-method="init" destroy-method="destroy">
      <property name="producerGroup" value="${rocketmq.producerGroup}" />
      <property name="namesrvAddr" value="${rocketmq.namesrvAddr}" />
      <property name="topic" value="${rocketmq.user_tocke}"/>
      <property name="messageListener" ref="userListener"></property>
   </bean>
   <bean id="userListener" class="com.hong.spring.listener.UserListener" />
</beans>

WEB-INF/web.xml

代码语言:javascript
复制
<?xml version="1.0" encoding="UTF-8"?>
<web-app xmlns="http://xmlns.jcp.org/xml/ns/javaee"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://xmlns.jcp.org/xml/ns/javaee http://xmlns.jcp.org/xml/ns/javaee/web-app_3_1.xsd"
         version="3.1">
    <servlet>
        <servlet-name>spring_rocketmq_consumer</servlet-name>
        <servlet-class>org.springframework.web.servlet.DispatcherServlet</servlet-class>
        <init-param>
            <param-name>contextConfigLocation</param-name>
            <param-value>classpath:applicationContext.xml,
                classpath:rocketmq.xml
            </param-value>
        </init-param>
        <load-on-startup>1</load-on-startup>
    </servlet>

    <filter>
        <filter-name>encodingFilter</filter-name>
        <filter-class>org.springframework.web.filter.CharacterEncodingFilter</filter-class>
        <init-param>
            <param-name>encoding</param-name>
            <param-value>UTF-8</param-value>
        </init-param>
        <init-param>
            <param-name>forceEncoding</param-name>
            <param-value>true</param-value>
        </init-param>
    </filter>
    <filter-mapping>
        <filter-name>encodingFilter</filter-name>
        <url-pattern>/*</url-pattern>
    </filter-mapping>

    <servlet-mapping>
        <servlet-name>spring_rocketmq_consumer</servlet-name>
        <url-pattern>/</url-pattern>
    </servlet-mapping>
</web-app>

spring_mq/spring_rocketmq_consumer/pom.xml

代码语言:javascript
复制
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <parent>
        <artifactId>spring_mq</artifactId>
        <groupId>com.hong</groupId>
        <version>1.0-SNAPSHOT</version>
    </parent>
    <modelVersion>4.0.0</modelVersion>

    <artifactId>spring_rocketmq_consumer</artifactId>
    <!--静态资源导出问题-->
    <build>
        <resources>
            <resource>
                <directory>src/main/java</directory>
                <includes>
                    <include>**/*.properties</include>
                    <include>**/*.xml</include>
                </includes>
                <filtering>false</filtering>
            </resource>
            <resource>
                <directory>src/main/resources</directory>
                <includes>
                    <include>**/*.properties</include>
                    <include>**/*.xml</include>
                </includes>
                <filtering>false</filtering>
            </resource>
        </resources>
        <plugins>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-compiler-plugin</artifactId>
                <configuration>
                    <source>6</source>
                    <target>6</target>
                </configuration>
            </plugin>
        </plugins>
    </build>

    <dependencies>
        <dependency>
            <artifactId>spring_mq_common_api</artifactId>
            <version>1.0-SNAPSHOT</version>
            <groupId>com.hong</groupId>
        </dependency>
        <dependency>
            <groupId>org.apache.rocketmq</groupId>
            <artifactId>rocketmq-client</artifactId>
            <version>4.8.0</version>
        </dependency>
        <dependency>
            <groupId>org.apache.rocketmq</groupId>
            <artifactId>rocketmq-all</artifactId>
            <version>4.8.0</version>
            <type>pom</type>
        </dependency>

    </dependencies>

</project>

结果

代码语言:javascript
复制
16:33:49.387 [NettyClientPublicExecutor_3] INFO com.hong.spring.listener.UserListener - 获取的用户信息{"age":1000,"username":"hong"}
16:33:49.845 [NettyClientPublicExecutor_3] INFO com.alibaba.druid.pool.DruidDataSource - {dataSource-1} inited
16:33:49.897 [NettyClientPublicExecutor_3] DEBUG org.mybatis.spring.SqlSessionUtils - Creating a new SqlSession
16:33:49.913 [NettyClientPublicExecutor_3] DEBUG org.mybatis.spring.SqlSessionUtils - Registering transaction synchronization for SqlSession [org.apache.ibatis.session.defaults.DefaultSqlSession@7ddd64f2]
16:33:49.932 [NettyClientPublicExecutor_3] DEBUG org.mybatis.spring.transaction.SpringManagedTransaction - JDBC Connection [com.mysql.jdbc.JDBC4Connection@6b13936b] will be managed by Spring
16:33:49.937 [NettyClientPublicExecutor_3] DEBUG com.hong.spring.dao.UserMapper.save - ==> Preparing: INSERT INTO user ( user_name, age) VALUES (?, ?) 
16:33:49.973 [NettyClientPublicExecutor_3] DEBUG com.hong.spring.dao.UserMapper.save - ==> Parameters: hong(String), 1000(Integer)
16:33:49.980 [NettyClientPublicExecutor_3] DEBUG com.hong.spring.dao.UserMapper.save - <== Updates: 1
16:33:49.983 [NettyClientPublicExecutor_3] DEBUG com.alibaba.druid.pool.PreparedStatementPool - stmt enter cache
16:33:49.984 [NettyClientPublicExecutor_3] DEBUG org.mybatis.spring.SqlSessionUtils - Releasing transactional SqlSession [org.apache.ibatis.session.defaults.DefaultSqlSession@7ddd64f2]
16:33:49.985 [NettyClientPublicExecutor_3] DEBUG org.mybatis.spring.SqlSessionUtils - Transaction synchronization committing SqlSession [org.apache.ibatis.session.defaults.DefaultSqlSession@7ddd64f2]
16:33:49.986 [NettyClientPublicExecutor_3] DEBUG org.mybatis.spring.SqlSessionUtils - Transaction synchronization deregistering SqlSession [org.apache.ibatis.session.defaults.DefaultSqlSession@7ddd64f2]
16:33:49.986 [NettyClientPublicExecutor_3] DEBUG org.mybatis.spring.SqlSessionUtils - Transaction synchronization closing SqlSession [org.apache.ibatis.session.defaults.DefaultSqlSession@7ddd64f2]
16:33:55.558 [ConsumeMessageThread_1] INFO com.hong.spring.listener.UserListener - 获取的用户信息{"age":1000,"username":"hong"}
16:33:55.560 [ConsumeMessageThread_1] DEBUG org.mybatis.spring.SqlSessionUtils - Creating a new SqlSession
16:33:55.560 [ConsumeMessageThread_1] DEBUG org.mybatis.spring.SqlSessionUtils - Registering transaction synchronization for SqlSession [org.apache.ibatis.session.defaults.DefaultSqlSession@96ca4ad]
16:33:55.560 [ConsumeMessageThread_1] DEBUG org.mybatis.spring.transaction.SpringManagedTransaction - JDBC Connection [com.mysql.jdbc.JDBC4Connection@6b13936b] will be managed by Spring
16:33:55.560 [ConsumeMessageThread_1] DEBUG com.hong.spring.dao.UserMapper.save - ==> Preparing: INSERT INTO user ( user_name, age) VALUES (?, ?) 
16:33:55.561 [ConsumeMessageThread_1] DEBUG com.hong.spring.dao.UserMapper.save - ==> Parameters: hong(String), 1000(Integer)
16:33:55.562 [ConsumeMessageThread_1] DEBUG com.hong.spring.dao.UserMapper.save - <== Updates: 1
16:33:55.563 [ConsumeMessageThread_1] DEBUG org.mybatis.spring.SqlSessionUtils - Releasing transactional SqlSession [org.apache.ibatis.session.defaults.DefaultSqlSession@96ca4ad]
16:33:55.563 [ConsumeMessageThread_1] DEBUG org.mybatis.spring.SqlSessionUtils - Transaction synchronization committing SqlSession [org.apache.ibatis.session.defaults.DefaultSqlSession@96ca4ad]
16:33:55.563 [ConsumeMessageThread_1] DEBUG org.mybatis.spring.SqlSessionUtils - Transaction synchronization deregistering SqlSession [org.apache.ibatis.session.defaults.DefaultSqlSession@96ca4ad]
16:33:55.563 [ConsumeMessageThread_1] DEBUG org.mybatis.spring.SqlSessionUtils - Transaction synchronization closing SqlSession [org.apache.ibatis.session.defaults.DefaultSqlSession@96ca4ad]

springboot整合rocketmq

springboot_rocketmq_api 入口 端口:8186 dubbo端口:20880

springboot_rocketmq_consumer 消费者 端口:8188 dubbo端口:20882

springboot_rocketmq_producer 生产者 端口:8187 dubbo端口:20881

框架技术:springboot2.x+dubbo+zk+rocketmq4.8

关于zk参考另一篇文章:spring整合各种服务注册中心(zk、eureka、nacos、consul)

实现结果:通过 用户请求api,api通过dubbo rpc协议调用rpoducer生产者,若调用是查询则直接查库,如果调用是添加或修改数据,则通过rocketmq发送给生产者consumer,consumer监听到对应的topic后再调用producer的rpc接口最终实现消息异步化。

springboot_rocketmq_api

结构

代码语言:javascript
复制
│ pom.xml
│
└─src
    └─main
        ├─java
        │ └─com
        │ └─hong
        │ └─springboot
        │ │ Application.java
        │ │
        │ └─controller
        │ IndexController.java
        │ UserController.java
        │
        └─resources
                application.properties

com.hong.springboot.controller.IndexController

代码语言:javascript
复制
package com.hong.springboot.controller;

import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;

/**
 * @author: csh
 * @Date: 2021/1/12 10:16
 * @Description:首页
 */
@RestController
public class IndexController {
    @RequestMapping("/")
    public String index(){
        return "成功!";
    }
}

com.hong.springboot.controller.UserController

代码语言:javascript
复制
package com.hong.springboot.controller;


import com.alibaba.dubbo.common.utils.StringUtils;
import com.alibaba.dubbo.config.annotation.Reference;
import com.hong.springboot.api.IUserService;
import com.hong.springboot.entity.User;
import com.hong.springboot.utils.DataResponse;
import lombok.extern.slf4j.Slf4j;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RestController;

import java.util.List;

/**
 * @Auther: csh
 * @Date: 2020/8/18 16:11
 * @Description:
 */
@RestController
@Slf4j
public class UserController {
    @Reference
    private IUserService userService;

    @GetMapping("/findByAll")
    public DataResponse<List<User>> findByAll(){
        try {
            return userService.findByAll();
        } catch (Exception e){
            log.error("查询出错{}",e);
        }
        return DataResponse.BuildFailResponse("查询出错!");
    }

    @PostMapping("/save")
    public DataResponse<Boolean> save(User ao){
        if(null==ao || ao.getAge()==null || StringUtils.isBlank(ao.getUsername())){
            return DataResponse.BuildFailResponse("参数不能为空!");
        }
        DataResponse <Boolean> save = userService.save(ao);
        return save;
    }
}

com.hong.springboot.Application

代码语言:javascript
复制
package com.hong.springboot;


import com.alibaba.dubbo.config.spring.context.annotation.EnableDubbo;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;

/**
 * @author: csh
 * @Date: 2020/11/21 11:37
 * @Description:springboot dubbo消费端
 */
@SpringBootApplication(scanBasePackages = "com.hong.springboot")
@EnableDubbo
public class Application {
    public static void main(String[] args) {
        SpringApplication.run(Application.class);
    }
}

application.properties

代码语言:javascript
复制
#dubbo configuration
#服务名称
dubbo.application.name=springboot_dubbo_consumer
#注册中心协议
dubbo.registry.protocol=zookeeper
#注册地址
dubbo.registry.address=zookeeper://127.0.0.1:2181
#扫描注解包通过该设置将服务注册到zookeeper
dubbo.protocol.scan=com.hong.springboot.api
#注册端口
dubbo.protocol.port=20880
#协议名称
dubbo.protocol.name=dubbo
#扫包
dubbo.scan.basePackages=com.hong.springboot.controller

#避免端口冲突
server.port=8186

springboot_all/springboot_rocketmq_api/pom.xml

代码语言:javascript
复制
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
    <parent>
        <groupId>com.hong.springboot</groupId>
        <artifactId>springboot_all</artifactId>
        <version>0.0.1-SNAPSHOT</version>
        <relativePath/>
    </parent>
    <modelVersion>4.0.0</modelVersion>

    <groupId>com.hong.springboot</groupId>
    <artifactId>springboot_rocketmq_api</artifactId>


    <properties>
        <java.version>1.8</java.version>
    </properties>

    <dependencies>
        <dependency>
            <groupId>com.hong.springboot</groupId>
            <artifactId>springboot_mq_api</artifactId>
            <version>1.0.0-SNAPSHOT</version>
        </dependency>

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
            <scope>test</scope>
        </dependency>

        <dependency>
            <groupId>com.alibaba.boot</groupId>
            <artifactId>dubbo-spring-boot-starter</artifactId>
            <version>0.2.0</version>
        </dependency>
        <dependency>
        <groupId>org.apache.zookeeper</groupId>
        <artifactId>zookeeper</artifactId>
        <version>3.5.4-beta</version>
            <exclusions>
                <exclusion>
                    <artifactId>slf4j-api</artifactId>
                    <groupId>org.slf4j</groupId>
                </exclusion>
                <exclusion>
                    <artifactId>slf4j-log4j12</artifactId>
                    <groupId>org.slf4j</groupId>
                </exclusion>
            </exclusions>
        </dependency>

    </dependencies>

    <!--静态资源导出问题-->
    <build>
        <plugins>
            <plugin>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-maven-plugin</artifactId>
            </plugin>
        </plugins>
        <resources>
            <resource>
                <directory>src/main/java</directory>
                <includes>
                    <include>**/*.properties</include>
                    <include>**/*.xml</include>
                </includes>
                <filtering>false</filtering>
            </resource>
            <resource>
                <directory>src/main/resources</directory>
                <includes>
                    <include>**/*.properties</include>
                    <include>**/*.xml</include>
                </includes>
                <filtering>false</filtering>
            </resource>
        </resources>
    </build>

</project>

springboot_rocketmq_producer

代码语言:javascript
复制
│ pom.xml
│
└─src
    └─main
        ├─java
        │ └─com
        │ └─hong
        │ └─springboot
        │ │ Application.java
        │ │
        │ ├─config
        │ │ DruidConfig.java
        │ │ ExtRocketMQTemplate.java
        │ │ TopicAll.java
        │ │
        │ ├─dao
        │ │ UserMapper.java
        │ │
        │ └─provider
        │ UserServiceImpl.java
        │
        └─resources
                application.properties
                rocketmq.properties

com.hong.springboot.config.DruidConfig

代码语言:javascript
复制
package com.hong.springboot.config;

import com.alibaba.druid.pool.DruidDataSource;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

import javax.sql.DataSource;

/**
 * @author: csh
 * @Date: 2021/1/8 18:08
 * @Description:数据源配置
 */
@Configuration
public class DruidConfig {
    @Bean
    @ConfigurationProperties(prefix = "spring.datasource")
    public DataSource dataSource(){
        return new DruidDataSource();
    }
}

com.hong.springboot.config.ExtRocketMQTemplate

代码语言:javascript
复制
package com.hong.springboot.config;

import org.apache.rocketmq.spring.annotation.ExtRocketMQTemplateConfiguration;
import org.apache.rocketmq.spring.core.RocketMQTemplate;

@ExtRocketMQTemplateConfiguration(nameServer = "${hong.rocketmq.extNameServer}")
public class ExtRocketMQTemplate extends RocketMQTemplate {
}

com.hong.springboot.config.TopicAll

代码语言:javascript
复制
package com.hong.springboot.config;

/**
 * @author: csh
 * @Date: 2021/3/16 14:15
 * @Description:存放所有的topic
 */
public class TopicAll {
    //用户topic
    public static final String USER_TOPIC ="springboot_user_topic";
}

com.hong.springboot.dao.UserMapper

代码语言:javascript
复制
package com.hong.springboot.dao;

import com.hong.springboot.entity.User;
import org.apache.ibatis.annotations.Insert;
import org.apache.ibatis.annotations.Select;

import java.util.List;

/**
 * @Auther: csh
 * @Date: 2020/8/18 15:04
 * @Description:用户dao层
 */

public interface UserMapper {
    @Select("select id,user_name,age from user")
    List<User> findAllUser();

    @Insert("insert into user (user_name,age) values(#{userName},#{age})")
    int insert(User user);
}

com.hong.springboot.provider.UserServiceImpl

代码语言:javascript
复制
package com.hong.springboot.provider;


import com.alibaba.dubbo.config.annotation.Service;
import com.alibaba.fastjson.JSONObject;
import com.hong.springboot.api.IUserService;
import com.hong.springboot.config.TopicAll;
import com.hong.springboot.dao.UserMapper;
import com.hong.springboot.entity.User;
import com.hong.springboot.utils.DataResponse;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.client.producer.SendStatus;
import org.apache.rocketmq.spring.core.RocketMQTemplate;
import org.springframework.beans.factory.annotation.Autowired;

import java.util.List;


/**
 * @Auther: csh
 * @Date: 2020/8/18 15:16
 * @Description:用户实现
 */
@Service(interfaceClass = IUserService.class,timeout = 6000)
@Slf4j
public class UserServiceImpl implements IUserService {
    @Autowired
    private UserMapper userDao;
    @Autowired
    private RocketMQTemplate rocketMQTemplate;

    @Override
    public DataResponse<List<User>> findByAll() {
        List <User> allUserList = userDao.findAllUser();
        return DataResponse.BuildSuccessResponse(allUserList,allUserList.size());
    }

    @Override
    public DataResponse <Boolean> save(User userAO) {
        log.info("需要rocketmq添加的用户信息{}",JSONObject.toJSONString(userAO));
        SendResult sendResult = rocketMQTemplate.syncSend(TopicAll.USER_TOPIC, JSONObject.toJSONString(userAO));
        if(null==sendResult || sendResult.getSendStatus()!= SendStatus.SEND_OK){
            return DataResponse.BuildFailResponse("添加用户失败!");
        }
        return DataResponse.BuildFailResponse("添加用户成功!");
    }

    @Override
    public DataResponse <Boolean> reallySave(User user) {
        log.info("要添加的用户信息{}",JSONObject.toJSONString(user));
        if(null==user || user.getAge()==null || StringUtils.isEmpty(user.getUsername())){
            return DataResponse.BuildFailResponse("参数不能为空!");
        }
        int insert = userDao.insert(user);
        return insert>0?DataResponse.BuildSuccessResponse(true):DataResponse.BuildFailResponse("失败",false);
    }
}

com.hong.springboot.Application

代码语言:javascript
复制
package com.hong.springboot;

import com.alibaba.dubbo.config.spring.context.annotation.EnableDubbo;
import org.mybatis.spring.annotation.MapperScan;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;

/**
 * @author: csh
 * @Date: 2020/11/21 11:37
 * @Description:
 */
@SpringBootApplication(scanBasePackages = "com.hong.springboot")
@MapperScan("com.hong.springboot.dao")
@EnableDubbo
public class Application  {

    public static void main(String[] args) {
        SpringApplication.run(Application.class);
    }

}

application.properties

代码语言:javascript
复制
rocketmq.name-server=localhost:9876
rocketmq.producer.group=hong_group
rocketmq.producer.sendMessageTimeout=300000

#dubbo configuration
#服务名称
dubbo.application.name=springboot_dubbo_provider

dubbo.registry.protocol=zookeeper
#注册地址
dubbo.registry.address=zookeeper://127.0.0.1:2181
#扫描注解包通过该设置将服务注册到zookeeper
dubbo.protocol.scan=com.hong.springboot.api
#注册端口
dubbo.protocol.port=20881
#协议名称
dubbo.protocol.name=dubbo

#避免端口冲突
server.port=8187
spring.datasource.driver-class-name=com.mysql.cj.jdbc.Driver
spring.datasource.url=jdbc:mysql://localhost:3306/spring?useUnicode=true&characterEncoding=utf-8&serverTimezone=UTC
spring.datasource.username=root
spring.datasource.password=123456


#mybatis配置
mybatis.typeAliasesPackage=com.hong.springboot.entity

rocketmq.properties

代码语言:javascript
复制
# properties used in the application
rocketmq.topic=string-topic
hong.rocketmq.orderTopic=order-paid-topic
hong.rocketmq.msgExtTopic=message-ext-topic
hong.rocketmq.transTopic=spring-transaction-topic
hong.rocketmq.topic.user=user-topic

hong.rocketmq.bytesRequestTopic=bytesRequestTopic:tagA
hong.rocketmq.stringRequestTopic=stringRequestTopic:tagA
hong.rocketmq.objectRequestTopic=objectRequestTopic:tagA
hong.rocketmq.genericRequestTopic=genericRequestTopic:tagA

hong.rocketmq.extNameServer=127.0.0.1:9876

springboot_all/springboot_rocketmq_producer/pom.xml

代码语言:javascript
复制
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
    <parent>
        <groupId>com.hong.springboot</groupId>
        <artifactId>springboot_all</artifactId>
        <version>0.0.1-SNAPSHOT</version>
        <relativePath/>
    </parent>
    <modelVersion>4.0.0</modelVersion>

    <groupId>com.hong.springboot</groupId>
    <artifactId>springboot_rocketmq_producer</artifactId>


    <properties>
        <java.version>1.8</java.version>
    </properties>

    <dependencies>

        <dependency>
            <groupId>org.apache.rocketmq</groupId>
            <artifactId>rocketmq-spring-boot-starter</artifactId>
            <version>2.2.0</version>
        </dependency>


        <dependency>
            <groupId>com.hong.springboot</groupId>
            <artifactId>springboot_mq_api</artifactId>
            <version>1.0.0-SNAPSHOT</version>
        </dependency>

        <dependency>
            <groupId>org.mybatis.spring.boot</groupId>
            <artifactId>mybatis-spring-boot-starter</artifactId>
            <version>2.1.3</version>
        </dependency>

        <dependency>
            <groupId>com.alibaba</groupId>
            <artifactId>druid-spring-boot-starter</artifactId>
            <version>1.1.10</version>
        </dependency>

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>

        <dependency>
            <groupId>com.alibaba.boot</groupId>
            <artifactId>dubbo-spring-boot-starter</artifactId>
            <version>0.2.0</version>
        </dependency>
        <dependency>
        <groupId>org.apache.zookeeper</groupId>
        <artifactId>zookeeper</artifactId>
        <version>3.5.4-beta</version>
            <exclusions>
                <exclusion>
                    <artifactId>slf4j-api</artifactId>
                    <groupId>org.slf4j</groupId>
                </exclusion>
                <exclusion>
                    <artifactId>slf4j-log4j12</artifactId>
                    <groupId>org.slf4j</groupId>
                </exclusion>
            </exclusions>
        </dependency>

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
            <scope>test</scope>
        </dependency>
    </dependencies>

    <!--<build>-->
        <!--<plugins>-->
            <!--<plugin>-->
                <!--<groupId>org.springframework.boot</groupId>-->
                <!--<artifactId>spring-boot-maven-plugin</artifactId>-->
                <!--<configuration>-->
                    <!--<skip>true</skip>-->
                <!--</configuration>-->
            <!--</plugin>-->
        <!--</plugins>-->
    <!--</build>-->
    <!--静态资源导出问题-->
    <build>
        <resources>
            <resource>
                <directory>src/main/java</directory>
                <includes>
                    <include>**/*.properties</include>
                    <include>**/*.xml</include>
                </includes>
                <filtering>false</filtering>
            </resource>
            <resource>
                <directory>src/main/resources</directory>
                <includes>
                    <include>**/*.properties</include>
                    <include>**/*.xml</include>
                </includes>
                <filtering>false</filtering>
            </resource>
        </resources>
    </build>

</project>

结果

springboot_rocketmq_consumer

代码语言:javascript
复制
│ pom.xml
│
└─src
    └─main
        ├─java
        │ └─com
        │ └─hong
        │ └─springboot
        │ │ Application.java
        │ │
        │ ├─config
        │ │ ExtRocketMQTemplate.java
        │ │
        │ └─listener
        │ UserListener.java
        │ UserListenerList.java
        │ UserTransactionListener.java
        │
        └─resources
                application.properties
                rocketmq.properties

rocketmq.properties

代码语言:javascript
复制
# properties used in the application
rocketmq.topic=string-topic
hong.rocketmq.orderTopic=order-paid-topic
hong.rocketmq.msgExtTopic=message-ext-topic
hong.rocketmq.transTopic=spring-transaction-topic
hong.rocketmq.topic.user=user-topic

hong.rocketmq.bytesRequestTopic=bytesRequestTopic:tagA
hong.rocketmq.stringRequestTopic=stringRequestTopic:tagA
hong.rocketmq.objectRequestTopic=objectRequestTopic:tagA
hong.rocketmq.genericRequestTopic=genericRequestTopic:tagA

hong.rocketmq.extNameServer=127.0.0.1:9876

application.properties

代码语言:javascript
复制
rocketmq.name-server=localhost:9876
rocketmq.producer.group=hong_group
rocketmq.producer.sendMessageTimeout=300000

#避免端口冲突
server.port=8188

#dubbo configuration
#服务名称
dubbo.application.name=springboot_rocketmq_consumer

dubbo.registry.protocol=zookeeper
#注册地址
dubbo.registry.address=zookeeper://127.0.0.1:2181
#扫描注解包通过该设置将服务注册到zookeeper
dubbo.protocol.scan=com.hong.springboot.api
#注册端口
dubbo.protocol.port=20882
#协议名称
dubbo.protocol.name=dubbo
#扫包
dubbo.scan.basePackages=com.hong.springboot.listener
user_topic=springboot_user_topic
user_group_consumer=user_group_consumer

com.hong.springboot.Application

代码语言:javascript
复制
package com.hong.springboot;

import com.alibaba.dubbo.config.spring.context.annotation.EnableDubbo;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;

/**
 * @author: csh
 * @Date: 2020/11/21 11:37
 * @Description:
 */
@SpringBootApplication(scanBasePackages = "com.hong.springboot")
@EnableDubbo
public class Application  {

    public static void main(String[] args) {
        SpringApplication.run(Application.class);
    }

}

com.hong.springboot.listener.UserListener

代码语言:javascript
复制
package com.hong.springboot.listener;

import com.alibaba.dubbo.config.annotation.Reference;
import com.alibaba.fastjson.JSONObject;
import com.google.common.cache.Cache;
import com.google.common.cache.CacheBuilder;
import com.hong.springboot.api.IUserService;
import com.hong.springboot.entity.User;
import com.hong.springboot.utils.DataResponse;
import lombok.extern.log4j.Log4j2;
import org.apache.commons.lang3.StringUtils;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.remoting.protocol.RemotingSerializable;
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.apache.rocketmq.spring.core.RocketMQListener;
import org.apache.rocketmq.spring.core.RocketMQLocalTransactionState;
import org.springframework.stereotype.Service;

import java.util.Map;
import java.util.concurrent.TimeUnit;

/**
 * @author: csh
 * @Date: 2021/3/16 11:14
 * @Description:用户监听
 */
@Service
@RocketMQMessageListener(consumerGroup ="${user_group_consumer}" , topic = "${user_topic}")
@Log4j2
public class UserListener implements RocketMQListener<User> {

    @Reference
    private IUserService userService;

    @Override
    public void onMessage(User user) {

        log.info("springboot获取的用户信息{}", JSONObject.toJSONString(user));
        DataResponse<Boolean> save = userService.reallySave(user);
        log.info("添加结果{}",JSONObject.toJSONString(save));
        if(save==null || !save.getData()){
            log.info("添加失败,原因{}",JSONObject.toJSONString(save));
        }
    }

}

com.hong.springboot.config.ExtRocketMQTemplate

代码语言:javascript
复制
package com.hong.springboot.config;

import org.apache.rocketmq.spring.annotation.ExtRocketMQTemplateConfiguration;
import org.apache.rocketmq.spring.core.RocketMQTemplate;

@ExtRocketMQTemplateConfiguration(nameServer = "${hong.rocketmq.extNameServer}")
public class ExtRocketMQTemplate extends RocketMQTemplate {
}

启动消费者 (开始消费)

以上就基本完成了rocketmq4.x+dubbo+zk+springboot2.x 的整合让耦合业务解耦,在项目中rcp起到跨服务远程调用,而mq起到削峰使消息异步化。

rocketmq幂等性问题

为了防止消息重复消费导致业务处理异常,消息队列RocketMQ版的消费者在接收到消息后,有必要根据业务上的唯一Key对消息做幂等处理。

通过数据发现,rocketmq有一个无法避免的问题,那就是消息重发,这是因为rocketmq在设计之初就这样设计,导致,这个问题无法避免,所以消息重发或重堆就会导致多次消息,可能会造成不一致幂等情况,那解决问题就是去重。

解决方案:

1.针对分布式集群场景可以通过redis缓存来存放多节点的消息id状态,使重复消费的时候判断是否消费过,若是则直接返回。

2.针对单机场景可以通过google guava的cache进行拦截判断,若消息一致则同redis缓存一样,默认过期时间3天,因为rocketmq存放时间为3天建议于配置一样。

由于本次采用的是第2 种采用的解决方案如下:

代码语言:javascript
复制
package com.hong.springboot.listener;

import com.alibaba.dubbo.config.annotation.Reference;
import com.alibaba.fastjson.JSONObject;
import com.google.common.cache.Cache;
import com.google.common.cache.CacheBuilder;
import com.hong.springboot.api.IUserService;
import com.hong.springboot.entity.User;
import com.hong.springboot.utils.DataResponse;
import lombok.extern.log4j.Log4j2;
import org.apache.commons.lang3.StringUtils;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.remoting.protocol.RemotingSerializable;
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.apache.rocketmq.spring.core.RocketMQListener;
import org.springframework.stereotype.Service;

import java.util.concurrent.TimeUnit;

/**
 * @author: csh
 * @Date: 2021/3/16 11:14
 * @Description:用户监听
 */
@Service
@RocketMQMessageListener(consumerGroup ="${user_group_consumer}" , topic = "${user_topic}")
@Log4j2
public class UserListener implements RocketMQListener<MessageExt> {

    @Reference
    private IUserService userService;

    //存放缓存三天 分布式集群场景可以
    private static Cache<String,ConsumeConcurrentlyStatus> cache = CacheBuilder.newBuilder().expireAfterWrite(3, TimeUnit.DAYS).build();

    @Override
    public void onMessage(MessageExt msg) {
        if(null==msg || msg.getBody()==null || StringUtils.isEmpty(msg.getMsgId())){
            return;
        }
        ConsumeConcurrentlyStatus status = cache.getIfPresent(msg.getMsgId());
        //这两种消息才进行消费
        if(null==status || status.equals(ConsumeConcurrentlyStatus.RECONSUME_LATER)){
            String msgId = msg.getMsgId();
            log.info("msgId"+msgId);
            User user = RemotingSerializable.decode(msg.getBody(), User.class);
            log.info("springboot获取的用户信息{}", JSONObject.toJSONString(user));
            DataResponse<Boolean> save = userService.reallySave(user);
            log.info("添加结果{}",JSONObject.toJSONString(save));
            if(save==null || !save.getData()){
                log.info("添加失败,原因{}",JSONObject.toJSONString(save));
                cache.put(msgId, ConsumeConcurrentlyStatus.RECONSUME_LATER);
            }
            cache.put(msgId, ConsumeConcurrentlyStatus.CONSUME_SUCCESS);
        }else{
            log.info("该消息{}已消费过{}",JSONObject.toJSONString(msg),JSONObject.toJSONString(status));
        }

    }

}

然后重新验证如下:新生产一条消息,若添加成功默认进入cache中有效期为3天,然后在管理后台重推送该消息。RESEND MESSAGE

相关学习资料

《RocketMQ技术内幕》

推荐学习资料 官网:https://rocketmq.apache.org/

最后

通过rocketmq+dubbo再搭配集群方式,在实际企业中用得是相当多的,而且这种架构模式在很多生产中得到特别多的验证,可以说屡试不爽。可以说在高性能、高可用、高并发,在做后端这块是一块不错的架构选择,并且dubbo和rocketmq都可以动态水平增缩,性能上就更不用说了,都是10万起步的。普通百万级的应用,一个3节点标配的集群基本搞定了。

参考文章:

https://www.cnblogs.com/lifeibai/p/9167701.html

https://www.cnblogs.com/weifeng1463/p/12889300.html

https://cloud.tencent.com/developer/article/1630183

https://www.cnblogs.com/chx9832/p/12325871.html

本文参与 腾讯云自媒体分享计划,分享自微信公众号。
原始发表:2021-03-21,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 技术趋势 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体分享计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
相关产品与服务
消息队列 CMQ 版
消息队列 CMQ 版(TDMQ for CMQ,简称 TDMQ CMQ 版)是一款分布式高可用的消息队列服务,它能够提供可靠的,基于消息的异步通信机制,能够将分布式部署的不同应用(或同一应用的不同组件)中的信息传递,存储在可靠有效的 CMQ 队列中,防止消息丢失。TDMQ CMQ 版支持多进程同时读写,收发互不干扰,无需各应用或组件始终处于运行状态。
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档