前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >Spring Cloud Alibaba技术栈(下)

Spring Cloud Alibaba技术栈(下)

作者头像
时间静止不是简史
发布2020-10-10 11:37:28
9450
发布2020-10-10 11:37:28
举报
文章被收录于专栏:Java探索之路

Spring Cloud Alibaba

承接上面的Nacos,Sentinel的学习, 现在开始学习Seata, Dubbo和RocketMQ

Seata 分布式事务框架

Seata 是一款开源的分布式事务解决方案,致力于在微服务架构下提供高性能和简单易用的分布式事务服务。 Seata 官网: http://seata.io/zh-cn/index.html

Seata 分布式事务原理

整体机制 (两阶段提交协议的演变)

  • 一阶段: 业务数据和回滚日志记录在同一个本地事务中提交,释放本地锁和连接资源。
  • 二阶段: 提交异步化,非常快速地完成。 回滚通过一阶段的回滚日志进行反向补偿。

官方文档: http://seata.io/zh-cn/docs/overview/what-is-seata.html

Seata Server 安装

代码语言:javascript
复制
下载地址:http://seata.io/zh-cn/blog/download.html

启动 Seata Server 进入 bin 目录中, 在 window 下启动 seata-server.bat, 在 linux 下启动 seata-server.bat

Seata 案例代码

测试框架搭建

项目架构

当前为止代码地址:https://gitee.com/TimePause/spring-cloud-alibaba-examples.git

四个微服务项目负责的业务

  • business-service 下单服务,
  • 调用 storage-service 删减库存,
  • 调用 order-service 创建订单,
  • 调用 account-service 扣除账户余额

项目搭建流程

  1. 如上图所示, 在根项目spring-cloud-alibaba-examples下创建父项目seata-examples, 修pom文件添加依赖
代码语言:javascript
复制
<dependencies>
        <!--服务注册-->
        <dependency>
            <groupId>com.alibaba.cloud</groupId>
            <artifactId>spring-cloud-alibaba-nacos-discovery</artifactId>
        </dependency>
        <!--Seata-->
        <dependency>
            <groupId>com.alibaba.cloud</groupId>
            <artifactId>spring-cloud-starter-alibaba-seata</artifactId>
        </dependency>
        <!--web 项目的基础依赖-->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>
    </dependencies>
  1. 创建子模块项目account-service,修改pom文件并添加依赖
  2. 创建子模块项目order-service,修改pom文件并添加依赖
  3. 创建子模块项目business-service,修改pom文件并添加依赖
  4. 创建子模块项目storage-service(对库存的扣减), 修改pom文件并添加依赖 以该微服务创建流程为例, 演示项目搭建步骤 <dependencies> <!--服务注册--> <dependency> <groupId>com.alibaba.cloud</groupId> <artifactId>spring-cloud-alibaba-nacos-discovery</artifactId> </dependency> <!--Seata--> <dependency> <groupId>com.alibaba.cloud</groupId> <artifactId>spring-cloud-starter-alibaba-seata</artifactId> </dependency> <!--web 项目的基础依赖--> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-web</artifactId> </dependency> </dependencies>
  5. 导入数据库脚本到mysql
  1. 代码生成 a.在mysql中右侧的database中添加数据库连接, 输入数据库相关属性, 然后根据提示下载驱动

b.连接成功后便可在idea上面查看数据库的相关信息

c.安装 MyBatis插件(自动生成实体类以及mapper接口和xml映射文件)

d. 右击选择的表进行生成代码操作

e. 填写代码生成所在位置的信息

f.编写库存操作的接口和实现类

代码语言:javascript
复制
public interface StorageService {

    /**
     * 完成对商品库存扣减操作
     * @param productNo
     * @param count
     */
    void deduct(String productNo, int count) ;
}


@Service
public class StorageServiceImpl implements StorageService {

    @Autowired
    private StorageTblDao storageTblDao;

    private static Logger logger = LoggerFactory.getLogger(StorageServiceImpl.class);

    @Transactional
    @Override
    public void deduct(String productNo, int count) {
        logger.info("开始扣减商品{}的库存, 数量为{}", productNo, count);
        //1.查询库存
        //StorageTbl storageTbl = storageTblDao.selectByPrimaryKey(Integer.parseInt(productNo));
        StorageTblExample storageTblExample = new StorageTblExample();
        storageTblExample.createCriteria().andCommodityCodeEqualTo(productNo);
        List<StorageTbl> storageTbls = storageTblDao.selectByExample(storageTblExample);
        StorageTbl storageTbl = storageTbls.get(0);

        if (storageTbl==null){
            throw new IllegalArgumentException("商品不存在");
        }
        //2.扣减操作(扣减后的金额)
        int idleCount =storageTbl.getCount()-count;
        if (idleCount<0){
            throw new RuntimeException("存库不足!");
        }
        //3.设置商品库存
        storageTbl.setCount(idleCount);
        //4.保存到数据库中
        storageTblDao.updateByPrimaryKeySelective(storageTbl);

        logger.info("扣减库存商品{}成功, 剩余的库存为{}",productNo, idleCount);
    }
}

g.启动类

代码语言:javascript
复制
@SpringBootApplication
@EnableDiscoveryClient
@MapperScan("ah.szxy.mapper")
@RestController
public class StorageServiceApplication {
    public static void main(String[] args) {
        SpringApplication.run(StorageServiceApplication.class,args);
    }

    @Autowired
    private StorageService storageService;

    //使用外部接口暴露
    @GetMapping("/deduct/{productNo}/{count}")
    public ResponseEntity<Void> deduct(@PathVariable("productNo") String productNo, @PathVariable("count") Integer count){
        storageService.deduct(productNo, count);
        return ResponseEntity.ok().build();
    }
}

h.pom文件

代码语言:javascript
复制
server:
  port: 8093

spring:
  application:
    name: storage-service
  cloud:
    nacos:
      discovery:
        server-addr: 47.97.169.52:8848
    alibaba:
      seata:
        tx-service-group: ${spring.application.name}
  datasource:
    driver-class-name: com.mysql.cj.jdbc.Driver
    name: storageDataSource
    type: com.alibaba.druid.pool.DruidDataSource
    url: jdbc:mysql://119.45.189.14:3306/seata?useSSL=false&serverTimezone=UTC
    username: root
    password: root123
    druid:
      max-active: 20  # 最大连接数
      min-idle: 2     # 最小活跃数
      initial-size: 2 # 初始连接数
seata:
  service:
    vgroup-mapping:
      storage-service: default
    grouplist:
      default: 127.0.0.1:8091  # seata默认端口
    disable-global-transaction: false # 开启全局事务
  enabled: true  # 开启 seata

mybatis-plus:
  mapper-locations: classpath:/mapper/*.xml

i.启动并访问测试即可

当前为止代码地址:https://gitee.com/TimePause/spring-cloud-alibaba-examples.git

测试分布式事务

  1. 如图所示, 启动所有项目, 待所有项目启动完毕后测试分布式事务
  1. 初始化三个表的数据 账户表

订单表

库存表

  1. 访问下单接口 注意在这里如果访问 SXT_USER_2用户就会报错, 因此我们可以分别测试下面两个url, 看看出现错误时是否回滚
代码语言:javascript
复制
http://localhost:8096/purchase/SXT_USER_2/HUAWEI_0001/1  //经测试,发生;额回滚
http://localhost:8096/purchase/SXT_USER_1/HUAWEI_0001/1  //未发生回滚

Dubbo Spring Cloud

是什么

Dubbo Spring Cloud 基于 Dubbo Spring Boot 2.7.1 和 Spring Cloud 2.x 开发,无论开发人 员是 Dubbo 用户还是 Spring Cloud 用户,都能轻松地驾驭,并以接近“零”成本的代价使应用向上迁移。 Dubbo Spring Cloud 致力于简化 Cloud Native 开发成本,提高研发效能以及提升应用性能等目的。 Dubbo Spring Cloud 首个 Preview Release,随同 Spring Cloud Alibaba 0.2.2.RELEASE 和 0.9.0.RELEASE 一同发布,分别对应 Spring Cloud Finchley 与 Greenwich(下文分别简称为 “F” 版 和 “G” 版)

功能完成度

由于 Dubbo Spring Cloud 构建在原生的 Spring Cloud 之上,其服务治理方面的能力可 认为是 Spring Cloud Plus,不仅完全覆盖 Spring Cloud 原生特性,而且提供更为稳定和成熟 的实现

框架的搭建

我们将搭建如图所示的项目框架

搭建的代码已分享至码云: https://gitee.com/TimePause/spring-cloud-alibaba-examples.git

测试

  1. 测试消费者项目能不能消费到提供者的项目, 访问 http://localhost:8080/rpc/csdn-timepause
  2. 启动多个服务提供者项目, 方式如下

修改启动类的名称, 添加指定端口的参数

启动这三个提供者项目, 并重启消费者项目

Nacos上面可以看到该项目的详细信息

  1. 重复访问步骤1的url, 可以看到消费者在随机的访问提供者 由此可以知道Dubbo SpringCloud对服务进行了负载均衡(自动), 且无需任何配置

RocketMQ

RocketMQ 是一款开源的分布式消息系统,基于高可用分布式集群技术,提供低延时的、 高可靠的消息发布与订阅服务。 同时,广泛应用于多个领域,包括异步通信解耦、企业解决 方案、金融支付、电信、电子商务、快递物流、广告营销、社交、即时通信、移动应用、手游、视频、物联网、车联网等。

具有以下特点:

  • 能够保证严格的消息顺序
  • 提供丰富的消息拉取模式
  • 高效的订阅者水平扩展能力
  • 实时的消息订阅机制
  • 亿级消息堆积能力

下载 RocketMQ

代码语言:javascript
复制
//这里我们选择 4.4.0 版本的原因在于,我们 spring cloud alibaba 版本为:2.2.0.RELEASE,它里面控制的 rocketMQ 的版是 4.4.0。
下载地址:  http://rocketmq.apache.org/release_notes/release-notes-4.4.0/
  1. 配置环境变量
代码语言:javascript
复制
变量名: ROCKETMQ_HOME
变量值: D:\SoftWare\rocket-mq\rocketmq-all-4.4.0-bin-release //(RocketMq软件所在目录) 
  1. 启动mqnamesrv.cmd 方法: win+R 输入 cmd, 将mqnamesrv.cmd文件拖到cmd命令行中回车即可(退出只需将该命令行关闭即可)
  2. 启动mqbroker.cmd 方法: win+R 输入 cmd, 将mqbroker.cmd文件拖到cmd命令行中, 然后输入 -n 主机名:端口号,然后回车即可(退出同上)
  1. 启动图形化界面 方法: win+R 输入 cmd, 首先输入java -jar , 然后将rocketmq-console-ng-1.0.0.jar文件拖到cmd命令行中, 回车即可(RocketMQ和图形化软件会分享在底部)
  1. 访问图形化界面 输入 http://localhost:8080/ ,页面右上角可以进行中英文切换

SpringCloud Stream

介绍

Spring Cloud Stream 是一个用于构建基于消息的微服务应用框架。它基于 SpringBoot 来创建具有生产级别的单机 Spring 应用,并且使用 Spring Integration 与 Broker 进行连接。 Spring Cloud Stream 提供了消息中间件配置的统一抽象,推出了 publish-subscribe(发布订阅)、 consumer groups(消费者组)、partition(分区) 这些统一的概念。

Spring Cloud Stream 内部有两个概念:Binder 和 Binding:

  • Binder: 跟外部消息中间件集成的组件,用来创建 Binding,各消息中间件都有自己的 Binder 实现。 举例说明: Kafka的实现KafkaMessageChannelBinder, RabbitMQ 的实现RabbitMessageChannelBinder 以及 RocketMQ 的实现RocketMQMessageChannelBinder
  • Binding: 包括 Input Binding 和 Output Binding。 Binding 在消息中间件与应用程序提供的 Provider 和 Consumer 之间提供了一个桥梁,实现了开发者只需使用应用程序的 Provider 或 Consumer 生产或消费数据即可,屏蔽了开发者与底层消息中间件的接触。

测试Demo环境搭建

按照下面的结构图, 创建pom类型的父项目, 创建两个子模块项目, 添加对应的Maven依赖, 修改配置文件, 编写测试代码, 最后启动项目 项目地址: https://gitee.com/TimePause/spring-cloud-alibaba-examples.git

配置文件

代码语言:javascript
复制
logging.level.com.alibaba.cloud.stream.binder.rocketmq=DEBUG

#rocketmq 服务器 namerserver的地址
spring.cloud.stream.rocketmq.binder.name-server=127.0.0.1:9876

#stream->bindings->output(input)
#output1
#发送消息的目的地址
spring.cloud.stream.bindings.output1.destination=test-topic
#消息的默认类型
spring.cloud.stream.bindings.output1.content-type=application/json
#生产者组
spring.cloud.stream.rocketmq.bindings.output1.producer.group=binder-group
#消息的同步发送
spring.cloud.stream.rocketmq.bindings.output1.producer.sync=true

#output2 主要演示事务消息的发送
spring.cloud.stream.bindings.output2.destination=TransactionTopic
spring.cloud.stream.bindings.output2.content-type=application/json
#发送的是事务消息
spring.cloud.stream.rocketmq.bindings.output2.producer.transactional=true
spring.cloud.stream.rocketmq.bindings.output2.producer.group=myTxProducerGroup

#output3 用它演示消息的手动拉取
spring.cloud.stream.bindings.output3.destination=pull-topic
spring.cloud.stream.bindings.output3.content-type=text/plain
spring.cloud.stream.rocketmq.bindings.output3.producer.group=pull-binder-group

spring.application.name=rocketmq-produce-example

server.port=28081

management.endpoints.web.exposure.include=*
management.endpoint.health.show-details=always
  1. 消息提供者项目启动完毕后, 可以测试 测试普通字符串消息

测试带tag的消息

测试发送对象消息

测试发送事务消息(half)=>需要创建事务消息监听后才能发送成功(稍后演示)

测试发送消息到pull 的目的地址,为了演示我们消息的手动拉取

添加事务监听

代码语言:javascript
复制
package ah.szxy.listener;

import org.apache.rocketmq.spring.annotation.RocketMQTransactionListener;
import org.apache.rocketmq.spring.core.RocketMQLocalTransactionListener;
import org.apache.rocketmq.spring.core.RocketMQLocalTransactionState;
import org.springframework.messaging.Message;

@RocketMQTransactionListener(
        txProducerGroup = "myTxProducerGroup" ,
        corePoolSize = 2 ,
        maximumPoolSize = 5
)
public class RockerMQLocalTransactionListenerImpl implements RocketMQLocalTransactionListener {

    /**
     * 当我们发送半(half)消息成功后,mq 服务要求我们执行本地的事务,并且返回本地事务的执行结果
     * RocketMQLocalTransactionState:
     *  COMMIT 提交->其他的消费者将收到该消息
     *  ROLLBACK 回滚->mq ->半消息删除
     *  UNKNOWN -> mq 会再次检查本地的事务->checkLocalTransaction
     *
     * @param message
     * @param o
     * @return
     */
    @Override
    public RocketMQLocalTransactionState executeLocalTransaction(Message message, Object o) {
        String type = message.getHeaders().get("type").toString();
        switch (type){
            case "1" :
                System.out.println("本地事务执行状态未知");
                return RocketMQLocalTransactionState.UNKNOWN ;
            case "2":
                System.out.println("本地事务执行状态成功");
                return RocketMQLocalTransactionState.COMMIT ;
            case "3":
                System.out.println("本地事务执行状态失败");
                return RocketMQLocalTransactionState.ROLLBACK ;
        }
        return null ;
    }

    /**
     * 当mq 收到我们的本地的事务为UNKNOWN ,它会再次来检查我们的本地事务状态,要求返回一个本地事务的状态
     * @param message
     * @return
     */
    @Override
    public RocketMQLocalTransactionState checkLocalTransaction(Message message) {
        System.out.println("来检查了,本次我提交本地事务");
        return RocketMQLocalTransactionState.COMMIT;
    }
}
  1. 运行测试 测试提交成功的情况

测试回滚的情况

测试再次检查本地的事务的情况

Consumer项目的完善

点击查看项目地址

配置文件application.properties

代码语言:javascript
复制
#rocketmq nameserver的地址
spring.cloud.stream.rocketmq.binder.name-server=127.0.0.1:9876

#stream->bindings->input

#input1
spring.cloud.stream.bindings.input1.destination=test-topic
spring.cloud.stream.bindings.input1.content-type=text/plain
spring.cloud.stream.bindings.input1.group=test-group1
spring.cloud.stream.rocketmq.bindings.input1.consumer.orderly=true


#input2
spring.cloud.stream.bindings.input2.destination=test-topic
spring.cloud.stream.bindings.input2.content-type=text/plain
spring.cloud.stream.bindings.input2.group=test-group2
spring.cloud.stream.rocketmq.bindings.input2.consumer.orderly=false
spring.cloud.stream.rocketmq.bindings.input2.consumer.tags=tagStr
spring.cloud.stream.bindings.input2.consumer.concurrency=20
spring.cloud.stream.bindings.input2.consumer.maxAttempts=1

#input3
spring.cloud.stream.bindings.input3.destination=test-topic
spring.cloud.stream.bindings.input3.content-type=application/json
spring.cloud.stream.bindings.input3.group=test-group3
spring.cloud.stream.rocketmq.bindings.input3.consumer.tags=tagObj
spring.cloud.stream.bindings.input3.consumer.concurrency=20

#input4
spring.cloud.stream.bindings.input4.destination=TransactionTopic
spring.cloud.stream.bindings.input4.content-type=text/plain
spring.cloud.stream.bindings.input4.group=transaction-group
spring.cloud.stream.bindings.input4.consumer.concurrency=5


#input5 手动消息的拉取
spring.cloud.stream.bindings.input5.destination=pull-topic
spring.cloud.stream.bindings.input5.content-type=text/plain
spring.cloud.stream.bindings.input5.group=pull-topic-group

spring.application.name=rocketmq-consume-example

server.port=28082

management.endpoints.web.exposure.include=*
management.endpoint.health.show-details=always

测试Consumer项目

  1. 测试input1(接收字符类型)

2. 测试input2(接收tag类型)

3.测试input3(接收对象类型)

4.测试input4(接收事务类型, 成功/失败/确认成功还是失败)


相关软件分享如下

链接:https://pan.baidu.com/s/1o0WausCDJ6PA4OIpaz8qYA 提取码:d7wr

本文参与 腾讯云自媒体同步曝光计划,分享自作者个人站点/博客。
原始发表:2020/10/09 ,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 作者个人站点/博客 前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • Spring Cloud Alibaba
  • Seata 分布式事务框架
    • Seata 分布式事务原理
      • Seata Server 安装
        • Seata 案例代码
          • 测试框架搭建
          • 测试分布式事务
      • Dubbo Spring Cloud
        • 是什么
          • 功能完成度
            • 框架的搭建
              • 测试
              • RocketMQ
                • 下载 RocketMQ
                  • SpringCloud Stream
                    • 介绍
                    • 测试Demo环境搭建
                    • 添加事务监听
                    • Consumer项目的完善
                相关产品与服务
                领券
                问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档