专栏首页不止思考Apollo服务端设计原理剖析

Apollo服务端设计原理剖析

本文摘自于《Spring Cloud微服务 入门 实战与进阶》一书,文末赠送此书。

1 配置发布后的实时推送设计

配置中心最重要的一个特性就是实时推送了,正因为有这个特性,我们可以依赖配置中心做很多事情。在我自己开发的Smconf这个配置中心,Smconf是依赖于Zookeeper的Watch机制来实现实时推送。

上图简要描述了配置发布的大致过程:

  • 用户在Portal中进行配置的编辑和发布
  • Portal会调用Admin Service提供的接口进行发布操作
  • Admin Service收到请求后,发送ReleaseMessage给各个Config Service,通知Config Service配置发生变化
  • Config Service收到ReleaseMessage后,通知对应的客户端,基于Http长连接实现

2 发送ReleaseMessage的实现方式

ReleaseMessage消息是通过Mysql实现了一个简单的消息队列。之所有没有采用消息中间件,是为了让Apollo在部署的时候尽量简单,尽可能减少外部依赖。

上图简要描述了发送ReleaseMessage的大致过程:

  • Admin Service在配置发布后会往ReleaseMessage表插入一条消息记录
  • Config Service会启动一个线程定时扫描ReleaseMessage表,去查看是否有新的消息记录
  • Config Service发现有新的消息记录,那么就会通知到所有的消息监听器
  • 消息监听器得到配置发布的信息后,则会通知对应的客户端

3 Config Service通知客户端的实现方式

通知是采用基于Http长连接实现,主要分为下面几个步骤:

  • 客户端会发起一个Http请求到Config Service的notifications/v2接口
  • v2接口通过Spring DeferredResult把请求挂起,不会立即返回
  • 如果在60秒内没有该客户端关心的配置发布,那么会返回Http状态码304给客户端
  • 如果发现配置有修改,则会调用DeferredResult的setResult方法,传入有配置变化的namespace信息,同时该请求会立即返回
  • 客户端从返回的结果中获取到配置变化的namespace后,会立即请求Config Service获取该namespace的最新配置

4 源码解析实时推送设计

Apollo推送这块代码比较多,就不在本书中详细分析了,我把推送这块的代码稍微简化了下,给大家进行讲解,这样理解起来会更容易。当然我这边会比较简单,很多细节就不做考虑了,只是为了能够让大家明白Apollo推送的核心原理。

发送ReleaseMessage的逻辑我们就写一个简单的接口,用队列存储,测试的时候就调用这个接口模拟配置有更新,发送ReleaseMessage消息。

@RestController
public class NotificationControllerV2 implements ReleaseMessageListener {
 // 模拟配置更新,往里插入数据表示有更新
 public static Queue<String> queue = new LinkedBlockingDeque<>();
 @GetMapping("/addMsg")
 public String addMsg() {
        queue.add("xxx");
 return "success";
    }
}

消息发送之后,前面我们有讲过Config Service会启动一个线程定时扫描ReleaseMessage表,去查看是否有新的消息记录,然后取通知客户端,这边我们也启动一个线程去扫描:

@Component
public class ReleaseMessageScanner implements InitializingBean {
 @Autowired
 private NotificationControllerV2 configController;
 @Override
 public void afterPropertiesSet() throws Exception {
 // 定时任务从数据库扫描有没有新的配置发布
 new Thread(() -> {
 for (;;) {
 String result = NotificationControllerV2.queue.poll();
 if (result != null) {
 ReleaseMessage message = new ReleaseMessage();
                    message.setMessage(result);
                    configController.handleMessage(message);
                }
            }
        }).start();;
    }
}

循环去读取NotificationControllerV2中的队列,如果有消息的话就构造一个ReleaseMessage的对象,然后调用NotificationControllerV2中的handleMessage()方法进行消息的处理。

ReleaseMessage就一个字段,模拟消息内容:

public class ReleaseMessage {
 private String message;
 public void setMessage(String message) {
 this.message = message;
    }
 public String getMessage() {
 return message;
    }
}

接下来,我们看handleMessage做了什么样的工作

NotificationControllerV2实现了ReleaseMessageListener接口,ReleaseMessageListener中定义了handleMessage()方法。

public interface ReleaseMessageListener {
 void handleMessage(ReleaseMessage message);
}

handleMessage就是当配置发生变化的时候,通知的消息监听器,消息监听器得到配置发布的信息后,则会通知对应的客户端:

@RestController
public class NotificationControllerV2 implements ReleaseMessageListener {
 private final Multimap<String, DeferredResultWrapper> deferredResults = Multimaps
            .synchronizedSetMultimap(HashMultimap.create());
 @Override
 public void handleMessage(ReleaseMessage message) {
 System.err.println("handleMessage:"+ message);
 List<DeferredResultWrapper> results = Lists.newArrayList(deferredResults.get("xxxx"));
 for (DeferredResultWrapper deferredResultWrapper : results) {
 List<ApolloConfigNotification> list = new ArrayList<>();
            list.add(new ApolloConfigNotification("application", 1));
            deferredResultWrapper.setResult(list);
        }
    }
}

Apollo的实时推送是基于Spring DeferredResult实现的,在handleMessage()方法中可以看到是通过deferredResults获取DeferredResult,deferredResults就是第一行的Multimap,Key其实就是消息内容,Value就是DeferredResult的业务包装类DeferredResultWrapper,我们来看下DeferredResultWrapper的代码:

public class DeferredResultWrapper {
 private static final long TIMEOUT = 60 * 1000;// 60 seconds
 private static final ResponseEntity<List<ApolloConfigNotification>> NOT_MODIFIED_RESPONSE_LIST = 
 new ResponseEntity<>(HttpStatus.NOT_MODIFIED);
 private DeferredResult<ResponseEntity<List<ApolloConfigNotification>>> result;
 public DeferredResultWrapper() {
        result = new DeferredResult<>(TIMEOUT, NOT_MODIFIED_RESPONSE_LIST);
    }
 public void onTimeout(Runnable timeoutCallback) {
        result.onTimeout(timeoutCallback);
    }
 public void onCompletion(Runnable completionCallback) {
        result.onCompletion(completionCallback);
    }
 public void setResult(ApolloConfigNotification notification) {
        setResult(Lists.newArrayList(notification));
    }
 public void setResult(List<ApolloConfigNotification> notifications) {
        result.setResult(new ResponseEntity<>(notifications, HttpStatus.OK));
    }
 public DeferredResult<ResponseEntity<List<ApolloConfigNotification>>> getResult() {
 return result;
    }
}

通过setResult()方法设置返回结果给客户端,以上就是当配置发生变化,然后通过消息监听器通知客户端的原理,那么客户端是在什么时候接入的呢?

@RestController
public class NotificationControllerV2 implements ReleaseMessageListener {
 // 模拟配置更新,往里插入数据表示有更新
 public static Queue<String> queue = new LinkedBlockingDeque<>();
 private final Multimap<String, DeferredResultWrapper> deferredResults = Multimaps
            .synchronizedSetMultimap(HashMultimap.create());
 @GetMapping("/getConfig")
 public DeferredResult<ResponseEntity<List<ApolloConfigNotification>>> getConfig() {
 DeferredResultWrapper deferredResultWrapper = new DeferredResultWrapper();
 List<ApolloConfigNotification> newNotifications = getApolloConfigNotifications();
 if (!CollectionUtils.isEmpty(newNotifications)) {
            deferredResultWrapper.setResult(newNotifications);
        } else {
            deferredResultWrapper.onTimeout(() -> {
 System.err.println("onTimeout");
            });
            deferredResultWrapper.onCompletion(() -> {
 System.err.println("onCompletion");
            });
            deferredResults.put("xxxx", deferredResultWrapper);
        }
 return deferredResultWrapper.getResult();
    }
 private List<ApolloConfigNotification> getApolloConfigNotifications() {
 List<ApolloConfigNotification> list = new ArrayList<>();
 String result = queue.poll();
 if (result != null) {
            list.add(new ApolloConfigNotification("application", 1));
        }
 return list;
    }
}

NotificationControllerV2中提供了一个/getConfig的接口,客户端在启动的时候会调用这个接口,这个时候会执行getApolloConfigNotifications()方法去获取有没有配置的变更信息,如果有的话证明配置修改过,直接就通过deferredResultWrapper.setResult(newNotifications);返回结果给客户端了,客户端收到结果后重新拉取配置的信息进行覆盖本地的配置。

如果getApolloConfigNotifications()方法没有返回配置修改的信息,证明配置没有发生修改,就将DeferredResultWrapper对象添加到deferredResults中,等待后续配置发生变化时消息监听器进行通知。

同时这个请求就会挂起,不会立即返回,挂起是通过DeferredResultWrapper中的下面的代码实现的:

private static final long TIMEOUT = 60 * 1000;// 60 seconds
private static final ResponseEntity<List<ApolloConfigNotification>> NOT_MODIFIED_RESPONSE_LIST = 
 new ResponseEntity<>(HttpStatus.NOT_MODIFIED);
private DeferredResult<ResponseEntity<List<ApolloConfigNotification>>> result;
public DeferredResultWrapper() {
    result = new DeferredResult<>(TIMEOUT, NOT_MODIFIED_RESPONSE_LIST);
}

在创建DeferredResult对象的时候指定了超时的时间和超时后返回的响应码,如果60秒内没有消息监听器进行通知,那么这个请求就会超时,超时后客户端就收到的响应码就是304。

整个Config Service的流程就走完了,接下来我们看客户端是怎么实现的,我们简单的写个测试类模拟客户端注册:

public class ClientTest {
 public static void main(String[] args) {
        reg();
    }
 private static void reg() {
 System.err.println("注册");
 String result = request("http://localhost:8081/getConfig");
 if (result != null) {
 // 配置有更新,重新拉取配置
 // ......
        }
 // 重新注册
        reg();
    }
 private static String request(String url) {
 HttpURLConnection connection = null;
 BufferedReader reader = null;
 try {
            URL getUrl = new URL(url);
            connection = (HttpURLConnection) getUrl.openConnection();
            connection.setReadTimeout(90000);
            connection.setConnectTimeout(3000);
            connection.setRequestMethod("GET");
            connection.setRequestProperty("Accept-Charset", "utf-8");
            connection.setRequestProperty("Content-Type", "application/json");
            connection.setRequestProperty("Charset", "UTF-8");
 System.out.println(connection.getResponseCode());
 if (200 == connection.getResponseCode()) {
                reader = new BufferedReader(new InputStreamReader(connection.getInputStream(), "UTF-8"));
 StringBuilder result = new StringBuilder();
 String line = null;
 while ((line = reader.readLine()) != null) {
                    result.append(line);
                }
 System.out.println("结果 " + result);
 return result.toString();
            }
        } catch (IOException e) {
            e.printStackTrace();
        } finally {
 if (connection != null) {
                connection.disconnect();
            }
        }
 return null;
    }
}

首先启动/getConfig接口所在的服务,然后启动客户端,客户端就会发起注册请求,如果有修改直接获取到结果,进行配置的更新操作。如果无修改,请求会挂起,这边客户端设置的读取超时时间是90秒,大于服务端的60秒超时时间。

每次收到结果后,无论是有修改还是没修改,都必须重新进行注册,通过这样的方式就可以达到配置实时推送的效果。

我们可以调用之前写的/addMsg接口来模拟配置发生变化,调用之后客户端就能马上得到返回结果。

本文摘自于《Spring Cloud微服务 入门 实战与进阶》一书。

(1)作者是资深的Java技术专家和微服务技术专家,知名网站“猿天地”创始人;

(2)从技术原理、工程实践、进阶提升3个维度详解Spring Cloud微服务的架构与开发;

(3)指导零基础读者快速入门并掌握工程实践能力,最终进阶为Spring Cloud微服务技术达人。

能坚持阅读到文章底部的人数不多,那我们就为这部分同学赠送2本《Spring Cloud微服务:入门、实战与进阶》图书!

本文分享自微信公众号 - 不止思考(bzsikao)

原文出处及转载信息见文内详细说明,如有侵权,请联系 yunjia_community@tencent.com 删除。

原始发表时间:2019-07-16

本文参与腾讯云自媒体分享计划,欢迎正在阅读的你也加入,一起分享。

我来说两句

0 条评论
登录 后参与评论

相关文章

  • 微服务架构之「 配置中心 」

    在微服务架构的系列文章中,前面已经通过文章《微服务架构之「服务网关 」》介绍过了在微服务中服务网关的原理和应用,今天这篇文章我们继续来聊一聊微服务中另外一个重要...

    奎哥
  • 聊聊区块链

    在聊区块链(Blockchain)之前,我先推荐两本书,《区块链 从数字货币到信用社会》、《区块链 技术驱动金融》。

    奎哥
  • 时间都去哪儿了-技术人员的时间管理

    是不是有这样一种感觉,明明一天开始的时候计划要做很多事情,但是忙忙碌碌一天之后发现,杂七杂八的事情做了一堆,重要的事情却一件没完成。

    奎哥
  • Apollo服务端设计原理剖析,文末有福利

    配置中心最重要的一个特性就是实时推送了,正因为有这个特性,我们可以依赖配置中心做很多事情。在我自己开发的Smconf这个配置中心,Smconf是依赖于Zooke...

    南风
  • Apollo服务端设计原理剖析

    配置中心最重要的一个特性就是实时推送了,正因为有这个特性,我们可以依赖配置中心做很多事情。在我自己开发的Smconf这个配置中心,Smconf是依赖于Zooke...

    猿天地
  • 聊聊前后端分离接口规范

    随着互联网的高速发展,前端页面的展示、交互体验越来越灵活、炫丽,响应体验也要求越来越高,后端服务的高并发、高可用、高性能、高扩展等特性的要求也愈加苛刻,从而导致...

    Java团长
  • 面试官:你们前后端分离的接口规范是什么?

    随着互联网的高速发展,前端页面的展示、交互体验越来越灵活、炫丽,响应体验也要求越来越高,后端服务的高并发、高可用、高性能、高扩展等特性的要求也愈加苛刻,从而导致...

    黄泽杰
  • 如何获取 C# 类中发生数据变化的属性信息

    在平时的开发中,当用户修改数据时,一直没有很好的办法来记录具体修改了那些信息,只能暂时采用将类序列化成 json 字符串,然后全塞入到日志中的方式,此时如果我们...

    程序员宇说
  • 解决原子性问题?你首先需要的是宏观理解

    上一篇文章 可见性有序性,Happens-before来搞定,解决了并发三大问题中的两个,今天我们就聊聊如何解决原子性问题

    用户4172423
  • 解决原子性问题?你首先需要的是宏观理解

    上一篇文章 可见性有序性,Happens-before来搞定,解决了并发三大问题中的两个,今天我们就聊聊如何解决原子性问题

    码农小胖哥

扫码关注云+社区

领取腾讯云代金券