首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >不使用 MQ 如何实现 pub/sub 消息消费场景?

不使用 MQ 如何实现 pub/sub 消息消费场景?

作者头像
Coder小黑
发布2020-12-15 15:37:28
1.1K0
发布2020-12-15 15:37:28
举报
文章被收录于专栏:Coder小黑Coder小黑Coder小黑

hello,大家好,我是小黑,又和大家见面啦~~

在配置中心中,有一个经典的 pub/sub 场景:某个配置项发生变更之后,需要实时的同步到各个服务端节点,同时推送给客户端集群。

在之前实现的简易版配置中心中是通过 redis 的 pub/sub 来实现的。这种实现虽然简单,但却强依赖了 redis。

配置中心作为一个基础组件,如果能尽可能的减少外部依赖,那对使用方来说一定是更友好的。那么,有没有可能不使用 MQ 来实现 pub/sub 的场景呢?答案是肯定的。

基于 DB 的 pub/sub 方案

Apollo 在实现上述场景时,并没有选用基于 MQ 来进行实现,而是通过数据库实现了一个简单的消息队列。示意图如下:

ReleaseMessage 示意图

大致实现方式如下:

  1. Admin Service 在配置发布后会往 ReleaseMessage 表插入一条消息记录
  2. Config Service 中有一个线程会每秒扫描一次 ReleaseMessage 表,看是否有新的消息记录(怎么判断是不是新消息呢,怎么保证每个 client 不会重复消费呢?)
  3. Config Service 如果发现有新的消息记录,就会通知给客户端(怎么保证通知给每个客户端呢?每个 Config Service 都通知,不会重复通知吗?)

下面,就让我们带着这几个问题来学习一下源码吧。(画外音:思路比源码更重要

DatabaseMessageSender

Admin Service 在配置发布后会调用 DatabaseMessageSender#sendMessage 方法,该方法主要做了两件事情:

  1. 创建 ReleaseMessage ,然后将其保存到数据库中
  2. 记录当前保存的 ReleaseMessage Id,将其放到 DatabaseMessageSender#toClean 队列中。

DatabaseMessageSender#sendMessage

为什么要记录当前保存的 ReleaseMessage Id 呢?

DatabaseMessageSender 中有个定时任务,会去清除比当前 ID 小的 ReleaseMessage。

DatabaseMessageSender#cleanMessage

ReleaseMessageScanner

Config Service 中通过 ReleaseMessageScanner 组件会每秒(默认配置下)扫描一次 ReleaseMessage 表,来获取最新的消息。

ReleaseMessageScanner#afterPropertiesSet

有了这个基于 DB 的 pub/sub,Admin Service 在配置发布之后,每个 Config Service 都会通过 DB 来感知到这个消息,然后再通知给客户端。

那 Config Service 又是如何通知客户端的呢?

基于长轮询的实时消息

在 Apollo 的设计中,配置发生更新之后,并不是服务端主动推给客户端的,而且客户端通过长轮询的方式向服务端询问是否有配置发生了变更。大致思路为:如果在 60 秒内没有该客户端关心的配置发布,那么会返回 Http 状态码 304 给客户端;如果有该客户端关心的配置发布,请求就会立即返回,客户端从返回的结果中获取到配置变化的 namespace 后,会立即请求 Config Service 获取该 namespace 的最新配置

客户端的相关代码在 RemoteConfigLongPollService#doLongPollingRefresh,代码比较简单,感兴趣的同学可以自行查阅。

这里我们重点看一下服务端是如何实现的。

在传统的 servlet 模型中,每个请求都是由某个线程处理的,如果一个请求处理的时间较长,那么这种基于线程池的同步模型很快就会把所有线程耗尽,导致服务器无法响应新的请求。

servlet 3.0 中引入了异步支持,允许对一个请求进行异步处理,工作线程在此期间不会被阻塞,可以继续处理传入的客户端请求。

从 Spring 3.2 开始,可以使用 DeferredResult 来实现异步处理。使用 DeferredResult 时,可以设置超时,超时之后自动返回超时错误响应。同时,可以在另一个线程中,可以调用其 setResult()写入结果返回。

在 Apollo 客户端长轮询的地址为 /notifications/v2,对应的服务端代码为 NotificationControllerV2

NotificationControllerV2 中就使用了 Spring 的 DeferredResult来实现的。本文重在解决问题的思路,就不展示源码了,感兴趣的同学可以自己阅读一下源码。不过,小黑同学写了一个简单的 demo 来帮助我们理解一下 DeferredResult 的使用。

@Slf4j
@RestController
public class DeferredResultDemoController {

    private final Multimap<String, DeferredResult<String>> deferredResults = ArrayListMultimap.create();

    @GetMapping("/info")
    public DeferredResult<String> info(String key) {
        // 设置 1 秒超时时间,设置超时是返回的结果
        DeferredResult<String> result = new DeferredResult<>(1000L, "key not change");
        // 将 result 放到 deferredResults 中, key 即为当前请求所关心的配置项
        deferredResults.put(key, result);
        // 如果超时,移除当前 DeferredResult,并打印日志,同时返回 DeferredResult 构造器中传入的结果
        result.onTimeout(() -> {
            deferredResults.remove(key, result);
            log.info("time out key not change");
        });
        // 如果完成了,则从 deferredResults 中移除当前 DeferredResult
        result.onCompletion(() -> deferredResults.remove(key, result));
        return result;
    }

    @PostConstruct
    public void init() {
        new Thread(() -> {
            while (true) {
                try {
                    TimeUnit.MILLISECONDS.sleep(700);
                } catch (InterruptedException e) {
                    log.info(e.getMessage(), e);
                }
                // 定时任务,模拟配置更新
                // 当 hello key 发生变更之后,从 deferredResults 获取到相关的 DeferredResult,通过 setResult 方法设置返回结果,同时移除 deferredResults
                if (deferredResults.containsKey("hello")) {
                    Collection<DeferredResult<String>> results = deferredResults.removeAll("hello");
                    results.forEach(stringDeferredResult -> stringDeferredResult.setResult("hello key change :" + System.currentTimeMillis()));
                }
            }
        }).start();
    }
}

本文参与 腾讯云自媒体分享计划,分享自微信公众号。
原始发表:2020-11-30,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 Coder小黑 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体分享计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 基于 DB 的 pub/sub 方案
    • DatabaseMessageSender
      • ReleaseMessageScanner
      • 基于长轮询的实时消息
      相关产品与服务
      微服务引擎 TSE
      微服务引擎(Tencent Cloud Service Engine)提供开箱即用的云上全场景微服务解决方案。支持开源增强的云原生注册配置中心(Zookeeper、Nacos 和 Apollo),北极星网格(腾讯自研并开源的 PolarisMesh)、云原生 API 网关(Kong)以及微服务应用托管的弹性微服务平台。微服务引擎完全兼容开源版本的使用方式,在功能、可用性和可运维性等多个方面进行增强。
      领券
      问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档