前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >聊聊CanalEventSink

聊聊CanalEventSink

原创
作者头像
code4it
修改2020-04-15 10:27:59
4170
修改2020-04-15 10:27:59
举报

本文主要研究一下CanalEventSink

CanalEventSink

canal-1.1.4/sink/src/main/java/com/alibaba/otter/canal/sink/CanalEventSink.java

public interface CanalEventSink<T> extends CanalLifeCycle {
​
    /**
     * 提交数据
     * 
     * @param event
     * @param remoteAddress
     * @param destination
     * @throws CanalSinkException
     * @throws InterruptedException
     */
    boolean sink(T event, InetSocketAddress remoteAddress, String destination) throws CanalSinkException,
                                                                              InterruptedException;
​
    /**
     * 中断消费,比如解析模块发生了切换,想临时中断当前的merge请求,清理对应的上下文状态,可见{@linkplain GroupEventSink}
     */
    void interrupt();
​
}
  • CanalEventSink继承了CanalLifeCycle,它定义了sink、interrupt接口

AbstractCanalEventSink

canal-1.1.4/sink/src/main/java/com/alibaba/otter/canal/sink/AbstractCanalEventSink.java

public abstract class AbstractCanalEventSink<T> extends AbstractCanalLifeCycle implements CanalEventSink<T> {
​
    protected CanalEventFilter                  filter;
    protected List<CanalEventDownStreamHandler> handlers = new ArrayList<CanalEventDownStreamHandler>();
​
    public void setFilter(CanalEventFilter filter) {
        this.filter = filter;
    }
​
    public void addHandler(CanalEventDownStreamHandler handler) {
        this.handlers.add(handler);
    }
​
    public CanalEventDownStreamHandler getHandler(int index) {
        return this.handlers.get(index);
    }
​
    public void addHandler(CanalEventDownStreamHandler handler, int index) {
        this.handlers.add(index, handler);
    }
​
    public void removeHandler(int index) {
        this.handlers.remove(index);
    }
​
    public void removeHandler(CanalEventDownStreamHandler handler) {
        this.handlers.remove(handler);
    }
​
    public CanalEventFilter getFilter() {
        return filter;
    }
​
    public List<CanalEventDownStreamHandler> getHandlers() {
        return handlers;
    }
​
    public void interrupt() {
        // do nothing
    }
​
}
  • AbstractCanalEventSink继承了AbstractCanalLifeCycle,声明实现了CanalEventSink接口;它定义了filter及handlers两个属性

EntryEventSink

canal-1.1.4/sink/src/main/java/com/alibaba/otter/canal/sink/entry/EntryEventSink.java

public class EntryEventSink extends AbstractCanalEventSink<List<CanalEntry.Entry>> implements CanalEventSink<List<CanalEntry.Entry>> {
​
    private static final Logger    logger                        = LoggerFactory.getLogger(EntryEventSink.class);
    private static final int       maxFullTimes                  = 10;
    private CanalEventStore<Event> eventStore;
    protected boolean              filterTransactionEntry        = false;                                        // 是否需要尽可能过滤事务头/尾
    protected boolean              filterEmtryTransactionEntry   = true;                                         // 是否需要过滤空的事务头/尾
    protected long                 emptyTransactionInterval      = 5 * 1000;                                     // 空的事务输出的频率
    protected long                 emptyTransctionThresold       = 8192;                                         // 超过8192个事务头,输出一个
​
    protected volatile long        lastTransactionTimestamp      = 0L;
    protected AtomicLong           lastTransactionCount          = new AtomicLong(0L);
    protected volatile long        lastEmptyTransactionTimestamp = 0L;
    protected AtomicLong           lastEmptyTransactionCount     = new AtomicLong(0L);
    protected AtomicLong           eventsSinkBlockingTime        = new AtomicLong(0L);
    protected boolean              raw;
​
    public EntryEventSink(){
        addHandler(new HeartBeatEntryEventHandler());
    }
​
    public void start() {
        super.start();
        Assert.notNull(eventStore);
​
        if (eventStore instanceof MemoryEventStoreWithBuffer) {
            this.raw = ((MemoryEventStoreWithBuffer) eventStore).isRaw();
        }
​
        for (CanalEventDownStreamHandler handler : getHandlers()) {
            if (!handler.isStart()) {
                handler.start();
            }
        }
    }
​
    public void stop() {
        super.stop();
​
        for (CanalEventDownStreamHandler handler : getHandlers()) {
            if (handler.isStart()) {
                handler.stop();
            }
        }
    }
​
    public boolean sink(List<CanalEntry.Entry> entrys, InetSocketAddress remoteAddress, String destination) {
        return sinkData(entrys, remoteAddress);
    }
​
    //......
}
  • EntryEventSink继承了AbstractCanalEventSink,声明实现了CanalEventSink,其start方法会遍历handlers,挨个执行handler.start方法;其stop方法会遍历handlers,挨个执行handler.stop方法;其sink方法执行的是sinkData方法

sinkData

canal-1.1.4/sink/src/main/java/com/alibaba/otter/canal/sink/entry/EntryEventSink.java

public class EntryEventSink extends AbstractCanalEventSink<List<CanalEntry.Entry>> implements CanalEventSink<List<CanalEntry.Entry>> {
​
    //......
​
    private boolean sinkData(List<CanalEntry.Entry> entrys, InetSocketAddress remoteAddress)
                                                                                            throws InterruptedException {
        boolean hasRowData = false;
        boolean hasHeartBeat = false;
        List<Event> events = new ArrayList<Event>();
        for (CanalEntry.Entry entry : entrys) {
            if (!doFilter(entry)) {
                continue;
            }
​
            if (filterTransactionEntry
                && (entry.getEntryType() == EntryType.TRANSACTIONBEGIN || entry.getEntryType() == EntryType.TRANSACTIONEND)) {
                long currentTimestamp = entry.getHeader().getExecuteTime();
                // 基于一定的策略控制,放过空的事务头和尾,便于及时更新数据库位点,表明工作正常
                if (lastTransactionCount.incrementAndGet() <= emptyTransctionThresold
                    && Math.abs(currentTimestamp - lastTransactionTimestamp) <= emptyTransactionInterval) {
                    continue;
                } else {
                    lastTransactionCount.set(0L);
                    lastTransactionTimestamp = currentTimestamp;
                }
            }
​
            hasRowData |= (entry.getEntryType() == EntryType.ROWDATA);
            hasHeartBeat |= (entry.getEntryType() == EntryType.HEARTBEAT);
            Event event = new Event(new LogIdentity(remoteAddress, -1L), entry, raw);
            events.add(event);
        }
​
        if (hasRowData || hasHeartBeat) {
            // 存在row记录 或者 存在heartbeat记录,直接跳给后续处理
            return doSink(events);
        } else {
            // 需要过滤的数据
            if (filterEmtryTransactionEntry && !CollectionUtils.isEmpty(events)) {
                long currentTimestamp = events.get(0).getExecuteTime();
                // 基于一定的策略控制,放过空的事务头和尾,便于及时更新数据库位点,表明工作正常
                if (Math.abs(currentTimestamp - lastEmptyTransactionTimestamp) > emptyTransactionInterval
                    || lastEmptyTransactionCount.incrementAndGet() > emptyTransctionThresold) {
                    lastEmptyTransactionCount.set(0L);
                    lastEmptyTransactionTimestamp = currentTimestamp;
                    return doSink(events);
                }
            }
​
            // 直接返回true,忽略空的事务头和尾
            return true;
        }
    }
​
    protected boolean doSink(List<Event> events) {
        for (CanalEventDownStreamHandler<List<Event>> handler : getHandlers()) {
            events = handler.before(events);
        }
        long blockingStart = 0L;
        int fullTimes = 0;
        do {
            if (eventStore.tryPut(events)) {
                if (fullTimes > 0) {
                    eventsSinkBlockingTime.addAndGet(System.nanoTime() - blockingStart);
                }
                for (CanalEventDownStreamHandler<List<Event>> handler : getHandlers()) {
                    events = handler.after(events);
                }
                return true;
            } else {
                if (fullTimes == 0) {
                    blockingStart = System.nanoTime();
                }
                applyWait(++fullTimes);
                if (fullTimes % 100 == 0) {
                    long nextStart = System.nanoTime();
                    eventsSinkBlockingTime.addAndGet(nextStart - blockingStart);
                    blockingStart = nextStart;
                }
            }
​
            for (CanalEventDownStreamHandler<List<Event>> handler : getHandlers()) {
                events = handler.retry(events);
            }
​
        } while (running && !Thread.interrupted());
        return false;
    }
​
    //......
​
}
  • sinkData方法遍历entrys,通过doFilter过滤掉一些entry,之后将entry转换为Event添加到events中,之后执行doSink方法;doSink方法遍历handlers,挨个执行handler.before(events),之后执行eventStore.tryPut(events),然后遍历handlers,挨个执行handler.after(events);若tryPut不成功,则遍历handlers,挨个执行handler.retry(events)

小结

CanalEventSink继承了CanalLifeCycle,它定义了sink、interrupt接口;AbstractCanalEventSink继承了AbstractCanalLifeCycle,声明实现了CanalEventSink接口;它定义了filter及handlers两个属性;EntryEventSink继承了AbstractCanalEventSink,声明实现了CanalEventSink,其start方法会遍历handlers,挨个执行handler.start方法;其stop方法会遍历handlers,挨个执行handler.stop方法;其sink方法执行的是sinkData方法

doc

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • CanalEventSink
  • AbstractCanalEventSink
  • EntryEventSink
  • sinkData
  • 小结
  • doc
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档