前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >聊聊CanalEventDownStreamHandler

聊聊CanalEventDownStreamHandler

原创
作者头像
code4it
修改2020-04-16 09:42:10
2540
修改2020-04-16 09:42:10
举报
文章被收录于专栏:码匠的流水账码匠的流水账

本文主要研究一下CanalEventDownStreamHandler

CanalEventDownStreamHandler

canal-1.1.4/sink/src/main/java/com/alibaba/otter/canal/sink/CanalEventDownStreamHandler.java

代码语言:javascript
复制
public interface CanalEventDownStreamHandler<T> extends CanalLifeCycle {
​
    /**
     * 提交到store之前做一下处理,允许替换Event
     */
    public T before(T events);
​
    /**
     * store处于full后,retry时处理做一下处理
     */
    public T retry(T events);
​
    /**
     * 提交store成功后做一下处理
     */
    public T after(T events);
}
  • CanalEventDownStreamHandler继承了CanalLifeCycle接口,它定义了before、retry、after方法

AbstractCanalEventDownStreamHandler

canal-1.1.4/sink/src/main/java/com/alibaba/otter/canal/sink/AbstractCanalEventDownStreamHandler.java

代码语言:javascript
复制
public class AbstractCanalEventDownStreamHandler<T> extends AbstractCanalLifeCycle implements CanalEventDownStreamHandler<T> {
​
    public T before(T events) {
        return events;
    }
​
    public T retry(T events) {
        return events;
    }
​
    public T after(T events) {
        return events;
    }
​
}
  • AbstractCanalEventDownStreamHandler继承了AbstractCanalLifeCycle,实现了CanalEventDownStreamHandler接口,before、retry、after方法默认返回入参的events

HeartBeatEntryEventHandler

canal-1.1.4/sink/src/main/java/com/alibaba/otter/canal/sink/entry/HeartBeatEntryEventHandler.java

代码语言:javascript
复制
public class HeartBeatEntryEventHandler extends AbstractCanalEventDownStreamHandler<List<Event>> {
​
    public List<Event> before(List<Event> events) {
        boolean existHeartBeat = false;
        for (Event event : events) {
            if (event.getEntryType() == EntryType.HEARTBEAT) {
                existHeartBeat = true;
            }
        }
​
        if (!existHeartBeat) {
            return events;
        } else {
            // 目前heartbeat和其他事件是分离的,保险一点还是做一下检查处理
            List<Event> result = new ArrayList<Event>();
            for (Event event : events) {
                if (event.getEntryType() != EntryType.HEARTBEAT) {
                    result.add(event);
                }
            }
​
            return result;
        }
    }
​
}
  • HeartBeatEntryEventHandler继承了AbstractCanalEventDownStreamHandler,其before方法遍历events判断是否有EntryType.HEARTBEAT类型的event,如果没有则立即返回events,如果有则返回非heartbeat的事件

PrometheusCanalEventDownStreamHandler

canal-1.1.4/prometheus/src/main/java/com/alibaba/otter/canal/prometheus/impl/PrometheusCanalEventDownStreamHandler.java

代码语言:javascript
复制
public class PrometheusCanalEventDownStreamHandler extends AbstractCanalEventDownStreamHandler<List<Event>> {
​
    private final AtomicLong latestExecuteTime  = new AtomicLong(System.currentTimeMillis());
    private final AtomicLong transactionCounter = new AtomicLong(0L);
​
    @Override
    public List<Event> before(List<Event> events) {
        long localExecTime = 0L;
        if (events != null && !events.isEmpty()) {
            for (Event e : events) {
                EntryType type = e.getEntryType();
                if (type == null) continue;
                switch (type) {
                    case TRANSACTIONBEGIN: {
                        long exec = e.getExecuteTime();
                        if (exec > 0) localExecTime = exec;
                        break;
                    }
                    case ROWDATA: {
                        long exec = e.getExecuteTime();
                        if (exec > 0) localExecTime = exec;
                        break;
                    }
                    case TRANSACTIONEND: {
                        long exec = e.getExecuteTime();
                        if (exec > 0) localExecTime = exec;
                        transactionCounter.incrementAndGet();
                        break;
                    }
                    case HEARTBEAT:
                        CanalEntry.EventType eventType = e.getEventType();
                        if (eventType == CanalEntry.EventType.MHEARTBEAT) {
                            localExecTime = System.currentTimeMillis();
                        }
                        break;
                    default:
                        break;
                }
            }
            if (localExecTime > 0) {
                latestExecuteTime.lazySet(localExecTime);
            }
        }
        return events;
    }
​
    @Override
    public void start() {
​
        super.start();
    }
​
    @Override
    public void stop() {
        super.stop();
    }
​
    public AtomicLong getLatestExecuteTime() {
        return latestExecuteTime;
    }
​
    public AtomicLong getTransactionCounter() {
        return transactionCounter;
    }
​
}
  • PrometheusCanalEventDownStreamHandler继承了AbstractCanalEventDownStreamHandler,其before方法遍历events,然后根据不同的EntryType来更新localExecTime、transactionCounter、latestExecuteTime

小结

CanalEventDownStreamHandler继承了CanalLifeCycle接口,它定义了before、retry、after方法;AbstractCanalEventDownStreamHandler继承了AbstractCanalLifeCycle,实现了CanalEventDownStreamHandler接口,before、retry、after方法默认返回入参的events;目前有HeartBeatEntryEventHandler及PrometheusCanalEventDownStreamHandler继承了AbstractCanalEventDownStreamHandler

doc

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • CanalEventDownStreamHandler
  • AbstractCanalEventDownStreamHandler
  • HeartBeatEntryEventHandler
  • PrometheusCanalEventDownStreamHandler
  • 小结
  • doc
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档