摘要: 原创出处 http://www.iocoder.cn/Apollo/config-service-audit-instance/ 「芋道源码」欢迎转载,保留摘要,谢谢!
老艿艿:本系列假定胖友已经阅读过 《Apollo 官方 wiki 文档》 。
在 Portal 的应用详情页,我们可以看到每个 Namespace 下的实例列表。如下图所示:
本文分享实例相关的实体和如何存储的。
com.ctrip.framework.apollo.biz.entity.Instance
,Instance 实体。代码如下:
@Entity
@Table(name = "Instance")
public class Instance {
/**
* 编号
*/
@Id
@GeneratedValue
@Column(name = "Id")
private long id;
/**
* App 编号
*/
@Column(name = "AppId", nullable = false)
private String appId;
/**
* Cluster 名字
*/
@Column(name = "ClusterName", nullable = false)
private String clusterName;
/**
* 数据中心的 Cluster 名字
*/
@Column(name = "DataCenter", nullable = false)
private String dataCenter;
/**
* 客户端 IP
*/
@Column(name = "Ip", nullable = false)
private String ip;
/**
* 数据创建时间
*/
@Column(name = "DataChange_CreatedTime", nullable = false)
private Date dataChangeCreatedTime;
/**
* 数据最后更新时间
*/
@Column(name = "DataChange_LastTime")
private Date dataChangeLastModifiedTime;
@PrePersist
protected void prePersist() {
if (this.dataChangeCreatedTime == null) {
dataChangeCreatedTime = new Date();
}
if (this.dataChangeLastModifiedTime == null) {
dataChangeLastModifiedTime = dataChangeCreatedTime;
}
}
}
id
字段,编号,自增。appId
+ clusterName
+ dataCenter
+ ip
组成唯一索引,通过这四个字段唯一一个实例( 客户端 )。com.ctrip.framework.apollo.biz.entity.InstanceConfig
,Instance Config 实体,记录 Instance 对 Namespace 的配置的获取情况。如果一个 Instance 使用了多个 Namespace ,则会记录多条 InstanceConfig 。
代码如下:
@Entity
@Table(name = "InstanceConfig")
public class InstanceConfig {
/**
* 编号
*/
@Id
@GeneratedValue
@Column(name = "Id")
private long id;
/**
* Instance 编号,指向 {@link Instance#id}
*/
@Column(name = "InstanceId")
private long instanceId;
/**
* App 编号
*/
@Column(name = "ConfigAppId", nullable = false)
private String configAppId;
/**
* Cluster 名字
*/
@Column(name = "ConfigClusterName", nullable = false)
private String configClusterName;
/**
* Namespace 名字
*/
@Column(name = "ConfigNamespaceName", nullable = false)
private String configNamespaceName;
/**
* Release Key ,对应 {@link Release#releaseKey}
*/
@Column(name = "ReleaseKey", nullable = false)
private String releaseKey;
/**
* 配置下发时间
*/
@Column(name = "ReleaseDeliveryTime", nullable = false)
private Date releaseDeliveryTime;
/**
* 数据创建时间
*/
@Column(name = "DataChange_CreatedTime", nullable = false)
private Date dataChangeCreatedTime;
/**
* 数据最后更新时间
*/
@Column(name = "DataChange_LastTime")
private Date dataChangeLastModifiedTime;
@PrePersist
protected void prePersist() {
if (this.dataChangeCreatedTime == null) {
dataChangeCreatedTime = new Date();
}
if (this.dataChangeLastModifiedTime == null) {
dataChangeLastModifiedTime = dataChangeCreatedTime;
}
}
}
id
字段,编号,自增。instanceId
+ configAppId
+ ConfigNamespaceName
组成唯一索引,因为一个 Instance 可以使用多个 Namespace 。releaseKey
字段,Release Key ,对应 Release.releaseKey
字段。releaseDeliveryTime
字段,配置下发时间。releaseKey
+ releaseDeliveryTime
字段,可以很容易判断 Instance 在当前 Namespace 获取配置的情况。configClusterName
字段,Cluster 名字。在 《Apollo 源码解析 —— Config Service 配置读取接口》 中,我们看到,客户端读取配置时,会调用 Config Service 的 GET /configs/{appId}/{clusterName}/{namespace:.+}
接口。在接口中,会调用 InstanceConfigAuditUtil#audit(...)
的方法,代码如下:
private void auditReleases(String appId, String cluster, String dataCenter, String clientIp,
List<Release> releases) {
if (Strings.isNullOrEmpty(clientIp)) {
//no need to audit instance config when there is no ip
return;
}
// 循环 Release 数组
for (Release release : releases) {
// 记录 InstanceConfig
instanceConfigAuditUtil.audit(appId, cluster, dataCenter, clientIp, release.getAppId(),
release.getClusterName(),
release.getNamespaceName(), release.getReleaseKey());
}
}
下面我们来看看 InstanceConfigAuditUtil 的具体实现。
com.ctrip.framework.apollo.configservice.util.InstanceConfigAuditUtil
,实现 InitializingBean 接口,InstanceConfig 审计工具类。
/**
* {@link #audits} 大小
*/
private static final int INSTANCE_CONFIG_AUDIT_MAX_SIZE = 10000;
/**
* {@link #instanceCache} 大小
*/
private static final int INSTANCE_CACHE_MAX_SIZE = 50000;
/**
* {@link #instanceConfigReleaseKeyCache} 大小
*/
private static final int INSTANCE_CONFIG_CACHE_MAX_SIZE = 50000;
private static final long OFFER_TIME_LAST_MODIFIED_TIME_THRESHOLD_IN_MILLI = TimeUnit.MINUTES.toMillis(10);//10 minutes
private static final Joiner STRING_JOINER = Joiner.on(ConfigConsts.CLUSTER_NAMESPACE_SEPARATOR);
/**
* ExecutorService 对象。队列大小为 1 。
*/
private final ExecutorService auditExecutorService;
/**
* 是否停止
*/
private final AtomicBoolean auditStopped;
/**
* 队列
*/
private BlockingQueue<InstanceConfigAuditModel> audits = Queues.newLinkedBlockingQueue(INSTANCE_CONFIG_AUDIT_MAX_SIZE);
/**
* Instance 的编号的缓存
*
* KEY:{@link #assembleInstanceKey(String, String, String, String)}
* VALUE:{@link Instance#id}
*/
private Cache<String, Long> instanceCache;
/**
* InstanceConfig 的 ReleaseKey 的缓存
*
* KEY:{@link #assembleInstanceConfigKey(long, String, String)}
* VALUE:{@link InstanceConfig#id}
*/
private Cache<String, String> instanceConfigReleaseKeyCache;
@Autowired
private InstanceService instanceService;
public InstanceConfigAuditUtil() {
auditExecutorService = Executors.newSingleThreadExecutor(ApolloThreadFactory.create("InstanceConfigAuditUtil", true));
auditStopped = new AtomicBoolean(false);
instanceCache = CacheBuilder.newBuilder().expireAfterAccess(1, TimeUnit.HOURS).maximumSize(INSTANCE_CACHE_MAX_SIZE).build();
instanceConfigReleaseKeyCache = CacheBuilder.newBuilder().expireAfterWrite(1, TimeUnit.DAYS).maximumSize(INSTANCE_CONFIG_CACHE_MAX_SIZE).build();
}
instanceId
+ configAppId
+ ConfigNamespaceName
,恰好是 InstanceConfig 的唯一索引的字段。releaseKey
。appId
+ clusterName
+ dataCenter
+ ip
,恰好是 Instance 的唯一索引的字段。id
。instanceCache
属性,Instance 的编号的缓存。其中:instanceConfigReleaseKeyCache
属性,InstanceConfig 的 ReleaseKey 的缓存。其中:auditExecutorService
属性,ExecutorService 对象。队列大小为 1 。auditStopped
属性,是否停止。audits
属性,队列。#afterPropertiesSet()
方法,通过 Spring 调用,初始化任务。代码如下:
1: @Override
2: public void afterPropertiesSet() {
3: // 提交任务
4: auditExecutorService.submit(() -> {
5: // 循环,直到停止或线程打断
6: while (!auditStopped.get() && !Thread.currentThread().isInterrupted()) {
7: try {
8: // 获得队首 InstanceConfigAuditModel 元素,非阻塞
9: InstanceConfigAuditModel model = audits.poll();
10: // 若获取不到,sleep 等待 1 秒
11: if (model == null) {
12: TimeUnit.SECONDS.sleep(1);
13: continue;
14: }
15: // 若获取到,记录 Instance 和 InstanceConfig
16: doAudit(model);
17: } catch (Throwable ex) {
18: Tracer.logError(ex);
19: }
20: }
21: });
22: }
auditExecutorService
中。BlockingQueue#poll()
方法,获得队首 InstanceConfigAuditModel 元素,非阻塞。#doAudit(InstanceConfigAuditModel)
方法,记录 Instance 和 InstanceConfig 。详细解析,见 「3.4 doAudit」 。#audit(...)
方法,添加到队列中。代码如下:
public boolean audit(String appId, String clusterName, String dataCenter, String
ip, String configAppId, String configClusterName, String configNamespace, String releaseKey) {
return this.audits.offer(new InstanceConfigAuditModel(appId, clusterName, dataCenter, ip,
configAppId, configClusterName, configNamespace, releaseKey));
}
public
static
class
InstanceConfigAuditModel
{
private String appId;
private String clusterName;
private String dataCenter;
private String ip;
private String configAppId;
private String configClusterName;
private String configNamespace;
private String releaseKey;
/**
* 入队时间
*/
private Date offerTime;
public
InstanceConfigAuditModel(String appId, String clusterName, String dataCenter, String
clientIp, String configAppId, String configClusterName, String configNamespace, String
releaseKey)
{
this.offerTime = new Date(); // 当前时间
this.appId = appId;
this.clusterName = clusterName;
this.dataCenter = Strings.isNullOrEmpty(dataCenter) ? "" : dataCenter;
this.ip = clientIp;
this.configAppId = configAppId;
this.configClusterName = configClusterName;
this.configNamespace = configNamespace;
this.releaseKey = releaseKey;
}
}
offerTime
属性,入队时间,取得当前时间,避免异步处理的时间差。BlockingQueue#offset(InstanceConfigAuditModel)
方法,添加到队列 audits
中。#doAudit(InstanceConfigAuditModel)
方法,记录 Instance 和 InstanceConfig 。代码如下:
1: void doAudit(InstanceConfigAuditModel auditModel) {
2: // 拼接 instanceCache 的 KEY
3: String instanceCacheKey = assembleInstanceKey(auditModel.getAppId(), auditModel.getClusterName(),
4: auditModel.getIp(), auditModel.getDataCenter());
5: // 获得 Instance 编号
6: Long instanceId = instanceCache.getIfPresent(instanceCacheKey);
7: // 查询不到,从 DB 加载或者创建,并添加到缓存中。
8: if (instanceId == null) {
9: instanceId = prepareInstanceId(auditModel);
10: instanceCache.put(instanceCacheKey, instanceId);
11: }
12:
13: // 获得 instanceConfigReleaseKeyCache 的 KEY
14: // load instance config release key from cache, and check if release key is the same
15: String instanceConfigCacheKey = assembleInstanceConfigKey(instanceId, auditModel.getConfigAppId(),
16: auditModel.getConfigNamespace());
17: // 获得缓存的 cacheReleaseKey
18: String cacheReleaseKey = instanceConfigReleaseKeyCache.getIfPresent(instanceConfigCacheKey);
19: // 若相等,跳过
20: // if release key is the same, then skip audit
21: if (cacheReleaseKey != null && Objects.equals(cacheReleaseKey, auditModel.getReleaseKey())) {
22: return;
23: }
24: // 更新对应的 instanceConfigReleaseKeyCache 缓存
25: instanceConfigReleaseKeyCache.put(instanceConfigCacheKey, auditModel.getReleaseKey());
26: // 获得 InstanceConfig 对象
27: // if release key is not the same or cannot find in cache, then do audit
28: InstanceConfig instanceConfig = instanceService.findInstanceConfig(instanceId, auditModel.getConfigAppId(),
29: auditModel.getConfigNamespace());
30:
31: // 若 InstanceConfig 已经存在,进行更新
32: if (instanceConfig != null) {
33: // ReleaseKey 发生变化
34: if (!Objects.equals(instanceConfig.getReleaseKey(), auditModel.getReleaseKey())) {
35: instanceConfig.setConfigClusterName(auditModel.getConfigClusterName());
36: instanceConfig.setReleaseKey(auditModel.getReleaseKey());
37: instanceConfig.setReleaseDeliveryTime(auditModel.getOfferTime()); // 配置下发时间,使用入队时间
38: // 时间过近,例如 Client 先请求的 Config Service A 节点,再请求 Config Service B 节点的情况。
39: } else if (offerTimeAndLastModifiedTimeCloseEnough(auditModel.getOfferTime(), instanceConfig.getDataChangeLastModifiedTime())) {
40: //when releaseKey is the same, optimize to reduce writes if the record was updated not long ago
41: return;
42: }
43: // 更新
44: //we need to update no matter the release key is the same or not, to ensure the
45: //last modified time is updated each day
46: instanceConfig.setDataChangeLastModifiedTime(auditModel.getOfferTime());
47: instanceService.updateInstanceConfig(instanceConfig);
48: return;
49: }
50:
51: // 若 InstanceConfig 不存在,创建 InstanceConfig 对象
52: instanceConfig = new InstanceConfig();
53: instanceConfig.setInstanceId(instanceId);
54: instanceConfig.setConfigAppId(auditModel.getConfigAppId());
55: instanceConfig.setConfigClusterName(auditModel.getConfigClusterName());
56: instanceConfig.setConfigNamespaceName(auditModel.getConfigNamespace());
57: instanceConfig.setReleaseKey(auditModel.getReleaseKey());
58: instanceConfig.setReleaseDeliveryTime(auditModel.getOfferTime());
59: instanceConfig.setDataChangeCreatedTime(auditModel.getOfferTime());
60: // 保存 InstanceConfig 对象到数据库中
61: try {
62: instanceService.createInstanceConfig(instanceConfig);
63: } catch (DataIntegrityViolationException ex) {
64: // concurrent insertion, safe to ignore
65: }
66: }
instanceCache
的 KEY 。Cache#getIfPresent(key)
从缓存 instanceCache
中获得 Instance 编号。#prepareInstanceId(InstanceConfigAuditModel)
方法,代码如下:
private
long
prepareInstanceId(InstanceConfigAuditModel auditModel)
{
// 查询 Instance 对象
Instance instance = instanceService.findInstance(auditModel.getAppId(), auditModel
.getClusterName(), auditModel.getDataCenter(), auditModel.getIp());
// 已存在,返回 Instance 编号
if (instance != null) {
return instance.getId();
}
// 若 Instance 不存在,创建 Instance 对象
instance = new Instance();
instance.setAppId(auditModel.getAppId());
instance.setClusterName(auditModel.getClusterName());
instance.setDataCenter(auditModel.getDataCenter());
instance.setIp(auditModel.getIp());
// 保存 Instance 对象到数据库中
try {
return instanceService.createInstance(instance).getId();
} catch (DataIntegrityViolationException ex) {
// 发生唯一索引冲突,意味着已经存在,进行查询 Instance 对象,并返回
// return the one exists
return instanceService.findInstance(instance.getAppId(), instance.getClusterName(),
instance.getDataCenter(), instance.getIp()).getId();
}
}
instanceConfigReleaseKeyCache
的 KEY 。Cache#getIfPresent(key)
从缓存 instanceConfigReleaseKeyCache
中获得 cacheReleaseKey
。releaseKey
相当,说明无更新,跳过。instanceConfigReleaseKeyCache
缓存。InstanceService#findInstanceConfig(...)
方法,获得 InstanceConfig 对象。相比 Instance 来说,InstanceConfig 存在更新逻辑。releaseKey
发生变化,设置需要更新的字段 configClusterName
releaseKey
releaseDeliveryTime
。注意,releaseDeliveryTime
配置下发时间,使用入队时间。#offerTimeAndLastModifiedTimeCloseEnough(Date offerTime, Date lastModifiedTime)
方法,时间过近,仅相差 10 分钟。例如,Client 先请求的 Config Service A 节点,再请求 Config Service B 节点的情况。此时,InstanceConfig 在 DB 中是已经更新了,但是在 Config Service B 节点的缓存是未更新的。#offerTimeAndLastModifiedTimeCloseEnough(...)
方法,代码如下:
private
boolean
offerTimeAndLastModifiedTimeCloseEnough(Date offerTime, Date lastModifiedTime)
{
return (offerTime.getTime() - lastModifiedTime.getTime()) < OFFER_TIME_LAST_MODIFIED_TIME_THRESHOLD_IN_MILLI;
}
InstanceService#updateInstanceConfig(InstanceConfig)
方法,更新 InstanceConfig。结束处理。InstanceService#createInstanceConfig(InstanceConfig)
方法,保存 InstanceConfig 对象到数据库中。Instance 和 InstanceConfig 相关的 Service 和 Controller 类的代码,胖友可以自己查看源码,比较好理解。老艿艿就不瞎比比啦。