本文将对canal的启动模块deployer进行分析。
Deployer模块(绿色部分)在整个系统中的角色如下图所示,用来启动canal-server.
模块内的类如下:
为了能带着目的看源码,以几个问题开头,带着问题来一起探索deployer模块的源码。
这个类是整个canal-server的入口类。负责配置加载和启动canal-server。
主流程如下:
1 public static void main(String[] args) {
2
3 try {
4
5 //note:设置全局未捕获异常的处理
6
7 setGlobalUncaughtExceptionHandler();
8
9 /**
10
11 * note:
12
13 * 1.读取canal.properties的配置
14
15 * 可以手动指定配置路径名称
16
17 */
18
19 String conf = System.getProperty("canal.conf", "classpath:canal.properties");
20
21 Properties properties = new Properties();
22
23 if (conf.startsWith(CLASSPATH_URL_PREFIX)) {
24
25 conf = StringUtils.substringAfter(conf, CLASSPATH_URL_PREFIX);
26
27 properties.load(CanalLauncher.class.getClassLoader().getResourceAsStream(conf));
28
29 } else {
30
31 properties.load(new FileInputStream(conf));
32
33 }
34
35 final CanalStarter canalStater = new CanalStarter(properties);
36
37 String managerAddress = CanalController.getProperty(properties, CanalConstants.CANAL_ADMIN_MANAGER);
38
39 /**
40
41 * note:
42
43 * 2.根据canal.admin.manager是否为空判断是否是admin控制,如果不是admin控制,就直接根据canal.properties的配置来了
44
45 */
46
47 if (StringUtils.isNotEmpty(managerAddress)) {
48
49 String user = CanalController.getProperty(properties, CanalConstants.CANAL_ADMIN_USER);
50
51 //省略一部分。。。。。。
52
53
54 /**
55
56 * note:
57
58 * 2.1使用PlainCanalConfigClient获取远程配置
59
60 */
61
62 final PlainCanalConfigClient configClient = new PlainCanalConfigClient(managerAddress,
63
64 user,
65
66 passwd,
67
68 registerIp,
69
70 Integer.parseInt(adminPort),
71
72 autoRegister,
73
74 autoCluster);
75
76 PlainCanal canalConfig = configClient.findServer(null);
77
78 if (canalConfig == null) {
79
80 throw new IllegalArgumentException("managerAddress:" + managerAddress
81
82 + " can't not found config for [" + registerIp + ":" + adminPort
83
84 + "]");
85
86 }
87
88 Properties managerProperties = canalConfig.getProperties();
89
90 // merge local
91
92 managerProperties.putAll(properties);
93
94 int scanIntervalInSecond = Integer.valueOf(CanalController.getProperty(managerProperties,
95
96 CanalConstants.CANAL_AUTO_SCAN_INTERVAL,
97
98 "5"));
99
100 /**
101
102 * note:
103
104 * 2.2 新开一个线程池每隔五秒用http请求去admin上拉配置进行merge(这里依赖了instance模块的相关配置拉取的工具方法)
105
106 */
107
108 executor.scheduleWithFixedDelay(new Runnable() {
109
110 private PlainCanal lastCanalConfig;
111
112 public void run() {
113
114 try {
115
116 if (lastCanalConfig == null) {
117
118 lastCanalConfig = configClient.findServer(null);
119
120 } else {
121
122 PlainCanal newCanalConfig = configClient.findServer(lastCanalConfig.getMd5());
123
124 /**
125
126 * note:
127
128 * 2.3 用md5进行校验,如果canal-server配置有更新,那么就重启canal-server
129
130 */
131
132 if (newCanalConfig != null) {
133
134 // 远程配置canal.properties修改重新加载整个应用
135
136 canalStater.stop();
137
138 Properties managerProperties = newCanalConfig.getProperties();
139
140 // merge local
141
142 managerProperties.putAll(properties);
143
144 canalStater.setProperties(managerProperties);
145
146 canalStater.start();
147
148 lastCanalConfig = newCanalConfig;
149
150 }
151
152 }
153
154 } catch (Throwable e) {
155
156 logger.error("scan failed", e);
157
158 }
159
160 }
161
162 }, 0, scanIntervalInSecond, TimeUnit.SECONDS);
163
164 canalStater.setProperties(managerProperties);
165
166 } else {
167
168 canalStater.setProperties(properties);
169
170 }
171
172 canalStater.start();
173
174 //note: 这样用CDL处理和while(true)有点类似
175
176 runningLatch.await();
177
178 executor.shutdownNow();
179
180 } catch (Throwable e) {
181
182 logger.error("## Something goes wrong when starting up the canal Server:", e);
183
184 }
185
186 }
从上面的入口类,我们可以看到canal-server真正的启动逻辑在CanalStarter类的start方法。
这里先对三个对象进行辨析:
start方法主要逻辑如下:
1 public synchronized void start() throws Throwable {
2
3 String serverMode = CanalController.getProperty(properties, CanalConstants.CANAL_SERVER_MODE);
4
5 /**
6
7 * note
8
9 * 1.如果canal.serverMode不是tcp,加载CanalMQProducer,并且启动CanalMQProducer
10
11 * 回头可以深入研究下ExtensionLoader类的相关实现
12
13 */
14
15 if (!"tcp".equalsIgnoreCase(serverMode)) {
16
17 ExtensionLoader<CanalMQProducer> loader = ExtensionLoader.getExtensionLoader(CanalMQProducer.class);
18
19 canalMQProducer = loader
20
21 .getExtension(serverMode.toLowerCase(), CONNECTOR_SPI_DIR, CONNECTOR_STANDBY_SPI_DIR);
22
23 if (canalMQProducer != null) {
24
25 ClassLoader cl = Thread.currentThread().getContextClassLoader();
26
27 Thread.currentThread().setContextClassLoader(canalMQProducer.getClass().getClassLoader());
28
29 canalMQProducer.init(properties);
30
31 Thread.currentThread().setContextClassLoader(cl);
32
33 }
34
35 }
36
37 //note 如果启动了canalMQProducer,就不使用canalWithNetty(这里的netty是用在哪里的?)
38
39 if (canalMQProducer != null) {
40
41 MQProperties mqProperties = canalMQProducer.getMqProperties();
42
43 // disable netty
44
45 System.setProperty(CanalConstants.CANAL_WITHOUT_NETTY, "true");
46
47 if (mqProperties.isFlatMessage()) {
48
49 // 设置为raw避免ByteString->Entry的二次解析
50
51 System.setProperty("canal.instance.memory.rawEntry", "false");
52
53 }
54
55 }
56
57 controller = new CanalController(properties);
58
59 //note 2.启动canalController
60
61 controller.start();
62
63 //note 3.注册了一个shutdownHook,系统退出时执行相关逻辑
64
65 shutdownThread = new Thread() {
66
67 public void run() {
68
69 try {
70
71 controller.stop();
72
73 //note 主线程退出
74
75 CanalLauncher.runningLatch.countDown();
76
77 } catch (Throwable e) {
78
79
80 } finally {
81
82 }
83
84 }
85
86 };
87
88 Runtime.getRuntime().addShutdownHook(shutdownThread);
89
90 //note 4.启动canalMQStarter,集群版的话,没有预先配置destinations。
91
92 if (canalMQProducer != null) {
93
94 canalMQStarter = new CanalMQStarter(canalMQProducer);
95
96 String destinations = CanalController.getProperty(properties, CanalConstants.CANAL_DESTINATIONS);
97
98 canalMQStarter.start(destinations);
99
100 controller.setCanalMQStarter(canalMQStarter);
101
102 }
103
104 // start canalAdmin
105
106 String port = CanalController.getProperty(properties, CanalConstants.CANAL_ADMIN_PORT);
107
108 //note 5.根据填写的canalAdmin的ip和port,启动canalAdmin,用netty做服务器
109
110 if (canalAdmin == null && StringUtils.isNotEmpty(port)) {
111
112 String user = CanalController.getProperty(properties, CanalConstants.CANAL_ADMIN_USER);
113
114 String passwd = CanalController.getProperty(properties, CanalConstants.CANAL_ADMIN_PASSWD);
115
116 CanalAdminController canalAdmin = new CanalAdminController(this);
117
118 canalAdmin.setUser(user);
119
120 canalAdmin.setPasswd(passwd);
121
122 String ip = CanalController.getProperty(properties, CanalConstants.CANAL_IP);
123
124 CanalAdminWithNetty canalAdminWithNetty = CanalAdminWithNetty.instance();
125
126 canalAdminWithNetty.setCanalAdmin(canalAdmin);
127
128 canalAdminWithNetty.setPort(Integer.parseInt(port));
129
130 canalAdminWithNetty.setIp(ip);
131
132 canalAdminWithNetty.start();
133
134 this.canalAdmin = canalAdminWithNetty;
135
136 }
137
138 running = true;
139
140 }
前面两个类都是比较清晰的,一个是入口类,一个是启动类,下面来看看核心逻辑所在的CanalController。
这里用了大量的匿名内部类实现接口,看起来有点头大,耐心慢慢剖析一下。
3.1 从构造器开始了解
整体初始化的顺序如下:
这里有几个机制要详细介绍一下。
3.1.1 CanalServer两种模式
canalServer支持两种模式,CanalServerWithEmbedded和CanalServerWithNetty。
在构造器中初始化代码部分如下:
1 // 3.准备canal server
2
3 //note: 核心在于embededCanalServer,如果有需要canalServerWithNetty,那就多包装一个(我们serverMode=mq
4
5 // 是不需要这个netty的)
6
7 ip = getProperty(properties, CanalConstants.CANAL_IP);
8
9 //省略一部分。。。
10
11 embededCanalServer = CanalServerWithEmbedded.instance();
12
13 embededCanalServer.setCanalInstanceGenerator(instanceGenerator);// 设置自定义的instanceGenerator
14
15 int metricsPort = Integer.valueOf(getProperty(properties, CanalConstants.CANAL_METRICS_PULL_PORT, "11112"));
16
17 //省略一部分。。。
18
19 String canalWithoutNetty = getProperty(properties, CanalConstants.CANAL_WITHOUT_NETTY);
20
21 if (canalWithoutNetty == null || "false".equals(canalWithoutNetty)) {
22
23 canalServer = CanalServerWithNetty.instance();
24
25 canalServer.setIp(ip);
26
27 canalServer.setPort(port);
28
29 }
embededCanalServer:类型为CanalServerWithEmbedded
canalServer:类型为CanalServerWithNetty
二者有什么区别呢?
都实现了CanalServer接口,且都实现了单例模式,通过静态方法instance获取实例。
关于这两种类型的实现,canal官方文档有以下描述:
说白了,就是我们可以不必独立部署canal server。在应用直接使用CanalServerWithEmbedded直连mysql数据库进行订阅。
如果觉得自己的技术hold不住相关代码,就独立部署一个canal server,使用canal提供的客户端,连接canal server获取binlog解析后数据。而CanalServerWithNetty是在CanalServerWithEmbedded的基础上做的一层封装,用于与客户端通信。
在独立部署canal server时,Canal客户端发送的所有请求都交给CanalServerWithNetty处理解析,解析完成之后委派给了交给CanalServerWithEmbedded进行处理。因此CanalServerWithNetty就是一个马甲而已。CanalServerWithEmbedded才是核心。
因此,在构造器中,我们看到,
用于生成CanalInstance实例的instanceGenerator被设置到了CanalServerWithEmbedded中,
而ip和port被设置到CanalServerWithNetty中。
关于CanalServerWithNetty如何将客户端的请求委派给CanalServerWithEmbedded进行处理,我们将在server模块源码分析中进行讲解。
3.1.2 ServerRunningMonitor
在CanalController的构造器中,canal会为每一个destination创建一个Instance,每个Instance都会由一个ServerRunningMonitor来进行控制。而ServerRunningMonitor统一由ServerRunningMonitors进行管理。
ServerRunningMonitor是做什么的呢?
我们看下它的属性就了解了。它主要用来记录每个instance的运行状态数据的。
1 /**
2
3 * 针对server的running节点控制
4
5 */
6
7 public class ServerRunningMonitor extends AbstractCanalLifeCycle {
8
9 private static final Logger logger = LoggerFactory.getLogger(ServerRunningMonitor.class);
10
11 private ZkClientx zkClient;
12
13 private String destination;
14
15 private IZkDataListener dataListener;
16
17 private BooleanMutex mutex = new BooleanMutex(false);
18
19 private volatile boolean release = false;
20
21 // 当前服务节点状态信息
22
23 private ServerRunningData serverData;
24
25 // 当前实际运行的节点状态信息
26
27 private volatile ServerRunningData activeData;
28
29 private ScheduledExecutorService delayExector = Executors.newScheduledThreadPool(1);
30
31 private int delayTime = 5;
32
33 private ServerRunningListener listener;
34
35 public ServerRunningMonitor(ServerRunningData serverData){
36
37 this();
38
39 this.serverData = serverData;
40
41 }
42 //。。。。。
43
44 }
在创建ServerRunningMonitor对象时,首先根据ServerRunningData创建ServerRunningMonitor实例,之后设置了destination和ServerRunningListener。
ServerRunningListener是个接口,这里采用了匿名内部类的形式构建,实现了各个接口的方法。
主要为instance在当前server上的状态发生变化时调用。比如要在当前server上启动这个instance了,就调用相关启动方法,如果在这个server上关闭instance,就调用相关关闭方法。
具体的调用逻辑我们后面在启动过程中分析,这里大概知道下构造器中做了些什么就行了,主要就是一些启动、关闭的逻辑。
1 new Function<String, ServerRunningMonitor>() {
2
3 public ServerRunningMonitor apply(final String destination) {
4
5 ServerRunningMonitor runningMonitor = new ServerRunningMonitor(serverData);
6
7 runningMonitor.setDestination(destination);
8
9 runningMonitor.setListener(new ServerRunningListener() {
10
11 /**
12
13 * note
14
15 * 1.内部调用了embededCanalServer的start(destination)方法。
16
17 * 这里很关键,说明每个destination对应的CanalInstance是通过embededCanalServer的start方法启动的,
18
19 * 这样我们就能理解,为什么之前构造器中会把instanceGenerator设置到embededCanalServer中了。
20
21 * embededCanalServer负责调用instanceGenerator生成CanalInstance实例,并负责其启动。
22
23 *
24
25 * 2.如果投递mq,还会直接调用canalMQStarter来启动一个destination
26
27 */
28
29 public void processActiveEnter() {
30
31 //省略具体内容。。。
32 }
33
34 /**
35
36 * note
37
38 * 1.与开始顺序相反,如果有mqStarter,先停止mqStarter的destination
39
40 * 2.停止embedeCanalServer的destination
41
42 */
43
44 public void processActiveExit() {
45
46 //省略具体内容。。。
47
48 }
49
50 /**
51
52 * note
53
54 * 在Canalinstance启动之前,destination注册到ZK上,创建节点
55
56 * 路径为:/otter/canal/destinations/{0}/cluster/{1},其0会被destination替换,1会被ip:port替换。
57
58 * 此方法会在processActiveEnter()之前被调用
59
60 */
61
62 public void processStart() {
63
64 //省略具体内容。。。
65
66 }
67
68 /**
69
70 * note
71
72 * 在Canalinstance停止前,把ZK上节点删除掉
73
74 * 路径为:/otter/canal/destinations/{0}/cluster/{1},其0会被destination替换,1会被ip:port替换。
75
76 * 此方法会在processActiveExit()之前被调用
77
78 */
79
80 public void processStop() {
81
82 //省略具体内容。。。
83 }
84
85 });
86
87 if (zkclientx != null) {
88
89 runningMonitor.setZkClient(zkclientx);
90
91 }
92
93 // 触发创建一下cid节点
94
95 runningMonitor.init();
96
97 return runningMonitor;
98
99 }
100
101 }
3.2 canalController的start方法
具体运行逻辑如下:
这里需要注意,canalServer什么时候为空?
如果用户选择了serverMode为mq,那么就不会启动canalServerWithNetty,采用mqStarter来作为server,直接跟mq集群交互。canalServerWithNetty只有在serverMode为tcp时才启动,用来跟canal-client做交互。
所以如果以后想把embeddedCanal嵌入自己的应用,可以考虑参考mqStarter的写法。后面我们在server模块中会做详细解析。
1 public void start() throws Throwable {
2
3 // 创建整个canal的工作节点
4
5 final String path = ZookeeperPathUtils.getCanalClusterNode(registerIp + ":" + port);
6
7 initCid(path);
8
9 if (zkclientx != null) {
10
11 this.zkclientx.subscribeStateChanges(new IZkStateListener() {
12
13 public void handleStateChanged(KeeperState state) throws Exception {
14
15 }
16
17 public void handleNewSession() throws Exception {
18
19 initCid(path);
20
21 }
22
23 @Override
24
25 public void handleSessionEstablishmentError(Throwable error) throws Exception{
26
27 logger.error("failed to connect to zookeeper", error);
28
29 }
30
31 });
32
33 }
34
35 // 先启动embeded服务
36
37 embededCanalServer.start();
38
39 // 尝试启动一下非lazy状态的通道
40
41 for (Map.Entry<String, InstanceConfig> entry : instanceConfigs.entrySet()) {
42
43 final String destination = entry.getKey();
44
45 InstanceConfig config = entry.getValue();
46
47 // 创建destination的工作节点
48
49 if (!embededCanalServer.isStart(destination)) {
50
51 // HA机制启动
52
53 ServerRunningMonitor runningMonitor = ServerRunningMonitors.getRunningMonitor(destination);
54
55 if (!config.getLazy() && !runningMonitor.isStart()) {
56
57 runningMonitor.start();
58
59 }
60
61 }
62
63 //note:为每个instance注册一个配置监视器
64
65 if (autoScan) {
66
67 instanceConfigMonitors.get(config.getMode()).register(destination, defaultAction);
68
69 }
70
71 }
72
73 if (autoScan) {
74
75 //note:启动线程定时去扫描配置
76
77 instanceConfigMonitors.get(globalInstanceConfig.getMode()).start();
78
79 //note:这部分代码似乎没有用,目前只能是manager或者spring两种方式二选一
80
81 for (InstanceConfigMonitor monitor : instanceConfigMonitors.values()) {
82
83 if (!monitor.isStart()) {
84
85 monitor.start();
86
87 }
88
89 }
90
91 }
92
93 // 启动网络接口
94
95 if (canalServer != null) {
96
97 canalServer.start();
98
99 }
100
101 }
我们重点关注启动instance的过程,也就是ServerRunningMonitor的运行机制,也就是HA启动的关键。
入口在runningMonitor.start()。
1 public synchronized void start() {
2
3 super.start();
4
5 try {
6
7 /**
8
9 * note
10
11 * 内部会调用ServerRunningListener的processStart()方法
12
13 */
14
15 processStart();
16
17 if (zkClient != null) {
18
19 // 如果需要尽可能释放instance资源,不需要监听running节点,不然即使stop了这台机器,另一台机器立马会start
20
21 String path = ZookeeperPathUtils.getDestinationServerRunning(destination);
22
23 zkClient.subscribeDataChanges(path, dataListener);
24
25 initRunning();
26
27 } else {
28
29 /**
30
31 * note
32
33 * 内部直接调用ServerRunningListener的processActiveEnter()方法
34
35 */
36
37 processActiveEnter();// 没有zk,直接启动
38
39 }
40
41 } catch (Exception e) {
42
43 logger.error("start failed", e);
44
45 // 没有正常启动,重置一下状态,避免干扰下一次start
46
47 stop();
48
49 }
50
51 }
重点关注下HA启动方式,一般 我们都采用这种模式进行。
在集群模式下,可能会有多个canal server共同处理同一个destination,
在某一时刻,只能由一个canal server进行处理,处理这个destination的canal server进入running状态,其他canal server进入standby状态。
同时,通过监听对应的path节点,一旦发生变化,出现异常,可以立刻尝试自己进入running,保证了instace的 高可用!!
启动的重点还是在initRuning()。
利用zk来保证集群中有且只有 一个instance任务在运行。
1 private void initRunning() {
2 if (!isStart()) {
3 return;
4 }
5
6
7 //note: 还是一样构建一个临时节点的路径:/otter/canal/destinations/{0}/running
8 String path = ZookeeperPathUtils.getDestinationServerRunning(destination);
9 // 序列化
10 byte[] bytes = JsonUtils.marshalToByte(serverData);
11 try {
12 mutex.set(false);
13 /**
14 * note:
15 * 尝试创建临时节点。如果节点已经存在,说明是其他的canal server已经启动了这个canal instance。
16 * 此时会抛出ZkNodeExistsException,进入catch代码块。
17 */
18 zkClient.create(path, bytes, CreateMode.EPHEMERAL);
19 /**
20 * note:
21 * 如果创建成功,就开始触发启动事件
22 */
23 activeData = serverData;
24 processActiveEnter();// 触发一下事件
25 mutex.set(true);
26 release = false;
27 } catch (ZkNodeExistsException e) {
28 /**
29 * note:
30 * 如果捕获异常,表示创建失败。
31 * 就根据临时节点路径查一下是哪个canal-sever创建了。
32 * 如果没有相关信息,马上重新尝试一下。
33 * 如果确实存在,就把相关信息保存下来
34 */
35 bytes = zkClient.readData(path, true);
36 if (bytes == null) {// 如果不存在节点,立即尝试一次
37 initRunning();
38 } else {
39 activeData = JsonUtils.unmarshalFromByte(bytes, ServerRunningData.class);
40 }
41 } catch (ZkNoNodeException e) {
42 /**
43 * note:
44 * 如果是父节点不存在,那么就尝试创建一下父节点,然后再初始化。
45 */
46 zkClient.createPersistent(ZookeeperPathUtils.getDestinationPath(destination), true); // 尝试创建父节点
47 initRunning();
48 }
49 }
那运行中的HA是如何实现的呢,我们回头看一下
zkClient.subscribeDataChanges(path, dataListener);
对destination对应的running节点进行监听,一旦发生了变化,则说明可能其他处理相同destination的canal server可能出现了异常,此时需要尝试自己进入running状态。
dataListener是在ServerRunningMonitor的构造方法中初始化的,
包括节点发生变化、节点被删两种变化情况以及相对应的处理逻辑,如下 :
1 public ServerRunningMonitor(){
2 // 创建父节点
3 dataListener = new IZkDataListener() {
4 /**
5 * note:
6 * 当注册节点发生变化时,会自动回调这个方法。
7 * 我们回想一下使用过程中,什么时候可能 改变节点当状态呢?
8 * 大概是在控制台中,对canal-server中正在运行的 instance做"停止"操作时,改变了isActive。
9 * 可以 触发 HA。
10 */
11 public void handleDataChange(String dataPath, Object data) throws Exception {
12 MDC.put("destination", destination);
13 ServerRunningData runningData = JsonUtils.unmarshalFromByte((byte[]) data, ServerRunningData.class);
14 if (!isMine(runningData.getAddress())) {
15 mutex.set(false);
16 }
17
18 if (!runningData.isActive() && isMine(runningData.getAddress())) { // 说明出现了主动释放的操作,并且本机之前是active
19 releaseRunning();// 彻底释放mainstem
20 }
21
22 activeData = (ServerRunningData) runningData;
23 }
24
25
26 /**
27 * note:
28 * 如果其他canal instance出现异常,临时节点数据被删除时,会自动回调这个方法,此时当前canal instance要顶上去
29 */
30 public void handleDataDeleted(String dataPath) throws Exception {
31 MDC.put("destination", destination);
32 mutex.set(false);
33 if (!release && activeData != null && isMine(activeData.getAddress())) {
34 // 如果上一次active的状态就是本机,则即时触发一下active抢占
35 initRunning();
36 } else {
37 // 否则就是等待delayTime,避免因网络异常或者zk异常,导致出现频繁的切换操作
38 delayExector.schedule(new Runnable() {
39 public void run() {
40 initRunning();
41 }
42 }, delayTime, TimeUnit.SECONDS);
43 }
44 }
45 };
46 }
当注册节点发生变化时,会自动回调zkListener的handleDataChange方法。
我们回想一下使用过程中,什么时候可能 改变节点当状态呢?
就是在控制台中,对canal-server中正在运行的 instance做”停止”操作时,改变了isActive,可以 触发 HA。
如下图所示
我们现在采用admin做全局的配置控制。
那么每个canalServer是怎么监控配置的变化呢?
还记得上吗cananlController的start方法中对配置监视器的启动吗?
1 if (autoScan) {
2 //note:启动线程定时去扫描配置
3 instanceConfigMonitors.get(globalInstanceConfig.getMode()).start();
4 //note:这部分代码似乎没有用,目前只能是manager或者spring两种方式二选一
5 for (InstanceConfigMonitor monitor : instanceConfigMonitors.values()) {
6 if (!monitor.isStart()) {
7 monitor.start();
8 }
9 }
10 }
这个就是关键的配置监控。
我们来看deployer模块中的monitor包了。
4.1 InstanceAction
是一个接口,有四个方法,用来获取配置后,对具体instance采取动作。
1 /**
2 * config配置变化后的动作
3 *
4 */
5 public interface InstanceAction {
6
7
8 /**
9 * 启动destination
10 */
11 void start(String destination);
12
13
14 /**
15 * 主动释放destination运行
16 */
17 void release(String destination);
18
19
20 /**
21 * 停止destination
22 */
23 void stop(String destination);
24
25
26 /**
27 * 重载destination,可能需要stop,start操作,或者只是更新下内存配置
28 */
29 void reload(String destination);
30 }
具体实现在canalController的构造器中实现了匿名类。
4.2 InstanceConfigMonitor
这个接口有两个实现,一个是基于spring的,一个基于manager(就是admin)。
我们看下基于manager配置的实现的ManagerInstanceConfigMonitor即可。
原理很简单。
1 /**
2 * 基于manager配置的实现
3 *
4 */
5 public class ManagerInstanceConfigMonitor extends AbstractCanalLifeCycle implements InstanceConfigMonitor, CanalLifeCycle {
6
7
8 private static final Logger logger = LoggerFactory.getLogger(ManagerInstanceConfigMonitor.class);
9 private long scanIntervalInSecond = 5;
10 private InstanceAction defaultAction = null;
11 /**
12 * note:
13 * 每个instance对应的instanceAction,实际上我们看代码发现都是用的同一个defaultAction
14 */
15 private Map<String, InstanceAction> actions = new MapMaker().makeMap();
16 /**
17 * note:
18 * 每个instance对应的远程配置
19 */
20 private Map<String, PlainCanal> configs = MigrateMap.makeComputingMap(new Function<String, PlainCanal>() {
21 public PlainCanal apply(String destination) {
22 return new PlainCanal();
23 }
24 });
25 /**
26 * note:
27 * 一个固定大小线程池,每隔5s,使用PlainCanalConfigClient去拉取instance配置
28 */
29 private ScheduledExecutorService executor = Executors.newScheduledThreadPool(1,
30 new NamedThreadFactory("canal-instance-scan"));
31
32 private volatile boolean isFirst = true;
33 /**
34 * note:
35 * 拉取admin配置的client
36 */
37 private PlainCanalConfigClient configClient;
38 //…
39 }
deployer模块的主要作用:
1)读取canal.properties,确定canal instance的配置加载方式。如果使用了admin,那么还会定时拉取admin上的配置更新。
2)确定canal-server的启动方式:独立启动或者集群方式启动
3)利用zkClient监听canal instance在zookeeper上的状态变化,动态停止、启动或新增,实现了instance的HA
4)利用InstanceConfigMonitor,采用固定线程定时轮训admin,获取instance的最新配置
5)启动canal server,监听客户端请求
这里还有个非常有意思的问题没有展开说明,那就是CanalStarter里面的配置加载,通过ExtensionLoader类的相关实现,如何通过不同的类加载器,实现SPI,后面再分析吧。