前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >聊聊BinlogConnectorReplicator的work

聊聊BinlogConnectorReplicator的work

原创
作者头像
code4it
修改2020-05-06 15:28:00
2310
修改2020-05-06 15:28:00
举报
文章被收录于专栏:码匠的流水账码匠的流水账

本文主要研究一下BinlogConnectorReplicator的work

StoppableTask

maxwell-1.25.1/src/main/java/com/zendesk/maxwell/util/StoppableTask.java

代码语言:javascript
复制
public interface StoppableTask {
    void requestStop() throws Exception;
    void awaitStop(Long timeout) throws TimeoutException;
}
  • StoppableTask接口定义了requestStop、awaitStop方法

RunLoopProcess

maxwell-1.25.1/src/main/java/com/zendesk/maxwell/util/RunLoopProcess.java

代码语言:javascript
复制
abstract public class RunLoopProcess implements StoppableTask {
    protected volatile StoppableTaskState taskState;
    private Thread thread;
​
    public RunLoopProcess() {
        this.taskState = new StoppableTaskState(this.getClass().getName());
    }
​
    public void requestStop() {
        this.taskState.requestStop();
​
        interrupt();
    }
​
    public void interrupt() {
        if ( this.thread != null )
            this.thread.interrupt();
    }
​
    public void awaitStop(Long timeout) throws TimeoutException {
        this.taskState.awaitStop(thread, timeout);
    }
​
    public void runLoop() throws Exception {
        this.thread = Thread.currentThread();
        this.beforeStart();
​
        try {
            while (this.taskState.isRunning()) {
                work();
            }
        } finally {
            this.beforeStop();
            this.taskState.stopped();
        }
    }
​
    protected abstract void work() throws Exception;
    protected void beforeStart() throws Exception { }
    protected void beforeStop() throws Exception { }
}
  • RunLoopProcess实现了StoppableTask接口,其runLoop方法会通过while循环不断执行work()方法,直到taskState.isRunning()为false

BinlogConnectorReplicator

maxwell-1.25.1/src/main/java/com/zendesk/maxwell/replication/BinlogConnectorReplicator.java

代码语言:javascript
复制
public class BinlogConnectorReplicator extends RunLoopProcess implements Replicator {
​
    //......
​
    public void work() throws Exception {
        RowMap row = null;
        try {
            row = getRow();
        } catch ( InterruptedException e ) {
        }
​
        if ( row == null )
            return;
​
        rowCounter.inc();
        rowMeter.mark();
​
        if ( scripting != null && !isMaxwellRow(row))
            scripting.invoke(row);
​
        processRow(row);
    }
​
    protected void processRow(RowMap row) throws Exception {
        if ( row instanceof HeartbeatRowMap) {
            producer.push(row);
            if (stopAtHeartbeat != null) {
                long thisHeartbeat = row.getPosition().getLastHeartbeatRead();
                if (thisHeartbeat >= stopAtHeartbeat) {
                    LOGGER.info("received final heartbeat " + thisHeartbeat + "; stopping replicator");
                    // terminate runLoop
                    this.taskState.stopped();
                }
            }
        } else if ( !shouldSkipRow(row) )
            producer.push(row);
    }
​
    //......
​
}
  • BinlogConnectorReplicator实现了RunLoopProcess接口,其work方法通过getRow()获取RowMap,若不为null则执行processRow(row)方法;processRow方法则执行producer.push(row)

AbstractProducer

maxwell-1.25.1/src/main/java/com/zendesk/maxwell/producer/AbstractProducer.java

代码语言:javascript
复制
public abstract class AbstractProducer {
    protected final MaxwellContext context;
    protected final MaxwellOutputConfig outputConfig;
    protected final Counter succeededMessageCount;
    protected final Meter succeededMessageMeter;
    protected final Counter failedMessageCount;
    protected final Meter failedMessageMeter;
    protected final Timer messagePublishTimer;
    protected final Timer messageLatencyTimer;
    protected final Counter messageLatencySloViolationCount;
​
    public AbstractProducer(MaxwellContext context) {
        this.context = context;
        this.outputConfig = context.getConfig().outputConfig;
​
        Metrics metrics = context.getMetrics();
        MetricRegistry metricRegistry = metrics.getRegistry();
​
        this.succeededMessageCount = metricRegistry.counter(metrics.metricName("messages", "succeeded"));
        this.succeededMessageMeter = metricRegistry.meter(metrics.metricName("messages", "succeeded", "meter"));
        this.failedMessageCount = metricRegistry.counter(metrics.metricName("messages", "failed"));
        this.failedMessageMeter = metricRegistry.meter(metrics.metricName("messages", "failed", "meter"));
        this.messagePublishTimer = metricRegistry.timer(metrics.metricName("message", "publish", "time"));
        this.messageLatencyTimer = metricRegistry.timer(metrics.metricName("message", "publish", "age"));
        this.messageLatencySloViolationCount = metricRegistry.counter(metrics.metricName("message", "publish", "age", "slo_violation"));
    }
​
    abstract public void push(RowMap r) throws Exception;
​
    public StoppableTask getStoppableTask() {
        return null;
    }
​
    public Meter getFailedMessageMeter() {
        return this.failedMessageMeter;
    }
​
    public MaxwellDiagnostic getDiagnostic() {
        return null;
    }
}
  • AbstractProducer定义了push抽象方法供子类实现

StdoutProducer

maxwell-1.25.1/src/main/java/com/zendesk/maxwell/producer/StdoutProducer.java

代码语言:javascript
复制
public class StdoutProducer extends AbstractProducer {
    public StdoutProducer(MaxwellContext context) {
        super(context);
    }
​
    @Override
    public void push(RowMap r) throws Exception {
        String output = r.toJSON(outputConfig);
​
        if ( output != null && r.shouldOutput(outputConfig) )
            System.out.println(output);
​
        this.context.setPosition(r);
    }
}
  • StdoutProducer继承了AbstractProducer,其push方法则执行System.out.println(output)以及context.setPosition(r)

小结

StoppableTask接口定义了requestStop、awaitStop方法;RunLoopProcess实现了StoppableTask接口,其runLoop方法会通过while循环不断执行work()方法,直到taskState.isRunning()为false;BinlogConnectorReplicator实现了RunLoopProcess接口,其work方法通过getRow()获取RowMap,若不为null则执行processRow(row)方法;processRow方法则执行producer.push(row)

doc

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • StoppableTask
  • RunLoopProcess
  • BinlogConnectorReplicator
  • AbstractProducer
  • StdoutProducer
  • 小结
  • doc
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档