前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >kafka 保证分区数据顺序

kafka 保证分区数据顺序

原创
作者头像
用户6327222
发布2022-04-19 17:28:24
7860
发布2022-04-19 17:28:24
举报
文章被收录于专栏:chong_baochong_bao

Producer

kafka 发送消息
代码语言:javascript
复制
KafkaTemplate
​
// 相同的key会发送到同一分区上
@Override
public ListenableFuture<SendResult<K, V>> send(String topic, K key, @Nullable V data) {
   ProducerRecord<K, V> producerRecord = new ProducerRecord<>(topic, key, data);
   return doSend(producerRecord);
}

// 指定分区
@Override
public ListenableFuture<SendResult<K, V>> sendDefault(Integer partition, K key, @Nullable V data) {
    return send(this.defaultTopic, partition, key, data);
}
​
kafka 发送消息-默认分区实现
代码语言:javascript
复制
默认分片
org.apache.kafka.clients.producer.internals.DefaultPartitioner#partition
​
public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
    if (keyBytes == null) {
        return stickyPartitionCache.partition(topic, cluster);
    } 
    List<PartitionInfo> partitions = cluster.partitionsForTopic(topic);
    int numPartitions = partitions.size();
    // hash the keyBytes to choose a partition
    return Utils.toPositive(Utils.murmur2(keyBytes)) % numPartitions;
}

Consumer

kafka 监听获取消息
代码语言:javascript
复制
@KafkaListener(topics = "topic1", groupId = "fooGroup")
public void listen(String kms) {
   logger.info("Received: " + kms);
   if (kms.startsWith("fail")) {
      throw new RuntimeException("failed");
   }
}
​
@KafkaListener(topics = "topic1.DLT", groupId = "fooGroup")
public void dltListen(String kms) {
   logger.info("Received from DLT: " + kms);
}

kafka 监听获取消息实现
代码语言:javascript
复制
获取分片
org.springframework.kafka.listener.KafkaMessageListenerContainer.ListenerConsumer#seekPartitions
​
private void seekPartitions(Collection<TopicPartition> partitions, boolean idle) {
   this.consumerSeekAwareListener.registerSeekCallback(this);
   Map<TopicPartition, Long> current = new HashMap<>();
   // 获取分片数据
   for (TopicPartition topicPartition : partitions) {
      current.put(topicPartition, ListenerConsumer.this.consumer.position(topicPartition));
   }
   if (idle) {
      this.consumerSeekAwareListener.onIdleContainer(current, this.seekCallback);
   }
   else {
      this.consumerSeekAwareListener.onPartitionsAssigned(current, this.seekCallback);
   }
}

kafka 监听私信队列
私信队列配置
代码语言:javascript
复制
@Configuration
public class KafkaErrorHandlerConfig {
    public KafkaErrorHandlerConfig() {
    }
​
    @Bean
    public ConcurrentKafkaListenerContainerFactory<?, ?> kafkaListenerContainerFactory(
        ConcurrentKafkaListenerContainerFactoryConfigurer configurer, 
        ConsumerFactory<Object, Object> kafkaConsumerFactory, 
        KafkaTemplate<Object, Object> template) {
        ConcurrentKafkaListenerContainerFactory<Object, Object> factory = new ConcurrentKafkaListenerContainerFactory();
        configurer.configure(factory, kafkaConsumerFactory);
        // dead-letter after 3 tries
        factory.setErrorHandler(new SeekToCurrentErrorHandler(new DeadLetterPublishingRecoverer(template), new FixedBackOff(0L, 2L)));
        return factory;
    }
}

默认私信队列
代码语言:javascript
复制
DeadLetterPublishingRecoverer
​
// 默认私信队列名称
private static final BiFunction<ConsumerRecord<?, ?>, Exception, TopicPartition>
   DEFAULT_DESTINATION_RESOLVER = (cr, e) -> new TopicPartition(cr.topic() + ".DLT", cr.partition());

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • Producer
    • kafka 发送消息
      • kafka 发送消息-默认分区实现
      • Consumer
        • kafka 监听获取消息
          • kafka 监听获取消息实现
            • kafka 监听私信队列
            领券
            问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档