学习
实践
活动
专区
工具
TVP
写文章

Kafka核心API——Consumer消费者

Consumer之自动提交 在上文中介绍了Producer API的使用,现在我们已经知道如何将消息通过API发送到Kafka中了,那么现在的生产者/消费者模型就还差一位扮演消费者的角色了。 因此,本文将介绍Consumer API的使用,使用APIKafka中消费消息,让应用成为一个消费者角色。 0.0.1:9092"); // 指定group.id,Kafka中的消费者需要在消费者组里 props.setProperty(ConsumerConfig.GROUP_ID_CONFIG 中,当消费者消费数据后,需要提交数据的offset来告知服务端成功消费了哪些数据。 若消费者处理数据失败时,只要不提交相应的offset,就可以在下一次重新进行消费。 和数据库的事务一样,Kafka消费者提交offset的方式也有两种,分别是自动提交和手动提交。

56120
  • 广告
    关闭

    热门业务场景教学

    个人网站、项目部署、开发环境、游戏服务器、图床、渲染训练等免费搭建教程,多款云服务器20元起。

  • 您找到你想要的搜索结果了吗?
    是的
    没有找到

    Kafka 新版消费者 API(一):订阅主题

    ; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecords 重要性:低 说明:我们通过 fetch.min.bytes 告诉 Kafka,等到有足够的数据时才把它返回给消费者。 而 feth.max.wait.ms 则用于指定 broker 的等待时间,默认是如果没有足够的数据流入Kafka消费者获取最小数据量的要求就得不到满足,最终导致 500ms 的延迟。 如果 fetch.max.wait.ms 被设为 100ms,并且 fetch.min.bytes 被设为 1MB,那么 Kafka 在收到消费者的请求后,要么返回 1MB 数据,要么在 100ms 后返回所有可用的数据 Kafka 有两个默认的分配策略。 Range:该策略会把主题的若干个连续的分区分配给消费者。假设消费者 C1 和消费者 C2 同时订阅了主题 T1 和主题 T2,并且每个主题有 3 个分区。

    1.9K20

    Kafka消费者

    Kafka 消费者经常会做一些高延迟的操作,比如把数据写到数据库或 HDFS,或者使用数据进行比较耗时的计算。 一旦消费者订阅了主题,轮询就会处理所有的细节,包括消费者群组协调、分区再均衡、发送心跳和获取数据,开发者只需要使用一组简单的 API 来处理从分区返回的数据。轮询不只是获取数据那么简单。 KafkaConsumer API 提供了很多种方式来提交偏移量:自动提交偏移量、手动提交偏移量。 再均衡监听器在【分区再均衡前后】、【消费者开始读取消息之前】、【消费者停止读取消息之后】我们可以通过消费者 API 执行一些应用程序代码,在调用 kafkaConsumer 的 subscribe() 权威指南》第 4 章:Kafka 消费者——从 Kafka 读取数据

    1420

    Kafka 新版消费者 API(四):优雅的退出消费者程序、多线程消费者以及独立消费者

    优雅的退出消费者程序 package com.bonc.rdpe.kafka110.consumer; import java.util.Arrays; import java.util.Properties ,线程的数量受限于分区数,当消费者线程的数量大于分区数时,就有一部分消费线程一直处于空闲状态 多线程消费者的线程实现类代码如下: package com.bonc.rdpe.kafka110.thread 独立消费者 有时候你可能只需要一个消费者从一个主题的所有分区或者某个特定的分区读取数据。这个时候就不需要消费者群组和再均衡了,只需要把主题或者分区分配给消费者,然后开始读取消息并提交偏移量。 一个消费者可以订阅主题(并加入消费者群组),或者为自己分配分区,但不能同时做这两件事情。 以下是独立消费者的示例代码: package com.bonc.rdpe.kafka110.consumer; import java.util.ArrayList; import java.util.List

    2.2K40

    Kafka消费者架构

    消费者将记住他们上次离开时的偏移量 消费者组每个分区都有自己的偏移量 Kafka消费者分担负载 Kafka消费者将消费在一个消费者组内的消费者实例上所划分的分区。 消费者组中的每个消费者都是分区的“公平共享”的独家消费者。这就是Kafka如何在消费者组中对消费者进行负载平衡。消费者组内的消费者成员资格由Kafka协议动态处理。 Kafka消费者故障转移 消费者在成功处理记录之后通知Kafka Broker,从而将偏移量提前。 Kafka消费者可以消费哪些记录?消费者无法读取未复制的数据。Kafka消费者只能消费分区之外的“高水印”偏移量的消息。 管理故障切换(每个进程运行X个消费者线程)也更简单,因为您可以允许Kafka首当其冲的工作。 Kafka消费者回顾 什么是消费者组?

    67290

    kafka 消费者详解

    前言 读完本文,你将了解到如下知识点: kafka消费者消费者组 如何正确使用 kafka consumer 常用的 kafka consumer 配置 消费者消费者组 什么是消费者? 顾名思义,消费者就是从kafka集群消费数据的客户端, 如下图,展示了一个消费者从一个topic中消费数据的模型 ? 图1 单个消费者模型存在的问题? 这个时候kafka会进行 分区再均衡, 来为这个分区分配消费者,分区再均衡 期间该 Topic 是不可用的, 并且作为一个 被消费者, 分区数的改动将影响到每一个消费者组 , 所以在创建 topic 至此,消费者都知道自己的消费的分区, 分区过程结束, 当发生 分区再均衡 的时候, leader 将会重复分配过程 实践——kafka 消费者的使用 咱们以 java api 为例,下面是一个简单的 PartitionAssignor 根据给定的消费者和主题, 决定哪些分区应该被分配给哪个消费者Kafka 有两个默认的分配策略。

    53610

    Kafka 独立消费者

    针对以上问题,Kafka 的提供了独立消费者模式,可以消费者可以指定分区进行消费,如果只用一个 topic,每个消息源启动一个生产者,分别发往不同的分区,消费者指定消费相关的分区即可,用如下图所示: ? 但是 Kafka 独立消费者也有它的限定场景: 1、 Kafka 独立消费者模式下,Kafka 集群并不会维护消费者的消费偏移量,需要每个消费者维护监听分区的消费偏移量,因此,独立消费者模式与 group 2、group 模式的重平衡机制在消费者异常时可将其监听的分区重分配给其它正常的消费者,使得这些分区不会停止被监听消费,但是独立消费者由于是手动进行监听指定分区,因此独立消费者发生异常时,并不会将其监听的分区进行重分配 因此,在该模式下,独立消费者需要实现高可用,例如独立消费者使用 K8s Deployment 进行部署。 下面将演示如何使用 Kafka#assgin 方法手动订阅指定分区进行消费: public static void main(String[] args) { Properties kafkaProperties

    88031

    初始 Kafka Consumer 消费者

    温馨提示:整个 Kafka 专栏基于 kafka-2.2.1 版本。 那如果其中一个消费者宕机或新增一个消费者,那队列能动态调整吗? 答案是会重新再次平衡,例如如果新增一个消费者 c3,则c1,c2,c3都会负责2个分区的消息消费,分区重平衡会在后续文章中重点介绍。 kafka 对 poll loop 行为的控制参数 Kafka 提供了如下两个参数来控制 poll 的行为: max.poll.interval.ms 允许 两次调用 poll 方法的最大间隔,即设置每一批任务最大的处理时间 void close() 关闭消费者。 void close(Duration timeout) 关闭消费者。 void wakeup() 唤醒消费者。 int defaultApiTimeoutMs 为所有可能阻塞的API设置一个默认的超时时间。 List< PartitionAssignor> assignors 分区分配算法(分区负载算法)。

    70620

    Kafka系列3:深入理解Kafka消费者

    本篇单独聊聊Kafka消费者,包括如下内容: 消费者消费者组 如何创建消费者 如何消费消息 消费者配置 提交和偏移量 再均衡 结束消费 消费者消费者组 概念 Kafka消费者对象订阅主题并接收Kafka Kafka消费者消费者组的一部分。一个消费者组里的消费者订阅的是同一个主题,每个消费者接收主题一部分分区的消息。 代码样例: consumer.subscribe(Collections.singletonList(topic)); 轮询消费 消息轮询是消费者API的核心,消费者通过轮询 API(poll) 向服务器定时请求数据 基于这个原因,Kafka 也提供了手动提交偏移量的 API,使得用户可以更为灵活的提交偏移量。 手动提交: 用户可以通过将 enable.auto.commit 设为 false,然后手动提交偏移量。 而按照 Kafka API,手动提交偏移量又可以分为同步提交和异步提交。同步提交:通过调用 consumer.commitSync() 来进行同步提交,不传递任何参数时提交的是当前轮询的最大偏移量。

    34040

    Kafka系列3:深入理解Kafka消费者

    本篇单独聊聊Kafka消费者,包括如下内容: 消费者消费者组 如何创建消费者 如何消费消息 消费者配置 提交和偏移量 再均衡 结束消费 消费者消费者组 概念 Kafka消费者对象订阅主题并接收Kafka Kafka消费者消费者组的一部分。一个消费者组里的消费者订阅的是同一个主题,每个消费者接收主题一部分分区的消息。 代码样例: consumer.subscribe(Collections.singletonList(topic)); 轮询消费 消息轮询是消费者API的核心,消费者通过轮询 API(poll) 向服务器定时请求数据 基于这个原因,Kafka 也提供了手动提交偏移量的 API,使得用户可以更为灵活的提交偏移量。 而按照 Kafka API,手动提交偏移量又可以分为同步提交和异步提交。

    41920

    Kafka核心API——Stream API

    Kafka Stream概念及初识高层架构图 Kafka Stream是Apache Kafka从0.10版本引入的一个新Feature,它提供了对存储于Kafka内的数据进行流式处理和分析的功能。 Kafka Stream的基本概念: Kafka Stream是处理分析存储在Kafka数据的客户端程序库(lib) 由于Kafka Streams是Kafka的一个lib,所以实现的程序不依赖单独的环境 因此,我们在使用Stream API前需要先创建两个Topic,一个作为输入,一个作为输出。 :2181 --replication-factor 1 --partitions 1 --topic output-topic 由于之前依赖的kafka-clients包中没有Stream API,所以需要另外引入 > <version>2.5.0</version> </dependency> 接下来以一个经典的词频统计为例,演示一下Stream API的使用。

    65720

    Kafka核心API——AdminClient API

    五类Kafka客户端作用和区别 在上文中介绍了如何搭建一个Kafka服务,那么在开发中我们要如何去访问、集成Kafka呢?这就需要使用到本文将要介绍的Kafka客户端API。 这些客户端通过APIKafka进行集成,Kafka的五类客户端API类型如下: AdminClient API:允许管理和检测Topic、broker以及其他Kafka实例,与Kafka自带的脚本命令作用类似 Producer API:发布消息到1个或多个Topic,也就是生产者或者说发布方需要用到的API Consumer API:订阅1个或多个Topic,并处理产生的消息,也就是消费者或者说订阅方需要用到的 API Stream API:高效地将输入流转换到输出流,通常应用在一些流处理场景 Connector API:从一些源系统或应用程序拉取数据到Kafka,如上图中的DB ---- 创建工程 在接下来的篇章中将会演示 的组件都会使用到这两个API,因为通过这两个API可以获取到Topic自身和周边的详细信息 ---- 创建Topic 使用createTopics方法可以创建Topic,传入的参数也与kafka-topics.sh

    34910

    关注

    腾讯云开发者公众号
    10元无门槛代金券
    洞察腾讯核心技术
    剖析业界实践案例
    腾讯云开发者公众号二维码

    相关产品

    • 消息队列 CKafka

      消息队列 CKafka

      CKafka(Cloud Kafka)是一个分布式的、高吞吐量、高可扩展性的消息系统,100%兼容开源 Kafka API(0.9版本)。Ckafka 基于发布/订阅模式,通过消息解耦,使生产者和消费者异步交互,无需彼此等待。Ckafka 具有数据压缩、同时支持离线和实时数据处理等优点,适用于日志压缩收集、监控数据聚合等场景。

    相关资讯

    热门标签

    活动推荐

    扫码关注腾讯云开发者

    领取腾讯云代金券