本篇主要是分布式工程中,有些跨数据库操作的使用样例,一般可用在分布式事务上。
MQ的作用,当然有扛洪峰,消息堆集,异步处理的作用。
第一步:添加POM的依赖,版本当然由你自己选择
<dependency>
<groupId>com.alibaba.rocketmq</groupId>
<artifactId>rocketmq-client</artifactId>
<version>3.2.6</version>
</dependency>
第二步,消息生产者。
package com.xxx.consumer.mq;
import com.alibaba.rocketmq.client.QueryResult;
import com.alibaba.rocketmq.client.exception.MQClientException;
import com.alibaba.rocketmq.client.producer.*;
import com.alibaba.rocketmq.common.message.Message;
import com.alibaba.rocketmq.common.message.MessageExt;
import org.springframework.stereotype.Component;
import java.util.Map;
@Component
public class MQProducer {
private final String GROUP_NAME = "xxxx";
private final String NAMESRV_ADDR = "xxx.xxx.xxx.xxx:9876";
private TransactionMQProducer producer;
public MQProducer() {
this.producer = new TransactionMQProducer(GROUP_NAME);
this.producer.setNamesrvAddr(NAMESRV_ADDR); //nameserver服务
this.producer.setCheckThreadPoolMinSize(5); // 事务回查最小并发数
this.producer.setCheckThreadPoolMaxSize(20); // 事务回查最大并发数
this.producer.setCheckRequestHoldMax(2000); // 队列数
//服务器回调Producer,检查本地事务分支成功还是失败
this.producer.setTransactionCheckListener(new TransactionCheckListener() {
public LocalTransactionState checkLocalTransactionState(MessageExt msg) {
System.out.println("state -- "+ new String(msg.getBody()));
return LocalTransactionState.COMMIT_MESSAGE;
}
});
try {
this.producer.start();
} catch (MQClientException e) {
e.printStackTrace();
}
}
public QueryResult queryMessage(String topic, String key, int maxNum, long begin, long end) throws Exception {
return this.producer.queryMessage(topic, key, maxNum, begin, end);
}
public LocalTransactionState check(MessageExt me){
LocalTransactionState ls = this.producer.getTransactionCheckListener().checkLocalTransactionState(me);
return ls;
}
public void sendTransactionMessage(Message message, LocalTransactionExecuter localTransactionExecuter, Map<String, Object> transactionMapArgs) throws Exception {
TransactionSendResult tsr = this.producer.sendMessageInTransaction(message, localTransactionExecuter, transactionMapArgs);
System.out.println("send返回内容:" + tsr.toString());
}
public void shutdown(){
Runtime.getRuntime().addShutdownHook(new Thread(new Runnable() {
public void run() {
producer.shutdown();
}
}));
System.exit(0);
}
}
组名和地址当然根据你的实际情况写。
第三步,写一个你要执行的方法,比如你的本项目的一次数据库执行,或者其他业务代码。我这里要执行的是保存个人信息。
personInfoService.savePersonalInfo(userid, workClass,workCity);
全代码如下。
package com.xxx.consumer.mq;
import com.alibaba.dubbo.config.annotation.Reference;
import com.alibaba.fastjson.JSONObject;
import com.alibaba.rocketmq.client.producer.LocalTransactionExecuter;
import com.alibaba.rocketmq.client.producer.LocalTransactionState;
import com.alibaba.rocketmq.common.message.Message;
import com.wmq.stub.PersonInfoService;
import org.springframework.stereotype.Component;
import java.util.Map;
/**
* 执行本地事务,由客户端回调
*/
//@Scope("prototype")
@Component("transactionExecuterImpl")
public class TransactionExecuterImpl implements LocalTransactionExecuter {
@Reference(version = "1.0.0")
private PersonInfoService personInfoService;
public LocalTransactionState executeLocalTransactionBranch(Message msg, Object arg) {
try {
//Message Body
JSONObject messageBody = FastJsonConvert.convertJSONToObject(new String(msg.getBody(), "utf-8"), JSONObject.class);
//Transaction MapArgs
Map<String, Object> mapArgs = (Map<String, Object>) arg;
// --------------------IN PUT---------------------- //
System.out.println("message body = " + messageBody);
System.out.println("message mapArgs = " + mapArgs);
System.out.println("message tag = " + msg.getTags());
// --------------------IN PUT---------------------- //
long userid = messageBody.getLong("userid");
String face = messageBody.getString("face"); //头像
String trueName = messageBody.getString("truename"); //姓名
int gender = messageBody.getInteger("gender"); //性别
int workClass = messageBody.getInteger("workClass"); //期望职位ID
int workCity = messageBody.getInteger("workCity");
personInfoService.savePersonalInfo(userid, workClass,workCity);
//成功通知MQ消息变更 该消息变为:<确认发送>
return LocalTransactionState.COMMIT_MESSAGE;
//return LocalTransactionState.UNKNOW;
} catch (Exception e) {
e.printStackTrace();
//失败则不通知MQ 该消息一直处于:<暂缓发送>
return LocalTransactionState.ROLLBACK_MESSAGE;
}
}
}
注:因为我这里用的是springboot的dubbo框架,所以
@Reference(version = "1.0.0")
private PersonInfoService personInfoService;是@Reference而不是@AutoWired
可改成你们自己的注入。事务消息的本质在这段代码中可以很清楚,发送一条消息出去,然后判断该事务是否执行成功,若成功,通知消息可以发送给消费者,否则该消息暂缓发送。
第四步,在交互代码里面调用
比如某一个controller或者其他地方(。。。。。。。为你们自己的获取数据来源的代码)
try {
long userid = 。。。。。。。
String face = 。。。。。。。
String trueName = 。。。。。。。。
int gender = 。。。。。。。。
int workClass = 。。。。。。。。
int workCity = 。。。。。。。。。
//构造消息数据
Message message = new Message();
//主题
message.setTopic("user");
//子标签
message.setTags("tag");
//key
String uuid = UUID.randomUUID().toString();
message.setKeys(uuid);
JSONObject body = new JSONObject();
body.put("userid", userid);
body.put("face", face);
body.put("truename", trueName);
body.put("gender", gender);
body.put("workClass", workClass);
body.put("workCity", workCity);
message.setBody(FastJsonConvert.convertObjectToJSON(body).getBytes());
//添加参数
Map<String, Object> transactionMapArgs = new HashMap<String, Object>();
this.mQProducer.sendTransactionMessage(message, this.transactionExecuterImpl, transactionMapArgs);
} catch (Exception e) {
e.printStackTrace();
}
这里主要是在消息体中获取参数以及发送消息,这里的消息要等待事务执行成功才能被消费者获得。比较重要的地方就是消息的主题 message.setTopic("user");注意,生产者和消费者的主题必须相同,否则消费者是拿不到消息的,至于主题是什么可以自己定义。
第五步,消费者
消费者一般是写在分布式的另外一个工程里面的,而且是不同的数据库,这一点比较重要,因为同一工程用MQ的意义不大(泄洪除外)。
package com.xxx.xxx.mq;
import com.alibaba.fastjson.JSONObject;
import com.alibaba.rocketmq.client.QueryResult;
import com.alibaba.rocketmq.client.consumer.DefaultMQPushConsumer;
import com.alibaba.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import com.alibaba.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import com.alibaba.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import com.alibaba.rocketmq.common.consumer.ConsumeFromWhere;
import com.alibaba.rocketmq.common.message.MessageExt;
import com.xxx.model.User;
import com.xxx.stub.authentication.UpdateUserService;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import java.util.List;
@Component
public class MQConsumer {
private final String GROUP_NAME = "xxxxxxxx";
private final String NAMESRV_ADDR = "xxx.xxx.xxx.xxx:9876";
private DefaultMQPushConsumer consumer;
@Autowired
private UpdateUserService updateUserService;
public MQConsumer() {
try {
this.consumer = new DefaultMQPushConsumer(GROUP_NAME);
this.consumer.setNamesrvAddr(NAMESRV_ADDR);
this.consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET);
this.consumer.subscribe("user", "*");
this.consumer.registerMessageListener(new Listener());
this.consumer.start();
System.out.println("consumer start");
} catch (Exception e) {
e.printStackTrace();
}
}
public QueryResult queryMessage(String topic, String key, int maxNum, long begin, long end) throws Exception {
long current = System.currentTimeMillis();
return this.consumer.queryMessage(topic, key, maxNum, begin, end);
}
class Listener implements MessageListenerConcurrently {
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
MessageExt msg = msgs.get(0);
try {
String topic = msg.getTopic();
//Message Body
JSONObject messageBody = FastJsonConvert.convertJSONToObject(new String(msg.getBody(), "utf-8"), JSONObject.class);
String tags = msg.getTags();
String keys = msg.getKeys();
System.out.println("服务收到消息, keys : " + keys + ", body : " + new String(msg.getBody(), "utf-8"));
long userid = messageBody.getLong("userid");
String face = messageBody.getString("face"); //头像
String trueName = messageBody.getString("truename"); //姓名
int gender = messageBody.getInteger("gender"); //性别
int workClass = messageBody.getInteger("workClass"); //期望职位ID
int workCity = messageBody.getInteger("workCity");
User user = new User();
user.setUserId(userid);
user.setFace(face);
user.setTruename(trueName);
user.setGender(gender);
user.setStep(4);
updateUserService.updateUserInfo(user);
} catch (Exception e) {
e.printStackTrace();
//重试次数为3情况
if(msg.getReconsumeTimes() == 3){
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
//记录日志
}
return ConsumeConcurrentlyStatus.RECONSUME_LATER;
}
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
}
}
Listener是一个侦听类,专门负责侦听broker的消息是否到达,并且取回this.consumer.subscribe("user", "*");这个主题的消息。取得消息后执行业务代码updateUserService.updateUserInfo(user);
一般来说,请严格测试该业务代码,因为rocketmq如果这段业务代码失败,消息是会重新发送给消费者,重新执行这段代码直到成功的,以此到达事务的最终一致性,所以你的这段代码决不能本身就是有Bug的,当然我们一般会处理3次,并不让他不停的处理,如果还是失败则记录日志,我们需要查看日志来解决。