一、consumer概览
consumer客户端也有很多编程语言实现,本章还是以JAVA语言讲解为主
新版本把上文提到的消费者客户端位移状态信息提交到kafka服务端broker内的一个内部不对外显示的topic(名为__consumer_offsets)上
__consumer_offsets的每条消息格式大致如下
__consumer_offsets 内的数据:每当更新group_topic_partition分区里的位移时,会写入一条最新的offset消息,同事对该topic进行压实操作,就是为每个消息key只保存含有最新offset的消息。为什么这样?kafka设计初始就这样,不修改只增加。这样也可以控制住该内部topic的日志大小
为了避免多个consumer同时操作带来的写入并发问题,kafka提供了该topic(__consumer_offsets) 50个分区,对每个group.id进行哈希求模负载均衡到不同分区里
假设一个consumer group,有20个consumer实例,该group订阅了100个分区的topic,那么consumer group平均为每个consumer分配五个分区,即每个consumer实例负责读取5个分区的数据。这个分配过程叫rebalance(消费者组重平衡)
二、构建consumer
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092"); //必填
props.put("group.id", "test"); //必填
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); //必填
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");//必填
props.put("auto.offset.reset", "earliset");//从最早的消息开始读取
props.put("session.timeout.ms", "10000");//coordinator协调者检测失败的时间,检测消费者组中那个consumer崩溃从而快速开启rebalance避免造成更大的消费滞后(rebalance位移值丢失),默认时间10秒
props.put("max.poll.interval.ms", "10000");//单个consumer场景使用,消费者需要消费消息最大超时时间
props.put("auto.offset.reset", "earliset | latest | none"); //只有consumer消费的消息位移不在当前消息日志的合理区间范围或者位移越界了,上面两种情况该属性才生效
props.put("enable.auto.commit", "true"); //是否自动提交位移
props.put("fetch.max.bytes", ""); //指定consumer端单词获取数据的最大字节数
props.put("max.poll.records", "");//单次poll返回的最大消息数,1的话就是返回一条消息,这个太极端了
props.put("heartbeat.interval.ms", ""); //就是消费者组在rebalance的时候,其他成员知道它需要重新加入group,这个过程就是这里设置的超时时间。heartbeat.interval.ms
props.put("connections.max.idle.ms", "-1");//kafka定期地关闭空闲的连向broker的socket连接,默认9分钟。-1表示不要关闭空闲的链接
KafkaConsumer consumer = new KafkaConsumer(props);
consumer.subscribe(Arrays.asList("foo", "bar"));//订阅topic
// consumer.subscribe(Pattern.compile("kafka.*"),new NoOpConsumerRebalanceListenner()); //重复执行subscribe会覆盖上
面那句
//-----------
//消费者组订阅topic如下:
//consumer.subscribe(Arrays.asList("test1","test2"));
//独立消费者订阅如下:
//TopicPartition tp1=new TopicPartition("topic-name",0);
//TopicPartition tp2=new TopicPartition("topic-name",1);
//consumer.assign(Arrays.asList(tp1,tp2));
//订阅是延迟生效,当前订阅下次poll的时候才会生效
//poll使用方法
consumer需要定期执行其他子任务,推荐consumer.poll(较小超时时间)+运行标识布尔变量的方式
consumer不需要定期执行子任务,推荐consumer.poll(MAX_VALUE)+捕获WakeupException的方式
//------------
try{
while (true(运行标识布尔变量)) {
ConsumerRecords records = consumer.poll(1000); //超时1000ms,循环获取封装在ConsumerRecord的topic消息。拿到足够多的数据就立即返回,没有足够多的数据可供返回,consumer会处于阻塞状态,这时候阻塞超时时间为1000ms。所有的消费逻辑都爱poll方法内一次调用中被执行(包含coordinator的协调、消费者组的rebalance以及数据的获取),多个poll才有select I/O轮询调用
for (ConsumerRecord record : records)
}
// }catch(WakeupException e){ //该场景推荐consumer.poll(MAX_VALUE)+捕获WakeupException的方式
}finally{
consumer.close();//关闭consumer并最多等待30秒,默认30秒。不仅清除consumer创建的各种socket资源,还会通知消费者组coordinator主动离组从而更快地开启新一轮rebalance
}
三、位移
手动提交位移的时候如果选择异步提交,这个时候是跟poll方法一个线程,都是用户主线程,在poll被阻塞的时候异步提交
四、rebalance
Kafka集群会选一个broker为组协调者(group coordinator),coordinator负责对组的状态进行管理,它主要职责就是对新成员到达时促成组内成员达成新的分区分配方案,即coordinator负责对组执行rebalance操作
range分区策略
round-robin分区策略
sticky分区策略
rebalance监听器有一个主要的接口回调类ConsumerRebalanceListener,里面有两个方法onPartitionsRevoked和onPartitionAssigned。在coordinator开启新一轮rebalance前onPartitionsRevoked方法会被调用,而rebalance完成后会调用onPartitionAssigned方法
五、多线程
生产者是线程安全
消费者线程不安全
每个线程维护一个kafkaConsumer
单个kafkaConsumer实例+多worker线程
领取专属 10元无门槛券
私享最新 技术干货