腾讯云
开发者社区
文档
建议反馈
控制台
登录/注册
首页
学习
活动
专区
工具
TVP
最新优惠活动
文章/答案/技术大牛
搜索
搜索
关闭
发布
精选内容/技术社群/优惠产品,
尽在小程序
立即前往
文章
问答
(8661)
视频
沙龙
1
回答
如
何在
mule
中
动态
设置
kafka
偏移
值
,
以便
我们
可以
从
特定
偏移量
开始
处理
、
、
、
我想要
动态
设置
KAFKA
偏移量
,这样我的
处理
就
可以
从
相同的
偏移量
开始
,使用mulesoft...举个例子:
kafka
中
总共有150条消息,我想从
偏移
值
51
开始
处理
,直到最后..在配置
中
,
我们
可以
从
开始
/
从
最新<
浏览 37
提问于2021-08-03
得票数 0
回答已采纳
2
回答
有没有办法获取
kafka
主题的最早可用
偏移量
、
、
、
、
我有一个要求,在系统读取数据之前,我要存储
偏移
信息。因此,下次系统
开始
从
kafka
读取数据时,我需要将系统
中
的旧
偏移量
之间的数据读取到最新
偏移量
。但旧的
偏移量
可能由于
kafka
保留策略而无效。那么,如果
我们
在
kafka
消费者中指定较旧的
偏移量
,会有什么行为呢?另外,有没有办法获得
特定
主题/
偏移量
的最旧
偏移
值</
浏览 60
提问于2020-09-09
得票数 0
1
回答
通过外部触发器寻找
偏移量
这个实现帮助我侦听
特定
的主题,并使用手动ack
处理
消息。现在我需要构建以下功能:让
我们
假设,对于某个环境异常或通过这个主题输入的一些坏数据,我需要在
特定
偏移量
之间重放某个主题上的数据。如果我能够检索这些
偏移量
之间的消息并提供提要,这将是理想的,因为它是一个重播主题,这样新的使用者就
可以
处理
这些消息,从而保持原始主题上的
偏移
完整。我不确定这是否可能 另外,假设我带着一个崩溃时间戳,能不能反思一下这个话题,找出与崩溃对应的
偏移
浏览 2
提问于2018-11-24
得票数 1
回答已采纳
2
回答
我应该如何为卡夫卡原子消费者设定卡夫卡制作者的属性?
我正在浏览,它解释了如何通过执行以下操作来确保消息被正确地
处理
一次: 通过调用consumer.commitAsync()或consumer.commitSync()手动提交对
Kafka
浏览 0
提问于2019-10-25
得票数 2
1
回答
Kafka
consumerGroup丢失了所有分区的已提交
偏移量
信息,并从一
开始
就
开始
使用
偏移量
。
、
、
我一直在使用
kafka
主题的事件,在我的应用程序
中
处理
同样的事件已经有一段时间了。该主题有20个分区,我将
kafka
并发
设置
为10,因为我使用来自
kafka
主题的事件和我的应用程序的2个副本。我将提交模式
设置
为手动立即,因此,一旦应用程序确保事件得到
处理
,我希望提交分区
偏移量
。一切都很好,直到卡夫卡服务器的一个或多个节点被关闭并重新启动的一天。
我们
使用卡夫卡经纪人的3个节点。例如,如果1在主题的
特定
浏览 9
提问于2021-01-30
得票数 2
1
回答
Spark Streaming
中
驱动使用直接
Kafka
API查询分区
偏移量
的频率是多少?
、
是针对每个批
处理
间隔查询
偏移量
还是以不同的频率查询
偏移量
?
浏览 1
提问于2015-11-17
得票数 1
1
回答
如何为消费者
设置
卡夫卡补偿?
、
假设我的主题中已经有10个数据,现在我
开始
编写Flink,消费者将使用第11个数据。 任何帮助都是非常感谢的。
浏览 1
提问于2018-10-31
得票数 2
回答已采纳
1
回答
当您重新启动火花作业时,如果它在输入给
kafka
的数据
中
遇到了意外的格式,会发生什么?
、
、
不一致可能是数据格式问题或垃圾字符,而这些问题可能是无法
处理
的。在这种情况下,
我们
如何解决这个问题?有什么方法
可以
让
我们
进入卡夫卡主题并手动修改数据吗?如果
我们
不修复数据问题并重新启动星火作业,它将读取导致失败的老行,因为
我们
还没有提交检查点。那
我们
怎么摆脱这个循环呢。如何解决卡夫卡主题中的数据问题,以恢复中止的火花作业?
浏览 0
提问于2020-05-12
得票数 3
回答已采纳
3
回答
Apache
Kafka
:消费者状态
、
、
我阅读了
Kafka
网站上的文档,但在尝试实现一个完整的最小示例(生产者->
kafka
->消费者)之后,我不太清楚“消费者状态”是如何
处理
的。一些信息 卡夫卡经纪公司
浏览 5
提问于2013-02-07
得票数 8
回答已采纳
2
回答
是否有一种方法
可以
使用卡夫卡流(而不是通过KafkaConsumer)
从
每次在java
开始
读取消息?
、
每个源表CDC以Avro格式发送到各自的主题(使用
Kafka
模式注册表和
Kafka
服务器)
我们
正在编写java代码来使用avro模式
中
的消息,使用AvroSerde反序列化它并将它们加入,然后发送到不同的主题,
以便
外部系统
可以
使用它们。(直到
我们
确信代码工作正常,并且
可以
重新
开始
接收实时数据)。 在KafkaConsumer对象
中
,
我们
可以
选择使用seekToBeginning方法
浏览 4
提问于2020-08-04
得票数 1
2
回答
Apache
Kafka
消费者组和简单消费者
我刚接触
Kafka
,到目前为止,我对消费者的理解是,基本上有两种类型的实现。2) 高级抽象最重要的部分是当
Kafka
不关心
处理
偏移量
时使用它,而简单的使用者对
偏移量
管理提供了更好的控制。让我困惑的是,如果我想在多线程环境
中
运行消费者,又想控制我使用的消费者组,这是否意味着我必须
从
存储在zookeeper
中
的最后一个
偏移量
开始
读取?这是我唯一的选择吗。
浏览 1
提问于2013-08-01
得票数 7
1
回答
卡夫卡消费者-如何识别
偏移
跳过/缺失
偏移量
?
、
、
、
在这种情况下,信息将在
Kafka
中
删除(由于上述保留期),然后再被消费者捡起和
处理
。在默认配置
中
,使用者将跳过丢失的记录,选择最早可用的
偏移量
。这导致了目标端的不一致。问题:注意:如果使用者只是抛出一个异常/停止/etc,以防它找不到它给定的
偏移量
的消息,
我们
就
可以
了。
我们
试着走远了.
我们
尝
浏览 8
提问于2022-02-01
得票数 0
回答已采纳
2
回答
卡夫卡流在生成主题时不会增加1的
偏移量
。
、
、
、
、
然而,我发现,
我们
的
Kafka
流应用程序并不保证向接收器主题生成记录,而对于每个生成的记录,
偏移量
将增加1。我有一个场景,在
处理
所有数据之前,可能会收到记录。当记录被流应用程序不匹配
处理
时,它们会移动到死信主题,而不是继续往下流。发布新数据时,
我们
将死信主题中的最新消息转储回流应用程序的源主题,
以便
使用新数据进行重新
处理
。死信
处理
器: 在运行应用程序
开始
时,记录每个分区的结束
浏览 0
提问于2019-02-11
得票数 5
回答已采纳
4
回答
Java:当我
开始
从
kafka
主题读取时,如何
从
当前
偏移量
读取
、
每当消费者
开始
阅读一个主题时,它就会
从
主题的开头
开始
阅读,并且需要相当长的时间才能赶上最新的事件。
我们
如何确保消费者
从
当前
偏移量
读取数据?
浏览 0
提问于2016-07-29
得票数 0
4
回答
星火结构流查询总是以auto.offset.rest=earliest
开始
,即使
设置
了auto.offset.reset=latest
、
、
、
我有一个奇怪的问题,试图读取数据
从
卡夫卡使用火花结构化流。我的用例是能够
从
最大的/最新的
偏移量
中
读取主题。我正在使用新的使用者apis,正如文档所示,我只需要将auto.offset.reset
设置
为“最新”,或者将startingOffsets
设置
为“最新”,以确保我的Spark作业
从
卡夫卡
中
每个分区可用的最新
偏移量
开始
消费我还知道,
设置
auto.offset.reset只在第一次启动新
浏览 1
提问于2018-02-14
得票数 4
5
回答
Kafka
Consumer:如
何在
Python
中
从
最后一条消息
开始
消费
、
、
、
我使用的是
Kafka
0.8.1和
Kafka
python-0.9.0。在我的
设置
中
,我有2个
kafka
代理
设置
。当我运行我的
kafka
消费者时,我
可以
看到它从队列
中
检索消息,并跟踪两个代理的
偏移量
。我的问题是,当我重新启动消费者时,它会从头
开始
使用消息。我所期望的是,在重启时,消费者将从它死之前停止的地方
开始
消费消息。我确实试着跟踪Redis
中
的消息
偏移
浏览 3
提问于2014-07-10
得票数 8
5
回答
来自
Kafka
的Spark流有错误numRecords不能为负
、
、
、
这是一种奇怪的错误,因为我仍然将数据推送到
kafka
,并使用来自
kafka
的消息,而Exception in thread "main" java.lang.IllegalArgumentException我有一个服务器是主和代理运行mesos上,我
设置
了3个经纪人卡夫卡那样。然后,我在该集群上运行spark-job。under-replicated-partitions: 0 is-active-controller: 0 然后,我运行spark-
浏览 0
提问于2016-01-25
得票数 11
1
回答
从
较早的
偏移量
Apache Storm读取
、
、
我不想读数据,既不从最新的
偏移量
,也不想从
开始
。有没有办法
从
可配置的
偏移量
中
读取存储在动物园管理员
中
的
偏移量
。风暴为
我们
提供了
从
最新的
偏移量
或从一
开始
阅读的方法。我不想要那个案子。用例:
偏移量
0已部署拓扑。
浏览 2
提问于2016-04-26
得票数 0
回答已采纳
1
回答
重构卡夫卡信息的可能原因
、
、
、
昨天,我
从
日志中发现,卡夫卡在卡夫卡小组协调员发起了一次小组再平衡之后,正在重新消费一些信息。这些消息是两天前使用的(
从
日志
中
确认)。config.Version = version在声明消息之前,
我们
正在
处理
混乱,因此似乎
我们
至少使用了一次
kafka</em
浏览 0
提问于2019-07-02
得票数 0
回答已采纳
2
回答
如何
从
Camel
中
事务性地轮询
Kafka
?
、
、
事件主题被轮询时,如果我的pollEnrich
中
的api.url不可用,则没有检索到业务对象,事件丢失。因此,我需要实现一个事务逻辑,
以便
能够回滚我的路由中的初始
kafka
轮询,
以便
同一事件
可以
被轮询多次,直到api.url将等待的业务对象发送给我。我尝试了几种方法,首先将我的org.apache.camel:camel-
kafka
版本更新到2.22.0,
以便
能够进行手动提交。然后,我尝试实现一个基本的错误
处理
程序(配置了maximumRedelive
浏览 1
提问于2018-08-14
得票数 4
点击加载更多
扫码
添加站长 进交流群
领取专属
10元无门槛券
手把手带您无忧上云
相关
资讯
消息中间件—Kafka数据存储(一)
Kafka 提供哪些日志清理策略?
Kafka消费者API
Python操作分布式流处理系统Kafka
Spark Streaming与Kafka 整合的改进
热门
标签
更多标签
云服务器
ICP备案
实时音视频
对象存储
云直播
活动推荐
运营活动
广告
关闭
领券