前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >Kafka消费者 之 如何进行消息消费

Kafka消费者 之 如何进行消息消费

作者头像
create17
发布2019-07-16 15:52:56
3.4K0
发布2019-07-16 15:52:56
举报

每一个成功人士的背后,必定曾经做出过勇敢而又孤独的决定。

放弃不难,但坚持很酷~由于消费者模块的知识涉及太多,所以决定先按模块来整理知识,最后再进行知识模块汇总。

一、消息消费

1、poll()

Kafka 中的消费是基于拉模式的,即消费者主动向服务端发起请求来拉取消息。Kakfa 中的消息消费是一个不断轮询的过程,消费者所要做的就是重复地调用 poll() 方法,而 poll() 方法返回的是所订阅主题(或分区)上的一组消息。一旦消费者订阅了主题(或分区),轮询就会处理所有细节,包括群组协调、分区再均衡、发送心跳和获取数据。

对于 poll() 方法而言,如果某些分区中没有可供消费的消息,那么此分区对应的消息拉取的结果就为空;如果订阅的所有分区中都没有可供消费的消息,那么 poll() 方法返回为空的消息集合。

poll() 方法的具体定义如下:

代码语言:javascript
复制
public ConsumerRecords<K, V> poll(long timeout)

注意到 poll() 方法里还有一个超时时间参数 timeout ,用来控制 poll() 方法的阻塞时间。在 Kafka 2.0.0之前的版本中,timeout 参数类型为 long ;Kafka 2.0.0之后的版本中,timeout 参数的类型为 Duration ,它是 JDK8 中新增的一个与时间相关的模型。

代码语言:javascript
复制
public ConsumerRecords<K, V> poll(final Duration timeout)

poll(long) 方法中 timeout 的时间单位固定为毫秒,而poll(Duration) 方法可以根据 Duration 中的 ofMillis()、ofSeconds()、ofMinutes()、ofHours() 等多种不同的方法指定不同的时间单位,灵活性更强。

timeout 的设置取决于应用程序对响应速度的要求,比如需要多长时间内将控制权移交给执行轮询的应用线程。如果直接将 timeout 设置为 0 ,这样 poll() 方法会立刻返回,而不管是否已经拉到了消息。如果知道这个原理的话,在写消费程序过程中,如果第一次没有拉取到数据,第二次才拉取到数据也就不足为奇了。

consumer.poll() 拉取数据的最大值由 max.poll.records 配置约束,默认值为 500 。

2、ConsumerRecord

消费者消费到的每条消息的类型为 ConsumerRecord(注意与 ConsumerRecords 的区别),这个和生产者发送的消息类型 ProducerRecord 相对应,不过 ConsumerRecord 中的内容更加丰富,具体的结构参考如下代码:

代码语言:javascript
复制
public class ConsumerRecord<K, V> {
    public static final long NO_TIMESTAMP = -1L;
    public static final int NULL_SIZE = -1;
    public static final int NULL_CHECKSUM = -1;
    private final String topic;
    private final int partition;
    private final long offset;
    private final long timestamp;
    private final TimestampType timestampType;
    private final int serializedKeySize;
    private final int serializedValueSize;
    private final Headers headers;
    private final K key;
    private final V value;
    private volatile Long checksum;
    // 省略若干方法
}

topic 和 partition 这两个字段分别代表消息所属主题的名称和所在分区的编号。offset 表示消息在所属分区的偏移量。timestamp 表示时间戳,与此对应的 timestampType 表示时间戳的类型。timestampType 有两种类型:CreateTime 和 LogAppendTime ,分别代表 消息创建的时间戳 和 消息追加到日志的时间戳 。headers 表示消息的头部内容。key 和 value 分别表示消息的键和消息的值,一般业务应用要读取的就是 value ,serializedKeySize 和 serializedValueSize 分别表示 key 和 value 经过序列化之后的大小,如果key为空,则 serializedKeysize 值为 -1。同样,如果 value 为空,则 serializedValueSize 的值也会为 -1 。checksum 是 CRC32 的校验值。

我们在消息消费时可以直接对 ConsumerRecord 中感兴趣的字段进行具体的业务逻辑处理。

3、iterator()

poll() 方法的返回值类型是 ConsumerRecords ,它用来表示一次拉取操作所获得的消息集,内部包含了若干 ConsumerRecord ,它提供了一个 iterator() 方法来循环遍历消息集内部的消息,示例如下:

代码语言:javascript
复制
Iterator<ConsumerRecord<String, String>> iterator = records.iterator();
while (iterator.hasNext()) {
    ConsumerRecord<String, String> record = iterator.next();
    System.out.println("topic = " + record.topic() + ", partition = " + record.partition() + ", offset = " + record.offset());
    System.out.println("key = " + record.key() + ", value = " + record.value());
}
4、records(TopicPartition)

我们还可以按照分区来进行消费,这一点很有用,在手动提交位移时尤为明显。ConsumerRecords 类提供了一个 records(TopicPartition) 方法来获取消息集中指定分区的消息。此方法的定义如下:

代码语言:javascript
复制
public List<ConsumerRecord<K, V>> records(TopicPartition partition)

可以使用 records(TopicPartition) 来代替 iterator() 的消费逻辑,示例如下:

代码语言:javascript
复制
// records(TopicPartition)
for(TopicPartition tp : records.partitions()){
    // tp: topic-demo-0、topic-demo-1、topic-demo-2、topic-demo-3
    // 指定获取某一主题的某一分区:tp = new TopicPartition(TOPIC, 0)
    for(ConsumerRecord<String, String> record : records.records(tp)){
        System.out.println("topic = " + record.topic() + ", partition = " + record.partition() + ", offset = " + record.offset());
        System.out.println("key = " + record.key() + ", value = " + record.value());
    }
}
5、records(String topic)

在 ConsumerRecords 类中还提供了按照主题维度来进行消费的方法,这个方法是 records(TopicPartition) 的重载方法,具体定义如下:

代码语言:javascript
复制
public Iterable<ConsumerRecord<K, V>> records(String topic)

比如消费者消费了 topic-demo 和 topic-test 两个主题,我们可以通过 records(String topic) 只获取某一主题的消息,示例如下,只获取 topic-demo 主题的消息:

代码语言:javascript
复制
// records(String topicName)
for(ConsumerRecord<String, String> record : records.records("topic-demo")){
    System.out.println("topic = " + record.topic() + ", partition = " + record.partition() + ", offset = " + record.offset());
    System.out.println("key = " + record.key() + ", value = " + record.value());
}

二、总结

本文主要讲解了消费者如何从订阅的主题或分区中拉取数据的,使用的 poll() 方法。拉取到之后,又顺势讲解了 ConsumerRecord 内部结构,以及自带的 iterator() 方法,遍历得到每一个 ConsumerRecord 。最后讲解了 records() 方法的两种使用,一种是指定分区来消费,另一种是指定主题来消费。

在外观上来看,poll() 方法只是拉取了一下数据,但就其内部逻辑而言并不简单,它涉及消息位移、消费者协调器、组协调器、消费者的选举、分区分配的分发、再均衡的逻辑、心跳等内容,在后面的学习中会陆续介绍这些内容。

另外本文涉及到的源码已上传至:github,链接如下:

https://github.com/841809077/hdpproject/blob/master/src/main/java/com/hdp/project/kafka/consumer/MessageConsumer.java

本文参与 腾讯云自媒体分享计划,分享自微信公众号。
原始发表:2019-07-12,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 大数据实战演练 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体分享计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 一、消息消费
    • 1、poll()
      • 2、ConsumerRecord
        • 3、iterator()
          • 4、records(TopicPartition)
            • 5、records(String topic)
            • 二、总结
            领券
            问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档