代码见文章结尾
想想平常生活中做饭的场景,在用电饭锅做饭的同时,我们可以洗菜、切菜,等待电饭锅发出饭做好的提示我们回去拔下电饭锅电源(或者什么也不知让它处于保温状态),反正这个时候我们知道饭做好了,接下来可以炒菜了。从这里可以看出我们在日常生活中与世界的互动并不是同步的、线性的,不是简单的请求--响应模型。它是事件驱动的,我们不断的发送消息、接受消息、处理消息。
同样在软件世界中也不全是请求--响应模型,也会需要进行异步的消息通信。使用消息实现事件通信的概念被称为消息驱动架构(Event Driven Architecture,EDA),也被称为消息驱动架构(Message Driven Architecture,MDA)。使用这类架构可以构建高度解耦的系统,该系统能够对变化做出响应,且不需要与特定的库或者服务紧密耦合。
在 Spring Cloud 项目中可以使用Spirng Cloud Stream轻而易举的构建基于消息传递的解决方案。
要解答这个问题,让我们从一个例子开始,之前一直使用的两个服务:许可证服务和组织服务。每次对许可证服务进行请求,许可证服务都要通过 http 请求到组织服务上查询组织信息。显而易见这次额外的 http 请求会花费较长的时间。如果能够将缓存组织数据的读操作,将会大幅提高许可证服务的响应时间。但是缓存数据有如下 2 个要求:
要实现上面的要求,现在有两种办法。
许可证服务在 redis 中缓存从组织服务中查询到的服务信息,当组织数据更新时,组织服务同步 http 请求通知许可证服务数据过期。这种方式有以下几个问题:
同样的许可证服务在 redis 中缓存从组织服务中查询到的服务信息,当组织数据更新时,组织服务将更新信息写入到队列中。许可证服务监听消息队列。使用消息传递有一下 4 个好处:
spring cloud 项目中可以通过 spring cloud stream 框架来轻松集成消息传递。该框架最大的特点是抽象了消息传递平台的细节,因此可以在支持的消息队列中随意切换(包括 Apache Kafka 和 RabbitMQ)。
spring cloud stream 中有 4 个组件涉及到消息发布和消息消费,分别为:
处理逻辑如下:
继续使用之前的项目,在许可证服务中缓存组织数据到 redis 中。
为方便起见,使用 docker 创建 redis,建立脚本如下:
docker run -itd --name redis --net host redis:
首先在 organization 服务中引入 spring cloud stream 和 kafka 的依赖。
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-stream-kafka</artifactId>
</dependency>
然后在 events 类中编写SimpleSouce
类,用于组织数据修改,产生一条消息到队列中。代码如下:
@EnableBinding(Source.class)
public class SimpleSource {
private Logger logger = LoggerFactory.getLogger(SimpleSource.class);
private Source source;
@Autowired
public SimpleSource(Source source) {
this.source = source;
}
public void publishOrChange(String action, String orgId) {
logger.info("在请求:{}中,发送kafka消息:{} for Organization Id:{}", UserContextHolder.getContext().id, action, orgId);
OrganizationChange change = new OrganizationChange(action, orgId, UserContextHolder.getContext().id);
source.output().send(MessageBuilder.withPayload(change).build());
}
}
这里使用的是默认通道,Source 类定义的 output 通道发消息。后面通过 Sink 定义的 input 通道收消息。
然后在OrganizationController
类中定义一个 delete 方法,并注入 SimpleSouce 类,代码如下:
@Autowired
private SimpleSource simpleSource;
@DeleteMapping(value = "/organization/{orgId}")
public void deleteOne(@PathVariable("orgId") String id) {
logger.debug("删除了组织:{}", id);
simpleSource.publishOrChange("delete", id);
}
最后在配置文件中加入消息队列的配置:
# 省略了其他配置
spring:
cloud:
stream:
bindings:
output:
destination: orgChangeTopic
content-type: application/json
kafka:
binder:
# 替换为部署kafka的ip和端口
zk-nodes: 192.168.226.5:2181
brokers: 192.168.226.5:9092
现在我们可以测试下访问localhost:5555/apis/org/organization/12,可以看到控制台打印消息生成的日志。
集成 redis 的方法,参看。这里不作说明。
首先引入依赖,依赖项同上面组织服务。
然后在 event 包下创建OrgChange
的类,代码如下:
@EnableBinding(Sink.class) //使用Sink接口中定义的通道来监听传入消息
public class OrgChange {
private Logger logger = LoggerFactory.getLogger(OrgChange.class);
@StreamListener(Sink.INPUT)
public void loggerSink(OrganizationChange change){
logger.info("收到一个消息,组织id为:{},关联id为:{}",change.getOrgId(),change.getId());
//删除失效缓存
RedisUtils.del(RedisKeyUtils.getOrgCacheKey(change.getOrgId()));
}
}
//下面两个都在util包下
//RedisKeyUtils.java代码如下
public class RedisKeyUtils {
private static final String ORG_CACHE_PREFIX = "orgCache_";
public static String getOrgCacheKey(String orgId){
return ORG_CACHE_PREFIX+orgId;
}
}
//RedisUtils.java代码如下
@Component
@SuppressWarnings("all")
public class RedisUtils {
public static RedisTemplate redisTemplate;
@Autowired
public void setRedisTemplate(RedisTemplate redisTemplate) {
RedisUtils.redisTemplate = redisTemplate;
}
public static boolean setObj(String key,Object value){
return setObj(key,value,0);
}
/**
* Description:
*
* @author fanxb
* @date 2019/2/21 15:21
* @param key 键
* @param value 值
* @param time 过期时间,单位ms
* @return boolean 是否成功
*/
public static boolean setObj(String key,Object value,long time){
try{
if(time<=0){
redisTemplate.opsForValue().set(key,value);
}else{
redisTemplate.opsForValue().set(key,value,time,TimeUnit.MILLISECONDS);
}
return true;
}catch (Exception e){
e.printStackTrace();
return false;
}
}
public static Object get(String key){
if(key==null){
return null;
}
try{
Object obj = redisTemplate.opsForValue().get(key);
return obj;
}catch (Exception e){
e.printStackTrace();
return null;
}
}
public static void del(String... key){
if(key!=null && key.length>0){
redisTemplate.delete(CollectionUtils.arrayToList(key));
}
}
}
上面用到的是 Sink.INPUT 通道,这个和之前的 Source.OUTPUT 通道刚好一队,一个负责收,一个负责发。
然后修改OrganizationByRibbonService.java
文件中的getOrganizationWithRibbon
方法:
public Organization getOrganizationWithRibbon(String id) {
String key = RedisKeyUtils.getOrgCacheKey(id);
//先从redis缓存取数据
Object res = RedisUtils.get(key);
if (res == null) {
logger.info("当前数据无缓存:{}", id);
try{
ResponseEntity<Organization> responseEntity = restTemplate.exchange("http://organizationservice/organization/{id}",
HttpMethod.GET, null, Organization.class, id);
res = responseEntity.getBody();
RedisUtils.setObj(key, res);
}catch (Exception e){
e.printStackTrace();
}
} else {
logger.info("当前数据为缓存数据:{}", id);
}
return (Organization) res;
}
最后修改配置文件,为 input 通道指定 topic,配置如下:
spring:
cloud:
stream:
bindings:
input:
destination: orgChangeTopic
content-type: application/json
# 定义将要消费消息的消费者组的名称
# 可能多个服务监听同一个消息队列。如果定义了消费者组,那么同组中只要有一个消费了消息,剩余的不会再次消费该消息,保证只有消息的
# 一个副本会被该组的某个实例所消费
group: licensingGroup
kafka:
binder:
zk-nodes: 192.168.226.5:2181
brokers: 192.168.226.5:9092
基本和发送的配置相同,只是这里是为input
通道映射队列,然后还定义了一个组名,避免一个消息被重复消费。
现在来多次访问localhost:5555/apis/licensingservice/licensingByRibbon/12,可以看到 licensingservice 控制台打印数据从缓存中读取,如下所示:
然后再以 delete 访问localhost:5555/apis/org/organization/12清除缓存,再次访问 licensingservice 服务,结果如下:
上面用的是Spring Cloud Stream
自带的 input/output 通道,那么要如何自定义通道呢?下面以自定义customInput/customOutput
通道为例。
public interface CustomOutput {
@Output("customOutput")
MessageChannel out();
}
对于每个自定义的发数据通道,需使用@OutPut 注解标记的返回 MessageChannel 类的方法。
public interface CustomInput {
@Input("customInput")
SubscribableChannel in();
}
同上,对应自定义的收数据通道,需要使用@Input 注解标记的返回 SubscribableChannel 类的方法。
看完本篇你应该已经能够在 Spring Cloud 中集成 Spring Cloud Stream 消息队列了,貌似这个也能用到普通的 spring boot 项目中,比直接集成 mq 更加的优雅。
2019,Fighting!
本篇原创发布于:FleyX 的个人博客
本篇所用全部代码:FleyX 的 github