前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >spring整合各种中间件(RocketMQ、kafka、RabbitMQ、TubeMQ、NSQ)-腾讯开源【TubeMQ】

spring整合各种中间件(RocketMQ、kafka、RabbitMQ、TubeMQ、NSQ)-腾讯开源【TubeMQ】

作者头像
逍遥壮士
发布2021-05-24 10:19:48
6660
发布2021-05-24 10:19:48
举报
文章被收录于专栏:技术趋势技术趋势

上文:spring整合各种中间件(RocketMQ、kafka、RabbitMQ、ActiveMQ、ZeroMQ)-ZeroMQ

本文源码下载:https://gitee.com/hong99/spring/issues/I1N1DF

TubeMQ是什么?

简介

TubeMQ是2019年腾讯在ApacheCon开源的一个消息中间件系统,性能优越。经过近7年、万亿规模的海量数据沉淀,TubeMQ目前日均接入量超过25万亿条消息。较之于其他的开源MQ组件,TubeMQ长期应用于真实生产环境中,在稳定性、性能和成本方面都有着核心优势。

https://inlong.apache.org/zh-cn/docs/quick_start.html

功能介绍

纯 Java 实现语言

引入 Master 协调节点:相比 Kafka 依赖于 Zookeeper 完成元数据的管理和实现 HA 保障不同,TubeMQ 系统采用的是自管理的元数据仲裁机制方式进行,Master 节点通过采用内嵌数据库 BDB 完成集群内元数据的存储、更新以及 HA 热切功能,负责 TubeMQ 集群的运行管控和配置管理操作,对外提供接口等;通过 Master 节点,TubeMQ 集群里的 Broker 配置设置、变更及查询实现了完整的自动化闭环管理,减轻了系统维护的复杂度

服务器侧消费负载均衡:TubeMQ 采用的是服务侧负载均衡的方案,而不是客户端侧操作,提升系统的管控能力同时简化客户端实现,更便于均衡算法升级

系统行级锁操作:对于 Broker 消息读写中存在中间状态的并发操作采用行级锁,避免重复问题

Offset 管理调整:Offset 由各个 Broker 独自管理,ZK 只作数据持久化存储用(最初考虑完全去掉ZK依赖,考虑到后续的功能扩展就暂时保留)

消息读取机制的改进:TubeMQ 采用的是消息随机读取模式,同时为了降低消息时延又增加了内存缓存读写,对于带 SSD 设备的机器,增加消息滞后转 SSD 消费的处理,解决消费严重滞后时吞吐量下降以及 SSD 磁盘容量小、刷盘次数有限的问题,使其满足业务快速生产消费的需求

消费者行为管控:支持通过策略实时动态地控制系统接入的消费者行为,包括系统负载高时对特定业务的限流、暂停消费,动态调整数据拉取的频率等;

服务分级管控:针对系统运维、业务特点、机器负载状态的不同需求,系统支持运维通过策略来动态控制不同消费者的消费行为,比如是否有权限消费、消费时延分级保证、消费限流控制,以及数据拉取频率控制等

系统安全管控:根据业务不同的数据服务需要,以及系统运维安全的考虑,TubeMQ 系统增加了 TLS 传输层加密管道,生产和消费服务的认证、授权,以及针对分布式访问控制的访问令牌管理,满足业务和系统运维在系统安全方面的需求

资源利用率提升改进:相比于 Kafka,TubeMQ 采用连接复用模式,减少连接资源消耗;通过逻辑分区构造,减少系统对文件句柄数的占用,通过服务器端过滤模式,减少网络带宽资源使用率;通过剥离对 Zookeeper 的使用,减少 Zookeeper 的强依赖及瓶颈限制

客户端改进:基于业务使用上的便利性以,我们简化了客户端逻辑,使其做到最小的功能集合,我们采用基于响应消息的接收质量统计算法来自动剔出坏4的 Broker 节点,基于首次使用时作连接尝试来避免大数据量发送时发送受阻。

Portal:负责对外交互和运维操作的Portal部分,包括API和Web两块,API对接集群之外的管理系统,Web是在API基础上对日常运维功能做的页面封装;

Master: 负责集群控制的Control部分,该部分由1个或多个Master节点组成,Master HA通过Master节点间心跳保活、实时热备切换完成(这是大家使用TubeMQ的Lib时需要填写对应集群所有Master节点地址的原因),主Master负责管理整个集群的状态、资源调度、权限检查、元数据查询等;

Broker: 负责实际数据存储的Store部分,该部分由相互之间独立的Broker节点组成,每个Broker节点对本节点内的Topic集合进行管理,包括Topic的增、删、改、查,Topic内的消息存储、消费、老化、分区扩容、数据消费的offset记录等,集群对外能力,包括Topic数目、吞吐量、容量等,通过水平扩展Broker节点来完成;

Client: 负责数据生产和消费的Client部分,该部分我们以Lib形式对外提供,大家用得最多的是消费端,相比之前,消费端现支持Push、Pull两种数据拉取模式,数据消费行为支持顺序和过滤消费两种。对于Pull消费模式,支持业务通过客户端重置精确offset以支持业务exactly-once消费,同时,消费端新推出跨集群切换免重启的BidConsumer客户端;

Zookeeper: 负责offset存储的zk部分,该部分功能已弱化到仅做offset的持久化存储,考虑到接下来的多节点副本功能该模块暂时保留。

详细请对考这里:https://inlong.apache.org/zh-cn/docs/architecture.html

源码实现

直接上docker (强列建议用docker哈,之前的tars自己部署搞了好几个星期....)

代码语言:javascript
复制
docker run -p 8080:8080 -p 8000:8000 -p 8123:8123 --name tubemq -d apachetubemq/tubemq-all:latest

然后运行:localhost:8080 如下

集群部署参考:https://inlong.apache.org/zh-cn/docs/quick_start.html

新增topic

默认这里的授权字段是:abc

纯java实现

引入jar包

代码语言:javascript
复制
<properties>
    <tubemq-client-version>0.8.0-incubating</tubemq-client-version>
</properties>

<dependencies>
    <dependency>
        <groupId>org.apache.tubemq</groupId>
        <artifactId>tubemq-client</artifactId>
        <version>${tubemq-client-version}</version>
    </dependency>

    <dependency>
        <groupId>com.alibaba</groupId>
        <artifactId>fastjson</artifactId>
        <version>1.2.71</version>
    </dependency>
</dependencies>

生产端

生产-异步消息

代码语言:javascript
复制
import com.alibaba.fastjson.JSONObject;
import org.apache.commons.codec.binary.StringUtils;
import org.apache.tubemq.client.config.TubeClientConfig;
import org.apache.tubemq.client.factory.MessageSessionFactory;
import org.apache.tubemq.client.factory.TubeSingleSessionFactory;
import org.apache.tubemq.client.producer.MessageProducer;
import org.apache.tubemq.client.producer.MessageSentCallback;
import org.apache.tubemq.client.producer.MessageSentResult;
import org.apache.tubemq.corebase.Message;
/**
 *
 * 功能描述: 异步消息
 *
 * @param:
 * @return:
 * @auther: csh
 * @date: 2021/5/16 18:04
 */
public final class AsyncProducerExample {

   public static void main(String[] args) throws Throwable {
       final String masterHostAndPort = "localhost:8000";
       final TubeClientConfig clientConfig = new TubeClientConfig(masterHostAndPort);
       final MessageSessionFactory messageSessionFactory = new TubeSingleSessionFactory(clientConfig);
       final MessageProducer messageProducer = messageSessionFactory.createProducer();
       final String topic = "java_tubemq";
       final String body = "发送异步消息!";
       byte[] bodyData = StringUtils.getBytesUtf8(body);
       messageProducer.publish(topic);
       final Message message = new Message(topic, bodyData);
       messageProducer.sendMessage(message, new MessageSentCallback(){
           public void onMessageSent(MessageSentResult result) {
               if (result.isSuccess()) {
                   System.out.println("async send message : " + JSONObject.toJSONString(message));
               } else {
                   System.out.println("async send message failed : " + result.getErrMsg());
               }
           }
           public void onException(Throwable e) {
               System.out.println("async send message error : " + e);
           }
       });
       messageProducer.shutdown();
   }

}

发送结果

代码语言:javascript
复制
async send message : {"data":"5Y+R6YCB5byC5q2l5raI5oGv77yB","flag":0,"indexId":0,"topic":"java_tubemq"}

同步消息-生产

代码语言:javascript
复制
import com.alibaba.fastjson.JSON;
import org.apache.commons.codec.binary.StringUtils;
import org.apache.tubemq.client.config.TubeClientConfig;
import org.apache.tubemq.client.factory.MessageSessionFactory;
import org.apache.tubemq.client.factory.TubeSingleSessionFactory;
import org.apache.tubemq.client.producer.MessageProducer;
import org.apache.tubemq.client.producer.MessageSentResult;
import org.apache.tubemq.corebase.Message;
/**
 *
 * 功能描述:同步消息
 *
 * @param: 
 * @return: 
 * @auther: csh
 * @date: 2021/5/16 17:45
 */
public final class SyncProducerExample {

   public static void main(String[] args) throws Throwable {
       //主节点地址
       final String masterHostAndPort = "localhost:8000";
       final TubeClientConfig clientConfig = new TubeClientConfig(masterHostAndPort);
       final MessageSessionFactory messageSessionFactory = new TubeSingleSessionFactory(clientConfig);
       final MessageProducer messageProducer = messageSessionFactory.createProducer();
       //topic 刚刚创建那个
       final String topic = "java_tubemq";
       final String body = "hello,i'm hong!How are you?";
       byte[] bodyData = StringUtils.getBytesUtf8(body);
       messageProducer.publish(topic);
       Message message = new Message(topic, bodyData);
       MessageSentResult result = messageProducer.sendMessage(message);
       //判断结果 如果成功打印出 sync send message 加上消息内容
       if (result.isSuccess()) {
           System.out.println("同步发送出去的消息: " + JSON.toJSONString(message));
       }
       //关闭服务
       messageProducer.shutdown();
   }
}

发送结果

代码语言:javascript
复制
同步发送出去的消息: {"data":"aGVsbG8saSdtIGhvbmchSG93IGFyZSB5b3U/","flag":0,"indexId":0,"topic":"java_tubemq"}

消费端

代码语言:javascript
复制
import com.alibaba.fastjson.JSONObject;
import org.apache.tubemq.client.config.ConsumerConfig;
import org.apache.tubemq.client.consumer.ConsumePosition;
import org.apache.tubemq.client.consumer.ConsumerResult;
import org.apache.tubemq.client.consumer.PullMessageConsumer;
import org.apache.tubemq.client.factory.MessageSessionFactory;
import org.apache.tubemq.client.factory.TubeSingleSessionFactory;
import org.apache.tubemq.corebase.Message;
import org.apache.tubemq.corebase.utils.ThreadUtils;

import java.util.List;
/**
 *
 * 功能描述: 通过pull拉取消息
 *
 * @param: 
 * @return: 
 * @auther: csh
 * @date: 2021/5/16 17:51
 */
public class PullConsumerExample {

     public static void main(String[] args) throws Throwable {
         //服务地址
         final String masterHostAndPort = "localhost:8000";
         //topic
         final String topic = "java_tubemq";
         //消费组
         final String group = "hong-group1";
         final ConsumerConfig consumerConfig = new ConsumerConfig(masterHostAndPort, group);
         consumerConfig.setConsumePosition(ConsumePosition.CONSUMER_FROM_LATEST_OFFSET);
         final MessageSessionFactory messageSessionFactory = new TubeSingleSessionFactory(consumerConfig);
         final PullMessageConsumer messagePullConsumer = messageSessionFactory.createPullConsumer(consumerConfig);
         messagePullConsumer.subscribe(topic, null);
         messagePullConsumer.completeSubscribe();
         // wait for client to join the exact consumer queue that consumer group allocated
         while (!messagePullConsumer.isPartitionsReady(1000)) {
             ThreadUtils.sleep(1000);
         }
         while (true) {
             ConsumerResult result = messagePullConsumer.getMessage();
             if (result.isSuccess()) {
                 List<Message> messageList = result.getMessageList();
                 for (Message message : messageList) {
                     System.out.println("接收到的消息: " + JSONObject.toJSONString(message)+"内容是:"+new String(message.getData()));
                 }
                 messagePullConsumer.confirmConsume(result.getConfirmContext(), true);
             } else {
                 if (result.getErrCode() == 400) {
                     ThreadUtils.sleep(100);
                 } else {
                     if (result.getErrCode() != 404) {
                         System.out.println(String.format("Receive messages errorCode is %d, Error message is %s", result.getErrCode(), result.getErrMsg()));
                     }
                 }
             }
         }
     } 

 }

结果

代码语言:javascript
复制
接收到的消息: {"data":"5Y+R6YCB5byC5q2l5raI5oGv77yB","flag":0,"indexId":1200513646816395264,"topic":"java_tubemq"}内容是:发送异步消息!
接收到的消息: {"data":"aGVsbG8saSdtIGhvbmchSG93IGFyZSB5b3U/","flag":0,"indexId":2057144306094833664,"topic":"java_tubemq"}内容是:hello,i'm hong!How are you?
代码语言:javascript
复制
import org.apache.tubemq.client.config.ConsumerConfig;
import org.apache.tubemq.client.consumer.ConsumePosition;
import org.apache.tubemq.client.consumer.MessageListener;
import org.apache.tubemq.client.consumer.PushMessageConsumer;
import org.apache.tubemq.client.factory.MessageSessionFactory;
import org.apache.tubemq.client.factory.TubeSingleSessionFactory;
import org.apache.tubemq.corebase.Message;

import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Executor;
import java.util.concurrent.TimeUnit;
/**
 *
 * 功能描述: push消息模式
 *
 * @param:
 * @return:
 * @auther: csh
 * @date: 2021/5/16 17:53
 */
public class PushConsumerExample {
     public static void main(String[] args) throws Throwable {
         final String masterHostAndPort = "localhost:8000";
         final String topic = "java_tubemq";
         final String group = "hong-group";
         final ConsumerConfig consumerConfig = new ConsumerConfig(masterHostAndPort, group);
         consumerConfig.setConsumePosition(ConsumePosition.CONSUMER_FROM_LATEST_OFFSET);
         final MessageSessionFactory messageSessionFactory = new TubeSingleSessionFactory(consumerConfig);
         final PushMessageConsumer pushConsumer = messageSessionFactory.createPushConsumer(consumerConfig);
         pushConsumer.subscribe(topic, null, new MessageListener() {

             public void receiveMessages(List<Message> messages) throws InterruptedException {
                 for (Message message : messages) {
                     System.out.println("PUSH接收到的消息 : " + new String(message.getData()));
                 }
             }

             public Executor getExecutor() {
                 return null;
             }

             public void stop() {
                 //
             }
         });
         pushConsumer.completeSubscribe();
         CountDownLatch latch = new CountDownLatch(1);
         latch.await(10, TimeUnit.MINUTES);
     }
 }

结果

代码语言:javascript
复制
PUSH接收到的消息 : 发送异步消息!
PUSH接收到的消息 : hello,i'm hong!How are you?

关于数据拉取模式支持Push、Pull的区别:

Push客户端:TubeMQ最初消费端版本只提供Push模式的消费,这种模式能比较快速地消费数据,减轻服务端压力,但同时也带来一个问题,业务使用的时候因为无法控制拉取频率,从而容易形成数据积压数据处理不过来;

带消费中止/继续的Push客户端: 在收到业务反馈能否控制Push拉取动作的需求后,我们增加了resumeConsume()/pauseConsume()函数对,让业务可以模拟水位线控制机制,状态比较繁忙时调用pauseConsume()函数来中止Lib后台的数据拉取,在状态恢复后,再调用resumeConsume()通知Lib后台继续拉取数据;

Pull客户端: 我们后来版本里增加了Pull客户端,该客户端有别于Push客户端,是由业务而非Lib主动的拉取消息并对数据处理的结果进行成功与否的确认,将数据处理的主动权留给业务。这样处理后,虽然服务端压力有所提升,但业务消费时积压情况可大大缓解。

客户端与服务器端RPC交互过程:

客户端要维持已发请求消息的本地保存,直到RPC超时,或者收到响应消息,响应消息通过请求发送时生成的SerialNo关联;从服务器端收到的Broker信息,以及Topic信息,SDK要保存在本地,并根据最新的返回信息进行更新,以及定期的上报给服务器端;SDK要维持到Master或者Broker的心跳,如果发现Master反馈注册超时错误时,要进行重注册操作;SDK要基于Broker进行连接建立,同一个进程不同对象之间,要允许业务进行选择,是支持按对象建立连接,还是按照进程建立连接。

spring整合TubeMQ

网上基本没有找着相关spring整合tubemq以及相关的学习资料,除了官网那些很久也没更新的...所以在java基础之上做一个简单的整合...如下

spring生产

代码语言:javascript
复制
│ pom.xml
│ spring_tubemq_producer.iml
│
├─src
│ ├─main
│ │ ├─java
│ │ │ └─com
│ │ │ └─hong
│ │ │ └─spring
│ │ │ ├─config
│ │ │ │ CommonTopic.java
│ │ │ │ TubeMqProducer.java
│ │ │ │
│ │ │ └─controller
│ │ │ │ UserController.java
│ │ │ │
│ │ │ └─ao
│ │ │ UserSaveAO.java
│ │ │
│ │ └─resources
│ │ application.properties
│ │ applicationContext.xml
│ │ log4j2.xml
│ │ logging.properties
│ │ tubemq.properties
│ │ tubemq.xml
│ │
│ └─test
│ └─java
└─web
    └─WEB-INF
            web.xml

com.hong.spring.config.CommonTopic

代码语言:javascript
复制
package com.hong.spring.config;

/**
 * @author: csh
 * @Date: 2021/5/18 10:56
 * @Description:公共的topic
 */
public class CommonTopic  {
    //用户mq
    public static final String TUBETOPIC="tubemq_spring_user";
}

com.hong.spring.config.TubeMqProducer

代码语言:javascript
复制
package com.hong.spring.config;

import com.alibaba.fastjson.JSONObject;
import lombok.extern.log4j.Log4j2;
import org.apache.commons.codec.binary.StringUtils;
import org.apache.tubemq.client.config.TubeClientConfig;
import org.apache.tubemq.client.exception.TubeClientException;
import org.apache.tubemq.client.factory.MessageSessionFactory;
import org.apache.tubemq.client.factory.TubeSingleSessionFactory;
import org.apache.tubemq.client.producer.MessageProducer;
import org.apache.tubemq.client.producer.MessageSentCallback;
import org.apache.tubemq.client.producer.MessageSentResult;
import org.apache.tubemq.corebase.Message;

/**
 * @author: csh
 * @Date: 2021/4/27 18:06
 * @Description:tubemq配置
 */
@Log4j2
public class TubeMqProducer {


    /**服务端 */
    private static MessageProducer messageProducer;
    /**服务地址 */
    private String url;


    public TubeMqProducer(String url) {
        this.url = url;
    }

    public void init () throws TubeClientException {

        TubeClientConfig clientConfig =new TubeClientConfig(url);
        MessageSessionFactory messageSessionFactory = new TubeSingleSessionFactory(clientConfig);
        messageProducer = messageSessionFactory.createProducer();
    }

    /**
     *
     * 功能描述: 发送消息
     *
     * @param:
     * @return:
     * @auther: csh
     * @date: 2021/4/28 17:05
     */
    private static Boolean resultFlag;
    public Boolean send(String msg,String topic)  {
        try {
            byte[] bodyData = StringUtils.getBytesUtf8(msg);
            messageProducer.publish(topic);
            final Message message = new Message(topic, bodyData);
            messageProducer.sendMessage(message, new MessageSentCallback(){
                public void onMessageSent(MessageSentResult result) {
                    if (result.isSuccess()) {
                        resultFlag=true;
                        log.info("同步发送消息成功 : " + JSONObject.toJSONString(message));
                    } else {
                        resultFlag =false;
                        log.info("发送消息出错 : " + result.getErrMsg());
                    }
                }
                public void onException(Throwable e) {
                    log.error("同步消息出错 : " + e);
                }
            });
            return resultFlag;
        }catch (Exception e){
            log.error("发送失败{}",e);
        }
        return false;
    }

    public String getUrl() {
        return url;
    }

    public void setUrl(String url) {
        this.url = url;
    }

    /**
     *
     * 功能描述: 关闭
     *
     * @param:
     * @return:
     * @auther: csh
     * @date: 2021/5/18 14:50
     */
    public void close(){
        try {
            if(null!=messageProducer){
                messageProducer.shutdown();
            }
        }catch (Exception e){
            log.error("关闭失败{}",e);
        } catch (Throwable throwable) {
            log.error("关闭失败{}",throwable);
            throwable.printStackTrace();
        }

    }
}

com.hong.spring.controller.ao.UserSaveAO

代码语言:javascript
复制
package com.hong.spring.controller.ao;

import lombok.Data;

import java.io.Serializable;

/**
 * @author: csh
 * @Date: 2021/3/16 11:21
 * @Description:用户入参
 */
@Data
public class UserSaveAO implements Serializable {
    private Integer id;
    private String username;
    private Integer age;
}

com.hong.spring.controller.UserController

代码语言:javascript
复制
package com.hong.spring.controller;

import com.alibaba.fastjson.JSONObject;
import com.hong.spring.config.CommonTopic;
import com.hong.spring.config.TubeMqProducer;
import com.hong.spring.controller.ao.UserSaveAO;
import com.hong.spring.utils.DataResponse;
import lombok.extern.log4j.Log4j2;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;

/**
 * @Auther: csh
 * @Date: 2020/8/18 16:11
 * @Description:
 */
@RestController
@RequestMapping("/user/")
@Log4j2
public class UserController {

    @Autowired
    private TubeMqProducer push;


    @RequestMapping("save")
    public DataResponse<Boolean> save(UserSaveAO ao){
        log.info("添加用户入参{}",JSONObject.toJSONString(ao));
        if(null==ao){
            return DataResponse.BuildFailResponse("参数不能为空!");
        }
       try {
           Boolean send = push.send(JSONObject.toJSONString(ao),CommonTopic.TUBETOPIC);
           if(null==send || !send){
               return DataResponse.BuildFailResponse("添加用户失败!");
           }
           return DataResponse.BuildFailResponse("添加用户成功!");
       }catch (Exception e){
           log.error("添加出错{}",e);
           return DataResponse.BuildFailResponse("添加出错请重试!");
       }
    }
}

相关配置文件

application.properties

代码语言:javascript
复制
logging.level.root=WARN
logging.level.org.springframework.web=DEBUG
logging.level.org.hibernate=ERROR

applicationContext.xml

代码语言:javascript
复制
<?xml version="1.0" encoding="UTF-8" ?>
<beans xmlns="http://www.springframework.org/schema/beans"
      xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
      xmlns:context="http://www.springframework.org/schema/context" xmlns:tx="http://www.springframework.org/schema/tx"
      xmlns:mvc="http://www.springframework.org/schema/mvc"
      xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-4.0.xsd
    http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context-4.0.xsd http://www.springframework.org/schema/tx http://www.springframework.org/schema/tx/spring-tx.xsd http://www.springframework.org/schema/mvc http://www.springframework.org/schema/mvc/spring-mvc.xsd">


   <!-- 配置组件扫描 -->
   <context:component-scan base-package="com.hong.spring"></context:component-scan>
   <!--加载配置文件-->
   <context:property-placeholder location="classpath:tubemq.properties"/>

   <!-- 开启注解 -->
   <context:annotation-config />

   <mvc:default-servlet-handler />


   <bean class="org.springframework.web.servlet.view.InternalResourceViewResolver"
        id="internalResourceViewResolver">
      <!-- 前缀 -->
      <property name="prefix" value="/WEB-INF/pages/" />
      <!-- 后缀 -->
      <property name="suffix" value=".html" />
      <property name="contentType" value="text/html"/>
   </bean>

   <!--开启mvc注解事务-->
   <!-- 定义注解驱动 -->
   <mvc:annotation-driven>
      <mvc:message-converters>
         <!-- 设置支持中文 -->
         <bean class="org.springframework.http.converter.StringHttpMessageConverter">
            <property name="supportedMediaTypes">
               <list>
                  <value>text/plain;charset=UTF-8</value>
                  <value>text/html;charset=UTF-8</value>
               </list>
            </property>
         </bean>
         <bean class="com.alibaba.fastjson.support.spring.FastJsonHttpMessageConverter"/>
      </mvc:message-converters>
   </mvc:annotation-driven>
</beans>

log4j2.xml

代码语言:javascript
复制
<?xml version="1.0" encoding="UTF-8"?>
<configuration status="INFO">
    <appenders>
        <Console name="Console" target="SYSTEM_OUT">
            <PatternLayout pattern="%d{HH:mm:ss.SSS} [%t] %-5level %logger{36} - %msg%n"/>
        </Console>
        <RollingFile name="RollingFile" fileName="logs/app.log"
                     filePattern="logs/$${date:yyyy-MM}/app-%d{MM-dd-yyyy}-%i.log.gz">
            <PatternLayout pattern="%d{yyyy.MM.dd 'at' HH:mm:ss z} %-5level %class{36} %L %M - %msg%xEx%n"/>
            <SizeBasedTriggeringPolicy size="5 MB"/>
        </RollingFile>
    </appenders>
    <loggers>
        <root level="DEBUG">
            <appender-ref ref="Console"/>
            <appender-ref ref="RollingFile"/>
        </root>
    </loggers>
</configuration>

logging.properties

代码语言:javascript
复制
org.apache.catalina.core.ContainerBase.[Catalina].level=INFO 
org.apache.catalina.core.ContainerBase.[Catalina].handlers=java.util.logging.ConsoleHandler

handlers = org.apache.juli.FileHandler, java.util.logging.ConsoleHandler

############################################################
# Handler specific properties.
# Describes specific configuration info for Handlers.
############################################################

org.apache.juli.FileHandler.level = FINE
org.apache.juli.FileHandler.directory = ../logs
org.apache.juli.FileHandler.prefix = error-debug.

java.util.logging.ConsoleHandler.level = FINE
java.util.logging.ConsoleHandler.formatter = java.util.logging.SimpleFormatter

tubemq.properties

代码语言:javascript
复制
tubemq.url=localhost:8000

tubemq.xml

代码语言:javascript
复制
<?xml version="1.0" encoding="UTF-8" ?>
<beans xmlns="http://www.springframework.org/schema/beans"
      xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
      xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-4.0.xsd">




   <!--zero配置-->
   <bean id="push" class="com.hong.spring.config.TubeMqProducer" init-method="init" destroy-method="close">
      <constructor-arg name="url" value="${tubemq.url}" />
   </bean>
</beans>

WEB-INF/web.xml

代码语言:javascript
复制
<?xml version="1.0" encoding="UTF-8"?>
<web-app xmlns="http://xmlns.jcp.org/xml/ns/javaee"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://xmlns.jcp.org/xml/ns/javaee http://xmlns.jcp.org/xml/ns/javaee/web-app_3_1.xsd"
         version="3.1">
    <servlet>
        <servlet-name>spring_tubemq_producer</servlet-name>
        <servlet-class>org.springframework.web.servlet.DispatcherServlet</servlet-class>
        <init-param>
            <param-name>contextConfigLocation</param-name>
            <param-value>classpath:applicationContext.xml,
                classpath:tubemq.xml
            </param-value>
        </init-param>
        <load-on-startup>1</load-on-startup>
    </servlet>

    <filter>
        <filter-name>encodingFilter</filter-name>
        <filter-class>org.springframework.web.filter.CharacterEncodingFilter</filter-class>
        <init-param>
            <param-name>encoding</param-name>
            <param-value>UTF-8</param-value>
        </init-param>
        <init-param>
            <param-name>forceEncoding</param-name>
            <param-value>true</param-value>
        </init-param>
    </filter>
    <filter-mapping>
        <filter-name>encodingFilter</filter-name>
        <url-pattern>/*</url-pattern>
    </filter-mapping>

    <servlet-mapping>
        <servlet-name>spring_tubemq_producer</servlet-name>
        <url-pattern>/</url-pattern>
    </servlet-mapping>
</web-app>

spring_mq/spring_tubemq_producer/pom.xml

代码语言:javascript
复制
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <parent>
        <artifactId>spring_mq</artifactId>
        <groupId>com.hong</groupId>
        <version>1.0-SNAPSHOT</version>
    </parent>
    <modelVersion>4.0.0</modelVersion>

    <groupId>com.hong.tubemq</groupId>
    <artifactId>spring_tubemq_producer</artifactId>

    <properties>
        <tubemq-client-version>0.8.0-incubating</tubemq-client-version>
    </properties>

    <dependencies>
        <dependency>
            <artifactId>spring_mq_common_api</artifactId>
            <version>1.0-SNAPSHOT</version>
            <groupId>com.hong</groupId>
        </dependency>


        <dependency>
            <groupId>org.apache.tubemq</groupId>
            <artifactId>tubemq-client</artifactId>
            <version>${tubemq-client-version}</version>
        </dependency>

        <dependency>
            <groupId>com.alibaba</groupId>
            <artifactId>fastjson</artifactId>
            <version>1.2.71</version>
        </dependency>

    </dependencies>

    <build>
        <resources>
            <resource>
                <directory>src/main/java</directory>
                <includes>
                    <include>**/*.properties</include>
                    <include>**/*.xml</include>
                </includes>
                <filtering>false</filtering>
            </resource>
            <resource>
                <directory>src/main/resources</directory>
                <includes>
                    <include>**/*.properties</include>
                    <include>**/*.xml</include>
                </includes>
                <filtering>false</filtering>
            </resource>
        </resources>
        <plugins>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-compiler-plugin</artifactId>
            </plugin>
        </plugins>
    </build>
</project>

tomcat启动配置

postman发送配置

代码语言:javascript
复制
username:spring_tubemq
age:1

结果

因为我的topic没有建,所以可以直接确认这个默认的情况下tubemq的topic需要手动来建。

重新添加topic

代码语言:javascript
复制
11:23:22.108 [http-nio-8683-exec-3] INFO com.hong.spring.controller.UserController - 添加用户入参{"age":1,"username":"spring_tubemq"}
11:23:22.108 [http-nio-8683-exec-3] INFO org.apache.tubemq.client.producer.ProducerManager - [Publish begin 1] publish topic tubemq_spring_user, address = org.apache.tubemq.client.producer.ProducerManager@23c7763a
11:23:22.197 [pool-6-thread-2] INFO com.hong.spring.config.TubeMqProducer - 同步发送消息成功 : {"data":"eyJhZ2UiOjEsInVzZXJuYW1lIjoic3ByaW5nX3R1YmVtcSJ9","flag":0,"indexId":0,"topic":"tubemq_spring_user"}
11:24:54.517 [http-nio-8683-exec-6] INFO com.hong.spring.controller.UserController - 添加用户入参{"age":1,"username":"spring_tubemq"}
11:24:54.517 [http-nio-8683-exec-6] INFO org.apache.tubemq.client.producer.ProducerManager - [Publish begin 1] publish topic tubemq_spring_user, address = org.apache.tubemq.client.producer.ProducerManager@23c7763a
11:24:54.519 [pool-6-thread-1] INFO com.hong.spring.config.TubeMqProducer - 同步发送消息成功 : {"data":"eyJhZ2UiOjEsInVzZXJuYW1lIjoic3ByaW5nX3R1YmVtcSJ9","flag":0,"indexId":0,"topic":"tubemq_spring_user"}

消费端

代码语言:javascript
复制
│ pom.xml
│ spring_tubemq_consumer.iml
│
├─src
│ ├─main
│ │ ├─java
│ │ │ └─com
│ │ │ └─hong
│ │ │ └─spring
│ │ │ ├─config
│ │ │ │ TubeMqConsumer.java
│ │ │ │
│ │ │ ├─dao
│ │ │ │ UserMapper.java
│ │ │ │
│ │ │ ├─listener
│ │ │ │ UserListener.java
│ │ │ │
│ │ │ ├─mapper
│ │ │ │ UserMapper.xml
│ │ │ │
│ │ │ └─provider
│ │ │ UserServiceImpl.java
│ │ │
│ │ └─resources
│ │ application.properties
│ │ applicationContext.xml
│ │ jdbc.properties
│ │ log4j2.xml
│ │ logging.properties
│ │ mybatis.xml
│ │ tubemq.properties
│ │ tubemq.xml
│ │
│ └─test
│ └─java
└─web
    └─WEB-INF
            web.xml

源码实现

com.hong.spring.config.TubeMqConsumer

代码语言:javascript
复制
import com.alibaba.fastjson.JSONObject;
import com.hong.spring.listener.UserListener;
import lombok.extern.log4j.Log4j2;
import org.apache.tubemq.client.config.ConsumerConfig;
import org.apache.tubemq.client.consumer.ConsumePosition;
import org.apache.tubemq.client.consumer.ConsumerResult;
import org.apache.tubemq.client.consumer.PullMessageConsumer;
import org.apache.tubemq.client.factory.MessageSessionFactory;
import org.apache.tubemq.client.factory.TubeSingleSessionFactory;
import org.apache.tubemq.corebase.Message;
import org.apache.tubemq.corebase.utils.ThreadUtils;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.scheduling.annotation.Async;
import org.springframework.stereotype.Component;

import java.util.List;

/**
 * @author: csh
 * @Date: 2021/4/27 18:06
 * @Description:zero配置
 */
@Log4j2
public class TubeMqConsumer {

    //服务地址
    private String addrHost;
    //topic
    private String topic;
    //消费组
    private String group;
    //
    private MessageSessionFactory messageSessionFactory;
    //
    PullMessageConsumer messagePullConsumer;

    @Autowired
    private UserListener userListener;


    public TubeMqConsumer(String addrHost, String topic, String group) {
        this.addrHost = addrHost;
        this.topic = topic;
        this.group = group;
    }

    public void init(){
       try {
           final ConsumerConfig consumerConfig = new ConsumerConfig(addrHost, group);
           consumerConfig.setConsumePosition(ConsumePosition.CONSUMER_FROM_LATEST_OFFSET);
           messageSessionFactory = new TubeSingleSessionFactory(consumerConfig);
           messagePullConsumer = messageSessionFactory.createPullConsumer(consumerConfig);
           messagePullConsumer.subscribe(topic, null);
           messagePullConsumer.completeSubscribe();
           recvStr();
       }catch (Exception e){
            log.error("tubemq初始化失败",e);
       }
    }

    @Async
    public void recvStr(){
        try {
            log.info("开始自动拉取消息!");

            while (true){
                ConsumerResult result = messagePullConsumer.getMessage();
                if (result.isSuccess()) {
                    List<Message> messageList = result.getMessageList();
                    for (Message message : messageList) {
                        log.info("接收到的消息: " + JSONObject.toJSONString(message)+"内容是:"+new String(message.getData()));
                        userListener.Listener(new String(message.getData()));
                    }
                    messagePullConsumer.confirmConsume(result.getConfirmContext(), true);
                } else {
                    if (result.getErrCode() == 400) {
                        ThreadUtils.sleep(100);
                    } else {
                        if (result.getErrCode() != 404) {
                            log.error("Receive messages errorCode is %d, Error message is %s", result.getErrCode(), result.getErrMsg());
                        }
                    }
                }
            }
        }catch (Exception e){
            log.error("接收消失败请重试{}",e);
        }

    }
    /**
     *
     * 功能描述: 关闭接口
     *
     * @param:
     * @return:
     * @auther: csh
     * @date: 2021/5/18 14:25
     */
    public void close(){
        try {
            if(messageSessionFactory!=null){
                messageSessionFactory.shutdown();
            }
        }catch (Exception e){
            log.error("关闭tubemq失败{}",e);
        }
    }

}

com.hong.spring.dao.UserMapper

代码语言:javascript
复制
import com.hong.spring.entity.User;
import com.hong.spring.entity.ao.UserAO;
import org.apache.ibatis.annotations.Param;

import java.util.List;

/**
 * @Auther: csh
 * @Date: 2020/8/18 15:04
 * @Description:用户dao层
 */

public interface UserMapper {

    /**
     *
     * 功能描述:查询总条数
     *
     * @param:
     * @return:
     * @auther: csh
     * @date: 2020/8/18 15:31
     */
    List<User> findAllUserList();
    /**
     *
     * 功能描述:获取总数
     *
     * @param:
     * @return:
     * @auther: csh
     * @date: 2020/8/18 15:30
     */
    int findAllTotal();
    /**
     *
     * 功能描述:更新
     *
     * @param:
     * @return:
     * @auther: csh
     * @date: 2020/8/18 15:30
     */
    int update(User user);
    /**
     *
     * 功能描述:添加
     *
     * @param:
     * @return:
     * @auther: csh
     * @date: 2020/8/19 18:39
     */
    int save(User user);
    /**
     *
     * 功能描述:批量添加
     *
     * @param:
     * @return:
     * @auther: csh
     * @date: 2020/8/21 15:46
     */
    int insertBatch(@Param("list") List <User> list);
    /**
     *
     * 功能描述:通过id查询
     *
     * @param:
     * @return:
     * @auther: csh
     * @date: 2020/8/19 18:39
     */
    User findById(int id);
    /**
     *
     * 功能描述:通过分页查询
     *
     * @param:
     * @return:
     * @auther: csh
     * @date: 2020/8/21 16:05
     */
    List<User> findByPage(UserAO ao);
}

com.hong.spring.listener.UserListener

代码语言:javascript
复制
import com.alibaba.fastjson.JSONObject;
import com.hong.spring.api.IUserService;
import com.hong.spring.entity.User;
import com.hong.spring.utils.DataResponse;
import lombok.extern.log4j.Log4j2;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

/**
 * @author: csh
 * @Date: 2021/3/16 11:14
 * @Description:用户监听
 */
@Log4j2
@Component
public class UserListener {

    @Autowired
    private IUserService userService;

    public void Listener(String str){
        log.info("获取的用户信息{}", str);
        User user = JSONObject.parseObject(str, User.class);
        DataResponse <Boolean> save = userService.save(user);
        if(save==null || save.getData()==null || !save.getData()){
            log.info("添加失败,原因{}",JSONObject.toJSONString(save));
        }
    }

}

com/hong/spring/mapper/UserMapper.xml

代码语言:javascript
复制
<?xml version="1.0" encoding="UTF-8" ?>
<!DOCTYPE mapper
        PUBLIC "-//mybatis.org//DTD Mapper 3.0//EN"
        "http://mybatis.org/dtd/mybatis-3-mapper.dtd">
<mapper namespace="com.hong.spring.dao.UserMapper">
    <resultMap type="com.hong.spring.entity.User" id="user">
        <id column="id" property="id" />
        <result column="user_name" property="username" />
        <result column="age" property="age" />
    </resultMap>

    <select id="findById" resultType="com.hong.spring.entity.User">
      SELECT * FROM user WHERE id = #{id,jdbcType=INTEGER}
    </select>

    <select id="findByPage" resultMap="user" parameterType="com.hong.spring.entity.ao.UserAO">
        select * from user where 1=1 limit #{page},#{pageSize}
    </select>

    <select id="findAllUserList" resultMap="user">
      SELECT * FROM user
    </select>

    <select id="findAllTotal" resultType="int">
      SELECT count(*) FROM user
    </select>

    <insert id="save" >
         INSERT INTO user ( user_name, age)
        VALUES (#{username,jdbcType=VARCHAR},
        #{age,jdbcType=INTEGER})
    </insert>

    <insert id="insertBatch">
        insert into user
        ( user_name, age)
        values
        <foreach collection="list" item="user" index="index"
                 separator=",">
            (#{user.username,jdbcType=VARCHAR},#{user.age,jdbcType=INTEGER})
        </foreach>
    </insert>

    <update id="update" >
        update user
        <set>
            <if test="username !=null">
                user_name=#{username,jdbcType=VARCHAR},
            </if>
            <if test="age !=null">
                age =#{age,jdbcType=INTEGER}
            </if>
        </set>
        where id = #{id,jdbcType=INTEGER}
    </update>
</mapper>

com.hong.spring.provider.UserServiceImpl

代码语言:javascript
复制
import com.hong.spring.api.IUserService;
import com.hong.spring.dao.UserMapper;
import com.hong.spring.entity.User;
import com.hong.spring.entity.ao.UserAO;
import com.hong.spring.utils.DataResponse;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;

import java.util.List;


/**
 * @Auther: csh
 * @Date: 2020/8/18 15:16
 * @Description:用户实现
 */
@Service("userService")
public class UserServiceImpl implements IUserService {
    @Autowired
    private UserMapper userDao;

    @Override
    public DataResponse<List<User>> findByAll() {
        List <User> allUserList = userDao.findAllUserList();
        int allTotal = userDao.findAllTotal();
        return DataResponse.BuildSuccessResponse(allUserList,allTotal);
    }
    @Override
    @Transactional
    public DataResponse <Boolean> save(User user) {
        if(null==user){
            return DataResponse.BuildFailResponse("必传参数不能为空!");
        }
        int save = userDao.save(user);
        return DataResponse.BuildSuccessResponse(save>0?true:false);
    }

    @Override
    public DataResponse <Boolean> insertBatch(List <User> list) {
        if(null==list){
            return DataResponse.BuildFailResponse("参数不能为空!");
        }
        int batchSave = userDao.insertBatch(list);
        return DataResponse.BuildSuccessResponse(batchSave>0?true:false);
    }

    @Override
    @Transactional
    public DataResponse <Boolean> update(User user) {
        if(null==user || user.getId()==null){
            return DataResponse.BuildFailResponse("必传参数不能为空!");
        }
        int update = userDao.update(user);
        return DataResponse.BuildSuccessResponse(update>0?true:false);
    }
    @Override
    public DataResponse <User> findById(int i) {
        User byId = userDao.findById(i);
        return DataResponse.BuildSuccessResponse(byId);
    }

    @Override
    public DataResponse <List <User>> findByPage(UserAO ao) {
        if(ao==null){
            ao.setPage(0);
            ao.setPageSize(10);
        }else{
            ao.setPage(ao.getPageSize() * ao.getPage());
        }
        int allTotal = userDao.findAllTotal();
        List <User> byPage = userDao.findByPage(ao);
        return DataResponse.BuildSuccessResponse(byPage,allTotal);
    }
}

相关配置文件

application.properties

代码语言:javascript
复制
logging.level.root=WARN
logging.level.org.springframework.web=DEBUG
logging.level.org.hibernate=ERROR

applicationContext.xml

代码语言:javascript
复制
<?xml version="1.0" encoding="UTF-8" ?>
<beans xmlns="http://www.springframework.org/schema/beans"
      xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
      xmlns:context="http://www.springframework.org/schema/context" xmlns:tx="http://www.springframework.org/schema/tx"
      xmlns:mvc="http://www.springframework.org/schema/mvc"
      xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-4.0.xsd
    http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context-4.0.xsd http://www.springframework.org/schema/tx http://www.springframework.org/schema/tx/spring-tx.xsd http://www.springframework.org/schema/mvc http://www.springframework.org/schema/mvc/spring-mvc.xsd">


   <!-- 配置组件扫描 -->
   <context:component-scan base-package="com.hong.spring"></context:component-scan>
   <!--加载配置文件-->
   <context:property-placeholder location="classpath:jdbc.properties,classpath:tubemq.properties"/>

   <!-- 开启注解 -->
   <context:annotation-config />
   <!--开启注解事务-->
   <tx:annotation-driven transaction-manager="transactionManager" />
   <!--放行静态资源-->
   <mvc:default-servlet-handler />


   <bean class="org.springframework.web.servlet.view.InternalResourceViewResolver"
        id="internalResourceViewResolver">
      <!-- 前缀 -->
      <property name="prefix" value="/WEB-INF/pages/" />
      <!-- 后缀 -->
      <property name="suffix" value=".html" />
      <property name="contentType" value="text/html"/>

   </bean>

   <!--开启mvc注解事务-->
   <!-- 定义注解驱动 -->
   <mvc:annotation-driven>
      <mvc:message-converters>
         <!-- 设置支持中文 -->
         <bean class="org.springframework.http.converter.StringHttpMessageConverter">
            <property name="supportedMediaTypes">
               <list>
                  <value>text/plain;charset=UTF-8</value>
                  <value>text/html;charset=UTF-8</value>
               </list>
            </property>
         </bean>
         <bean class="com.alibaba.fastjson.support.spring.FastJsonHttpMessageConverter"/>
      </mvc:message-converters>
   </mvc:annotation-driven>


   <bean id="dataSource" class="com.alibaba.druid.pool.DruidDataSource">
      <!-- 基础配置 -->
      <property name="url" value="${jdbc.url}"></property>
      <property name="driverClassName" value="${jdbc.driver}"></property>
      <property name="username" value="${jdbc.user}"></property>
      <property name="password" value="${jdbc.password}"></property>

      <!-- 关键配置 -->
      <!-- 初始化时建立物理连接的个数。初始化发生在显示调用init方法,或者第一次getConnection时 -->
      <property name="initialSize" value="3" />
      <!-- 最小连接池数量 -->
      <property name="minIdle" value="2" />
      <!-- 最大连接池数量 -->
      <property name="maxActive" value="15" />
      <!-- 配置获取连接等待超时的时间 -->
      <property name="maxWait" value="10000" />

      <!-- 性能配置 -->
      <!-- 打开PSCache,并且指定每个连接上PSCache的大小 -->
      <property name="poolPreparedStatements" value="true" />
      <property name="maxPoolPreparedStatementPerConnectionSize" value="20" />

      <!-- 其他配置 -->
      <!-- 配置间隔多久才进行一次检测,检测需要关闭的空闲连接,单位是毫秒 -->
      <property name="timeBetweenEvictionRunsMillis" value="60000" />
      <!-- 配置一个连接在池中最小生存的时间,单位是毫秒 -->
      <property name="minEvictableIdleTimeMillis" value="300000" />
      <!-- 建议配置为true,不影响性能,并且保证安全性。申请连接的时候检测,如果空闲时间大于timeBetweenEvictionRunsMillis,
                  执行validationQuery检测连接是否有效。-->
      <property name="testWhileIdle" value="true" />
      <!-- 这里建议配置为TRUE,防止取到的连接不可用 ,申请连接时执行validationQuery检测连接是否有效,做了这个配置会降低性能。-->
      <property name="testOnBorrow" value="true" />
      <!-- 归还连接时执行validationQuery检测连接是否有效,做了这个配置会降低性能 -->
      <property name="testOnReturn" value="false" />
   </bean>

   <!--事务管理器-->
   <!-- sqlSessionFactory -->
   <bean id="sqlSessionFactory" class="org.mybatis.spring.SqlSessionFactoryBean">
      <!-- 加载 MyBatis 的配置文件 -->
      <property name="configLocation" value="classpath:mybatis.xml"/>
      <!-- 数据源 -->
      <property name="dataSource" ref="dataSource"/>
      <!-- 所有配置的mapper文件 -->
      <property name="mapperLocations" value="classpath*:com/hong/spring/mapper/*.xml" />
   </bean>

   <!-- Mapper 扫描器 -->
   <bean class="org.mybatis.spring.mapper.MapperScannerConfigurer">
      <!-- 扫描 包下的组件 -->
      <property name="basePackage" value="com.hong.spring.dao" />
      <!-- 关联mapper扫描器 与 sqlsession管理器 -->
      <property name="sqlSessionFactoryBeanName" value="sqlSessionFactory" />
   </bean>
   <!--事务配置-->
   <bean id="transactionManager" class="org.springframework.jdbc.datasource.DataSourceTransactionManager">
      <property name="dataSource" ref="dataSource" />
   </bean>
</beans>

jdbc.properties

代码语言:javascript
复制
config.properties:
#数据库驱动
jdbc.driver=com.mysql.jdbc.Driver
#数据库连接url
jdbc.url=jdbc:mysql://localhost:3306/spring?useUnicode=true&characterEncoding=utf-8
#数据库用户名
jdbc.user=root
#数据库密码
jdbc.password=123456

log4j2.xml

代码语言:javascript
复制
<?xml version="1.0" encoding="UTF-8"?>
<configuration status="INFO">
    <appenders>
        <Console name="Console" target="SYSTEM_OUT">
            <PatternLayout pattern="%d{HH:mm:ss.SSS} [%t] %-5level %logger{36} - %msg%n"/>
        </Console>
        <RollingFile name="RollingFile" fileName="logs/app.log"
                     filePattern="logs/$${date:yyyy-MM}/app-%d{MM-dd-yyyy}-%i.log.gz">
            <PatternLayout pattern="%d{yyyy.MM.dd 'at' HH:mm:ss z} %-5level %class{36} %L %M - %msg%xEx%n"/>
            <SizeBasedTriggeringPolicy size="5 MB"/>
        </RollingFile>
    </appenders>
    <loggers>
        <root level="DEBUG">
            <appender-ref ref="Console"/>
            <appender-ref ref="RollingFile"/>
        </root>
    </loggers>
</configuration>

logging.properties

代码语言:javascript
复制
org.apache.catalina.core.ContainerBase.[Catalina].level=INFO 
org.apache.catalina.core.ContainerBase.[Catalina].handlers=java.util.logging.ConsoleHandler
org.apache.jasper.servlet.TldScanner.level = FINE

handlers = org.apache.juli.FileHandler, java.util.logging.ConsoleHandler

############################################################
# Handler specific properties.
# Describes specific configuration info for Handlers.
############################################################

org.apache.juli.FileHandler.level = FINE
org.apache.juli.FileHandler.directory = ../logs
org.apache.juli.FileHandler.prefix = error-debug.

java.util.logging.ConsoleHandler.level = FINE
java.util.logging.ConsoleHandler.formatter = java.util.logging.SimpleFormatter

mybatis.xml

代码语言:javascript
复制
<?xml version="1.0" encoding="UTF-8" ?>
<!DOCTYPE configuration
        PUBLIC "-//mybatis.org//DTD Config 3.0//EN"
        "http://mybatis.org/dtd/mybatis-3-config.dtd">
<configuration>

    <!-- settings -->
    <settings>
        <!-- 打开延迟加载的开关 -->
        <setting name="lazyLoadingEnabled" value="true"/>
        <!-- 将积极加载改为消极加载(即按需加载) -->
        <setting name="aggressiveLazyLoading" value="false"/>
        <!-- 打开全局缓存开关(二级缓存)默认值就是 true -->
        <setting name="cacheEnabled" value="true"/>
        <!-- 开启驼峰命名转换 Table(create_time) -> Entity(createtime) -->
        <setting name="mapUnderscoreToCamelCase" value="true"/>
        <!-- 使用列别名代替列名 默认:true seslect name as title from table -->
        <setting name="useColumnLabel" value="true"/>
        <!--使用jdbc的getGeneratedKeys获取数据库自增主键值-->
        <setting name="useGeneratedKeys" value="true"/>
    </settings>

    <!-- 别名定义 -->
    <typeAliases>
        <package name="com.hong.spring.entity"/>
    </typeAliases>

</configuration>

tubemq.properties

代码语言:javascript
复制
tubemq.url=localhost:8000
tubemq.topic_user=tubemq_spring_user
tubemq.group=tubemq_hong_consumer

tubemq.xml

代码语言:javascript
复制
<?xml version="1.0" encoding="UTF-8" ?>
<beans xmlns="http://www.springframework.org/schema/beans"
      xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
      xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-4.0.xsd">


   <!--tubemq配置-->
   <bean id="pull" class="com.hong.spring.config.TubeMqConsumer" init-method="init" destroy-method="close">
      <constructor-arg name="addrHost" value="${tubemq.url}" />
      <constructor-arg name="topic" value="${tubemq.topic_user}" />
      <constructor-arg name="group" value="${tubemq.group}" />
   </bean>
</beans>

WEB-INF/web.xml

代码语言:javascript
复制
<?xml version="1.0" encoding="UTF-8"?>
<web-app xmlns="http://xmlns.jcp.org/xml/ns/javaee"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://xmlns.jcp.org/xml/ns/javaee http://xmlns.jcp.org/xml/ns/javaee/web-app_3_1.xsd"
         version="3.1">
    <servlet>
        <servlet-name>spring_tubemq_consumer</servlet-name>
        <servlet-class>org.springframework.web.servlet.DispatcherServlet</servlet-class>
        <init-param>
            <param-name>contextConfigLocation</param-name>
            <param-value>classpath:applicationContext.xml,
                classpath:tubemq.xml
            </param-value>
        </init-param>
        <load-on-startup>1</load-on-startup>
    </servlet>

    <filter>
        <filter-name>encodingFilter</filter-name>
        <filter-class>org.springframework.web.filter.CharacterEncodingFilter</filter-class>
        <init-param>
            <param-name>encoding</param-name>
            <param-value>UTF-8</param-value>
        </init-param>
        <init-param>
            <param-name>forceEncoding</param-name>
            <param-value>true</param-value>
        </init-param>
    </filter>
    <filter-mapping>
        <filter-name>encodingFilter</filter-name>
        <url-pattern>/*</url-pattern>
    </filter-mapping>

    <servlet-mapping>
        <servlet-name>spring_tubemq_consumer</servlet-name>
        <url-pattern>/</url-pattern>
    </servlet-mapping>
</web-app>

spring_mq/spring_tubemq_consumer/pom.xml

代码语言:javascript
复制
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <parent>
        <artifactId>spring_mq</artifactId>
        <groupId>com.hong</groupId>
        <version>1.0-SNAPSHOT</version>
    </parent>
    <modelVersion>4.0.0</modelVersion>

    <groupId>com.hong.tubemq</groupId>
    <artifactId>spring_tubemq_consumer</artifactId>

    <properties>
        <tubemq-client-version>0.8.0-incubating</tubemq-client-version>
    </properties>

    <dependencies>
        <dependency>
            <artifactId>spring_mq_common_api</artifactId>
            <version>1.0-SNAPSHOT</version>
            <groupId>com.hong</groupId>
        </dependency>


        <dependency>
            <groupId>org.apache.tubemq</groupId>
            <artifactId>tubemq-client</artifactId>
            <version>${tubemq-client-version}</version>
        </dependency>

        <dependency>
            <groupId>com.alibaba</groupId>
            <artifactId>fastjson</artifactId>
            <version>1.2.71</version>
        </dependency>
    </dependencies>
    <build>
        <resources>
            <resource>
                <directory>src/main/java</directory>
                <includes>
                    <include>**/*.properties</include>
                    <include>**/*.xml</include>
                </includes>
                <filtering>false</filtering>
            </resource>
            <resource>
                <directory>src/main/resources</directory>
                <includes>
                    <include>**/*.properties</include>
                    <include>**/*.xml</include>
                </includes>
                <filtering>false</filtering>
            </resource>
        </resources>
        <plugins>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-compiler-plugin</artifactId>
                <configuration>
                    <source>6</source>
                    <target>6</target>
                </configuration>
            </plugin>
        </plugins>
    </build>
</project>

结果

数据库结果

简单做了下整合,用是可以用,可以在这个基础上再继续完善相关的功能。

springboot 整合TubeMq

因为官网也没有相关的支持,做了也没有啥太大的意义,等后续有支持再统一整合....(感觉没跟上时代...)

最后

相对来说这个TubeMq没有之前tars那么恶心,相对简洁功能过得去,也文档比较简单,但是想了解下为啥,鹅产开源的项目总感觉要么很难整、要么感觉好久没维护,感觉遗弃的娃一样....,看着19年的相关宣传又很霸气,怎么总感觉有点虎头蛇尾,大家觉得呢?

https://inlong.apache.org/en-us/

参考文章

https://github.com/Tencent/TubeMQ/blob/master/docs/tubemq_user_guide.md#top

https://note.youdao.com/ynoteshare1/index.html?id=ebb570b361bdce4f74c3742f2de3bfdc&type=note

http://tech.it168.com/a2019/0917/6055/000006055724.shtml

https://www.cnblogs.com/zqyx/p/13673199.html

https://inlong.apache.org/zh-cn/docs/clients_java.html

本文参与 腾讯云自媒体分享计划,分享自微信公众号。
原始发表:2021-05-19,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 技术趋势 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体分享计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
相关产品与服务
容器服务
腾讯云容器服务(Tencent Kubernetes Engine, TKE)基于原生 kubernetes 提供以容器为核心的、高度可扩展的高性能容器管理服务,覆盖 Serverless、边缘计算、分布式云等多种业务部署场景,业内首创单个集群兼容多种计算节点的容器资源管理模式。同时产品作为云原生 Finops 领先布道者,主导开源项目Crane,全面助力客户实现资源优化、成本控制。
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档