前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >聊聊KafkaCanalConnector的getFlatList

聊聊KafkaCanalConnector的getFlatList

原创
作者头像
code4it
修改2020-04-10 10:16:22
8640
修改2020-04-10 10:16:22
举报
文章被收录于专栏:码匠的流水账

本文主要研究一下KafkaCanalConnector的getFlatList

getFlatList

canal-1.1.4/client/src/main/java/com/alibaba/otter/canal/client/kafka/KafkaCanalConnector.java

代码语言:javascript
复制
public class KafkaCanalConnector implements CanalMQConnector {
​
    protected KafkaConsumer<String, Message> kafkaConsumer;
    protected KafkaConsumer<String, String>  kafkaConsumer2;                            // 用于扁平message的数据消费
    protected String                         topic;
    protected Integer                        partition;
    protected Properties                     properties;
    protected volatile boolean               connected      = false;
    protected volatile boolean               running        = false;
    protected boolean                        flatMessage;
​
    private Map<Integer, Long>               currentOffsets = new ConcurrentHashMap<>();
​
    //......
​
    public List<FlatMessage> getFlatList(Long timeout, TimeUnit unit) throws CanalClientException {
        waitClientRunning();
        if (!running) {
            return Lists.newArrayList();
        }
​
        List<FlatMessage> messages = getFlatListWithoutAck(timeout, unit);
        if (messages != null && !messages.isEmpty()) {
            this.ack();
        }
        return messages;
    }
​
    public List<FlatMessage> getFlatListWithoutAck(Long timeout, TimeUnit unit) throws CanalClientException {
        waitClientRunning();
        if (!running) {
            return Lists.newArrayList();
        }
​
        ConsumerRecords<String, String> records = kafkaConsumer2.poll(unit.toMillis(timeout));
​
        currentOffsets.clear();
        for (TopicPartition topicPartition : records.partitions()) {
            currentOffsets.put(topicPartition.partition(), kafkaConsumer2.position(topicPartition));
        }
​
        if (!records.isEmpty()) {
            List<FlatMessage> flatMessages = new ArrayList<>();
            for (ConsumerRecord<String, String> record : records) {
                String flatMessageJson = record.value();
                FlatMessage flatMessage = JSON.parseObject(flatMessageJson, FlatMessage.class);
                flatMessages.add(flatMessage);
            }
​
            return flatMessages;
        }
        return Lists.newArrayList();
    }
​
    //......
}
  • KafkaCanalConnector的getFlatList方法先通过getFlatListWithoutAck获取messages,然后执行ack;getFlatListWithoutAck通过kafkaConsumer2.poll方法获取records,然后更新记录的topicPartition到currentOffsets,之后将record转换为flatMessage

ack

canal-1.1.4/client/src/main/java/com/alibaba/otter/canal/client/kafka/KafkaCanalConnector.java

代码语言:javascript
复制
public class KafkaCanalConnector implements CanalMQConnector {
​
    //......
​
    public void ack() {
        waitClientRunning();
        if (!running) {
            return;
        }
​
        if (kafkaConsumer != null) {
            kafkaConsumer.commitSync();
        }
        if (kafkaConsumer2 != null) {
            kafkaConsumer2.commitSync();
        }
    }
​
    //......
}    
  • ack方法主要是执行kafkaConsumer.commitSync()及kafkaConsumer2.commitSync()

rollback

canal-1.1.4/client/src/main/java/com/alibaba/otter/canal/client/kafka/KafkaCanalConnector.java

代码语言:javascript
复制
public class KafkaCanalConnector implements CanalMQConnector {
​
    //......
​
    public void rollback() {
        waitClientRunning();
        if (!running) {
            return;
        }
        // 回滚所有分区
        if (kafkaConsumer != null) {
            for (Map.Entry<Integer, Long> entry : currentOffsets.entrySet()) {
                kafkaConsumer.seek(new TopicPartition(topic, entry.getKey()), entry.getValue() - 1);
            }
        }
        if (kafkaConsumer2 != null) {
            for (Map.Entry<Integer, Long> entry : currentOffsets.entrySet()) {
                kafkaConsumer2.seek(new TopicPartition(topic, entry.getKey()), entry.getValue() - 1);
            }
        }
    }
​
    //......
}      
  • rollback方法则根据currentOffsets的值通过kafkaConsumer的seek方法进行回退

小结

KafkaCanalConnector的getFlatList方法先通过getFlatListWithoutAck获取messages,然后执行ack;getFlatListWithoutAck通过kafkaConsumer2.poll方法获取records,然后更新记录的topicPartition到currentOffsets,之后将record转换为flatMessage

doc

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • getFlatList
  • ack
  • rollback
  • 小结
  • doc
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档