Consumer之自动提交 在上文中介绍了Producer API的使用,现在我们已经知道如何将消息通过API发送到Kafka中了,那么现在的生产者/消费者模型就还差一位扮演消费者的角色了。 因此,本文将介绍Consumer API的使用,使用API从Kafka中消费消息,让应用成为一个消费者角色。 0.0.1:9092"); // 指定group.id,Kafka中的消费者需要在消费者组里 props.setProperty(ConsumerConfig.GROUP_ID_CONFIG 中,当消费者消费数据后,需要提交数据的offset来告知服务端成功消费了哪些数据。 若消费者处理数据失败时,只要不提交相应的offset,就可以在下一次重新进行消费。 和数据库的事务一样,Kafka消费者提交offset的方式也有两种,分别是自动提交和手动提交。
Kafka 消费者总共有 3 种 API,新版 API、旧版高级 API、旧版低级 API,新版 API 是在 kafka 0.9 版本后增加的,推荐使用新版 API,但由于旧版低级 API 可以对消息进行更加灵活的控制 ,所有在实际开发中使用的也较多,本文讨论消费者旧版低级 API 的基本使用。 ; import kafka.api.FetchRequestBuilder; import kafka.api.PartitionOffsetRequestInfo; import kafka.common.ErrorMapping 旧版消费者API——低级API * @Author YangYunhe * @Date 2018-06-26 13:16:29 */ public class SimpleConsumerTest (),获取最开始的消费偏移量,不一定是0,因为segment会删除 * kafka.api.OffsetRequest.LatestTime(),获取最新的消费偏移量
个人网站、项目部署、开发环境、游戏服务器、图床、渲染训练等免费搭建教程,多款云服务器20元起。
; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecords 重要性:低 说明:我们通过 fetch.min.bytes 告诉 Kafka,等到有足够的数据时才把它返回给消费者。 而 feth.max.wait.ms 则用于指定 broker 的等待时间,默认是如果没有足够的数据流入Kafka,消费者获取最小数据量的要求就得不到满足,最终导致 500ms 的延迟。 如果 fetch.max.wait.ms 被设为 100ms,并且 fetch.min.bytes 被设为 1MB,那么 Kafka 在收到消费者的请求后,要么返回 1MB 数据,要么在 100ms 后返回所有可用的数据 Kafka 有两个默认的分配策略。 Range:该策略会把主题的若干个连续的分区分配给消费者。假设消费者 C1 和消费者 C2 同时订阅了主题 T1 和主题 T2,并且每个主题有 3 个分区。
自动提交 最简单的提交方式是让消费者自动提交偏移量。如果 enable.auto.commit 被设为 true,那么每过 5s,消费者会自动把从 poll() 方法接收到的最大偏移量提交上去。 消费者每次获取新数据时都会先把上一次poll()方法返回的最大偏移量提交上去。 但如果这是发生在关闭消费者或再均衡前的最后一次提交,就要确保能够提交成功。 消费者 API 提供了再均衡监听器,以下程序可以做到 kafka 消费数据的 Exactly Once 语义: package com.bonc.rdpe.kafka110.consumer; import ; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecords
consumer group 当有多个应用程序都需要从Kafka获取消息时,让每个app对应一个消费者组,从而使每个应用程序都能获取一个或多个Topic的全部消息;在每个消费者组中,往消费者组中添加消费者来伸缩读取能力和处理能力 ,消费者组中的每个消费者只处理每个Topic的一部分的消息,每个消费者对应一个线程。 Kafka 当前只能允许增加一个主题的分区数。 我们有时候可以看到ILLEGAL_GENERATION的错误,就是kafka在抱怨这件事情。 消费的两种方式 1.consumer.assign assign方法由用户直接手动consumer实例消费哪些具体分区,根据api上述描述,assign的consumer不会拥有kafka的group
应用从Kafka中读取数据需要使用KafkaConsumer订阅主题,然后接收这些主题的消息。在我们深入这些API之前,先来看下几个比较重要的概念。 .*"); 拉取循环 消费数据的API和处理方式很简单,我们只需要循环不断拉取消息即可。 在正常情况下,消费者会发送分区的提交信息到Kafka,Kafka进行记录。当消费者宕机或者新消费者加入时,Kafka会进行重平衡,这会导致消费者负责之前并不属于它的分区。 另外一个解决办法是,使用异步提交的API。 Kafka的API允许我们在消费者新增分区或者失去分区时进行处理,我们只需要在调用subscribe()方法时传入ConsumerRebalanceListener对象,该对象有两个方法: public
Kafka 消费者经常会做一些高延迟的操作,比如把数据写到数据库或 HDFS,或者使用数据进行比较耗时的计算。 一旦消费者订阅了主题,轮询就会处理所有的细节,包括消费者群组协调、分区再均衡、发送心跳和获取数据,开发者只需要使用一组简单的 API 来处理从分区返回的数据。轮询不只是获取数据那么简单。 KafkaConsumer API 提供了很多种方式来提交偏移量:自动提交偏移量、手动提交偏移量。 再均衡监听器在【分区再均衡前后】、【消费者开始读取消息之前】、【消费者停止读取消息之后】我们可以通过消费者 API 执行一些应用程序代码,在调用 kafkaConsumer 的 subscribe() 权威指南》第 4 章:Kafka 消费者——从 Kafka 读取数据
Kafka 消费者 1. Kafka 消费方式 2 Kafka 消费者工作流程 2.1 消费者总体工作流程 2.2 消费者组原理 Consumer Group(CG):消费者组,由多个consumer组成。 消费者 API 3.1 独立消费者案例(订阅主题) 1)需求: 创建一个独立消费者,消费 first 主题中数据。 注意在消费者 API 代码中必须配置消费者组 id。 Kafka可以同时使用多个分区分配策略。 -参数名称 -描述 heartbeat.interval.ms Kafka 消费者和 coordinator 之间的心跳时间,默认 3s。 因 此Kafka还提供了手动提交offset的API。 手动提交offset的方法有两种:分别是commitSync(同步提交)和commitAsync(异步提交)。
优雅的退出消费者程序 package com.bonc.rdpe.kafka110.consumer; import java.util.Arrays; import java.util.Properties ,线程的数量受限于分区数,当消费者线程的数量大于分区数时,就有一部分消费线程一直处于空闲状态 多线程消费者的线程实现类代码如下: package com.bonc.rdpe.kafka110.thread 独立消费者 有时候你可能只需要一个消费者从一个主题的所有分区或者某个特定的分区读取数据。这个时候就不需要消费者群组和再均衡了,只需要把主题或者分区分配给消费者,然后开始读取消息并提交偏移量。 一个消费者可以订阅主题(并加入消费者群组),或者为自己分配分区,但不能同时做这两件事情。 以下是独立消费者的示例代码: package com.bonc.rdpe.kafka110.consumer; import java.util.ArrayList; import java.util.List
消费者将记住他们上次离开时的偏移量 消费者组每个分区都有自己的偏移量 Kafka消费者分担负载 Kafka消费者将消费在一个消费者组内的消费者实例上所划分的分区。 消费者组中的每个消费者都是分区的“公平共享”的独家消费者。这就是Kafka如何在消费者组中对消费者进行负载平衡。消费者组内的消费者成员资格由Kafka协议动态处理。 Kafka消费者故障转移 消费者在成功处理记录之后通知Kafka Broker,从而将偏移量提前。 Kafka消费者可以消费哪些记录?消费者无法读取未复制的数据。Kafka消费者只能消费分区之外的“高水印”偏移量的消息。 管理故障切换(每个进程运行X个消费者线程)也更简单,因为您可以允许Kafka首当其冲的工作。 Kafka消费者回顾 什么是消费者组?
简介 消费者组是 Kafka 独有的概念,消费者组是 Kafka 提供的可扩展且具有容错性的消费者机制。 有多个消费者或消费者实例(Consumer Instance),它们共享一个公共的Group ID。 Group ID是一个字符串,在一个Kafka集群中,它标识唯一的一个Consumer Group。 消费者组作用 传统的消息队列模型的缺陷在于消息一旦被消费,就会从队列中被删除,而且只能被下游的一个Consumer消费。 在新版本的Consumer Group中,采用了将位移保存在Kafka内部主题的方法。
前言 读完本文,你将了解到如下知识点: kafka 的消费者 和 消费者组 如何正确使用 kafka consumer 常用的 kafka consumer 配置 消费者 和 消费者组 什么是消费者? 顾名思义,消费者就是从kafka集群消费数据的客户端, 如下图,展示了一个消费者从一个topic中消费数据的模型 ? 图1 单个消费者模型存在的问题? 这个时候kafka会进行 分区再均衡, 来为这个分区分配消费者,分区再均衡 期间该 Topic 是不可用的, 并且作为一个 被消费者, 分区数的改动将影响到每一个消费者组 , 所以在创建 topic 至此,消费者都知道自己的消费的分区, 分区过程结束, 当发生 分区再均衡 的时候, leader 将会重复分配过程 实践——kafka 消费者的使用 咱们以 java api 为例,下面是一个简单的 PartitionAssignor 根据给定的消费者和主题, 决定哪些分区应该被分配给哪个消费者。 Kafka 有两个默认的分配策略。
针对以上问题,Kafka 的提供了独立消费者模式,可以消费者可以指定分区进行消费,如果只用一个 topic,每个消息源启动一个生产者,分别发往不同的分区,消费者指定消费相关的分区即可,用如下图所示: ? 但是 Kafka 独立消费者也有它的限定场景: 1、 Kafka 独立消费者模式下,Kafka 集群并不会维护消费者的消费偏移量,需要每个消费者维护监听分区的消费偏移量,因此,独立消费者模式与 group 2、group 模式的重平衡机制在消费者异常时可将其监听的分区重分配给其它正常的消费者,使得这些分区不会停止被监听消费,但是独立消费者由于是手动进行监听指定分区,因此独立消费者发生异常时,并不会将其监听的分区进行重分配 因此,在该模式下,独立消费者需要实现高可用,例如独立消费者使用 K8s Deployment 进行部署。 下面将演示如何使用 Kafka#assgin 方法手动订阅指定分区进行消费: public static void main(String[] args) { Properties kafkaProperties
这种特性决定了kafka可以消费历史消息,而且按照消息的顺序消费指定消息,而不是只能消费队头的消息。 kafka早期的版本把消费者组和partition的offset直接维护在ZK中,但是读写的性能消耗太大了。 /kafka-topics.sh --topic __connsumer_offsets --describe --zookeeper localhost:2181 看起来这些分区副本在3个Broker /kafka-console-consumer.sh --topic __consumer_offsets --bootstrap-server localhost --formatter "kafka.coordinator.group.GroupMetadataManager /kafka-topic.sh --create --zookeeper localhost:2181 --partition 5 --replication-factor 1 --topic test
温馨提示:整个 Kafka 专栏基于 kafka-2.2.1 版本。 那如果其中一个消费者宕机或新增一个消费者,那队列能动态调整吗? 答案是会重新再次平衡,例如如果新增一个消费者 c3,则c1,c2,c3都会负责2个分区的消息消费,分区重平衡会在后续文章中重点介绍。 kafka 对 poll loop 行为的控制参数 Kafka 提供了如下两个参数来控制 poll 的行为: max.poll.interval.ms 允许 两次调用 poll 方法的最大间隔,即设置每一批任务最大的处理时间 void close() 关闭消费者。 void close(Duration timeout) 关闭消费者。 void wakeup() 唤醒消费者。 int defaultApiTimeoutMs 为所有可能阻塞的API设置一个默认的超时时间。 List< PartitionAssignor> assignors 分区分配算法(分区负载算法)。
本篇单独聊聊Kafka的消费者,包括如下内容: 消费者和消费者组 如何创建消费者 如何消费消息 消费者配置 提交和偏移量 再均衡 结束消费 消费者和消费者组 概念 Kafka消费者对象订阅主题并接收Kafka Kafka消费者是消费者组的一部分。一个消费者组里的消费者订阅的是同一个主题,每个消费者接收主题一部分分区的消息。 代码样例: consumer.subscribe(Collections.singletonList(topic)); 轮询消费 消息轮询是消费者API的核心,消费者通过轮询 API(poll) 向服务器定时请求数据 基于这个原因,Kafka 也提供了手动提交偏移量的 API,使得用户可以更为灵活的提交偏移量。 手动提交: 用户可以通过将 enable.auto.commit 设为 false,然后手动提交偏移量。 而按照 Kafka API,手动提交偏移量又可以分为同步提交和异步提交。同步提交:通过调用 consumer.commitSync() 来进行同步提交,不传递任何参数时提交的是当前轮询的最大偏移量。
本篇单独聊聊Kafka的消费者,包括如下内容: 消费者和消费者组 如何创建消费者 如何消费消息 消费者配置 提交和偏移量 再均衡 结束消费 消费者和消费者组 概念 Kafka消费者对象订阅主题并接收Kafka Kafka消费者是消费者组的一部分。一个消费者组里的消费者订阅的是同一个主题,每个消费者接收主题一部分分区的消息。 代码样例: consumer.subscribe(Collections.singletonList(topic)); 轮询消费 消息轮询是消费者API的核心,消费者通过轮询 API(poll) 向服务器定时请求数据 基于这个原因,Kafka 也提供了手动提交偏移量的 API,使得用户可以更为灵活的提交偏移量。 而按照 Kafka API,手动提交偏移量又可以分为同步提交和异步提交。
Kafka Connect基本概念介绍 Kafka Connect是一个用于将数据流输入和输出Kafka的框架。 因此,失败的task不会被框架自动重新启动,应该通过REST API重新启动。 ? root@txy-server2 /usr/local/src]# wget https://d1i4a15mxbxib1.cloudfront.net/api/plugins/confluentinc ---- Kafka Connect Source和MySQL集成 首先我们要知道rest服务提供了一些API去操作connector,如下表: ? 首先,我们需要调用Rest API新增一个Sink类型的connector。
Kafka Stream概念及初识高层架构图 Kafka Stream是Apache Kafka从0.10版本引入的一个新Feature,它提供了对存储于Kafka内的数据进行流式处理和分析的功能。 Kafka Stream的基本概念: Kafka Stream是处理分析存储在Kafka数据的客户端程序库(lib) 由于Kafka Streams是Kafka的一个lib,所以实现的程序不依赖单独的环境 因此,我们在使用Stream API前需要先创建两个Topic,一个作为输入,一个作为输出。 :2181 --replication-factor 1 --partitions 1 --topic output-topic 由于之前依赖的kafka-clients包中没有Stream API,所以需要另外引入 > <version>2.5.0</version> </dependency> 接下来以一个经典的词频统计为例,演示一下Stream API的使用。
五类Kafka客户端作用和区别 在上文中介绍了如何搭建一个Kafka服务,那么在开发中我们要如何去访问、集成Kafka呢?这就需要使用到本文将要介绍的Kafka客户端API。 这些客户端通过API与Kafka进行集成,Kafka的五类客户端API类型如下: AdminClient API:允许管理和检测Topic、broker以及其他Kafka实例,与Kafka自带的脚本命令作用类似 Producer API:发布消息到1个或多个Topic,也就是生产者或者说发布方需要用到的API Consumer API:订阅1个或多个Topic,并处理产生的消息,也就是消费者或者说订阅方需要用到的 API Stream API:高效地将输入流转换到输出流,通常应用在一些流处理场景 Connector API:从一些源系统或应用程序拉取数据到Kafka,如上图中的DB ---- 创建工程 在接下来的篇章中将会演示 的组件都会使用到这两个API,因为通过这两个API可以获取到Topic自身和周边的详细信息 ---- 创建Topic 使用createTopics方法可以创建Topic,传入的参数也与kafka-topics.sh
CKafka(Cloud Kafka)是一个分布式的、高吞吐量、高可扩展性的消息系统,100%兼容开源 Kafka API(0.9版本)。Ckafka 基于发布/订阅模式,通过消息解耦,使生产者和消费者异步交互,无需彼此等待。Ckafka 具有数据压缩、同时支持离线和实时数据处理等优点,适用于日志压缩收集、监控数据聚合等场景。
扫码关注腾讯云开发者
领取腾讯云代金券