腾讯云
开发者社区
文档
建议反馈
控制台
登录/注册
首页
学习
活动
专区
圈层
工具
MCP广场
文章/答案/技术大牛
搜索
搜索
关闭
发布
文章
问答
(9999+)
视频
沙龙
1
回答
如
何在
使用
并发
时
使用
ConsumerAwareErrorHandler
提交
偏移量
?
apache-kafka
、
kafka-consumer-api
、
spring-kafka
如果出现错误,
ConsumerAwareErrorHandler
接口将提供对ConsumerRecord和Consumer In handle方法的引用。现在,在处理错误之后,我们可以
提交
偏移量
或忽略
偏移量
。但我想知道,当我们
使用
多个线程(ConcurrentKafkaListenerContainerFactory.setConcurrency).
时
,它将如何表现如果由于不同的原因,最新的记录在前一条记录之前出错,我们
提交
了最新记录的
偏移量
,但是前一
浏览 44
提问于2019-01-02
得票数 1
回答已采纳
2
回答
卡夫卡当发生这种情况
时
,
提交
的
偏移量
大于客户端实际处理的最后一条消息的
偏移量
。
apache-kafka
、
confluent-platform
我正在阅读权威指南,
并发
现了这个短语。 不知道这是什么时候发生的?我理解了另一种用例,在这种情况下,
提交
的
偏移量
较小,在这种情况下,
使用
者将不得不放弃
提交
最新的
偏移量
。
浏览 1
提问于2020-06-06
得票数 0
回答已采纳
1
回答
kafka消费者在重新平衡
时
获取不同的
偏移量
apache-kafka
、
apache-camel
我有一个微服务应用程序开发
使用
骆驼-卡夫卡消费卡夫卡的消息。我
使用
4个
并发
实例运行此服务,其中每个实例都有1个
使用
者,这些
使用
者从具有20个分区的主题中消费。在查看日志
时
,注意到当应用程序停止重新平衡
时
,
偏移量
= 12399,但当应用程序开始消费消息
时
,
偏移量
= 12336。currentLeader=LeaderAndEpoch{leader=Optional[testbroker1.local:9093 (id:
浏览 216
提问于2020-11-18
得票数 0
1
回答
使用
Akka Stream和Kafka offset
提交
将事件从Kafka流式传输到Couchbase
scala
、
apache-kafka
、
akka
、
akka-stream
、
alpakka
我正在尝试设计一个Akka Stream,
使用
Alpakka来读取kafka主题中的事件,并将它们放到Couchbase中。logging") .runWith(Sink.seq) 我所谓“不知何故”的意思是,这个流实际上是从主题中读取事件,并将它们作为json文档放到Couchbase中,尽管我不知道如何将消费者
偏移量
提交
给如果我已经清楚地理解了隐藏在Kafka消费者
偏移量
后面的主要思想,在任何故障或重启发生的情况下,流从最后
提交
的
偏移量
读取所有消息,并且由于我们
浏览 21
提问于2019-10-18
得票数 0
1
回答
Kafka with Java:如何重新读取数据
java
、
apache-kafka
、
commit
、
kafka-consumer-api
在我的代码中,没有像预期的那样工作,这意味着“如果kafka崩溃,那么从
提交
的位置重新启动”。但是,当我的DB事务失败
时
,即使我不
使用
commitAsync(),指针也会向前移动。<code>A3</code> 但这需要手动维护分区及其
偏移量
的列表,我猜还有更简单、更优雅的东西?
浏览 31
提问于2018-12-21
得票数 1
回答已采纳
1
回答
重试消费来自Kafka主题的消息
apache-kafka
、
kafka-consumer-api
、
spring-cloud-stream
我正在开发一个模块,在这个模块中,它
使用
来自Kafka主题的消息
并发
布到下游系统。在下游系统不可用的情况下,消费者不要确认Kakfa消息。因此,当我的消费者在下游系统不可用时收到消息
时
,kakfa的
偏移量
将不会被
提交
。但如果我在下游系统启动后收到新消息,并且当我确认该消息
时
,最新的
偏移量
将被
提交
,消费者将永远不会收到没有
偏移量
提交
的主题中的那些消息。 也就是说,假设我的消费者被消耗到
偏移量
4。当下游
浏览 17
提问于2019-08-01
得票数 0
2
回答
在Kafka消费者中实现
并发
parallel-processing
、
apache-kafka
、
kafka-consumer-api
、
consumer
因此,我们尝试使API调用异步,并在其响应中增加
偏移量
。然而,我们看到了一个问题: 通过将API调用设置为异步,我们可以首先获得最后一条记录的响应,并且到那时前一条记录的API调用都还没有启动或完成。如果我们一收到最后一条记录的响应就
提交
偏移量
,那么
偏移量
就会变成最后一条记录。同时,如果
使用
者重新启动或分区重新平衡,我们将不会收到
提交
偏移量
为的最后一条记录之前的任何记录。
浏览 2
提问于2019-06-19
得票数 2
1
回答
Kafka -动态/任意分区
asynchronous
、
apache-kafka
、
job-scheduling
然而,根据我所读到的,当我真的想要无限或动态的分区
时
,似乎需要预先定义分区的数量(理想情况下,每条消息都有自己的分区动态创建)。例如,在单个分区上,我可以让
使用
者读取第一条消息
并发
出异步请求。它提供了一个回调函数,它将该
偏移量
提交
给Kafka。当该请求等待
时
,我的
使用
者将读取下一条消息
并发</em
浏览 3
提问于2015-05-08
得票数 1
回答已采纳
2
回答
不
提交
来自Kafka 10消费者的消费消息
apache-kafka
、
kafka-consumer-api
如果批处理由于任何原因而失败,我需要再次
使用
相同的消息集并重复该过程。因此,对于每个批处理,每个分区的from和to
偏移量
都存储在数据库中。为了实现这一点,我通过将分区分配给读取器来为每个分区创建一个Kafka消费者,基于先前存储的
偏移量
,消费者将查找到该位置并开始读取。我已经关闭了自动
提交
,并且我不
提交
来自消费者的
偏移量
。对于每个批处理,我为每个分区创建一个新的
使用
者,从存储的最后一个
偏移量
读取消息
并发
布到外部系统。在不
提交
浏览 0
提问于2016-11-06
得票数 3
1
回答
如何
使用
spring KafkaMessageListenerContainer.java确保消息不丢失
kafka-consumer-api
、
spring-kafka
一旦我从kafka收到一条消息,我需要运行一个长时间运行的进程(最多需要20秒),只有当这个过程完成
时
,我才需要将一条消息视为成功。
使用
MessageListener但是,我唯一的问题是,如果带有
偏移量
的特定消息(例如15 )首先被成功处理,但是带有14的消息仍在处理,那么会发生什么情况。因此,在这种情况
浏览 1
提问于2016-08-14
得票数 1
1
回答
Kafka有批量消费者吗?
apache-kafka
如果消费者想要处理这些消息并将其
提交
给其他下游消费者,比如Solr或Elastic-Search,这对他们来说可能是一个很大的问题,因为他们更喜欢批量发送消息,而不是一次只有一个。在内存中对这些消息进行批处理也不是一件容易的事,因为Kafka中的
偏移量
只有在批处理已经
提交
时
才需要同步,否则具有未
提交
下游消息的崩溃的kafka消费者(
如
Solr或ES)的
偏移量
将已经更新,从而导致消息松散如果消费者在向下游
提交
消息之后但在更新消息
偏移量
之
浏览 2
提问于2016-03-04
得票数 9
1
回答
Akka streams Kafka消费者进程并行
apache-kafka
、
akka
、
akka-stream
、
alpakka
、
akka-kafka
我正在开发一个
使用
Akka Kafka连接器的Kafka消费者应用程序。我希望消费者并行处理消息。我应该选择哪个消费群体??如
何在
消费者端配置并行度?
浏览 16
提问于2020-05-06
得票数 0
2
回答
如
何在
Spring Kafka中关闭offsets
提交
,以便在本地存储offsets?
apache-kafka
、
spring-kafka
我想
使用
Spring Kafka将事件处理到本地数据库中,并将
偏移量
存储在数据库本身中,
如
“在Kafka之外存储
偏移量
”下的https://kafka.apache.org/25/javadoc/index.htmlSpring Kafka (带有KafkaListener)似乎总是
提交
偏移量
(因此需要group.id),有没有办法完全禁用
提交
,允许在没有组的情况下进行侦听?
浏览 46
提问于2020-07-03
得票数 1
回答已采纳
1
回答
从kafka高级消费者获取
偏移量
java
、
apache-kafka
因为我
使用
的是自定义的
提交
偏移量
属性,所以我想测试一下我的自定义
提交
偏移量
是否工作正常。谁能告诉我如何获得补偿?我遇到了几个kafka工具(
如
getoffsetshell),但它对我的测试没有帮助。
浏览 1
提问于2015-12-07
得票数 1
2
回答
我应该如何为卡夫卡原子消费者设定卡夫卡制作者的属性?
apache-kafka
我正在浏览,它解释了如何通过执行以下操作来确保消息被正确地处理一次: 通过调用consumer.commitAsync()或consumer.commitSync()手动
提交
对Kafka的
偏移量
在此,我将
偏移量
保存到外部数据库。因
浏览 0
提问于2019-10-25
得票数 2
1
回答
无法从zookeeper检索主题/组/分区的kafka
偏移量
apache-zookeeper
、
apache-kafka
我们正在
使用
HL消费者,他们会像预期的那样访问kafka队列中的数据,在重启
时
,他们会从中断的地方重新开始。因此,消费者的行为符合预期。问题是当我们
使用
zkCli.sh
时
,我们在zookeeper中看不到
偏移量
。现在,
使用
者被设置为仅针对一个主题在一个分区中运行。因为当消费者停止并再次重启
时
,消费者的行为
如
预期的那样(即,它从上次运行时停止的
偏移量
中恢复。我们可以通过查看日志来判断这一点,日志给出了它开始的
偏移量
,并
浏览 2
提问于2015-02-04
得票数 1
5
回答
使用
高级
使用
者的Kafka延迟队列实现
java
、
scala
、
messaging
、
apache-kafka
、
kafka-consumer-api
希望
使用
高级
使用
者api实现延迟
使用
者。
提交
每个
偏移量
可能会减慢ZK的速度。如果是,我将
使用
相同的消息两次(
浏览 5
提问于2015-08-02
得票数 28
回答已采纳
1
回答
如何
使用
Camel-kafka
提交
消费者
偏移量
?
java
、
apache-camel
我正在
使用
apache camel集成我的kafka消息。此外,我还
使用
JAVA DSL来
使用
来自kafka端点的消息。
使用
apache kafka API,它知道如何
提交
具有给定属性切换的消费者
偏移量
。kafka.co
浏览 9
提问于2015-09-21
得票数 5
1
回答
批量记录处理后如何
提交
kafka
偏移量
java
、
performance
、
spring-kafka
我正在
使用
spring-kafka和消费卡夫卡主题的批量记录,并通过AbstractMessageListenerContainer.AckMode.BATCH
提交
偏移量
。在我的例子中,处理批处理记录需要时间(大约20秒),
使用
者线程等待批处理过程完成,然后再次执行轮询(在此轮询
时
提交
偏移量
)。(在此过程中,
使用
者线程将一直等待,直到它从ProcessThread获得结果,这会导致性能低下。 有没有办法让ProcessThread负责将offset<e
浏览 6
提问于2018-10-03
得票数 1
1
回答
RetryingBatchErrorHandler -
偏移量
提交
处理
spring-kafka
我正在
使用
spring-kafka 2.3.8,并尝试
使用
RetryingBatchErrorHandler记录恢复的记录并
提交
偏移量
。您将如
何在
恢复程序中
提交
偏移量
?
浏览 5
提问于2021-10-12
得票数 0
点击加载更多
热门
标签
更多标签
云服务器
对象存储
ICP备案
云点播
腾讯会议
活动推荐
运营活动
广告
关闭
领券