在了解了消费者与消费组之间的概念之后,我们就可以着手进行消费者客户端的开发了。在 Kafka 的历史中,消费者客户端同生产者客户端一样也经历了两个大版本:第一个是于 Kafka 开源之初使用 Scala 语言编写的客户端,我们可以称之为旧消费者客户端或 Scala 消费者客户端;第二个是从 Kafka 0.9.x 版本开始推出的使用 Java 编写的客户端,我们可以称之为新消费者客户端或 Java 消费者客户端,它弥补了旧客户端中存在的诸多设计缺陷。
正常的消费逻辑需要具备以下几个步骤:
在创建真正的消费者实例之前需要做相应的参数配置,比如上一节中的设置消费者所属的消费组的名称、连接地址等。参照上面代码中的 initConfig() 方法,在 Kafka 消费者客户端 KafkaConsumer 中有4个参数是必填的。
initConfig() 方法里还设置了一个参数 client.id,这个参数用来设定 KafkaConsumer 对应的客户端id,默认值也为“”。如果客户端不设置,则 KafkaConsumer 会自动生成一个非空字符串,内容形式如“consumer-1”、“consumer-2”,即字符串“consumer-”与数字的拼接。
KafkaConsumer 中的参数众多,远非示例 initConfig() 方法中的那样只有5个,开发人员可以根据业务应用的实际需求来修改这些参数的默认值,以达到灵活调配的目的。
在创建好消费者之后,我们就需要为该消费者订阅相关的主题了。一个消费者可以订阅一个或多个主题,代码我们使用 subscribe() 方法订阅了一个主题,对于这个方法而言,既可以以集合的形式订阅多个主题,也可以以正则表达式的形式订阅特定模式的主题。subscribe 的几个重载方法如下:
对于消费者使用集合的方式来订阅主题而言,比较容易理解,订阅了什么主题就消费什么主题中的消息。如果前后两次订阅了不同的主题,那么消费者以最后一次的为准。
如果消费者采用的是正则表达式的方式(subscribe(Pattern))订阅,在之后的过程中,如果有人又创建了新的主题,并且主题的名字与正则表达式相匹配,那么这个消费者就可以消费到新添加的主题中的消息。如果应用程序需要消费多个主题,并且可以处理不同的类型,那么这种订阅方式就很有效。在 Kafka 和其他系统之间进行数据复制时,这种正则表达式的方式就显得很常见。正则表达式的方式订阅的示例如下:
消费者不仅可以通过 KafkaConsumer.subscribe() 方法订阅主题,还可以直接订阅某些主题的特定分区,在 KafkaConsumer 中还提供了一个 assign() 方法来实现这些功能,此方法的具体定义如下:
这里补充说明一下 TopicPartition 类,在 Kafka 的客户端中,它用来表示分区,这个类的部分内容如下所示。
TopicPartition 类只有2个属性:topic 和 partition,分别代表分区所属的主题和自身的分区编号,这个类可以和我们通常所说的主题—分区的概念映射起来。
有读者会有疑问:如果我们事先并不知道主题中有多少个分区怎么办?KafkaConsumer 中的 partitionsFor() 方法可以用来查询指定主题的元数据信息,partitionsFor() 方法的具体定义如下:
这里会有个疑问:会有疑问:如果我们事先并不知道主题中有多少个分区怎么办?KafkaConsumer 中的 partitionsFor() 方法可以用来查询指定主题的元数据信息,partitionsFor() 方法的具体定义如下:
其中 PartitionInfo 类型即为主题的分区元数据信息
PartitionInfo 类中的属性 topic 表示主题名称,partition 代表分区编号,leader 代表分区的 leader 副本所在的位置,replicas 代表分区的 AR 集合,inSyncReplicas 代表分区的 ISR 集合,offlineReplicas 代表分区的 OSR 集合。通过 partitionFor() 方法的协助,我们可以通过 assign() 方法来实现订阅主题(全部分区)的功能,示例参考如下
既然有订阅,那么就有取消订阅,可以使用 KafkaConsumer 中的 unsubscribe() 方法来取消主题的订阅。这个方法既可以取消通过 subscribe(Collection) 方式实现的订阅,也可以取消通过 subscribe(Pattern) 方式实现的订阅,还可以取消通过 assign(Collection) 方式实现的订阅。示例代码如下:
如果没有订阅任何主题或分区,那么再继续执行消费程序的时候会报出 IllegalStateException 异常:
集合订阅的方式 subscribe(Collection)、正则表达式订阅的方式 subscribe(Pattern) 和指定分区的订阅方式 assign(Collection) 分表代表了三种不同的订阅状态:AUTO_TOPICS、AUTO_PATTERN 和 USER_ASSIGNED(如果没有订阅,那么订阅状态为 NONE)。然而这三种状态是互斥的,在一个消费者中只能使用其中的一种,否则会报出 IllegalStateException 异常:
通过 subscribe() 方法订阅主题具有消费者自动再均衡的功能,在多个消费者的情况下可以根据分区分配策略来自动分配各个消费者与分区的关系。当消费组内的消费者增加或减少时,分区分配关系会自动调整,以实现消费负载均衡及故障自动转移。而通过 assign() 方法订阅分区时,是不具备消费者自动均衡的功能的,其实这一点从 assign() 方法的参数中就可以看出端倪,两种类型的 subscribe() 都有 ConsumerRebalanceListener 类型参数的方法,而 assign() 方法却没有。
原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。
如有侵权,请联系 cloudcommunity@tencent.com 删除。
原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。
如有侵权,请联系 cloudcommunity@tencent.com 删除。