
简介:传统的消息队列对业务方提出了更高的要求,我们期望提供的是一种以业务为重心的,面向服务的解决方案。
这里记录的是学习分享内容,文章维护在 Github:studeyang/leanrning-share。
我们在上次分享中聊到了领域驱动设计和微服务,在 DDD 中有一个术语叫做领域事件,例如订单模型中的订单已创建、商品已发货。领域事件会触发下一步的业务操作,如果领域事件发生在微服务内,可以通过观察者模式很容易实现消息监听并处理。
<img src="https://technotes.oss-cn-shenzhen.aliyuncs.com/2023/202303242122856.png" style="zoom:50%;" />
如果发生在微服务之间,则需引入事件总线或者消息中间件。
经过技术选型后,我们决定使用 Kafka 作为消息中间件,此时微服务间的通信示意图如下:

不过,直接使用消息队列将面临以下问题:
我们期望提供的是一种以业务为重心的,面向服务的解决方案。
也就是说,即使团队中没人了解消息队列技术,也能够收发消息。于是对 Kafka SDK 二次封装,主要就是为了简化消息的接入,无需关注配置。

封装后解决了开发成本大、管理难度大的问题,但是离面向服务的解决方案目标还有一定的差距。比如业务方监听到消息后,执行一系列的业务逻辑异常了,想要做业务补偿,我们的“基于 Kafka SDK 二次封装”的方案就没办法满足,只能要求消息发送方再发一次消息,但这又会影响其他消息监听者。
于是我们决定将消息列队封装成消息服务,对业务方提供切实的服务能力。

我们熟知计算机中总线,在计算机系统中,不同的组件和设备需要相互通信以完成各种任务,此时,计算机总线就发挥了重要作用。类似的,微服务系统中,微服务就像是计算机系统中的各个组件和设备,而消息服务充当的就是计算机总线的角色。消息总线由此而来。
本文中出现的消息总线和消息服务指的是同一个东西。
发送消息和接收消息是消息服务最基本的能力,这两项能力分别由消息生产服务、消息消费服务提供。


微服务架构采用的技术栈是:SpringBoot、Kubernetes。
我们将消息总线取名为 Courier,Courier 的意思是“快递员”,消息的传递类似于快递的收发,消息总线正是快递员的角色。下面开始消息服务的初体验。
由于我们的微服务使用的是 SpingBoot 来落地的,因此我们提供了一个接入消息总线的 spring-boot-starter。
<dependency>
<groupId>com.casstime.open</groupId>
<artifactId>courier-spring-boot-starter</artifactId>
</dependency>接入消息总线,微服务只需要一个@EnableMessage注解即可加载所有相关配置:
@EnableMessage
@SpringBootApplication
public class WebApplication {
public static void main(String[] args) {
SpringApplication.run(WebApplication.class, args);
}
}下面代码定义了一个消息的基本信息,也称为消息 Header,包括消息 id,分区键 primaryKey,来源服务 service,消息 topic,创建时间 timstamp。
public abstract class Message {
private String id;
private String primaryKey;
private String service;
private String topic;
private Date timeStamp;
}消息可以分为两类,一类是事件,另一类是广播。定义如下:
// 事件
public abstract class Event extends Message {
}// 广播
public abstract class Event extends Message {
}业务消息内容称为消息 Body,例如订单已创建这个消息体的定义:
@Topic(name = "order")
public class OrderCreated extends Event {
private String orderId;
private String orderName;
private Date createdAt;
}业务方可以在业务执行方法的任一处,只需要一行代码,即可完成消息的发送。
// 发送消息
EventPublisher.publish(new OrderCreated());对于消息的监听,业务方只需关注业务逻辑的执行,屏蔽了 Offset 提交、重试等技术实现。
// 接收消息
@EventHandler(topic = "order", consumerGroup = "consumer-group1")
public class OrderMessageHandler {
public void handle(OrderCreated orderCreated) {
System.out.println("receive message: " + orderCreated);
}
}我们提供了 5 种不同功能类型的消息,满足各类业务场景。
1、事件消息
@Topic(name = "order")
public class OrderCreated extends Event {
private String orderId;
private String orderName;
private Date createdAt;
}
public void send() {
EventPublisher.publish(new OrderCreated());
}上面消息定义是事件,这是使用最多的一种消息。
2、广播消息
广播消息的消费示意图如下:

@Topic(name = "order")
public class CacheUpdate extends Broadcast {
private String orderId;
private String orderName;
private Date createdAt;
}
public void send() {
EventPublisher.publish(new CacheUpdate());
}上面消息定义时,继承了Broadcast,表示这是一个广播消息,消费服务的每个节点都将会收到这个广播。例如更新本地缓存事件,就需要用到广播消息。
3、顺序消息
@Topic(name = "order")
public class OrderCreated extends Event {
@PrimaryKey
private String orderId;
private String orderName;
private Date createdAt;
}
public void send() {
EventPublisher.publish(new OrderCreated());
}上面消息定义时,在orderId上加了@PrimaryKey注解,表示相同orderId的消息会有序的消费。
4、事务消息
@Topic(name = "order")
public class OrderCreated extends Event {
private String orderId;
private String orderName;
private Date createdAt;
}
@Transactional
public void send() {
EventPublisher.publish(new OrderCreated());
}上面消息发送时,在方法上添加了@Transactional注解,这是 Spring 的注解,表示这个方法里的逻辑执行是有事务性的。
5、延迟消息
@Topic(name = "order")
public class OrderCreated extends Event {
private String orderId;
private String orderName;
private Date createdAt;
}
@Transactional
public void send() {
EventPublisher.publish(new OrderCreated(), 2, TimeUnit.SECONDS);
}上面消息发送多了两个参数,表示延迟 2 秒接收。
只要是通过EventPublisher.publish()方法发送的消息,都可以追踪到这条消息记录。
消息定义了 5 种状态:

作为消息的发送方,关注的是消息是否发送成功,可通过下面页面查询。

作为消息的接收方,关注的是消息是否正常消费,可通过下面页面查询。

对于 5 种状态的消息,处理策略如下:

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。
如有侵权,请联系 cloudcommunity@tencent.com 删除。
原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。
如有侵权,请联系 cloudcommunity@tencent.com 删除。