前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >聊聊rocketmq-mysql的Replicator

聊聊rocketmq-mysql的Replicator

原创
作者头像
code4it
修改2020-05-26 09:43:09
7360
修改2020-05-26 09:43:09
举报
文章被收录于专栏:码匠的流水账码匠的流水账

本文主要研究一下rocketmq-mysql的Replicator

Replicator

rocketmq-externals/rocketmq-mysql/src/main/java/org/apache/rocketmq/mysql/Replicator.java

代码语言:javascript
复制
public class Replicator {
​
    private static final Logger LOGGER = LoggerFactory.getLogger(Replicator.class);
​
    private static final Logger POSITION_LOGGER = LoggerFactory.getLogger("PositionLogger");
​
    private Config config;
​
    private EventProcessor eventProcessor;
​
    private RocketMQProducer rocketMQProducer;
​
    private Object lock = new Object();
    private BinlogPosition nextBinlogPosition;
    private long nextQueueOffset;
    private long xid;
​
    public static void main(String[] args) {
​
        Replicator replicator = new Replicator();
        replicator.start();
    }
​
    public void start() {
​
        try {
            config = new Config();
            config.load();
​
            rocketMQProducer = new RocketMQProducer(config);
            rocketMQProducer.start();
​
            BinlogPositionLogThread binlogPositionLogThread = new BinlogPositionLogThread(this);
            binlogPositionLogThread.start();
​
            eventProcessor = new EventProcessor(this);
            eventProcessor.start();
​
        } catch (Exception e) {
            LOGGER.error("Start error.", e);
            System.exit(1);
        }
    }
​
    public void commit(Transaction transaction, boolean isComplete) {
​
        String json = transaction.toJson();
​
        for (int i = 0; i < 3; i++) {
            try {
                if (isComplete) {
                    long offset = rocketMQProducer.push(json);
​
                    synchronized (lock) {
                        xid = transaction.getXid();
                        nextBinlogPosition = transaction.getNextBinlogPosition();
                        nextQueueOffset = offset;
                    }
​
                } else {
                    rocketMQProducer.push(json);
                }
                break;
​
            } catch (Exception e) {
                LOGGER.error("Push error,retry:" + (i + 1) + ",", e);
            }
        }
    }
​
    public void logPosition() {
​
        String binlogFilename = null;
        long xid = 0L;
        long nextPosition = 0L;
        long nextOffset = 0L;
​
        synchronized (lock) {
            if (nextBinlogPosition != null) {
                xid = this.xid;
                binlogFilename = nextBinlogPosition.getBinlogFilename();
                nextPosition = nextBinlogPosition.getPosition();
                nextOffset = nextQueueOffset;
            }
        }
​
        if (binlogFilename != null) {
            POSITION_LOGGER.info("XID: {},   BINLOG_FILE: {},   NEXT_POSITION: {},   NEXT_OFFSET: {}",
                xid, binlogFilename, nextPosition, nextOffset);
        }
​
    }
​
    public Config getConfig() {
        return config;
    }
​
    public BinlogPosition getNextBinlogPosition() {
        return nextBinlogPosition;
    }
​
}
  • Replicator提供了start、commit、logPosition方法;start方法会创建RocketMQProducer、BinlogPositionLogThread及EventProcessor,然后执行其start方法;commit方法会通过rocketMQProducer将transaction.toJson()发送出去,对于isComplete为true的会更新xid、nextBinlogPosition、nextQueueOffset;logPosition方法会打印binlogFilename、nextPosition、nextOffset

RocketMQProducer

rocketmq-externals/rocketmq-mysql/src/main/java/org/apache/rocketmq/mysql/productor/RocketMQProducer.java

代码语言:javascript
复制
public class RocketMQProducer {
    private static final Logger LOGGER = LoggerFactory.getLogger(RocketMQProducer.class);
​
    private DefaultMQProducer producer;
    private Config config;
​
    public RocketMQProducer(Config config) {
        this.config = config;
    }
​
    public void start() throws MQClientException {
        producer = new DefaultMQProducer("BINLOG_PRODUCER_GROUP");
        producer.setNamesrvAddr(config.mqNamesrvAddr);
        producer.start();
    }
​
    public long push(String json) throws Exception {
        LOGGER.debug(json);
​
        Message message = new Message(config.mqTopic, json.getBytes("UTF-8"));
        SendResult sendResult = producer.send(message);
​
        return sendResult.getQueueOffset();
    }
}
  • RocketMQProducer的start方法创建DefaultMQProducer并执行其start方法;其push方法则通过producer.send(message)发送消息,并返回sendResult.getQueueOffset()

BinlogPositionLogThread

rocketmq-externals/rocketmq-mysql/src/main/java/org/apache/rocketmq/mysql/position/BinlogPositionLogThread.java

代码语言:javascript
复制
public class BinlogPositionLogThread extends Thread {
    private Logger logger = LoggerFactory.getLogger(BinlogPositionLogThread.class);
​
    private Replicator replicator;
​
    public BinlogPositionLogThread(Replicator replicator) {
        this.replicator = replicator;
        setDaemon(true);
    }
​
    @Override
    public void run() {
​
        while (true) {
            try {
                Thread.sleep(1000);
            } catch (InterruptedException e) {
                logger.error("Offset thread interrupted.", e);
            }
​
            replicator.logPosition();
        }
    }
}
  • BinlogPositionLogThread会定时执行replicator.logPosition()来打印position信息

小结

Replicator提供了start、commit、logPosition方法;start方法会创建RocketMQProducer、BinlogPositionLogThread及EventProcessor,然后执行其start方法;commit方法会通过rocketMQProducer将transaction.toJson()发送出去,对于isComplete为true的会更新xid、nextBinlogPosition、nextQueueOffset;logPosition方法会打印binlogFilename、nextPosition、nextOffset

doc

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • Replicator
  • RocketMQProducer
  • BinlogPositionLogThread
  • 小结
  • doc
相关产品与服务
云数据库 SQL Server
腾讯云数据库 SQL Server (TencentDB for SQL Server)是业界最常用的商用数据库之一,对基于 Windows 架构的应用程序具有完美的支持。TencentDB for SQL Server 拥有微软正版授权,可持续为用户提供最新的功能,避免未授权使用软件的风险。具有即开即用、稳定可靠、安全运行、弹性扩缩等特点。
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档