首页
学习
活动
专区
工具
TVP
发布
精选内容/技术社群/优惠产品,尽在小程序
立即前往

Spark Streaming消费Kafka数据的两种方案

欢迎您关注《大数据成神之路》 下午的时候翻微信看到大家在讨论Spark消费Kafka的方式,官网中就有答案,只不过是英文的,当然很多博客也都做了介绍,正好我的收藏夹中有一篇文章供大家参考。...Receivers 的实现使用到 Kafka 高级消费者 API。...这个是 Spark 内存控制的第一步,填充 currentBuffer 是阻塞的,消费 Kafka 的线程直接做填充。...当作业需要处理的数据来临时,Spark 通过调用 Kafka 的低级消费者 API 读取一定范围的数据。这个特性目前还处于试验阶段,而且仅仅在 Scala 和 Java 语言中提供相应的 API。...如果采用 Receiver-based Approach,消费 Kafka数据处理是被分开的,这样就很不好做容错机制,比如系统宕掉了。

3.3K42
您找到你想要的搜索结果了吗?
是的
没有找到

kafka查看消费数据

但是对于最新版本,kafka-run-class.sh 已经不能使用,必须使用另外一个脚本才行,它就是kafka-consumer-groups.sh 普通版 查看所有组 要想查询消费数据,必须要指定组...指定自己的分组 自己消费的topic会显示kafka总共有多少数据,以及已经被消费了多少条 标记解释: TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG...也就是说,消费数据没有积压的情况!...注意:以kafkaspout类作为消费者去读kafka数据,相当于直接从kafka server上取文件,没有消费者组的概念 每次读的数据存在自己zk的offet中,所以不能通过上述命令查看 ACL版查看... kafka-1.default.svc.cluster.local:9092 --group usercenter 如果需要使用shell脚本,来检测kafka消费数据,有没有积压。

6.4K10

ClickHouse系列--消费kafka数据

kafka_topic_list :Kafka 主题列表,多个主题用逗号分隔. kafka_group_name :消费者组. kafka_format – Message format....- 再次查看数据,发现数据为空 cdh04 :) select count(*) from kafka_user_behavior; SELECT count(*) FROM kafka_user_behavior...Kafka表引擎只是一个数据管道,我们可以通过物化视图的方式访问Kafka中的数据。...; -- 查询,多次查询,已经被查询的数据依然会被输出 cdh04 :) select * from kafka_user_behavior; Note: Kafka消费表不能直接作为结果表使用。...Kafka消费表只是用来消费Kafka数据,没有真正的存储所有数据。 这里还有一个疑问: 在众多资料中,kafka示例消息都是最简单的json格式,如果消息格式是复杂类型呢?是否支持?

97830

kafka丢失和重复消费数据

Kafka作为当下流行的高并发消息中间件,大量用于数据采集,实时处理等场景,我们在享受他的高并发,高可靠时,还是不得不面对可能存在的问题,最常见的就是丢包,重发问题。...当消费消费的速度很慢的时候,可能在一个session周期内还未完成,导致心跳机制检测报告出问题。 底层根本原因:已经消费数据,但是offset没提交。...配置问题:设置了offset自动提交 解决办法:至少发一次+去重操作(幂等性) 问题场景: 设置offset为自动提交,正在消费数据,kill消费者线程; 设置offset为自动提交,关闭kafka时,...,导致一个周期内,offset还未提交;继而重复消费,但是业务逻辑可能采用发送kafka或者其他无法回滚的方式; 重复消费最常见的原因:re-balance问题,通常会遇到消费数据,处理很耗时,导致超过了...以上就是大数据中的kafka丢失和重复消费数据的详细内容

1.2K20

如何管理Spark Streaming消费Kafka的偏移量(三)

前面的文章已经介绍了在spark streaming集成kafka时,如何处理其偏移量的问题,由于spark streaming自带的checkpoint弊端非常明显,所以一些对数据一致性要求比较高的项目里面...在spark streaming1.3之后的版本支持direct kafka stream,这种策略更加完善,放弃了原来使用Kafka的高级API自动保存数据的偏移量,之后的版本采用Simple API...本篇文章,会再介绍下,如何手动管理kafka的offset,并给出具体的代码加以分析: 版本: apache spark streaming2.1 apache kafka 0.9.0.0 手动管理offset...的注意点: (1)第一次项目启动的时候,因为zk里面没有偏移量,所以使用KafkaUtils直接创建InputStream,默认是从最新的偏移量开始消费,这一点可以控制。...(2)如果非第一次启动,zk里面已经存在偏移量,所以我们读取zk的偏移量,并把它传入到KafkaUtils中,从上次结束时的偏移量开始消费处理。

1.1K60

如何管理Spark Streaming消费Kafka的偏移量(二)

上篇文章,讨论了在spark streaming中管理消费kafka的偏移量的方式,本篇就接着聊聊上次说升级失败的案例。...事情发生一个月前,由于当时我们想提高spark streaming程序的并行处理性能,于是需要增加kafka分区个数,,这里需要说下,在新版本spark streaming和kafka的集成中,按照官网的建议...spark streaming的executors的数量要和kafka的partition的个数保持相等,这样每一个executor处理一个kafka partition的数据,效率是最高的。...接下来我们便增加了kafka分区的数量,同时修改了spark streaming的executors的个数和kafka的分区个数一一对应,然后就启动了流程序,结果出现了比较诡异的问题,表现如下: 造几条测试数据打入...,让其从最早的数据开始消费处理,这样以来因为旧的分区被删除,只有新分区有数据,所以相当于是把丢失的那部分数据给修复了。

1.1K40

如何管理Spark Streaming消费Kafka的偏移量(一)

本篇我们先从理论的角度聊聊在Spark Streaming集成Kafka时的offset状态如何管理。...spark streaming 版本 2.1 kafka 版本0.9.0.0 在这之前,先重述下spark streaming里面管理偏移量的策略,默认的spark streaming它自带管理的offset...直接创建InputStream流,默认是从最新的偏移量消费,如果是第一次其实最新和最旧的偏移量时相等的都是0,然后在以后的每个批次中都会把最新的offset给存储到外部存储系统中,不断的做更新。...场景三: 对正在运行的一个spark streaming+kafka的流式项目,我们在程序运行期间增加了kafka的分区个数,请注意:这个时候新增的分区是不能被正在运行的流式项目感应到的,如果想要程序能够识别新增的分区...,这样就会丢失一部分数据

1.6K70

kafka消费异常

背景开发过程中碰到了一个问题,某个top一直在消费,而一直存在,偏移量不增不减就在那。这个小组里面有6个topic,其余5个都消费很快,只有这个topicC出现了阻塞。...导致超时未上报给kafka服务端,服务端认为消费失败了,不更新offset。但是根据日志提示:offset提交请求失败,因为消费者已经不是一个活跃的组内了。为啥既然不是活跃的组内,还能消费消息呢?...难道服务端只禁止了不活跃的消费者提交offse,而不禁止消费?解决方法方法肯定是将客户端topicC消费中的业务逻辑改为异步处理,及时上报。解决了这个问题。offset恢复正常。...但是不知道这个提示与消费的矛盾具体是什么原理。

22520

kafka :聊聊如何高效的消费数据

前言 之前写过一篇《从源码分析如何优雅的使用 Kafka 生产者》 ,有生产者自然也就有消费者。 建议对 Kakfa 还比较陌生的朋友可以先看看。...也用 Kafka 消费过日均过亿的消息(不得不佩服 Kakfa 的设计),本文将借助我使用 Kakfa 消费数据的经验来聊聊如何高效的消费数据。...先来谈谈最简单的单线程消费,如下图所示: 由于数据散列在三个不同分区,所以单个线程需要遍历三个分区将数据拉取下来。...消费组自平衡 这个 Kafka 已经帮我做好了,它会来做消费组里的 Rebalance。 比如上面的情况,3 个分区却有 4 个消费实例;最终肯定只有三个实例能取到消息。...---- 当我关掉进程2,再发送10条数据时会发现所有数据又被进程1里的三个线程消费了。 通过这些测试相信大家已经可以看到消费组的优势了。

90130

kafka重复消费解决方案_kafka重复消费原因

这篇博客呢,就跟大家一起聊一下 kafka 消费者如何消费的?如何避免重复消费?...二、消费消费流程 消费流程: 从zk获取要消费的partition 的leader的位置 以及 offset位置 拉数据,这里拉数据是直接从broker的pagecash拉取,零拷贝 ,所以很快。...如果pagecash数据不全,就会从磁盘中拉取,并发送 消费完成后,可以手动提交offset,也可以自动提交offset。 消费策略有哪些?...其中用到了Direct Memory Access 技术,可以把数据直接从内核空间传递到网卡设备,kafka中把数据直接从磁盘复制到 pagecash,给消费者读取,如图: 零拷贝其实不是没有拷贝...我们知道kafka,整个topic有多个partition,每个partition内的消息是有顺序的。 五、如何延迟消费kafka是无状态的,没有延迟的功能。

1.8K10

Kafka 生产与消费

当写Leader成功后就返回,其他的replica都是通过fetcher去同步的,所以kafka是异步写,主备切换可能丢数据。...也正是由于这个高水位延迟一轮,在一些情况下,kafka会出现丢数据和主备数据不一致的情况,0.11开始,使用leader epoch来代替高水位。 3....首先想出来的: 生产者重做导致重复写入消息----生产保证幂等性 消费者重复消费---消灭重复消费,或者业务接口保证幂等性重复消费也没问题 由于业务接口是否幂等,不是kafka能保证的,所以kafka这里提供的...exactly once是有限制的,消费者的下游也必须是kafka。...所以一下讨论的,没特殊说明,消费者的下游系统都是kafka(注:使用kafka conector,它对部分系统做了适配,实现了exactly once)。 生产者幂等性好做,没啥问题。

1.1K51

kafka消费

consumer group 当有多个应用程序都需要从Kafka获取消息时,让每个app对应一个消费者组,从而使每个应用程序都能获取一个或多个Topic的全部消息;在每个消费者组中,往消费者组中添加消费者来伸缩读取能力和处理能力...Kafka 当前只能允许增加一个主题的分区数。...我们有时候可以看到ILLEGAL_GENERATION的错误,就是kafka在抱怨这件事情。...1.GroupCoordinator broker端的,每个kafka server都有一个实例,管理部分的consumer group和它们的offset,对于 consumer group 而言,是根据其...消费的两种方式 1.consumer.assign assign方法由用户直接手动consumer实例消费哪些具体分区,根据api上述描述,assign的consumer不会拥有kafka的group

93110

kafka消费入门

基本概念Topic 主题消费组 (一个topic可以有多个topic)消费者(一个消费者必须属于一个消费组,一个topic可以有多个消费者)分区消费者的分区消息,是可以自己选择的,有分区器消费的必要处理...消费者poll做的事情offset位移提交分区中的offset消费中的offset消费者的位移存储在__consumer_offsets中也可以指定位移消费自动提交要解决的问题重复消费(手动提交处理)消息丢失...(手动提交处理)kafka的再均衡问题:再均衡期间,消费者无法读取到消息(可能会发生重复消费消费者拦截器拦截三种行为onConsumonCommitclose消费者类KafkaConsumer是非线程安全的多线程处理每个线程一个...KafkaConsumer实例多个消费者线程消费同一个分区一个消费者,多线程处理消息重要的参数fetch.min(max).bytes一次拉取的消息的数量fetch.max.wait.ms消息时间max.partition.fetch.byts...每个分区返回给consumer最大数据max.pool.records最大小橡树

15500
领券