承接上面的Nacos,Sentinel的学习, 现在开始学习Seata, Dubbo和RocketMQ
Seata 是一款开源的分布式事务解决方案,致力于在微服务架构下提供高性能和简单易用的分布式事务服务。 Seata 官网: http://seata.io/zh-cn/index.html
整体机制 (两阶段提交协议的演变)
官方文档: http://seata.io/zh-cn/docs/overview/what-is-seata.html
下载地址:http://seata.io/zh-cn/blog/download.html
启动 Seata Server 进入 bin 目录中, 在 window 下启动 seata-server.bat, 在 linux 下启动 seata-server.bat
项目架构
当前为止代码地址:https://gitee.com/TimePause/spring-cloud-alibaba-examples.git
四个微服务项目负责的业务
项目搭建流程
<dependencies>
<!--服务注册-->
<dependency>
<groupId>com.alibaba.cloud</groupId>
<artifactId>spring-cloud-alibaba-nacos-discovery</artifactId>
</dependency>
<!--Seata-->
<dependency>
<groupId>com.alibaba.cloud</groupId>
<artifactId>spring-cloud-starter-alibaba-seata</artifactId>
</dependency>
<!--web 项目的基础依赖-->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
</dependencies>
b.连接成功后便可在idea上面查看数据库的相关信息
c.安装 MyBatis插件(自动生成实体类以及mapper接口和xml映射文件)
d. 右击选择的表进行生成代码操作
e. 填写代码生成所在位置的信息
f.编写库存操作的接口和实现类
public interface StorageService {
/**
* 完成对商品库存扣减操作
* @param productNo
* @param count
*/
void deduct(String productNo, int count) ;
}
@Service
public class StorageServiceImpl implements StorageService {
@Autowired
private StorageTblDao storageTblDao;
private static Logger logger = LoggerFactory.getLogger(StorageServiceImpl.class);
@Transactional
@Override
public void deduct(String productNo, int count) {
logger.info("开始扣减商品{}的库存, 数量为{}", productNo, count);
//1.查询库存
//StorageTbl storageTbl = storageTblDao.selectByPrimaryKey(Integer.parseInt(productNo));
StorageTblExample storageTblExample = new StorageTblExample();
storageTblExample.createCriteria().andCommodityCodeEqualTo(productNo);
List<StorageTbl> storageTbls = storageTblDao.selectByExample(storageTblExample);
StorageTbl storageTbl = storageTbls.get(0);
if (storageTbl==null){
throw new IllegalArgumentException("商品不存在");
}
//2.扣减操作(扣减后的金额)
int idleCount =storageTbl.getCount()-count;
if (idleCount<0){
throw new RuntimeException("存库不足!");
}
//3.设置商品库存
storageTbl.setCount(idleCount);
//4.保存到数据库中
storageTblDao.updateByPrimaryKeySelective(storageTbl);
logger.info("扣减库存商品{}成功, 剩余的库存为{}",productNo, idleCount);
}
}
g.启动类
@SpringBootApplication
@EnableDiscoveryClient
@MapperScan("ah.szxy.mapper")
@RestController
public class StorageServiceApplication {
public static void main(String[] args) {
SpringApplication.run(StorageServiceApplication.class,args);
}
@Autowired
private StorageService storageService;
//使用外部接口暴露
@GetMapping("/deduct/{productNo}/{count}")
public ResponseEntity<Void> deduct(@PathVariable("productNo") String productNo, @PathVariable("count") Integer count){
storageService.deduct(productNo, count);
return ResponseEntity.ok().build();
}
}
h.pom文件
server:
port: 8093
spring:
application:
name: storage-service
cloud:
nacos:
discovery:
server-addr: 47.97.169.52:8848
alibaba:
seata:
tx-service-group: ${spring.application.name}
datasource:
driver-class-name: com.mysql.cj.jdbc.Driver
name: storageDataSource
type: com.alibaba.druid.pool.DruidDataSource
url: jdbc:mysql://119.45.189.14:3306/seata?useSSL=false&serverTimezone=UTC
username: root
password: root123
druid:
max-active: 20 # 最大连接数
min-idle: 2 # 最小活跃数
initial-size: 2 # 初始连接数
seata:
service:
vgroup-mapping:
storage-service: default
grouplist:
default: 127.0.0.1:8091 # seata默认端口
disable-global-transaction: false # 开启全局事务
enabled: true # 开启 seata
mybatis-plus:
mapper-locations: classpath:/mapper/*.xml
i.启动并访问测试即可
当前为止代码地址:https://gitee.com/TimePause/spring-cloud-alibaba-examples.git
订单表
库存表
http://localhost:8096/purchase/SXT_USER_2/HUAWEI_0001/1 //经测试,发生;额回滚
http://localhost:8096/purchase/SXT_USER_1/HUAWEI_0001/1 //未发生回滚
Dubbo Spring Cloud 基于 Dubbo Spring Boot 2.7.1 和 Spring Cloud 2.x 开发,无论开发人 员是 Dubbo 用户还是 Spring Cloud 用户,都能轻松地驾驭,并以接近“零”成本的代价使应用向上迁移。 Dubbo Spring Cloud 致力于简化 Cloud Native 开发成本,提高研发效能以及提升应用性能等目的。 Dubbo Spring Cloud 首个 Preview Release,随同 Spring Cloud Alibaba 0.2.2.RELEASE 和 0.9.0.RELEASE 一同发布,分别对应 Spring Cloud Finchley 与 Greenwich(下文分别简称为 “F” 版 和 “G” 版)
由于 Dubbo Spring Cloud 构建在原生的 Spring Cloud 之上,其服务治理方面的能力可 认为是 Spring Cloud Plus,不仅完全覆盖 Spring Cloud 原生特性,而且提供更为稳定和成熟 的实现
我们将搭建如图所示的项目框架
搭建的代码已分享至码云: https://gitee.com/TimePause/spring-cloud-alibaba-examples.git
修改启动类的名称, 添加指定端口的参数
启动这三个提供者项目, 并重启消费者项目
Nacos上面可以看到该项目的详细信息
RocketMQ 是一款开源的分布式消息系统,基于高可用分布式集群技术,提供低延时的、 高可靠的消息发布与订阅服务。 同时,广泛应用于多个领域,包括异步通信解耦、企业解决 方案、金融支付、电信、电子商务、快递物流、广告营销、社交、即时通信、移动应用、手游、视频、物联网、车联网等。
具有以下特点:
//这里我们选择 4.4.0 版本的原因在于,我们 spring cloud alibaba 版本为:2.2.0.RELEASE,它里面控制的 rocketMQ 的版是 4.4.0。
下载地址: http://rocketmq.apache.org/release_notes/release-notes-4.4.0/
变量名: ROCKETMQ_HOME
变量值: D:\SoftWare\rocket-mq\rocketmq-all-4.4.0-bin-release //(RocketMq软件所在目录)
http://localhost:8080/
,页面右上角可以进行中英文切换Spring Cloud Stream 是一个用于构建基于消息的微服务应用框架。它基于 SpringBoot 来创建具有生产级别的单机 Spring 应用,并且使用 Spring Integration 与 Broker 进行连接。 Spring Cloud Stream 提供了消息中间件配置的统一抽象,推出了 publish-subscribe(发布订阅)、 consumer groups(消费者组)、partition(分区) 这些统一的概念。
Spring Cloud Stream 内部有两个概念:Binder 和 Binding:
按照下面的结构图, 创建pom类型的父项目, 创建两个子模块项目, 添加对应的Maven依赖, 修改配置文件, 编写测试代码, 最后启动项目 项目地址: https://gitee.com/TimePause/spring-cloud-alibaba-examples.git
配置文件
logging.level.com.alibaba.cloud.stream.binder.rocketmq=DEBUG
#rocketmq 服务器 namerserver的地址
spring.cloud.stream.rocketmq.binder.name-server=127.0.0.1:9876
#stream->bindings->output(input)
#output1
#发送消息的目的地址
spring.cloud.stream.bindings.output1.destination=test-topic
#消息的默认类型
spring.cloud.stream.bindings.output1.content-type=application/json
#生产者组
spring.cloud.stream.rocketmq.bindings.output1.producer.group=binder-group
#消息的同步发送
spring.cloud.stream.rocketmq.bindings.output1.producer.sync=true
#output2 主要演示事务消息的发送
spring.cloud.stream.bindings.output2.destination=TransactionTopic
spring.cloud.stream.bindings.output2.content-type=application/json
#发送的是事务消息
spring.cloud.stream.rocketmq.bindings.output2.producer.transactional=true
spring.cloud.stream.rocketmq.bindings.output2.producer.group=myTxProducerGroup
#output3 用它演示消息的手动拉取
spring.cloud.stream.bindings.output3.destination=pull-topic
spring.cloud.stream.bindings.output3.content-type=text/plain
spring.cloud.stream.rocketmq.bindings.output3.producer.group=pull-binder-group
spring.application.name=rocketmq-produce-example
server.port=28081
management.endpoints.web.exposure.include=*
management.endpoint.health.show-details=always
测试带tag的消息
测试发送对象消息
测试发送事务消息(half)=>需要创建事务消息监听后才能发送成功(稍后演示)
测试发送消息到pull 的目的地址,为了演示我们消息的手动拉取
package ah.szxy.listener;
import org.apache.rocketmq.spring.annotation.RocketMQTransactionListener;
import org.apache.rocketmq.spring.core.RocketMQLocalTransactionListener;
import org.apache.rocketmq.spring.core.RocketMQLocalTransactionState;
import org.springframework.messaging.Message;
@RocketMQTransactionListener(
txProducerGroup = "myTxProducerGroup" ,
corePoolSize = 2 ,
maximumPoolSize = 5
)
public class RockerMQLocalTransactionListenerImpl implements RocketMQLocalTransactionListener {
/**
* 当我们发送半(half)消息成功后,mq 服务要求我们执行本地的事务,并且返回本地事务的执行结果
* RocketMQLocalTransactionState:
* COMMIT 提交->其他的消费者将收到该消息
* ROLLBACK 回滚->mq ->半消息删除
* UNKNOWN -> mq 会再次检查本地的事务->checkLocalTransaction
*
* @param message
* @param o
* @return
*/
@Override
public RocketMQLocalTransactionState executeLocalTransaction(Message message, Object o) {
String type = message.getHeaders().get("type").toString();
switch (type){
case "1" :
System.out.println("本地事务执行状态未知");
return RocketMQLocalTransactionState.UNKNOWN ;
case "2":
System.out.println("本地事务执行状态成功");
return RocketMQLocalTransactionState.COMMIT ;
case "3":
System.out.println("本地事务执行状态失败");
return RocketMQLocalTransactionState.ROLLBACK ;
}
return null ;
}
/**
* 当mq 收到我们的本地的事务为UNKNOWN ,它会再次来检查我们的本地事务状态,要求返回一个本地事务的状态
* @param message
* @return
*/
@Override
public RocketMQLocalTransactionState checkLocalTransaction(Message message) {
System.out.println("来检查了,本次我提交本地事务");
return RocketMQLocalTransactionState.COMMIT;
}
}
测试回滚的情况
测试再次检查本地的事务的情况
配置文件application.properties
#rocketmq nameserver的地址
spring.cloud.stream.rocketmq.binder.name-server=127.0.0.1:9876
#stream->bindings->input
#input1
spring.cloud.stream.bindings.input1.destination=test-topic
spring.cloud.stream.bindings.input1.content-type=text/plain
spring.cloud.stream.bindings.input1.group=test-group1
spring.cloud.stream.rocketmq.bindings.input1.consumer.orderly=true
#input2
spring.cloud.stream.bindings.input2.destination=test-topic
spring.cloud.stream.bindings.input2.content-type=text/plain
spring.cloud.stream.bindings.input2.group=test-group2
spring.cloud.stream.rocketmq.bindings.input2.consumer.orderly=false
spring.cloud.stream.rocketmq.bindings.input2.consumer.tags=tagStr
spring.cloud.stream.bindings.input2.consumer.concurrency=20
spring.cloud.stream.bindings.input2.consumer.maxAttempts=1
#input3
spring.cloud.stream.bindings.input3.destination=test-topic
spring.cloud.stream.bindings.input3.content-type=application/json
spring.cloud.stream.bindings.input3.group=test-group3
spring.cloud.stream.rocketmq.bindings.input3.consumer.tags=tagObj
spring.cloud.stream.bindings.input3.consumer.concurrency=20
#input4
spring.cloud.stream.bindings.input4.destination=TransactionTopic
spring.cloud.stream.bindings.input4.content-type=text/plain
spring.cloud.stream.bindings.input4.group=transaction-group
spring.cloud.stream.bindings.input4.consumer.concurrency=5
#input5 手动消息的拉取
spring.cloud.stream.bindings.input5.destination=pull-topic
spring.cloud.stream.bindings.input5.content-type=text/plain
spring.cloud.stream.bindings.input5.group=pull-topic-group
spring.application.name=rocketmq-consume-example
server.port=28082
management.endpoints.web.exposure.include=*
management.endpoint.health.show-details=always
测试Consumer项目
2. 测试input2(接收tag类型)
3.测试input3(接收对象类型)
4.测试input4(接收事务类型, 成功/失败/确认成功还是失败)
相关软件分享如下
链接:https://pan.baidu.com/s/1o0WausCDJ6PA4OIpaz8qYA 提取码:d7wr