专栏首页算法之名RocketMQ事务消息代码样例 顶

RocketMQ事务消息代码样例 顶

本篇主要是分布式工程中,有些跨数据库操作的使用样例,一般可用在分布式事务上。

MQ的作用,当然有扛洪峰,消息堆集,异步处理的作用。

第一步:添加POM的依赖,版本当然由你自己选择

<dependency>
   <groupId>com.alibaba.rocketmq</groupId>
   <artifactId>rocketmq-client</artifactId>
   <version>3.2.6</version>
</dependency>

第二步,消息生产者。

package com.xxx.consumer.mq;

import com.alibaba.rocketmq.client.QueryResult;
import com.alibaba.rocketmq.client.exception.MQClientException;
import com.alibaba.rocketmq.client.producer.*;
import com.alibaba.rocketmq.common.message.Message;
import com.alibaba.rocketmq.common.message.MessageExt;
import org.springframework.stereotype.Component;

import java.util.Map;

@Component
public class MQProducer {
   
   private final String GROUP_NAME = "xxxx";
   private final String NAMESRV_ADDR = "xxx.xxx.xxx.xxx:9876";
   private TransactionMQProducer producer;
   
   public MQProducer() {
      
      this.producer = new TransactionMQProducer(GROUP_NAME);
      this.producer.setNamesrvAddr(NAMESRV_ADDR);    //nameserver服务
      this.producer.setCheckThreadPoolMinSize(5);    // 事务回查最小并发数
      this.producer.setCheckThreadPoolMaxSize(20);   // 事务回查最大并发数
      this.producer.setCheckRequestHoldMax(2000);    // 队列数
      //服务器回调Producer,检查本地事务分支成功还是失败
      this.producer.setTransactionCheckListener(new TransactionCheckListener() {
         public LocalTransactionState checkLocalTransactionState(MessageExt msg) {
            System.out.println("state -- "+ new String(msg.getBody()));
            return LocalTransactionState.COMMIT_MESSAGE;
         }
      });
      try {
         this.producer.start();
      } catch (MQClientException e) {
         e.printStackTrace();
      }  
   }
   
   public QueryResult queryMessage(String topic, String key, int maxNum, long begin, long end) throws Exception {
      return this.producer.queryMessage(topic, key, maxNum, begin, end);
   }
   
   public LocalTransactionState check(MessageExt me){
      LocalTransactionState ls = this.producer.getTransactionCheckListener().checkLocalTransactionState(me);
      return ls;
   }
   
   public void sendTransactionMessage(Message message, LocalTransactionExecuter localTransactionExecuter, Map<String, Object> transactionMapArgs) throws Exception {
      TransactionSendResult tsr = this.producer.sendMessageInTransaction(message, localTransactionExecuter, transactionMapArgs);
      System.out.println("send返回内容:" + tsr.toString());
   }
   
   public void shutdown(){
      Runtime.getRuntime().addShutdownHook(new Thread(new Runnable() {
         public void run() {
            producer.shutdown();
         }
      }));
      System.exit(0);
   }


}

组名和地址当然根据你的实际情况写。

第三步,写一个你要执行的方法,比如你的本项目的一次数据库执行,或者其他业务代码。我这里要执行的是保存个人信息。

personInfoService.savePersonalInfo(userid, workClass,workCity);

全代码如下。

package com.xxx.consumer.mq;

import com.alibaba.dubbo.config.annotation.Reference;
import com.alibaba.fastjson.JSONObject;
import com.alibaba.rocketmq.client.producer.LocalTransactionExecuter;
import com.alibaba.rocketmq.client.producer.LocalTransactionState;
import com.alibaba.rocketmq.common.message.Message;
import com.wmq.stub.PersonInfoService;
import org.springframework.stereotype.Component;

import java.util.Map;


/**
 * 执行本地事务,由客户端回调
 */

//@Scope("prototype")
@Component("transactionExecuterImpl")
public class TransactionExecuterImpl implements LocalTransactionExecuter {

   @Reference(version = "1.0.0")
   private PersonInfoService personInfoService;
   
   public LocalTransactionState executeLocalTransactionBranch(Message msg, Object arg) {
      try {
         //Message Body
         JSONObject messageBody = FastJsonConvert.convertJSONToObject(new String(msg.getBody(), "utf-8"), JSONObject.class);
         //Transaction MapArgs
         Map<String, Object> mapArgs = (Map<String, Object>) arg;
         
         // --------------------IN PUT---------------------- //
         System.out.println("message body = " + messageBody);
         System.out.println("message mapArgs = " + mapArgs);
         System.out.println("message tag = " + msg.getTags());
         // --------------------IN PUT---------------------- //
         long userid = messageBody.getLong("userid");
         String face = messageBody.getString("face"); //头像
         String trueName = messageBody.getString("truename"); //姓名
         int gender = messageBody.getInteger("gender"); //性别
         int workClass = messageBody.getInteger("workClass"); //期望职位ID
         int workCity = messageBody.getInteger("workCity");
         personInfoService.savePersonalInfo(userid, workClass,workCity);
         //成功通知MQ消息变更 该消息变为:<确认发送>
         
         return LocalTransactionState.COMMIT_MESSAGE;
         
         //return LocalTransactionState.UNKNOW;
         
      } catch (Exception e) {
         e.printStackTrace();
         //失败则不通知MQ 该消息一直处于:<暂缓发送>
         return LocalTransactionState.ROLLBACK_MESSAGE;
         
      }
      
   }
}

注:因为我这里用的是springboot的dubbo框架,所以

@Reference(version = "1.0.0")

private PersonInfoService personInfoService;是@Reference而不是@AutoWired

可改成你们自己的注入。事务消息的本质在这段代码中可以很清楚,发送一条消息出去,然后判断该事务是否执行成功,若成功,通知消息可以发送给消费者,否则该消息暂缓发送。

第四步,在交互代码里面调用

比如某一个controller或者其他地方(。。。。。。。为你们自己的获取数据来源的代码)

try {
    long userid = 。。。。。。。
    String face = 。。。。。。。
    String trueName = 。。。。。。。。
    int gender = 。。。。。。。。
    int workClass = 。。。。。。。。
    int workCity = 。。。。。。。。。
    //构造消息数据
    Message message = new Message();
    //主题
    message.setTopic("user");
    //子标签
    message.setTags("tag");
    //key
    String uuid = UUID.randomUUID().toString();
    message.setKeys(uuid);
    JSONObject body = new JSONObject();
    body.put("userid", userid);
    body.put("face", face);
    body.put("truename", trueName);
    body.put("gender", gender);
    body.put("workClass", workClass);
    body.put("workCity", workCity);
    message.setBody(FastJsonConvert.convertObjectToJSON(body).getBytes());

    //添加参数
    Map<String, Object> transactionMapArgs = new HashMap<String, Object>();
    this.mQProducer.sendTransactionMessage(message, this.transactionExecuterImpl, transactionMapArgs);

} catch (Exception e) {
    e.printStackTrace();
}

这里主要是在消息体中获取参数以及发送消息,这里的消息要等待事务执行成功才能被消费者获得。比较重要的地方就是消息的主题 message.setTopic("user");注意,生产者和消费者的主题必须相同,否则消费者是拿不到消息的,至于主题是什么可以自己定义。

第五步,消费者

消费者一般是写在分布式的另外一个工程里面的,而且是不同的数据库,这一点比较重要,因为同一工程用MQ的意义不大(泄洪除外)。

package com.xxx.xxx.mq;

import com.alibaba.fastjson.JSONObject;
import com.alibaba.rocketmq.client.QueryResult;
import com.alibaba.rocketmq.client.consumer.DefaultMQPushConsumer;
import com.alibaba.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import com.alibaba.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import com.alibaba.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import com.alibaba.rocketmq.common.consumer.ConsumeFromWhere;
import com.alibaba.rocketmq.common.message.MessageExt;
import com.xxx.model.User;
import com.xxx.stub.authentication.UpdateUserService;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

import java.util.List;

@Component
public class MQConsumer {
   
   private final String GROUP_NAME = "xxxxxxxx";
   private final String NAMESRV_ADDR = "xxx.xxx.xxx.xxx:9876";
   private DefaultMQPushConsumer consumer;
   
   @Autowired
   private UpdateUserService updateUserService;
   
   
   public MQConsumer() {
      try {
         this.consumer = new DefaultMQPushConsumer(GROUP_NAME);
         this.consumer.setNamesrvAddr(NAMESRV_ADDR);
         this.consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET);
         this.consumer.subscribe("user", "*");
         this.consumer.registerMessageListener(new Listener());
         this.consumer.start();
         System.out.println("consumer start");
      } catch (Exception e) {
         e.printStackTrace();
      }
   }

   public QueryResult queryMessage(String topic, String key, int maxNum, long begin, long end) throws Exception {
      long current = System.currentTimeMillis();
      return this.consumer.queryMessage(topic, key, maxNum, begin, end);
   }
   
   class Listener implements MessageListenerConcurrently {
      public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
         MessageExt msg = msgs.get(0);
         try {
            String topic = msg.getTopic();
            //Message Body
            JSONObject messageBody = FastJsonConvert.convertJSONToObject(new String(msg.getBody(), "utf-8"), JSONObject.class);
            String tags = msg.getTags();
            String keys = msg.getKeys();
            
            System.out.println("服务收到消息, keys : " + keys + ", body : " + new String(msg.getBody(), "utf-8"));
            long userid = messageBody.getLong("userid");
            String face = messageBody.getString("face"); //头像
            String trueName = messageBody.getString("truename"); //姓名
            int gender = messageBody.getInteger("gender"); //性别
            int workClass = messageBody.getInteger("workClass"); //期望职位ID
            int workCity = messageBody.getInteger("workCity");
            User user = new User();
            user.setUserId(userid);
            user.setFace(face);
            user.setTruename(trueName);
            user.setGender(gender);
            user.setStep(4);
            updateUserService.updateUserInfo(user);
            
            
            
         } catch (Exception e) {    
            e.printStackTrace();
            //重试次数为3情况 
            if(msg.getReconsumeTimes() == 3){
               return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
               //记录日志
            }
            return ConsumeConcurrentlyStatus.RECONSUME_LATER;
         }        
         return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
      }
   }

}

Listener是一个侦听类,专门负责侦听broker的消息是否到达,并且取回this.consumer.subscribe("user", "*");这个主题的消息。取得消息后执行业务代码updateUserService.updateUserInfo(user);

一般来说,请严格测试该业务代码,因为rocketmq如果这段业务代码失败,消息是会重新发送给消费者,重新执行这段代码直到成功的,以此到达事务的最终一致性,所以你的这段代码决不能本身就是有Bug的,当然我们一般会处理3次,并不让他不停的处理,如果还是失败则记录日志,我们需要查看日志来解决。

本文参与腾讯云自媒体分享计划,欢迎正在阅读的你也加入,一起分享。

我来说两句

0 条评论
登录 后参与评论

相关文章

  • 用Build来构建对象的写法 顶

    如果一个类的属性过多,用构造器来构建对象很难写,因此我们时用Build方式来构建对象。写法大致如下。

    算法之名
  • 在OAuth 2中模仿DefaultTokenServices写一个新的tokenServices来提供个性化服务

    我们把这些代码考出来,起一个新的名字,比如叫SingleTokenServices

    算法之名
  • 使用JDK的观察者接口进行消息推送 顶

    观察者模式就是对对象内部的变化进行观察,当发生改变时做出相应的响应。代码样例见 设计模式整理 !

    算法之名
  • 设计模式之中介者模式

    Define an object that encapsulates how a set of objects interact. Mediator promo...

    beginor
  • Mybatis源码阅读之一 原

        Spring版本4.3.11,Mybatis-spring版本是1.3.2,Mybatis版本是3.4.6。

    克虏伯
  • React Native组件篇(四) — Touchable系列组件

    从字面上的意思我们就可以理解,Touchable是可触摸的控件,相当于我们iOS的简单手势。复杂的RN处理手势还有专门的API如果你想实现视图的拖拽,或是实现自...

    ZY_FlyWay
  • [Cocos Creator] 制作简版消消乐(二):实现基础组件和管理脚本

    在上一篇文章中我们初步建立了项目并搭建好了场景,那么本篇文章将和大家一起实现部分基础组件和管理脚本。

    陈皮皮
  • SpringMVC源码学习(一) - DispatcherSerlet和相关组件

    在前几期中我们说SpringBoot中提供了很多onRefresh方法方法,其中有两个是包含创建Servlet容器的。至于剩余的实现我们当时说可以做一些不需要不...

    程序员_备忘录
  • 深入理解this,bind、call

    mcq
  • 前端测试题:(解析)关于JS中this关键字的说法,下面错误的是?

    JavaScript 有一套完全不同于其它语言的对 this 的处理机制。在五种不同的情况下 ,this 指向的各不相同。

    舒克

扫码关注云+社区

领取腾讯云代金券