腾讯云
开发者社区
文档
建议反馈
控制台
登录/注册
首页
学习
活动
专区
圈层
工具
MCP广场
文章/答案/技术大牛
搜索
搜索
关闭
发布
文章
问答
(2360)
视频
沙龙
1
回答
如
何在
kafka
流
中
处理
给定
时间
范围内
的
key
对应
的
最新
记录
?
、
、
、
、
说明:我只想
处理
密钥
的
最新
唯一事件。我有KafkaStreams kstreams。假设我在kafkaStreams
中
获得了以下事件: {id= "DELHI", event1},{id= "DELHI", event3},{id= "MUMBAI", event5} 现在,我想对它们进行分组(比如在10分钟内),这样我就只有
浏览 0
提问于2020-10-23
得票数 0
2
回答
延迟
处理
事件
的
可行解决方案是什么?
、
给定
系统,它使用来自
Kafka
的
事件流来分析存储在数据库
中
的
一些
记录
。也许还有另一种更方便和可伸缩
的</em
浏览 5
提问于2019-11-14
得票数 1
1
回答
Google如何确定各种来源
的
水印?
、
、
我只是回顾了,以了解Google是如何
处理
水印
的
,它只是提到了非常模糊
的
内容: 我发现了一些东西,表明如果你
的
来源是谷歌PubSub,它已经有一个水印将被提取,但如果源是其他东西呢?例如,一个卡夫卡主题(我认为它本质上没有水印,所以我看不出像这样
的
东西会如何应用)。它是在最后几分钟确定最大滞后,如果是的话,有多少(肯定不会永远如此,因
浏览 6
提问于2022-05-25
得票数 0
3
回答
为什么要使用KStream或KTable?
、
或者它有更多
的
功能。然而,在这方面,我找不到一个好
的
例子。我在一个很好
的
解释工作逻辑
的
来源
中
也看不到它。你能解释一个很好
的
ktable和kstream
的
例子吗?我能做些什么?
浏览 0
提问于2020-03-01
得票数 0
1
回答
如何更改
记录
的
时间
戳?
、
我使用FluentD (最后一个稳定版本)向
Kafka
发送消息。但是FluentD使用
的
是旧
的
KafkaProducer,因此
记录
时间
戳总是设置为-1。因此,当信息到达卡夫卡时,我必须使用WallclockTimestampExtractor将
记录
的
时间
戳设置为
时间
点。 “
时间<
浏览 2
提问于2017-10-12
得票数 6
回答已采纳
1
回答
星火卡夫卡
流
-发送原始
时间
戳而不是当前
时间
戳
、
我正在使用火花结构化
流
发送
记录
到一个卡夫卡主题。
kafka
主题是用config - message.timestamp.type=CreateTime创建
的
。这样做是为了使目标卡夫卡主题
记录
具有与原始
记录
相同
的
时间
戳。我的卡夫卡流媒体代码: kafkaRecords.selectExpr("CAST(
key
AS STRING)", "CAST(value AS BINARY)","C
浏览 0
提问于2018-11-09
得票数 0
1
回答
如何将地图发送给卡夫卡主题,使ProducerRecord键与相应
的
地图键相同
、
、
、
我正在使用星火
流
和数据被发送给卡夫卡。我要给卡夫卡发地图。假设我有一个20
的
Map (在
流
批持续
时间
中它可能增长到1000 )元素,如下所示: KafkaProducer.send(record);我
的
Kafka
主题是有10个分区。如
何在</
浏览 0
提问于2018-10-02
得票数 1
1
回答
卡夫卡
流
WindowStore取录顺序
、
WindowStore和ReadOnlyWindowStore方法fetch(K
key
, Instant from, Instant to)
的
Kafka
流
2.2.0文档声明: 对于每个键,迭代器保证窗口
的
顺序,从最老
的
/最早
的
可用窗口开始到
最新
/
最新
的
窗口。其他
的
提取方法都没有声明这一点(除了不推荐
的
fetch(K
key
, long fr
浏览 0
提问于2019-05-10
得票数 0
回答已采纳
1
回答
将火花流连接到
流
集输入
、
、
我想知道是否有可能提供输入来激发来自StreamSets
的
流
。我注意到在StreamSets连接器目的地中不支持火花
流
。 我将探讨是否有其他方法将它们连接到一个示例POC。
浏览 3
提问于2016-07-06
得票数 3
回答已采纳
1
回答
吡火花:如何使用KafkaUtils执行结构化
流
、
、
、
我正在使用SparkSession.readStream进行结构化
流
处理
,并将其写入蜂窝表,但它似乎不允许我使用基于
时间
的
微批,也就是说,我需要一批5秒
的
时间
。所有的消息都应该形成一个5秒
的
批
处理
,并且批
处理
数据应该被写入到hive表
中
。 现在,当他们被发布到
Kafka
主题时,它会读取这些信息,每条信息都是表
的
一条
记录
。foreachBatch(hive_writ
浏览 12
提问于2022-04-25
得票数 0
回答已采纳
1
回答
Kafka
流
:窗口时对旧数据
的
再
处理
、
有一个
Kafka
流
应用程序,它通过流连接执行窗口(使用原始事件
时间
,而不是挂钟
时间
),例如1天。如果打开这个拓扑,并从一开始就重新
处理
数据(就像在lambda风格
的
体系结构
中
那样),那么这个窗口会将旧数据保存在那里吗?例如:如果今天是2022-01-09,而我正在接收2021-03-01
的
数据,这个旧数据会进入表吗,还是会从一开始就被拒绝? 在这种情况下-可以采取什么策略来重新
处理
这些数据?使用
Kafka
Strea
浏览 6
提问于2022-01-09
得票数 0
2
回答
kafka
流
中
的
不均匀分区分配
、
、
我正经历着卡夫卡
流
的
奇怪
的
任务行为。我有三个节点的卡夫卡
流
集群。我
的
流
非常简单,有一个源主题(24个分区,所有
kafka
代理都运行在
kafka
流
节点以外
的
其他机器上),我们
的
流
图只接收消息,按键对它们进行分组,执行一些筛选,并存储所有用于接收主题
的
内容。然而,每当我做滚动更新我
的
kafka
流
(通过关闭总是只有一
浏览 3
提问于2021-10-09
得票数 1
1
回答
是否可以用卡夫卡连接来“插入”卡夫卡
的
信息?
、
、
、
、
我使用
的
是合流3.3.0。我使用jdbc-source-connector将消息插入到我
的
Oracle表
中
的
Kafka
中
。这个很好用。 我想检查一下是否可以“重新插入”。我
的
意思是,如果我有一个学生表,有3列id(数字)、name(varchar2)和last_modified(
时间
戳)。每当我插入新行时,它都会被推送到
Kafka
(使用timestamp+auto增量字段)。但是当我更新行时,应该更新
Kafka
<e
浏览 1
提问于2018-08-01
得票数 0
1
回答
如何从Apache
中
的
数据库
中
查找和更新
记录
的
状态?
、
、
我正在开发一个数据
流
应用程序,我正在研究在这个项目中使用Apache
的
可能性。其主要原因是它支持很好
的
高级
流
结构,非常类似于Java 8
的
streaming。我将接收与数据库
中
特定
记录
相
对应
的
事件,我希望能够
处理
这些事件(来自消息代理(
如
RabbitMQ或
Kafka
) ),并最终更新数据库
中
的
记录
,并将
处理
浏览 1
提问于2016-08-10
得票数 13
回答已采纳
2
回答
具有到增量湖
的
多个相同密钥
的
流
写入
、
、
我正在通过spark structured向delta写入数据
流
。每个
流
批次包含
key
- value (还包含作为一列
的
时间
戳)。delta lake不支持在源(蒸汽批)上使用多个相同
的
键进行更新,所以我只想用
最新
的
时间
戳
记录
来更新delta lake。我该怎么做呢?这是我正在尝试
的
代码片段: def upsertToDelta(microBatchOutputDF: DataFrame, bat
浏览 27
提问于2020-06-19
得票数 2
回答已采纳
1
回答
Kafka
Streams重新平衡行为
我理解重新平衡可以在您
的
流上
的
任何
时间
和任何点发生。当它这样做时,由于没有为
给定
的
偏移量提交
最新
的
偏移量,可能会发生事件
的
重新
处理
。
Kafka
Streams是否允许在重新平衡发生之前完成任何动态
处理
?我
的
意思是,您
的
应用程序正在使用一个
记录
(在您
的
流程方法
中
),一个重新平衡事件发生了。一个具体
的</e
浏览 35
提问于2020-01-14
得票数 1
1
回答
我们能从一个合流的卡夫卡主题中选择一个特定
的
记录
行吗?
、
、
在我
的
本地汇合平台中,我有一个主题名为"FOO_02",我手动向它插入了一些
记录
,因此,我可以根据以下命令从一开始就打印它: 我可以做这样
的
事情:我只想把COL1 = 1
的
记录
拉出来吗?类似于我们可以使用where条件执行select语句来从普通数据库(
如
db2 )中提取数据。我尝试了以下命令,但是我相信它只会得到新
的
数据,因为我得到了这个命令
的</e
浏览 1
提问于2022-03-17
得票数 0
回答已采纳
1
回答
StreamsException:提取
的
时间
戳值为负值,是不允许
的
、
我
的
Kafka
应用程序对每条消息做了一些转换逻辑,并将其转发到一个新
的
主题。应用程序
中
没有基于
时间
的
聚合/
处理
,因此不需要使用任何自定义
时间
戳提取器。在从所有StreamThreads抛出这个异常(总共10个)之后,应用程序被冻结了,因为在流上有几个小时没有进一步
的
进展。在那之后也没有任何例外。当我重新启动应用程序时,它只开始
处理
新来
的
消息。现在
的
问题是,这些消息之间(抛
浏览 0
提问于2016-12-27
得票数 4
回答已采纳
1
回答
在
Kafka
中
执行批
处理
验证并发送到相应
的
主题。
、
以下批次格式存储在
Kafka
主题中: 在这种情况下,B1批
处理
BS到
浏览 5
提问于2021-12-06
得票数 5
1
回答
在pyspark
中
随
时间
窗口删除重复项
、
我有一个从
kafka
主题中读取
的
spark
流
数据帧,我想在每次解析新
记录
时删除过去5分钟
的
重复数据。我知道dropDuplicates(["uid"])函数,但我不确定如
何在
特定
的
历史
时间
间隔内检查重复项。我
的
理解是: df = df.dropDuplicates(["uid"]) 要么
处理
当前(微)批
处理
读取
的
数据,要么
处理</em
浏览 31
提问于2020-04-21
得票数 0
回答已采纳
点击加载更多
热门
标签
更多标签
云服务器
ICP备案
腾讯会议
云直播
对象存储
活动推荐
运营活动
广告
关闭
领券