前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >浅谈基于 Zookeeper 实现分布式锁对 Maxwell 完成高可用

浅谈基于 Zookeeper 实现分布式锁对 Maxwell 完成高可用

作者头像
857技术社区
发布2022-05-17 16:09:06
5900
发布2022-05-17 16:09:06
举报
文章被收录于专栏:857-Bigdata857-Bigdata

1. 背景

❝麦斯威尔CDC框架使用方法,但后来声称基于筏子的框架实现了很高的可用性,存在MySQL协议进行相关测试试验发现上的问题,然后还是通过性克隆这个框架,通过Zookeeper框架,完成对Maxwell的高可用。 ❞

2.原理

2.1.文字介绍

❝分布式服务通过在代码里约定的路径向动物园管理员中注册自己,注意这里注册需要「临时有序」的子节点,分布式服务根据自己注册完成的子节点的先后顺序,依次监听自己前置位的子等,当 1.「变成子节点的时候」消失,且 2. 自己为当前的 Zookeeper 路径下节点号的最小节点的时候,开启自己的服务端。

  • 应该是为了更好地服务于他人的陪伴
  • 临时的目的是为了当前设备由于停机机,能够从动物园管理员撤掉自己,给服务的“腾位置”

2.2. 图示介绍

3.代码实现

3.1.修改pom文件

代码语言:javascript
复制
<dependency>
    <groupId>org.apache.curator</groupId>
    <artifactId>curator-framework</artifactId>
    <version>2.11.1</version>
</dependency>

<dependency>
    <groupId>org.apache.curator</groupId>
    <artifactId>curator-recipes</artifactId>
    <version>2.11.1</version>
</dependency>

3.2.修改框架入口类com.zendesk.maxwell.Maxwell的主要函数

代码语言:javascript
复制
public static void main(String[] args) {
  try {
   Logging.setupLogBridging();
   MaxwellConfig config = new MaxwellConfig(args);

   if ( config.log_level != null ) {
    Logging.setLevel(config.log_level);
   }

   final Maxwell maxwell = new Maxwell(config);

   Runtime.getRuntime().addShutdownHook(new Thread() {
    @Override
    public void run() {
     maxwell.terminate();
     StaticShutdownCallbackRegistry.invoke();
    }
   });

   LOGGER.info("Starting Maxwell. maxMemory: " + Runtime.getRuntime().maxMemory() + " bufferMemoryUsage: " + config.bufferMemoryUsage);

   /*
   if ( config.haMode ) {
    new MaxwellHA(maxwell, config.jgroupsConf, config.raftMemberID, config.clientID).startHA();
   } else {
    maxwell.start();
   }
    */
   if ( config.haMode ) {
    CuratorUtil curatorUtil = new CuratorUtil(config.zookeeperServers, config.sessionTimeoutMs, config.connectionTimeoutMs, config.baseSleepTimeMs, config.maxRetries);
    curatorUtil.highAvailable();
   }
   maxwell.start();

  } catch ( SQLException e ) {
   // catch SQLException explicitly because we likely don't care about the stacktrace
   LOGGER.error("SQLException: " + e.getLocalizedMessage());
   System.exit(1);
  } catch ( URISyntaxException e ) {
   // catch URISyntaxException explicitly as well to provide more information to the user
   LOGGER.error("Syntax issue with URI, check for misconfigured host, port, database, or JDBC options (see RFC 2396)");
   LOGGER.error("URISyntaxException: " + e.getLocalizedMessage());
   System.exit(1);
  } catch ( ServerException e ) {
   LOGGER.error("Maxwell couldn't find the requested binlog, exiting...");
   System.exit(2);
  } catch ( Exception e ) {
   e.printStackTrace();
   System.exit(1);
  }
 }

3.3.新增代码 com.zendesk.maxwell.util.CuratorUtil

代码语言:javascript
复制
package com.zendesk.maxwell.util;

import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.framework.recipes.locks.InterProcessMutex;
import org.apache.curator.retry.ExponentialBackoffRetry;
import org.apache.zookeeper.CreateMode;

public class CuratorUtil {
 private String zookeeperServers;
 private int sessionTimeoutMs;
 private int connectionTimeoutMs;
 private int baseSleepTimeMs;
 private int maxRetries;
 private CuratorFramework client;
 private String lockPath = "/maxwell/ha/lock";
 private String leaderPath = "/maxwell/ha/leader";


 public CuratorUtil(String zookeeperServers,int sessionTimeoutMs,int connectionTimeoutMs,int baseSleepTimeMs,int maxRetries){
  this.zookeeperServers = zookeeperServers;
  this.sessionTimeoutMs = sessionTimeoutMs;
  this.connectionTimeoutMs = connectionTimeoutMs;
  this.baseSleepTimeMs = baseSleepTimeMs;
  this.maxRetries = maxRetries;
 }

 /*
  * 构造 zookeeper 客户端,并连接 zookeeper 集群
  */
 public void start(){
  ExponentialBackoffRetry retryPolicy = new ExponentialBackoffRetry(this.baseSleepTimeMs, this.maxRetries);
  client = CuratorFrameworkFactory.newClient(
    this.zookeeperServers,
    this.sessionTimeoutMs,
    this.connectionTimeoutMs,
    retryPolicy
  );
  client.start();
 }

 /*
  * 实现分布式锁
  */
 public void highAvailable(){
  // 1.连接 Zookeeper 客户端
  this.start();
  // 2.向 zookeeper 注册自己
  InterProcessMutex lock = new InterProcessMutex(client, lockPath);
  try {
   // 3.获取锁
   lock.acquire();
   // 4.将自己信息注册到 leader 路径
   client.create()
     .withMode(CreateMode.EPHEMERAL)
     .forPath(leaderPath);
  } catch (Exception e) {
   e.printStackTrace();
  }
 }
}

3.4.修改代码com.zendesk.maxwell.MaxwellConfig

代码语言:javascript
复制
// 类新增属性
public String zookeeperServers;
public int sessionTimeoutMs;
public int connectionTimeoutMs;
public int baseSleepTimeMs;
public int maxRetries;

// 函数 MaxwellOptionParser 新增代码
parser.accepts( "zookeeper", "zookeeper servers support maxwell high available" )
    .withRequiredArg();
parser.accepts( "session_timeout_ms", "session timeout ms with zookeeper" )
    .withRequiredArg();
parser.accepts( "connection_timeout_ms", "connection timeout ms with zookeeper" )
    .withRequiredArg();
parser.accepts( "base_sleep_time_ms", "base sleep time ms if retry" )
    .withRequiredArg();
parser.accepts( "max_retries", "max retry times" )
    .withRequiredArg();

// 函数 setup 新增代码
this.haMode = fetchBooleanOption("ha", options, properties, false);
this.zookeeperServers = fetchStringOption("zookeeper",options, properties, null);
this.sessionTimeoutMs = fetchIntegerOption("session_timeout_ms",options, properties, 5000);
this.connectionTimeoutMs = fetchIntegerOption("connection_timeout_ms",options, properties, 5000);
this.baseSleepTimeMs = fetchIntegerOption("base_sleep_time_ms",options, properties, 5000);
this.maxRetries = fetchIntegerOption("max_retries",options, properties, 3);
if (haMode){
 if (zookeeperServers == null){
  LOGGER.warn("you must specify --zookeeper because you want to use maxwell in ha mode");
 }
}

4.说明

❝需要修改源代码是基于 1.29.2 完成对源代码的相关版本,使用高版本,按照相同的步骤对源代码进行修改。 ❞

本文参与 腾讯云自媒体分享计划,分享自微信公众号。
原始发表:2022-01-12,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 857Hub 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体分享计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 1. 背景
  • 2.原理
    • 2.1.文字介绍
      • 2.2. 图示介绍
      • 3.代码实现
        • 3.1.修改pom文件
        • 3.2.修改框架入口类com.zendesk.maxwell.Maxwell的主要函数
          • 3.3.新增代码 com.zendesk.maxwell.util.CuratorUtil
            • 3.4.修改代码com.zendesk.maxwell.MaxwellConfig
            • 4.说明
            相关产品与服务
            云数据库 MySQL
            腾讯云数据库 MySQL(TencentDB for MySQL)为用户提供安全可靠,性能卓越、易于维护的企业级云数据库服务。其具备6大企业级特性,包括企业级定制内核、企业级高可用、企业级高可靠、企业级安全、企业级扩展以及企业级智能运维。通过使用腾讯云数据库 MySQL,可实现分钟级别的数据库部署、弹性扩展以及全自动化的运维管理,不仅经济实惠,而且稳定可靠,易于运维。
            领券
            问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档