摘要: 原创出处 http://www.iocoder.cn/Apollo/config-service-notifications/ 「芋道源码」欢迎转载,保留摘要,谢谢!
老艿艿:本系列假定胖友已经阅读过 《Apollo 官方 wiki 文档》 。
本文接 《Apollo 源码解析 —— Admin Service 发送 ReleaseMessage》 一文,分享配置发布的第四步,NotificationControllerV2 得到配置发布的 AppId+Cluster+Namespace 后,会通知对应的客户端 。
FROM 《Apollo配置中心设计》 的 2.1.2 Config Service 通知客户端的实现方式
notifications/v
2 接口,也就是NotificationControllerV2 ,参见 RemoteConfigLongPollService 。友情提示:在目前 Apollo 的实现里,如下的名词是“等价”的:
ReleaseMessage.id
ReleaseMessage.message
文章暂时未统一用词,所以胖友看的时候需要“脑补”下。
老艿艿:流程较长,代码较多,请耐心理解。
com.ctrip.framework.apollo.configservice.controller.NotificationControllerV2
,实现 ReleaseMessageListener 接口,通知 Controller ,仅提供 notifications/v2
接口。
/**
* Watch Key 与 DeferredResultWrapper 的 Multimap
*
* Key:Watch Key
* Value:DeferredResultWrapper 数组
*/
private final Multimap<String, DeferredResultWrapper> deferredResults = Multimaps.synchronizedSetMultimap(HashMultimap.create());
private static final Splitter STRING_SPLITTER = Splitter.on(ConfigConsts.CLUSTER_NAMESPACE_SEPARATOR).omitEmptyStrings();
private static final Type notificationsTypeReference = new TypeToken<List<ApolloConfigNotification>>() {}.getType();
/**
* 通过 ReleaseMessage 的消息内容,获得对应 Namespace 的名字
*/
private static final Function<String, String> retrieveNamespaceFromReleaseMessage =
releaseMessage -> {
if (Strings.isNullOrEmpty(releaseMessage)) {
return null;
}
List<String> keys = STRING_SPLITTER.splitToList(releaseMessage);
//message should be appId+cluster+namespace
if (keys.size() != 3) {
logger.error("message format invalid - {}", releaseMessage);
return null;
}
return keys.get(2);
};
/**
* 大量通知分批执行 ExecutorService
*/
private final ExecutorService largeNotificationBatchExecutorService;
@Autowired
private WatchKeysUtil watchKeysUtil;
@Autowired
private ReleaseMessageServiceWithCache releaseMessageService;
@Autowired
private EntityManagerUtil entityManagerUtil;
@Autowired
private NamespaceUtil namespaceUtil;
@Autowired
private Gson gson;
@Autowired
private BizConfig bizConfig;
public NotificationControllerV2() {
largeNotificationBatchExecutorService = Executors.newSingleThreadExecutor(ApolloThreadFactory.create("NotificationControllerV2", true));
}
deferredResults
属性,Watch Key 与 DeferredResultWrapper 的 Multimap 。message
字段。notifications/v2
中,当请求的 Namespace 暂无新通知时,会将该 Namespace 对应的 Watch Key 们,注册到 deferredResults
中。等到 Namespace 配置发生变更时,在 #handleMessage(...)
中,进行通知。 1: @RequestMapping(method = RequestMethod.GET)
2: public DeferredResult<ResponseEntity<List<ApolloConfigNotification>>> pollNotification(
3: @RequestParam(value = "appId") String appId,
4: @RequestParam(value = "cluster") String cluster,
5: @RequestParam(value = "notifications") String notificationsAsString,
6: @RequestParam(value = "dataCenter", required = false) String dataCenter,
7: @RequestParam(value = "ip", required = false) String clientIp) {
8: // 解析 notificationsAsString 参数,创建 ApolloConfigNotification 数组。
9: List<ApolloConfigNotification> notifications = null;
10: try {
11: notifications = gson.fromJson(notificationsAsString, notificationsTypeReference);
12: } catch (Throwable ex) {
13: Tracer.logError(ex);
14: }
15: if (CollectionUtils.isEmpty(notifications)) {
16: throw new BadRequestException("Invalid format of notifications: " + notificationsAsString);
17: }
18:
19: // 创建 DeferredResultWrapper 对象
20: DeferredResultWrapper deferredResultWrapper = new DeferredResultWrapper();
21: // Namespace 集合
22: Set<String> namespaces = Sets.newHashSet();
23: // 客户端的通知 Map 。key 为 Namespace 名,value 为通知编号。
24: Map<String, Long> clientSideNotifications = Maps.newHashMap();
25: // 过滤并创建 ApolloConfigNotification Map
26: Map<String, ApolloConfigNotification> filteredNotifications = filterNotifications(appId, notifications);
27: // 循环 ApolloConfigNotification Map ,初始化上述变量。
28: for (Map.Entry<String, ApolloConfigNotification> notificationEntry : filteredNotifications.entrySet()) {
29: String normalizedNamespace = notificationEntry.getKey();
30: ApolloConfigNotification notification = notificationEntry.getValue();
31: // 添加到 `namespaces` 中。
32: namespaces.add(normalizedNamespace);
33: // 添加到 `clientSideNotifications` 中。
34: clientSideNotifications.put(normalizedNamespace, notification.getNotificationId());
35: // 记录名字被归一化的 Namespace 。因为,最终返回给客户端,使用原始的 Namespace 名字,否则客户端无法识别。
36: if (!Objects.equals(notification.getNamespaceName(), normalizedNamespace)) {
37: deferredResultWrapper.recordNamespaceNameNormalizedResult(notification.getNamespaceName(), normalizedNamespace);
38: }
39: }
40: if (CollectionUtils.isEmpty(namespaces)) {
41: throw new BadRequestException("Invalid format of notifications: " + notificationsAsString);
42: }
43:
44: // 组装 Watch Key Multimap
45: Multimap<String, String> watchedKeysMap = watchKeysUtil.assembleAllWatchKeys(appId, cluster, namespaces, dataCenter);
46: // 生成 Watch Key 集合
47: Set<String> watchedKeys = Sets.newHashSet(watchedKeysMap.values());
48: // 获得 Watch Key 集合中,每个 Watch Key 对应的 ReleaseMessage 记录。
49: List<ReleaseMessage> latestReleaseMessages = releaseMessageService.findLatestReleaseMessagesGroupByMessages(watchedKeys);
50:
51: /**
52: * Manually close the entity manager.
53: * Since for async request, Spring won't do so until the request is finished,
54: * which is unacceptable since we are doing long polling - means the db connection would be hold
55: * for a very long time
56: */
57: // 手动关闭 EntityManager
58: // 因为对于 async 请求,Spring 在请求完成之前不会这样做
59: // 这是不可接受的,因为我们正在做长轮询——意味着 db 连接将被保留很长时间。
60: // 实际上,下面的过程,我们已经不需要 db 连接,因此进行关闭。
61: entityManagerUtil.closeEntityManager();
62: // 获得新的 ApolloConfigNotification 通知数组
63: List<ApolloConfigNotification> newNotifications = getApolloConfigNotifications(namespaces, clientSideNotifications, watchedKeysMap, latestReleaseMessages);
64: // 若有新的通知,直接设置结果。
65: if (!CollectionUtils.isEmpty(newNotifications)) {
66: deferredResultWrapper.setResult(newNotifications);
67: // 若无新的通知,
68: } else {
69: // 注册超时事件
70: deferredResultWrapper.onTimeout(() -> logWatchedKeys(watchedKeys, "Apollo.LongPoll.TimeOutKeys")); // 【TODO 6001】Tracer 日志
71: // 注册结束事件
72: deferredResultWrapper.onCompletion(() -> {
73: // 移除 Watch Key + DeferredResultWrapper 出 `deferredResults`
74: // unregister all keys
75: for (String key : watchedKeys) {
76: deferredResults.remove(key, deferredResultWrapper);
77: }
78: // 【TODO 6001】Tracer 日志
79: logWatchedKeys(watchedKeys, "Apollo.LongPoll.CompletedKeys");
80: });
81:
82: // 注册 Watch Key + DeferredResultWrapper 到 `deferredResults` 中,等待配置发生变化后通知。详见 `#handleMessage(...)` 方法。
83: // register all keys
84: for (String key : watchedKeys) {
85: this.deferredResults.put(key, deferredResultWrapper);
86: }
87:
88: // 【TODO 6001】Tracer 日志
89: logWatchedKeys(watchedKeys, "Apollo.LongPoll.RegisteredKeys");
90: logger.debug("Listening {} from appId: {}, cluster: {}, namespace: {}, datacenter: {}", watchedKeys, appId, cluster, namespaces, dataCenter);
91: }
92:
93: return deferredResultWrapper.getResult();
94: }
/notifications/v2
接口,具体 URL 在类上注明。notificationsAsString
请求参数,JSON 字符串,在【第 8 至 17 行】的代码,解析成 List<ApolloConfigNotification>
,表示客户端本地的配置通知信息。List<ApolloConfigNotification>
,仅返回配置发生变化的 Namespace 对应的 ApolloConfigNotification 。也就说,当有几个 配置发生变化的 Namespace ,返回几个对应的 ApolloConfigNotification 。另外,客户端接收到返回后,会增量合并到本地的配置通知信息。客户端下次请求时,使用合并后的配置通知信息。namespaceName
+ notificationId
,不传递 messages
。clientIp
请求参数,目前该接口暂时用不到,作为预留参数。? 万一未来在灰度发布需要呢。#filterNotifications(appId, notifications)
方法,过滤并创建 ApolloConfigNotification Map 。胖友先跳到 「2.2.1 filterNotifications」 看完在回来。namespaces
中。clientSideNotifications
中。DeferredResultWrapper#recordNamespaceNameNormalizedResult(originalNamespaceName, normalizedNamespaceName)
方法,记录名字被归一化的 Namespace 。因为,最终返回给客户端,使用原始的 Namespace 名字,否则客户端无法识别。WatchKeysUtil#assembleAllWatchKeys(appId, cluster, namespaces, dataCenter)
方法,组装 Watch Key Multimap 。胖友先跳到 「7. WatchKeysUtil」 看完在回来。ReleaseMessageServiceWithCache#findLatestReleaseMessagesGroupByMessages(watchedKeys)
方法,获得 Watch Key 集合中,每个 Watch Key 对应的最新的 ReleaseMessage 记录。胖友先跳到 「6. ReleaseMessageServiceWithCache」 看完在回来。EntityManagerUtil#closeEntityManager()
方法,手动关闭 EntityManager 。因为对于 async 请求,SpringMVC 在请求完成之前不会这样做。这是不可接受的,因为我们正在做长轮询——意味着 db 连接将被保留很长时间。实际上,下面的过程,我们已经不需要 db 连接,因此进行关闭。「8. EntityManagerUtil」 看完在回来。#getApolloConfigNotifications(namespaces, clientSideNotifications, watchedKeysMap, latestReleaseMessages)
方法,获得新的 ApolloConfigNotification 通知数组。胖友先跳到 「2.2.2 getApolloConfigNotifications」 看完在回来。DeferredResultWrapper#setResult(List<ApolloConfigNotification>)
方法,直接设置 DeferredResult 的结果,从而结束长轮询。deferredResults
中,等到有配置变更或超时。DeferredResultWrapper#onTimeout(Runnable)
方法,注册超时事件。DeferredResultWrapper#onCompletion(Runnable)
方法,注册结束事件。在其内部,移除注册的 Watch Key + DeferredResultWrapper 出 deferredResults
。deferredResults
中,等待配置发生变化后通知。这样,任意一个 Watch Key 对应的 Namespace 对应的配置发生变化时,都可以进行通知,并结束轮询等待。详细解析,见 「2.3 handleMessage」 方法。#filterNotifications(appId, notifications)
方法,过滤并创建 ApolloConfigNotification Map 。其中,KEY 为 Namespace 的名字。代码如下:
1: private Map<String, ApolloConfigNotification> filterNotifications(String appId, List<ApolloConfigNotification> notifications) {
2: Map<String, ApolloConfigNotification> filteredNotifications = Maps.newHashMap();
3: for (ApolloConfigNotification notification : notifications) {
4: if (Strings.isNullOrEmpty(notification.getNamespaceName())) {
5: continue;
6: }
7: // 若 Namespace 名以 .properties 结尾,移除该结尾,并设置到 ApolloConfigNotification 中。例如 application.properties => application 。
8: // strip out .properties suffix
9: String originalNamespace = namespaceUtil.filterNamespaceName(notification.getNamespaceName());
10: notification.setNamespaceName(originalNamespace);
11: // 获得归一化的 Namespace 名字。因为,客户端 Namespace 会填写错大小写。
12: // 例如,数据库中 Namespace 名为 Fx.Apollo ,而客户端 Namespace 名为 fx.Apollo
13: // 通过归一化后,统一为 Fx.Apollo
14: // fix the character case issue, such as FX.apollo <-> fx.apollo
15: String normalizedNamespace = namespaceUtil.normalizeNamespace(appId, originalNamespace);
16:
17: // in case client side namespace name has character case issue and has difference notification ids
18: // such as FX.apollo = 1 but fx.apollo = 2, we should let FX.apollo have the chance to update its notification id
19: // which means we should record FX.apollo = 1 here and ignore fx.apollo = 2
20: // 如果客户端 Namespace 的名字有大小写的问题,并且恰好有不同的通知编号。
21: // 例如 Namespace 名字为 FX.apollo 的通知编号是 1 ,但是 fx.apollo 的通知编号为 2 。
22: // 我们应该让 FX.apollo 可以更新它的通知编号,
23: // 所以,我们使用 FX.apollo 的 ApolloConfigNotification 对象,添加到结果,而忽略 fx.apollo 。
24: if (filteredNotifications.containsKey(normalizedNamespace) &&
25: filteredNotifications.get(normalizedNamespace).getNotificationId() < notification.getNotificationId()) {
26: continue;
27: }
28:
29: filteredNotifications.put(normalizedNamespace, notification);
30: }
31: return filteredNotifications;
32: }
NamespaceUtil#filterNamespaceName(namespaceName)
方法,若 Namespace 名以 ".properties"
结尾,移除该结尾,并设置到 ApolloConfigNotification 中。例如: application.properties => application
。代码如下:
public String filterNamespaceName(String namespaceName)
{
// 若 Namespace 名以 .properties 结尾,移除该结尾,
if (namespaceName.toLowerCase().endsWith(".properties")) {
int dotIndex = namespaceName.lastIndexOf(".");
return namespaceName.substring(0, dotIndex);
}
return namespaceName;
}
NamespaceUtil#normalizeNamespace(appId, originalNamespace)
方法,获得归一化的 Namespace 名字。因为,客户端 Namespace 会填写错大小写。AppNamespaceServiceWithCache#findByAppIdAndNamespace(appId, namespaceName)
方法,获得 App 下的 AppNamespace 对象。AppNamespaceServiceWithCache#findPublicNamespaceByName(namespaceName)
方法,查询公用类型的 AppNamespace 对象。"Fx.Apollo"
,而客户端 Namespace 名为 "fx.Apollo"
。通过归一化后,统一为 "Fx.Apollo"
。1: @Autowired
2: private AppNamespaceServiceWithCache appNamespaceServiceWithCache;
3:
4: public String normalizeNamespace(String appId, String namespaceName)
{
5: // 获得 App 下的 AppNamespace 对象
6: AppNamespace appNamespace = appNamespaceServiceWithCache.findByAppIdAndNamespace(appId, namespaceName);
7: if (appNamespace != null) {
8: return appNamespace.getName();
9: }
10: // 获取不到,说明该 Namespace 可能是关联的
11: appNamespace = appNamespaceServiceWithCache.findPublicNamespaceByName(namespaceName);
12: if (appNamespace != null) {
13: return appNamespace.getName();
14: }
15: return namespaceName;
16: }
"FX.apollo"
的通知编号是 1 ,但是 "fx.apollo"
的通知编号为 2 。我们应该让 "FX.apollo"
可以更新它的通知编号,所以,我们使用 "FX.apollo"
的 ApolloConfigNotification 对象,添加到结果,而忽略 "fx.apollo"
。通过这样的方式,若此时服务器的通知编号为 3 ,那么 "FX.apollo"
的通知编号先更新成 3 ,再下一次长轮询时,"fx.apollo"
的通知编号再更新成 3 。? 比较“绕”,胖友细细品味下,大多数情况下,不会出现这样的情况。filteredNotifications
中。#getApolloConfigNotifications(namespaces, clientSideNotifications, watchedKeysMap, latestReleaseMessages)
方法,获得新的 ApolloConfigNotification 通知数组。代码如下:
1: private List<ApolloConfigNotification> getApolloConfigNotifications(Set<String> namespaces,
2: Map<String, Long> clientSideNotifications,
3: Multimap<String, String> watchedKeysMap,
4: List<ReleaseMessage> latestReleaseMessages) {
5: // 创建 ApolloConfigNotification 数组
6: List<ApolloConfigNotification> newNotifications = Lists.newArrayList();
7: if (!CollectionUtils.isEmpty(latestReleaseMessages)) {
8: // 创建最新通知的 Map 。其中 Key 为 Watch Key 。
9: Map<String, Long> latestNotifications = Maps.newHashMap();
10: for (ReleaseMessage releaseMessage : latestReleaseMessages) {
11: latestNotifications.put(releaseMessage.getMessage(), releaseMessage.getId());
12: }
13: // 循环 Namespace 的名字的集合,判断是否有配置更新
14: for (String namespace : namespaces) {
15: long clientSideId = clientSideNotifications.get(namespace);
16: long latestId = ConfigConsts.NOTIFICATION_ID_PLACEHOLDER;
17: // 获得 Namespace 对应的 Watch Key 集合
18: Collection<String> namespaceWatchedKeys = watchedKeysMap.get(namespace);
19: // 获得最大的通知编号
20: for (String namespaceWatchedKey : namespaceWatchedKeys) {
21: long namespaceNotificationId = latestNotifications.getOrDefault(namespaceWatchedKey, ConfigConsts.NOTIFICATION_ID_PLACEHOLDER);
22: if (namespaceNotificationId > latestId) {
23: latestId = namespaceNotificationId;
24: }
25: }
26: // 若服务器的通知编号大于客户端的通知编号,意味着有配置更新
27: if (latestId > clientSideId) {
28: // 创建 ApolloConfigNotification 对象
29: ApolloConfigNotification notification = new ApolloConfigNotification(namespace, latestId);
30: // 循环添加通知编号到 ApolloConfigNotification 中。
31: namespaceWatchedKeys.stream().filter(latestNotifications::containsKey).forEach(namespaceWatchedKey ->
32: notification.addMessage(namespaceWatchedKey, latestNotifications.get(namespaceWatchedKey)));
33: // 添加 ApolloConfigNotification 对象到结果
34: newNotifications.add(notification);
35: }
36: }
37: }
38: return newNotifications;
39: }
latestNotifications
判断是否有配置更新。ApolloConfigNotification#addMessage(String key, long notificationId)
方法,添加通知编号到 ApolloConfigNotification 中。对于关联类型的 Namespace ,details
会是多个。newNotifications
)。newNotifications
。若非空,说明有配置更新。#handleMessage(ReleaseMessage, channel)
方法,当有新的 ReleaseMessage 时,通知其对应的 Namespace 的,并且正在等待的请求。代码如下:
1: @Override
2: public void handleMessage(ReleaseMessage message, String channel) {
3: logger.info("message received - channel: {}, message: {}", channel, message);
4: // 【TODO 6001】Tracer 日志
5: String content = message.getMessage();
6: Tracer.logEvent("Apollo.LongPoll.Messages", content);
7:
8: // 仅处理 APOLLO_RELEASE_TOPIC
9: if (!Topics.APOLLO_RELEASE_TOPIC.equals(channel) || Strings.isNullOrEmpty(content)) {
10: return;
11: }
12:
13: // 获得对应的 Namespace 的名字
14: String changedNamespace = retrieveNamespaceFromReleaseMessage.apply(content);
15: if (Strings.isNullOrEmpty(changedNamespace)) {
16: logger.error("message format invalid - {}", content);
17: return;
18: }
19:
20: // `deferredResults` 存在对应的 Watch Key
21: if (!deferredResults.containsKey(content)) {
22: return;
23: }
24:
25: // create a new list to avoid ConcurrentModificationException
26: // 创建 DeferredResultWrapper 数组,避免并发问题。
27: List<DeferredResultWrapper> results = Lists.newArrayList(deferredResults.get(content));
28:
29: // 创建 ApolloConfigNotification 对象
30: ApolloConfigNotification configNotification = new ApolloConfigNotification(changedNamespace, message.getId());
31: configNotification.addMessage(content, message.getId());
32:
33: // do async notification if too many clients
34: // 若需要通知的客户端过多,使用 ExecutorService 异步通知,避免“惊群效应”
35: if (results.size() > bizConfig.releaseMessageNotificationBatch()) {
36: largeNotificationBatchExecutorService.submit(() -> {
37: logger.debug("Async notify {} clients for key {} with batch {}", results.size(), content,
38: bizConfig.releaseMessageNotificationBatch());
39: for (int i = 0; i < results.size(); i++) {
40: // 每 N 个客户端,sleep 一段时间。
41: if (i > 0 && i % bizConfig.releaseMessageNotificationBatch() == 0) {
42: try {
43: TimeUnit.MILLISECONDS.sleep(bizConfig.releaseMessageNotificationBatchIntervalInMilli());
44: } catch (InterruptedException e) {
45: //ignore
46: }
47: }
48: logger.debug("Async notify {}", results.get(i));
49: // 设置结果
50: results.get(i).setResult(configNotification);
51: }
52: });
53: return;
54: }
55:
56: logger.debug("Notify {} clients for key {}", results.size(), content);
57: // 设置结果
58: for (DeferredResultWrapper result : results) {
59: result.setResult(configNotification);
60: }
61: logger.debug("Notification completed");
62: }
deferredResults
存在对应的 Watch Key。deferredResults
中读取并创建 DeferredResultWrapper 数组,避免并发问题。ApolloConfigNotification#addMessage(String key, long notificationId)
方法,添加通知消息明细。此处,details
是一个。"apollo.release-message.notification.batch"
配置,默认 100 。"apollo.release-message.notification.batch"
个客户端,sleep 一段时间。可通过 ServerConfig "apollo.release-message.notification.batch.interval"
配置,默认 100 毫秒。DeferredResultWrapper#setResult(List<ApolloConfigNotification>)
方法,设置 DeferredResult 的结果,从而结束长轮询。DeferredResultWrapper#setResult(List<ApolloConfigNotification>)
方法,设置 DeferredResult 的结果,从而结束长轮询。com.ctrip.framework.apollo.core.dto.ApolloConfigNotification
,Apollo 配置通知 DTO 。代码如下:
public class ApolloConfigNotification {
/**
* Namespace 名字
*/
private String namespaceName;
/**
* 最新通知编号
*
* 目前使用 `ReleaseMessage.id` 。
*/
private long notificationId;
/**
* 通知消息集合
*/
private volatile ApolloNotificationMessages messages;
public ApolloConfigNotification(String namespaceName, long notificationId) {
this.namespaceName = namespaceName;
this.notificationId = notificationId;
}
}
namespaceName
字段,Namespace 名,指向对应的 Namespace 。因此,一个 Namespace 对应一个 ApolloConfigNotification 对象。notificationId
字段,最新通知编号,目前使用 ReleaseMessage.id
字段。messages
字段,通知消息集合。volatile
修饰,因为存在多线程修改和读取。#addMessage(String key, long notificationId)
方法,添加消息明细到 message
中。代码如下:
public
void
addMessage(String key, long notificationId)
{
// 创建 ApolloNotificationMessages 对象
if (this.messages == null) {
synchronized (this) {
if (this.messages == null) {
this.messages = new ApolloNotificationMessages();
}
}
}
// 添加到 `messages` 中
this.messages.put(key, notificationId);
}
com.ctrip.framework.apollo.core.dto.ApolloNotificationMessages
,Apollo 配置通知消息集合 DTO 。代码如下:
public class ApolloNotificationMessages {
/**
* 明细 Map
*
* KEY :{appId} "+" {clusterName} "+" {namespace} ,例如:100004458+default+application
* VALUE :通知编号
*/
private Map<String, Long> details;
public ApolloNotificationMessages() {
this(Maps.<String, Long>newHashMap());
}
}
details
字段,明细 Map 。其中,KEY 是 Watch Key 。为什么 ApolloConfigNotification 中有 ApolloNotificationMessages ,而且 ApolloNotificationMessages 的 details
字段是 Map ?按道理说,对于一个 Namespace 的通知,使用 ApolloConfigNotification 的 namespaceName
+ notificationId
已经足够了。但是,在 namespaceName
对应的 Namespace 是关联类型时,会同时查询当前 Namespace + 关联的 Namespace 这两个 Namespace,所以会是多个,使用 Map 数据结构。当然,对于 /notifications/v2
接口,仅有【直接】获得到配置变化才可能出现 ApolloNotificationMessages.details
为多个的情况。为啥?在 #handleMessage(...)
方法中,一次只处理一条 ReleaseMessage ,因此只会有 ApolloNotificationMessages.details
只会有一个。
put
public void put(String key, long notificationId) {
details.put(key, notificationId);
}
mergeFrom
public void mergeFrom(ApolloNotificationMessages source) {
if (source == null) {
return;
}
for (Map.Entry<String, Long> entry : source.getDetails().entrySet()) {
// to make sure the notification id always grows bigger
// 只合并新的通知编号大于的情况
if (this.has(entry.getKey()) && this.get(entry.getKey()) >= entry.getValue()) {
continue;
}
this.put(entry.getKey(), entry.getValue());
}
}
com.ctrip.framework.apollo.configservice.wrapper.DeferredResultWrapper
,DeferredResult 包装器,封装 DeferredResult 的公用方法。
/**
* 默认超时时间
*/
private static final long TIMEOUT = 60 * 1000; //60 seconds
/**
* 未修改时的 ResponseEntity 响应,使用 302 状态码。
*/
private static final ResponseEntity<List<ApolloConfigNotification>> NOT_MODIFIED_RESPONSE_LIST = new ResponseEntity<>(HttpStatus.NOT_MODIFIED);
/**
* 归一化和原始的 Namespace 的名字的 Map
*/
private Map<String, String> normalizedNamespaceNameToOriginalNamespaceName;
/**
* 响应的 DeferredResult 对象
*/
private DeferredResult<ResponseEntity<List<ApolloConfigNotification>>> result;
public DeferredResultWrapper() {
result = new DeferredResult<>(TIMEOUT, NOT_MODIFIED_RESPONSE_LIST);
}
TIMEOUT
静态属性,默认超时时间。NOT_MODIFIED_RESPONSE_LIST
静态属性,未修改时的 ResponseEntity 响应,使用 302 状态码。normalizedNamespaceNameToOriginalNamespaceName
属性,归一化( normalized )和原始( original )的 Namespace 的名字的 Map 。因为客户端在填写 Namespace 时,写错了名字的大小写。在 Config Service 中,会进行归一化“修复”,方便逻辑的统一编写。但是,最终返回给客户端需要“还原”回原始( original )的 Namespace 的名字,避免客户端无法识别。#recordNamespaceNameNormalizedResult(String originalNamespaceName, String normalizedNamespaceName)
方法,记录归一化和原始的 Namespace 的名字的映射。代码如下:
public
void
recordNamespaceNameNormalizedResult(String originalNamespaceName, String normalizedNamespaceName)
{
if (normalizedNamespaceNameToOriginalNamespaceName == null) {
normalizedNamespaceNameToOriginalNamespaceName = Maps.newHashMap();
}
// 添加到 `normalizedNamespaceNameToOriginalNamespaceName` 中
normalizedNamespaceNameToOriginalNamespaceName.put(normalizedNamespaceName, originalNamespaceName); // 和参数的顺序,相反
}
result
属性,响应的 DeferredResult 对象,在构造方法中初始化。public void onTimeout(Runnable timeoutCallback) {
result.onTimeout(timeoutCallback);
}
public void setResult(ApolloConfigNotification notification) {
setResult(Lists.newArrayList(notification));
}
public void setResult(ApolloConfigNotification notification) {
setResult(Lists.newArrayList(notification));
}
public void setResult(List<ApolloConfigNotification> notifications) {
// 恢复被归一化的 Namespace 的名字为原始的 Namespace 的名字
if (normalizedNamespaceNameToOriginalNamespaceName != null) {
notifications.stream().filter(notification -> normalizedNamespaceNameToOriginalNamespaceName.containsKey
(notification.getNamespaceName())).forEach(notification -> notification.setNamespaceName(
normalizedNamespaceNameToOriginalNamespaceName.get(notification.getNamespaceName())));
}
// 设置结果,并使用 200 状态码。
result.setResult(new ResponseEntity<>(notifications, HttpStatus.OK));
}
com.ctrip.framework.apollo.configservice.service.AppNamespaceServiceWithCache
,实现 InitializingBean 接口,缓存 AppNamespace 的 Service 实现类。通过将 AppNamespace 缓存在内存中,提高查询性能。缓存实现方式如下:
private static final Joiner STRING_JOINER = Joiner.on(ConfigConsts.CLUSTER_NAMESPACE_SEPARATOR).skipNulls();
@Autowired
private AppNamespaceRepository appNamespaceRepository;
@Autowired
private BizConfig bizConfig;
/**
* 增量初始化周期
*/
private int scanInterval;
/**
* 增量初始化周期单位
*/
private TimeUnit scanIntervalTimeUnit;
/**
* 重建周期
*/
private int rebuildInterval;
/**
* 重建周期单位
*/
private TimeUnit rebuildIntervalTimeUnit;
/**
* 定时任务 ExecutorService
*/
private ScheduledExecutorService scheduledExecutorService;
/**
* 最后扫描到的 AppNamespace 的编号
*/
private long maxIdScanned;
/**
* 公用类型的 AppNamespace 的缓存
*
* //store namespaceName -> AppNamespace
*/
private CaseInsensitiveMapWrapper<AppNamespace> publicAppNamespaceCache;
/**
* App 下的 AppNamespace 的缓存
*
* store appId+namespaceName -> AppNamespace
*/
private CaseInsensitiveMapWrapper<AppNamespace> appNamespaceCache;
/**
* AppNamespace 的缓存
*
* //store id -> AppNamespace
*/
private Map<Long, AppNamespace> appNamespaceIdCache;
public AppNamespaceServiceWithCache() {
initialize();
}
private void initialize() {
maxIdScanned = 0;
// 创建缓存对象
publicAppNamespaceCache = new CaseInsensitiveMapWrapper<>(Maps.newConcurrentMap());
appNamespaceCache = new CaseInsensitiveMapWrapper<>(Maps.newConcurrentMap());
appNamespaceIdCache = Maps.newConcurrentMap();
// 创建 ScheduledExecutorService 对象,大小为 1 。
scheduledExecutorService = Executors.newScheduledThreadPool(1, ApolloThreadFactory.create("AppNamespaceServiceWithCache", true));
}
appNamespaceCache
的 KEY ,通过 #assembleAppNamespaceKey(AppNamespace)
方法,拼接 appId
+ name
。代码如下:
private String assembleAppNamespaceKey(AppNamespace appNamespace)
{
return STRING_JOINER.join(appNamespace.getAppId(), appNamespace.getName());
}
#afterPropertiesSet()
方法,通过 Spring 调用,初始化定时任务。代码如下:
1: @Override
2: public void afterPropertiesSet() throws Exception {
3: // 从 ServerConfig 中,读取定时任务的周期配置
4: populateDataBaseInterval();
5: // 全量初始化 AppNamespace 缓存
6: scanNewAppNamespaces(); // block the startup process until load finished
7: // 创建定时任务,全量重构 AppNamespace 缓存
8: scheduledExecutorService.scheduleAtFixedRate(() -> {
9: // 【TODO 6001】Tracer 日志
10: Transaction transaction = Tracer.newTransaction("Apollo.AppNamespaceServiceWithCache", "rebuildCache");
11: try {
12: // 全量重建 AppNamespace 缓存
13: this.updateAndDeleteCache();
14: // 【TODO 6001】Tracer 日志
15: transaction.setStatus(Transaction.SUCCESS);
16: } catch (Throwable ex) {
17: // 【TODO 6001】Tracer 日志
18: transaction.setStatus(ex);
19: logger.error("Rebuild cache failed", ex);
20: } finally {
21: // 【TODO 6001】Tracer 日志
22: transaction.complete();
23: }
24: }, rebuildInterval, rebuildInterval, rebuildIntervalTimeUnit);
25: // 创建定时任务,增量初始化 AppNamespace 缓存
26: scheduledExecutorService.scheduleWithFixedDelay(this::scanNewAppNamespaces, scanInterval, scanInterval, scanIntervalTimeUnit);
27: }
#populateDataBaseInterval()
方法,从 ServerConfig 中,读取定时任务的周期配置。代码如下:
private
void
populateDataBaseInterval()
{
scanInterval = bizConfig.appNamespaceCacheScanInterval(); // "apollo.app-namespace-cache-scan.interval"
scanIntervalTimeUnit = bizConfig.appNamespaceCacheScanIntervalTimeUnit(); // 默认秒,不可配置
rebuildInterval = bizConfig.appNamespaceCacheRebuildInterval(); // "apollo.app-namespace-cache-rebuild.interval"
rebuildIntervalTimeUnit = bizConfig.appNamespaceCacheRebuildIntervalTimeUnit(); // 默认秒,不可配置
}
#scanNewAppNamespaces()
方法,全量初始化 AppNamespace 缓存。在 「5.3 scanNewAppNamespaces」 中详细解析。#updateAndDeleteCache()
方法,更新和删除 AppNamespace 缓存。在 「5.4 scanNewAppNamespaces」 中详细解析。#scanNewAppNamespaces()
方法。private void scanNewAppNamespaces() {
// 【TODO 6001】Tracer 日志
Transaction transaction = Tracer.newTransaction("Apollo.AppNamespaceServiceWithCache", "scanNewAppNamespaces");
try {
// 加载新的 AppNamespace 们
this.loadNewAppNamespaces();
// 【TODO 6001】Tracer 日志
transaction.setStatus(Transaction.SUCCESS);
} catch (Throwable ex) {
// 【TODO 6001】Tracer 日志
transaction.setStatus(ex);
logger.error("Load new app namespaces failed", ex);
} finally {
// 【TODO 6001】Tracer 日志
transaction.complete();
}
}
#loadNewAppNamespaces()
方法,加载新的 AppNamespace 们。代码如下:
private
void
loadNewAppNamespaces()
{
boolean hasMore = true;
while (hasMore && !Thread.currentThread().isInterrupted()) { // 循环,直到无新的 AppNamespace
// current batch is 500
// 获得大于 maxIdScanned 的 500 条 AppNamespace 记录,按照 id 升序
List<AppNamespace> appNamespaces = appNamespaceRepository.findFirst500ByIdGreaterThanOrderByIdAsc(maxIdScanned);
if (CollectionUtils.isEmpty(appNamespaces)) {
break;
}
// 合并到 AppNamespace 缓存中
mergeAppNamespaces(appNamespaces);
// 获得新的 maxIdScanned ,取最后一条记录
int scanned = appNamespaces.size();
maxIdScanned = appNamespaces.get(scanned - 1).getId();
// 若拉取不足 500 条,说明无新消息了
hasMore = scanned == 500;
logger.info("Loaded {} new app namespaces with startId {}", scanned, maxIdScanned);
}
}
#mergeAppNamespaces(appNamespaces)
方法,合并到 AppNamespace 缓存中。代码如下:
private
void
mergeAppNamespaces(List<AppNamespace> appNamespaces)
{
for (AppNamespace appNamespace : appNamespaces) {
// 添加到 `appNamespaceCache` 中
appNamespaceCache.put(assembleAppNamespaceKey(appNamespace), appNamespace);
// 添加到 `appNamespaceIdCache`
appNamespaceIdCache.put(appNamespace.getId(), appNamespace);
// 若是公用类型,则添加到 `publicAppNamespaceCache` 中
if (appNamespace.isPublic()) {
publicAppNamespaceCache.put(appNamespace.getName(), appNamespace);
}
}
}
private void updateAndDeleteCache() {
// 从缓存中,获得所有的 AppNamespace 编号集合
List<Long> ids = Lists.newArrayList(appNamespaceIdCache.keySet());
if (CollectionUtils.isEmpty(ids)) {
return;
}
// 每 500 一批,从数据库中查询最新的 AppNamespace 信息
List<List<Long>> partitionIds = Lists.partition(ids, 500);
for (List<Long> toRebuild : partitionIds) {
Iterable<AppNamespace> appNamespaces = appNamespaceRepository.findAll(toRebuild);
if (appNamespaces == null) {
continue;
}
// 处理更新的情况
// handle updated
Set<Long> foundIds = handleUpdatedAppNamespaces(appNamespaces);
// 处理删除的情况
// handle deleted
handleDeletedAppNamespaces(Sets.difference(Sets.newHashSet(toRebuild), foundIds));
}
}
#handleUpdatedAppNamespaces(appNamespaces)
方法,处理更新的情况。代码如下:
private Set<Long> handleUpdatedAppNamespaces(Iterable<AppNamespace> appNamespaces)
{
Set<Long> foundIds = Sets.newHashSet();
for (AppNamespace appNamespace : appNamespaces) {
foundIds.add(appNamespace.getId());
// 获得缓存中的 AppNamespace 对象
AppNamespace thatInCache = appNamespaceIdCache.get(appNamespace.getId());
// 从 DB 中查询到的 AppNamespace 的更新时间更大,才认为是更新
if (thatInCache != null && appNamespace.getDataChangeLastModifiedTime().after(thatInCache.getDataChangeLastModifiedTime())) {
// 添加到 appNamespaceIdCache 中
appNamespaceIdCache.put(appNamespace.getId(), appNamespace);
// 添加到 appNamespaceCache 中
String oldKey = assembleAppNamespaceKey(thatInCache);
String newKey = assembleAppNamespaceKey(appNamespace);
appNamespaceCache.put(newKey, appNamespace);
// 当 appId 或 namespaceName 发生改变的情况,将老的移除出 appNamespaceCache
// in case appId or namespaceName changes
if (!newKey.equals(oldKey)) {
appNamespaceCache.remove(oldKey);
}
// 添加到 publicAppNamespaceCache 中
if (appNamespace.isPublic()) { // 新的是公用类型
// 添加到 publicAppNamespaceCache 中
publicAppNamespaceCache.put(appNamespace.getName(), appNamespace);
// 当 namespaceName 发生改变的情况,将老的移除出 publicAppNamespaceCache
// in case namespaceName changes
if (!appNamespace.getName().equals(thatInCache.getName()) && thatInCache.isPublic()) {
publicAppNamespaceCache.remove(thatInCache.getName());
}
} else
if (thatInCache.isPublic()) { // 新的不是公用类型,需要移除
//just in case isPublic changes
publicAppNamespaceCache.remove(thatInCache.getName());
}
logger.info("Found AppNamespace changes, old: {}, new: {}", thatInCache, appNamespace);
}
}
return foundIds;
}
#handleDeletedAppNamespaces(Set<Long> deletedIds)
方法,处理删除的情况。代码如下:
private
void
handleDeletedAppNamespaces(Set<Long> deletedIds)
{
if (CollectionUtils.isEmpty(deletedIds)) {
return;
}
for (Long deletedId : deletedIds) {
// 从 appNamespaceIdCache 中移除
AppNamespace deleted = appNamespaceIdCache.remove(deletedId);
if (deleted == null) {
continue;
}
// 从 appNamespaceCache 中移除
appNamespaceCache.remove(assembleAppNamespaceKey(deleted));
// 从 publicAppNamespaceCache 移除
if (deleted.isPublic()) {
publicAppNamespaceCache.remove(deleted.getName());
}
logger.info("Found AppNamespace deleted, {}", deleted);
}
}
/**
* 获得 AppNamespace 对象
*
* @param appId App 编号
* @param namespaceName Namespace 名字
* @return AppNamespace
*/
public AppNamespace findByAppIdAndNamespace(String appId, String namespaceName) {
Preconditions.checkArgument(!StringUtils.isContainEmpty(appId, namespaceName), "appId and namespaceName must not be empty");
return appNamespaceCache.get(STRING_JOINER.join(appId, namespaceName));
}
/**
* 获得 AppNamespace 对象数组
*
* @param appId App 编号
* @param namespaceNames Namespace 名字的集合
* @return AppNamespace 数组
*/
public List<AppNamespace> findByAppIdAndNamespaces(String appId, Set<String> namespaceNames) {
Preconditions.checkArgument(!Strings.isNullOrEmpty(appId), "appId must not be null");
if (namespaceNames == null || namespaceNames.isEmpty()) {
return Collections.emptyList();
}
List<AppNamespace> result = Lists.newArrayList();
// 循环获取
for (String namespaceName : namespaceNames) {
AppNamespace appNamespace = appNamespaceCache.get(STRING_JOINER.join(appId, namespaceName));
if (appNamespace != null) {
result.add(appNamespace);
}
}
return result;
}
/**
* 获得公用类型的 AppNamespace 对象
*
* @param namespaceName Namespace 名字
* @return AppNamespace
*/
public AppNamespace findPublicNamespaceByName(String namespaceName) {
Preconditions.checkArgument(!Strings.isNullOrEmpty(namespaceName), "namespaceName must not be empty");
return publicAppNamespaceCache.get(namespaceName);
}
/**
* 获得公用类型的 AppNamespace 对象数组
*
* @param namespaceNames Namespace 名字的集合
* @return AppNamespace 数组
*/
public List<AppNamespace> findPublicNamespacesByNames(Set<String> namespaceNames) {
if (namespaceNames == null || namespaceNames.isEmpty()) {
return Collections.emptyList();
}
List<AppNamespace> result = Lists.newArrayList();
// 循环获取
for (String namespaceName : namespaceNames) {
AppNamespace appNamespace = publicAppNamespaceCache.get(namespaceName);
if (appNamespace != null) {
result.add(appNamespace);
}
}
return result;
}
com.ctrip.framework.apollo.configservice.service.ReleaseMessageServiceWithCache
,实现 InitializingBean 和 ReleaseMessageListener 接口,缓存 ReleaseMessage 的 Service 实现类。通过将 ReleaseMessage 缓存在内存中,提高查询性能。缓存实现方式如下:
@Autowired
private ReleaseMessageRepository releaseMessageRepository;
@Autowired
private BizConfig bizConfig;
/**
* 扫描周期
*/
private int scanInterval;
/**
* 扫描周期单位
*/
private TimeUnit scanIntervalTimeUnit;
/**
* 最后扫描到的 ReleaseMessage 的编号
*/
private volatile long maxIdScanned;
/**
* ReleaseMessage 缓存
*
* KEY:`ReleaseMessage.message`
* VALUE:对应的最新的 ReleaseMessage 记录
*/
private ConcurrentMap<String, ReleaseMessage> releaseMessageCache;
/**
* 是否执行扫描任务
*/
private AtomicBoolean doScan;
/**
* ExecutorService 对象
*/
private ExecutorService executorService;
public ReleaseMessageServiceWithCache() {
initialize();
}
private void initialize() {
// 创建缓存对象
releaseMessageCache = Maps.newConcurrentMap();
// 设置 doScan 为 true
doScan = new AtomicBoolean(true);
// 创建 ScheduledExecutorService 对象,大小为 1 。
executorService = Executors.newSingleThreadExecutor(ApolloThreadFactory.create("ReleaseMessageServiceWithCache", true));
}
#afterPropertiesSet()
方法,通知 Spring 调用,初始化定时任务。代码如下:
1: @Override
2: public void afterPropertiesSet() throws Exception {
3: // 从 ServerConfig 中,读取任务的周期配置
4: populateDataBaseInterval();
5: // 初始拉取 ReleaseMessage 到缓存
6: //block the startup process until load finished
7: //this should happen before ReleaseMessageScanner due to autowire
8: loadReleaseMessages(0);
9: // 创建定时任务,增量拉取 ReleaseMessage 到缓存,用以处理初始化期间,产生的 ReleaseMessage 遗漏的问题。
10: executorService.submit(() -> {
11: while (doScan.get() && !Thread.currentThread().isInterrupted()) {
12: // 【TODO 6001】Tracer 日志
13: Transaction transaction = Tracer.newTransaction("Apollo.ReleaseMessageServiceWithCache", "scanNewReleaseMessages");
14: try {
15: // 增量拉取 ReleaseMessage 到缓存
16: loadReleaseMessages(maxIdScanned);
17: // 【TODO 6001】Tracer 日志
18: transaction.setStatus(Transaction.SUCCESS);
19: } catch (Throwable ex) {
20: // 【TODO 6001】Tracer 日志
21: transaction.setStatus(ex);
22: logger.error("Scan new release messages failed", ex);
23: } finally {
24: transaction.complete();
25: }
26: try {
27: scanIntervalTimeUnit.sleep(scanInterval);
28: } catch (InterruptedException e) {
29: //ignore
30: }
31: }
32: });
33: }
#populateDataBaseInterval()
方法,从 ServerConfig 中,读取定时任务的周期配置。代码如下:
private
void
populateDataBaseInterval()
{
scanInterval = bizConfig.releaseMessageCacheScanInterval(); // "apollo.release-message-cache-scan.interval" ,默认为 1 。
scanIntervalTimeUnit = bizConfig.releaseMessageCacheScanIntervalTimeUnit(); // 默认秒,不可配置。
}
#loadReleaseMessages(startId)
方法,初始拉取 ReleaseMessage 到缓存。在 「6.3 loadReleaseMessages」 中详细解析。ReleaseMessageScanner.maxIdScanned
大于 ReleaseMessageServiceWithCache.maxIdScanned
,从而导致 ReleaseMessage 的遗漏。#loadReleaseMessages(startId)
方法,增量拉取新的 ReleaseMessage 们。代码如下:
private void loadReleaseMessages(long startId) {
boolean hasMore = true;
while (hasMore && !Thread.currentThread().isInterrupted()) {
// current batch is 500
// 获得大于 maxIdScanned 的 500 条 ReleaseMessage 记录,按照 id 升序
List<ReleaseMessage> releaseMessages = releaseMessageRepository.findFirst500ByIdGreaterThanOrderByIdAsc(startId);
if (CollectionUtils.isEmpty(releaseMessages)) {
break;
}
// 合并到 ReleaseMessage 缓存
releaseMessages.forEach(this::mergeReleaseMessage);
// 获得新的 maxIdScanned ,取最后一条记录
int scanned = releaseMessages.size();
startId = releaseMessages.get(scanned - 1).getId();
// 若拉取不足 500 条,说明无新消息了
hasMore = scanned == 500;
logger.info("Loaded {} release messages with startId {}", scanned, startId);
}
}
#mergeAppNamespaces(appNamespaces)
方法,合并到 ReleaseMessage 缓存中。代码如下:
private
synchronized
void
mergeReleaseMessage(ReleaseMessage releaseMessage)
{
// 获得对应的 ReleaseMessage 对象
ReleaseMessage old = releaseMessageCache.get(releaseMessage.getMessage());
// 若编号更大,进行更新缓存
if (old == null || releaseMessage.getId() > old.getId()) {
releaseMessageCache.put(releaseMessage.getMessage(), releaseMessage);
maxIdScanned = releaseMessage.getId();
}
}
1: @Override
2: public void handleMessage(ReleaseMessage message, String channel) {
3: // Could stop once the ReleaseMessageScanner starts to work
4: // 关闭增量拉取定时任务的执行
5: doScan.set(false);
6: logger.info("message received - channel: {}, message: {}", channel, message);
7:
8: // 仅处理 APOLLO_RELEASE_TOPIC
9: String content = message.getMessage();
10: Tracer.logEvent("Apollo.ReleaseMessageService.UpdateCache", String.valueOf(message.getId()));
11: if (!Topics.APOLLO_RELEASE_TOPIC.equals(channel) || Strings.isNullOrEmpty(content)) {
12: return;
13: }
14:
15: // 计算 gap
16: long gap = message.getId() - maxIdScanned;
17: // 若无空缺 gap ,直接合并
18: if (gap == 1) {
19: mergeReleaseMessage(message);
20: // 如有空缺 gap ,增量拉取
21: } else if (gap > 1) {
22: // gap found!
23: loadReleaseMessages(maxIdScanned);
24: }
25: }
gap
。#mergeReleaseMessage(message)
方法,直接合并即可。#loadReleaseMessages(maxIdScanned)
方法,增量拉取。例如,上述的第 3 步,定时任务还来不及拉取( 即未执行 ),ReleaseMessageScanner 就已经通知,此处会产生空缺的 gap
。com.ctrip.framework.apollo.configservice.util.WatchKeysUtil
,Watch Key 工具类。
核心的方法为 #assembleAllWatchKeys(appId, clusterName, namespaces, dataCenter)
方法,组装 Watch Key Multimap 。其中 KEY 为 Namespace 的名字,VALUE 为 Watch Key 集合。代码如下:
1: /**
2: * 组装所有的 Watch Key Multimap 。其中 Key 为 Namespace 的名字,Value 为 Watch Key 集合。
3: *
4: * @param appId App 编号
5: * @param clusterName Cluster 名
6: * @param namespaces Namespace 的名字的数组
7: * @param dataCenter IDC 的 Cluster 名
8: * @return Watch Key Multimap
9: */
10: public Multimap<String, String> assembleAllWatchKeys(String appId, String clusterName,
11: Set<String> namespaces,
12: String dataCenter) {
13: // 组装 Watch Key Multimap
14: Multimap<String, String> watchedKeysMap = assembleWatchKeys(appId, clusterName, namespaces, dataCenter);
15:
16: // 如果不是仅监听 'application' Namespace ,处理其关联来的 Namespace 。
17: // Every app has an 'application' namespace
18: if (!(namespaces.size() == 1 && namespaces.contains(ConfigConsts.NAMESPACE_APPLICATION))) {
19: // 获得属于该 App 的 Namespace 的名字的集合
20: Set<String> namespacesBelongToAppId = namespacesBelongToAppId(appId, namespaces);
21: // 获得关联来的 Namespace 的名字的集合
22: Set<String> publicNamespaces = Sets.difference(namespaces, namespacesBelongToAppId);
23: // 添加到 Watch Key Multimap 中
24: // Listen on more namespaces if it's a public namespace
25: if (!publicNamespaces.isEmpty()) {
26: watchedKeysMap.putAll(findPublicConfigWatchKeys(appId, clusterName, publicNamespaces, dataCenter));
27: }
28: }
29:
30: return watchedKeysMap;
31: }
#assembleWatchKeys(appId, clusterName, namespaces, dataCenter)
方法,组装 App 下的 Watch Key Multimap 。详细解析,见 「7.1 assembleWatchKeys」 。namespaces
中,可能存在关联类型的 Namespace ,因此需要进一步处理。在这里的判断会比较“绕”,如果 namespaces
仅仅是 "application"
时,那么肯定不存在关联类型的 Namespace 。#namespacesBelongToAppId(appId, namespaces)
方法,获得属于该 App 的 Namespace 的名字的集合。详细解析,见 「7.2 namespacesBelongToAppId」 。Sets#difference(...)
方法,进行集合差异计算,获得关联类型的 Namespace 的名字的集合。#findPublicConfigWatchKeys(...)
方法,获得关联类型的 Namespace 的名字的集合的 Watch Key Multimap ,并添加到结果集中。详细解析,见 「7.3 findPublicConfigWatchKeys」 。/**
* 获得每条消息内容对应的最新的 ReleaseMessage 对象
*
* @param messages 消息内容的集合
* @return 集合
*/
public List<ReleaseMessage> findLatestReleaseMessagesGroupByMessages(Set<String> messages) {
if (CollectionUtils.isEmpty(messages)) {
return Collections.emptyList();
}
List<ReleaseMessage> releaseMessages = Lists.newArrayList();
// 获得每条消息内容对应的最新的 ReleaseMessage 对象
for (String message : messages) {
ReleaseMessage releaseMessage = releaseMessageCache.get(message);
if (releaseMessage != null) {
releaseMessages.add(releaseMessage);
}
}
return releaseMessages;
}
/**
* 组装 Watch Key Multimap
*
* @param appId App 编号
* @param clusterName Cluster 名
* @param namespaces Namespace 的名字的集合
* @param dataCenter IDC 的 Cluster 名字
* @return Watch Key Multimap
*/
private Multimap<String, String> assembleWatchKeys(String appId, String clusterName, Set<String> namespaces, String dataCenter) {
Multimap<String, String> watchedKeysMap = HashMultimap.create();
// 循环 Namespace 的名字的集合
for (String namespace : namespaces) {
watchedKeysMap.putAll(namespace, assembleWatchKeys(appId, clusterName, namespace, dataCenter));
}
return watchedKeysMap;
}
#assembleWatchKeys(appId, clusterName, namespace, dataCenter)
方法,组装指定 Namespace 的 Watch Key 数组。代码如下:
1: private Set<String> assembleWatchKeys(String appId, String clusterName, String namespace, String dataCenter)
{
2: if (ConfigConsts.NO_APPID_PLACEHOLDER.equalsIgnoreCase(appId)) {
3: return Collections.emptySet();
4: }
5: Set<String> watchedKeys = Sets.newHashSet();
6:
7: // 指定 Cluster
8: // watch specified cluster config change
9: if (!Objects.equals(ConfigConsts.CLUSTER_NAME_DEFAULT, clusterName)) {
10: watchedKeys.add(assembleKey(appId, clusterName, namespace));
11: }
12:
13: // 所属 IDC 的 Cluster
14: // https://github.com/ctripcorp/apollo/issues/952
15: // watch data center config change
16: if (!Strings.isNullOrEmpty(dataCenter) && !Objects.equals(dataCenter, clusterName)) {
17: watchedKeys.add(assembleKey(appId, dataCenter, namespace));
18: }
19:
20: // 默认 Cluster
21: // watch default cluster config change
22: watchedKeys.add(assembleKey(appId, ConfigConsts.CLUSTER_NAME_DEFAULT, namespace));
23:
24: return watchedKeys;
25: }
"default"
) 的 Cluster 的 Namespace 的 Watch Key 。#assembleKey(appId, clusterName, namespace)
方法,获得 Watch Key ,详细解析,见 「7.4 assembleKey」 。关于多 Cluster 的读取顺序,可参见 《Apollo 配置中心介绍 —— 4.4.1 应用自身配置的获取规则》 。后续,我们也专门分享这块。
/**
* 获得属于该 App 的 Namespace 的名字的集合
*
* @param appId App 编号
* @param namespaces Namespace 名
* @return 集合
*/
private Set<String> namespacesBelongToAppId(String appId, Set<String> namespaces) {
if (ConfigConsts.NO_APPID_PLACEHOLDER.equalsIgnoreCase(appId)) {
return Collections.emptySet();
}
// 获得属于该 App 的 AppNamespace 集合
List<AppNamespace> appNamespaces = appNamespaceService.findByAppIdAndNamespaces(appId, namespaces);
if (appNamespaces == null || appNamespaces.isEmpty()) {
return Collections.emptySet();
}
// 返回 AppNamespace 的名字的集合
return appNamespaces.stream().map(AppNamespace::getName).collect(Collectors.toSet());
}
@Autowired
private AppNamespaceServiceWithCache appNamespaceService;
/**
* 获得 Namespace 类型为 public 对应的 Watch Key Multimap
*
* 重要:要求非当前 App 的 Namespace
*
* @param applicationId App 编号
* @param clusterName Cluster 名
* @param namespaces Namespace 的名字的集合
* @param dataCenter IDC 的 Cluster 名
* @return Watch Key Map
*/
private Multimap<String, String> findPublicConfigWatchKeys(String applicationId, String clusterName, Set<String> namespaces, String dataCenter) {
Multimap<String, String> watchedKeysMap = HashMultimap.create();
// 获得 Namespace 为 public 的 AppNamespace 数组
List<AppNamespace> appNamespaces = appNamespaceService.findPublicNamespacesByNames(namespaces);
// 组装 Watch Key Map
for (AppNamespace appNamespace : appNamespaces) {
// 排除非关联类型的 Namespace
// check whether the namespace's appId equals to current one
if (Objects.equals(applicationId, appNamespace.getAppId())) {
continue;
}
String publicConfigAppId = appNamespace.getAppId();
// 组装指定 Namespace 的 Watch Key 数组
watchedKeysMap.putAll(appNamespace.getName(), assembleWatchKeys(publicConfigAppId, clusterName, appNamespace.getName(), dataCenter));
}
return watchedKeysMap;
}
private static final Joiner STRING_JOINER = Joiner.on(ConfigConsts.CLUSTER_NAMESPACE_SEPARATOR);
/**
* 拼接 Watch Key
*
* @param appId App 编号
* @param cluster Cluster 名
* @param namespace Namespace 名
* @return Watch Key
*/
private String assembleKey(String appId, String cluster, String namespace) {
return STRING_JOINER.join(appId, cluster, namespace);
}
ReleaseMessage.message
的格式是一致的。com.ctrip.framework.apollo.biz.utils.EntityManagerUtil
,实现 org.springframework.orm.jpa.EntityManagerFactoryAccessor
抽象类,EntityManager 抽象类。代码如下:
@Component
public class EntityManagerUtil extends EntityManagerFactoryAccessor {
private static final Logger logger = LoggerFactory.getLogger(EntityManagerUtil.class);
/**
* close the entity manager.
* Use it with caution! This is only intended for use with async request, which Spring won't
* close the entity manager until the async request is finished.
*/
public void closeEntityManager() {
// 获得 EntityManagerHolder 对象
EntityManagerHolder emHolder = (EntityManagerHolder) TransactionSynchronizationManager.getResource(getEntityManagerFactory());
if (emHolder == null) {
return;
}
logger.debug("Closing JPA EntityManager in EntityManagerUtil");
// 关闭 EntityManager
EntityManagerFactoryUtils.closeEntityManager(emHolder.getEntityManager());
}
}
爽~爽爽爽爽爽~
感叹一句,比想象中长太多了。另外,RocketMQ 也是基于长轮询的方式,获取新的消息。实现上有一些差距,大的方向一致。感兴趣的胖友,可以看看老艿艿的 RocketMQ 源码解析系列。