首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >聊聊KafkaCanalConnector的getFlatList

聊聊KafkaCanalConnector的getFlatList

作者头像
code4it
发布2020-04-14 14:37:04
6930
发布2020-04-14 14:37:04
举报
文章被收录于专栏:码匠的流水账码匠的流水账

本文主要研究一下KafkaCanalConnector的getFlatList

getFlatList

canal-1.1.4/client/src/main/java/com/alibaba/otter/canal/client/kafka/KafkaCanalConnector.java

public class KafkaCanalConnector implements CanalMQConnector {

    protected KafkaConsumer<String, Message> kafkaConsumer;
    protected KafkaConsumer<String, String>  kafkaConsumer2;                            // 用于扁平message的数据消费
    protected String                         topic;
    protected Integer                        partition;
    protected Properties                     properties;
    protected volatile boolean               connected      = false;
    protected volatile boolean               running        = false;
    protected boolean                        flatMessage;

    private Map<Integer, Long>               currentOffsets = new ConcurrentHashMap<>();

    //......

    public List<FlatMessage> getFlatList(Long timeout, TimeUnit unit) throws CanalClientException {
        waitClientRunning();
        if (!running) {
            return Lists.newArrayList();
        }

        List<FlatMessage> messages = getFlatListWithoutAck(timeout, unit);
        if (messages != null && !messages.isEmpty()) {
            this.ack();
        }
        return messages;
    }

    public List<FlatMessage> getFlatListWithoutAck(Long timeout, TimeUnit unit) throws CanalClientException {
        waitClientRunning();
        if (!running) {
            return Lists.newArrayList();
        }

        ConsumerRecords<String, String> records = kafkaConsumer2.poll(unit.toMillis(timeout));

        currentOffsets.clear();
        for (TopicPartition topicPartition : records.partitions()) {
            currentOffsets.put(topicPartition.partition(), kafkaConsumer2.position(topicPartition));
        }

        if (!records.isEmpty()) {
            List<FlatMessage> flatMessages = new ArrayList<>();
            for (ConsumerRecord<String, String> record : records) {
                String flatMessageJson = record.value();
                FlatMessage flatMessage = JSON.parseObject(flatMessageJson, FlatMessage.class);
                flatMessages.add(flatMessage);
            }

            return flatMessages;
        }
        return Lists.newArrayList();
    }

    //......
}
  • KafkaCanalConnector的getFlatList方法先通过getFlatListWithoutAck获取messages,然后执行ack;getFlatListWithoutAck通过kafkaConsumer2.poll方法获取records,然后更新记录的topicPartition到currentOffsets,之后将record转换为flatMessage

ack

canal-1.1.4/client/src/main/java/com/alibaba/otter/canal/client/kafka/KafkaCanalConnector.java

public class KafkaCanalConnector implements CanalMQConnector {

    //......

    public void ack() {
        waitClientRunning();
        if (!running) {
            return;
        }

        if (kafkaConsumer != null) {
            kafkaConsumer.commitSync();
        }
        if (kafkaConsumer2 != null) {
            kafkaConsumer2.commitSync();
        }
    }

    //......
}    
  • ack方法主要是执行kafkaConsumer.commitSync()及kafkaConsumer2.commitSync()

rollback

canal-1.1.4/client/src/main/java/com/alibaba/otter/canal/client/kafka/KafkaCanalConnector.java

public class KafkaCanalConnector implements CanalMQConnector {

    //......

    public void rollback() {
        waitClientRunning();
        if (!running) {
            return;
        }
        // 回滚所有分区
        if (kafkaConsumer != null) {
            for (Map.Entry<Integer, Long> entry : currentOffsets.entrySet()) {
                kafkaConsumer.seek(new TopicPartition(topic, entry.getKey()), entry.getValue() - 1);
            }
        }
        if (kafkaConsumer2 != null) {
            for (Map.Entry<Integer, Long> entry : currentOffsets.entrySet()) {
                kafkaConsumer2.seek(new TopicPartition(topic, entry.getKey()), entry.getValue() - 1);
            }
        }
    }

    //......
}      
  • rollback方法则根据currentOffsets的值通过kafkaConsumer的seek方法进行回退

小结

KafkaCanalConnector的getFlatList方法先通过getFlatListWithoutAck获取messages,然后执行ack;getFlatListWithoutAck通过kafkaConsumer2.poll方法获取records,然后更新记录的topicPartition到currentOffsets,之后将record转换为flatMessage

doc

  • KafkaCanalConnector
本文参与 腾讯云自媒体分享计划,分享自微信公众号。
原始发表:2020-04-09,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 码匠的流水账 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体分享计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • getFlatList
  • ack
  • rollback
  • 小结
  • doc
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档