本文主要研究一下KafkaCanalConnector的getFlatList
canal-1.1.4/client/src/main/java/com/alibaba/otter/canal/client/kafka/KafkaCanalConnector.java
public class KafkaCanalConnector implements CanalMQConnector {
protected KafkaConsumer<String, Message> kafkaConsumer;
protected KafkaConsumer<String, String> kafkaConsumer2; // 用于扁平message的数据消费
protected String topic;
protected Integer partition;
protected Properties properties;
protected volatile boolean connected = false;
protected volatile boolean running = false;
protected boolean flatMessage;
private Map<Integer, Long> currentOffsets = new ConcurrentHashMap<>();
//......
public List<FlatMessage> getFlatList(Long timeout, TimeUnit unit) throws CanalClientException {
waitClientRunning();
if (!running) {
return Lists.newArrayList();
}
List<FlatMessage> messages = getFlatListWithoutAck(timeout, unit);
if (messages != null && !messages.isEmpty()) {
this.ack();
}
return messages;
}
public List<FlatMessage> getFlatListWithoutAck(Long timeout, TimeUnit unit) throws CanalClientException {
waitClientRunning();
if (!running) {
return Lists.newArrayList();
}
ConsumerRecords<String, String> records = kafkaConsumer2.poll(unit.toMillis(timeout));
currentOffsets.clear();
for (TopicPartition topicPartition : records.partitions()) {
currentOffsets.put(topicPartition.partition(), kafkaConsumer2.position(topicPartition));
}
if (!records.isEmpty()) {
List<FlatMessage> flatMessages = new ArrayList<>();
for (ConsumerRecord<String, String> record : records) {
String flatMessageJson = record.value();
FlatMessage flatMessage = JSON.parseObject(flatMessageJson, FlatMessage.class);
flatMessages.add(flatMessage);
}
return flatMessages;
}
return Lists.newArrayList();
}
//......
}
canal-1.1.4/client/src/main/java/com/alibaba/otter/canal/client/kafka/KafkaCanalConnector.java
public class KafkaCanalConnector implements CanalMQConnector {
//......
public void ack() {
waitClientRunning();
if (!running) {
return;
}
if (kafkaConsumer != null) {
kafkaConsumer.commitSync();
}
if (kafkaConsumer2 != null) {
kafkaConsumer2.commitSync();
}
}
//......
}
canal-1.1.4/client/src/main/java/com/alibaba/otter/canal/client/kafka/KafkaCanalConnector.java
public class KafkaCanalConnector implements CanalMQConnector {
//......
public void rollback() {
waitClientRunning();
if (!running) {
return;
}
// 回滚所有分区
if (kafkaConsumer != null) {
for (Map.Entry<Integer, Long> entry : currentOffsets.entrySet()) {
kafkaConsumer.seek(new TopicPartition(topic, entry.getKey()), entry.getValue() - 1);
}
}
if (kafkaConsumer2 != null) {
for (Map.Entry<Integer, Long> entry : currentOffsets.entrySet()) {
kafkaConsumer2.seek(new TopicPartition(topic, entry.getKey()), entry.getValue() - 1);
}
}
}
//......
}
KafkaCanalConnector的getFlatList方法先通过getFlatListWithoutAck获取messages,然后执行ack;getFlatListWithoutAck通过kafkaConsumer2.poll方法获取records,然后更新记录的topicPartition到currentOffsets,之后将record转换为flatMessage
原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。
如有侵权,请联系 cloudcommunity@tencent.com 删除。
原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。
如有侵权,请联系 cloudcommunity@tencent.com 删除。