前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >Kafka消费者 之 如何订阅主题或分区

Kafka消费者 之 如何订阅主题或分区

作者头像
create17
发布2019-07-17 16:58:34
2K0
发布2019-07-17 16:58:34
举报

每一个成功人士的背后,必定曾经做出过勇敢而又孤独的决定。

放弃不难,但坚持很酷~

代码语言:javascript
复制
一、消费者配置在创建真正消费者实例之前,需要做相应的参数配置,比如设置消费者所属的消费者组名称、broker 链接地址、反序列化的配置等。private static final String BROKERLIST = "node71.xdata:6667,node72.xdata:6667,node73.xdata:6667";
private static final String TOPIC = "topic-demo";
private static final String GROUPID = "group.demo.1";
private static final String CLIENTID = "consumer.client.id.1";

private static Properties initConfig() {
    Properties props = new Properties();
    // kafka集群所需的broker地址清单
    props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, BROKERLIST);
    // 设定kafkaConsumer对应的客户端id
    props.put(ConsumerConfig.CLIENT_ID_CONFIG, CLIENTID);
    // 消费者从broker端获取的消息格式都是byte[]数组类型,key和value需要进行反序列化。
    props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
    props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
    // 指定一个全新的group.id并且将auto.offset.reset设置为earliest可拉取该主题内所有消息记录。
    props.put(ConsumerConfig.GROUP_ID_CONFIG, GROUPID);
    props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
    // 关闭offset自动提交
    props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
    return props;
}
更多消费者配置可参考官网:https://kafka.apache.org/documentation/#consumerconfigs二、订阅主题与分区1、订阅主题消费者可使用 subscribe() 方法订阅一个主题。对于这个方法而言,即可以以集合的形式订阅多个主题,也可以以正则表达式的形式订阅特定模式的主题。subscribe() 的几个重载方法如下:public void subscribe(Collection<String> topics)public void subscribe(Pattern pattern)public void subscribe(Pattern pattern, ConsumerRebalanceListener listener)public void subscribe(Collection<String> topics, ConsumerRebalanceListener listener)示例如下:// 订阅多个主题
kafkaConsumer.subscribe(Arrays.asList("test1","test2","..."));
2、订阅分区消费者还可以直接订阅某些主题的特定分区,在KafkaConsumer中提供了一个 assign() 方法来实现这些功能,此方法的具体定义如下:public  void  assign(Collection<TopicPartition>  partitions)
该方法只接受一个参数 partitions ,用来指定需要订阅的分区集合。补充说明一下 TopicPartition 类,在 Kafka 的客户端中,它用来表示分区,该类的部分内容如下图所示:TopicPartition 类只有两个属性:topic 和 partition ,分别代表分区所属的主题和自身的分区编号,这个类可以和我们通常所说的主题-分区的概念映射起来。比如需要订阅 test 主题分区编号为 0 的分区,示例如下:
kafkaConsumer.assign(Arrays.asList(new TopicPartition("test", 0)));
Kafka 提供了一个计算主题分区的方法:partitionsFor() ,该方法可以查询指定主题的元数据信息。partitionsFor() 方法的具体定义如下:public List<PartitionInfo> partitionsFor(String topic)
其中 PartitionInfo 类即为主题的分区元数据信息,此类的主要结构如下:现在,通过 partitionFor() 方法的协助,我们可以通过 assign() 方法来实现订阅主题(全部分区)的功能,示例代码参考如下:
3、如何取消订阅
既然有订阅,那么就有取消订阅。可以使用 KafkaConsumer 中的 unsubscribe() 方法来取消主题的订阅。这个方法即可以取消通过 subscribe(Collection) 方式实现的订阅,也可以通过取消  subscribe(Pattern) 方式实现的订阅,还可以取消通过 assign(Collection) 方式实现的订阅。示例代码如下:consumer.unsubscribe();
除了使用 来取消订阅,还可以将 subscribe(Collection) 或  assign(Collection) 中的集合参数设置为空集合,作用等同于 unsubscribe() 方法,示例中三行代码效果相同:consumer.unsubscribe();
consumer.subscribe(new ArrayList<String>());
consumer.assgin(new ArrayList<TopicPartition>())
二、小结通过 subscribe() 方法订阅主题具有消费者自动再均衡的功能,在多个消费者的情况下可以根据分区分配策略来自动分配各个消费者与分区的关系。当消费组内的消费者增加或减少时,分区分配关系会自动调整,以实现消费负载均衡及故障自动转移,而通过 assign() 方法订阅分区时,是不具备消费者自动均衡的功能的,其实这一点从 assign() 方法的参数中就可以看出端倪,两种类型的 subscribe() 都有 ConsumerRebalanceListener 类型参数的方法,而 assign() 方法却没有。三、推荐阅读》

另外本文涉及到的源码已上传至:github,链接如下:

https://github.com/841809077/hdpproject/blob/master/src/main/java/com/hdp/project/kafka/consumer/KafkaConsumerAnalysis.java

本文参与 腾讯云自媒体分享计划,分享自微信公众号。
原始发表:2019-07-14,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 大数据实战演练 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体分享计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
相关产品与服务
负载均衡
负载均衡(Cloud Load Balancer,CLB)提供安全快捷的流量分发服务,访问流量经由 CLB 可以自动分配到云中的多台后端服务器上,扩展系统的服务能力并消除单点故障。负载均衡支持亿级连接和千万级并发,可轻松应对大流量访问,满足业务需求。
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档