腾讯云
开发者社区
文档
建议反馈
控制台
首页
学习
活动
专区
工具
TVP
最新优惠活动
文章/答案/技术大牛
搜索
搜索
关闭
发布
登录/注册
精选内容/技术社群/优惠产品,
尽在小程序
立即前往
文章
问答
(9999+)
视频
沙龙
1
回答
KTable
聚合
转发
相同
的
消息
我使用kafka-streams将
消息
聚合
到一个
KTable
中。在我
的
聚合
逻辑中,我总是像下面这样返回
相同
的
累加器: streamOfInts .aggregate(Accumulator.empty()) {k,v,我
的
预期是--由于
KTable
的
值没有改变--不会有任何值被发送到下游。然而,事实并非如此。
聚合
函数总是
转发
更新。 确
浏览 15
提问于2019-10-18
得票数 0
回答已采纳
1
回答
在Scala中
聚合
KStream
、
、
、
如果我有以下记录{"B",10}那么
聚合
应该是然后,当A变为10时,即添加新
消息
它现在应该计算为 val aggVal:
KTab
浏览 0
提问于2018-10-15
得票数 0
1
回答
Kafka流-如何进行全球度量
聚合
?
、
因此,我需要一个包含多个实例
的
多个
消息
聚合
的
GlobalKTable。现在,我
的
单个实例
KTable
设置如下所示: .groupByKey,而不是所有其他实例接收
的
所有
消息
。
KTable
的
流更新到METRIC_CHANGES_TOPIC,这将更新全局表
浏览 5
提问于2017-06-26
得票数 2
回答已采纳
2
回答
拥有数十亿个唯一密钥
的
KTable
我
的
要求是使用大量数据量
的
kafka流构建实时
聚合
管道.根据估计,可能
的
唯一密钥为30亿到40亿,总
消息
大小为5TB。高级架构是从一个kafka主题中读取,根据特定
的
关键列进行
聚合
,并将
聚合
结果发布到
KTable
()中。
KTable
用于读取以前
的
状态并使用新
的
聚合
结果进行更新。具有数十亿个唯一密钥
的
KTable
可伸缩吗?
浏览 3
提问于2020-07-30
得票数 1
回答已采纳
1
回答
消除
ktable
内部主题和用户主题之间
的
冗余
、
memberService创建一个成员
的
ktable
(通过
聚合
另一个主题上
的
更改
消息
来构建),以供其内部使用。bookService还需要对该
ktable
的
读访问。根据我对
ktable
工作方式
的
理解,memberTableTopic中
的
数据将与
ktable
使用
的
支持内部主题
相同
。有什么好办法来消除这种冗余吗?我
的
bookService应该订阅内
浏览 0
提问于2018-09-13
得票数 0
回答已采纳
5
回答
卡夫卡流-如何为
KTable
设置新
的
密钥
、
、
我是卡夫卡流
的
新手,我使用
的
是版本1.0.0。我想从其中一个值中为
KTable
设置一个新键。然而,在
KTable
中缺少这样
的
方法。唯一
的
方法是将给定
的
KTable
转换为KStream。对这个问题有什么想法吗?它改变了
KTable
设计
的
密钥吗?
浏览 1
提问于2018-04-15
得票数 10
回答已采纳
1
回答
Apache实现一个
KTable
、
、
、
我是卡夫卡流API
的
新手,我正在尝试创建一个
KTable
。我有一个输入主题:s-order-topic,它是json格式
的
消息
,如下所示。COMPLETED", "op_ts":"2019-12-24 13:16:40.316941",我阅读了本主题中
的
消息
,并希望创建一个
K
浏览 0
提问于2020-01-07
得票数 0
回答已采纳
1
回答
如果没有墓碑,
KTable
记录什么时候到期?
、
、
我有一个主题T,它
的
消息
过期retention.ms设置为2天。这个话题有紧凑之处。如果我将该
消息
读入KStream中,然后进一步
聚合
到
KTable
中,KStream和/或
KTable
会遵守这2天
的
期限吗?当
消息
不再出现在主题T中时,该
消息
是否也会自动从KStream或
KTable
中删除?或者,某些内务处理过程需要对这些信息进行墓碑?
浏览 3
提问于2021-08-25
得票数 0
回答已采纳
1
回答
在
聚合
运行之前,我能加入由Kafka #
聚合
调用生成
的
KTable
吗?
我有许多IOT设备,它们通过
消息
向kafka主题报告事件,并且我已经定义了一个
聚合
器来从这些事件中更新设备状态。我想要做
的
是能够将输入流加入到
聚合
器在
聚合
更新状态之前输出
的
KTable
--也就是说,我想,比方说,将一个事件与当前状态进行比较,如果它们匹配某个谓词,则执行一些处理,然后更新状态。我已经尝试过先用StreamsBuilder#addStateStore创建状态存储,但是该方法返回一个StreamsBuilder,而且似乎没有为我提供一种将其转换为
K
浏览 0
提问于2019-06-26
得票数 0
回答已采纳
2
回答
Kafka流连接不相关
的
流
、
我有需要与
ktable
/ changelog主题匹配
的
事件流,但是匹配是通过
ktable
条目的属性上
的
模式匹配完成
的
。所以我不能根据键加入流,因为我还不知道哪一个是匹配
的
。示例:{ [efg]: {id: 'efg', prop: 'another pattern'}
浏览 4
提问于2020-02-04
得票数 4
1
回答
没有减少记录
、
当我发布两个ID
相同
的
记录时,我会收到两个打印语句。我本来以为我是新来
的
,所以我肯定是误会了。
浏览 0
提问于2018-09-26
得票数 1
回答已采纳
1
回答
每次使用KSql查询
KTable
时都会获取所有行
、
、
我想使用Ksql对Kafka进行rest api调用,每次都从
Ktable
中获取所有行。从
KTable
获取所有行
的
最佳方法是什么?当我向
Ktable
正在侦听
的
主题生成
消息
(exp: key = '1')时,我只收到给定Key
的
聚合
。为了从表中读取所有行,我应该对web服务
的
每个请求进行什么查询?
浏览 16
提问于2019-07-26
得票数 0
1
回答
Kafka Streams -主题中
消息
数量不一致
、
假设我们有一个包含1000条
消息
的
Kafka主题。我们在它
的
基础上创建一个流(我们在下文中称之为st ),并执行以下操作: System.out.println(count)当处理“结束”时,它返回一个略大于1000
的
数字。是什么导致了这种奇怪
的
行为?
浏览 0
提问于2018-04-25
得票数 0
1
回答
Kafka
ktable
损坏
消息
处理
、
我们在kafka中使用Ktabke进行
聚合
,它是非常基本
的
用法,并参考了kafka文档。 我只是试图调查一下,如果一些
消息
消费失败,同时
聚合
我们如何将这些
消息
移动到错误主题或dlq。我为KStream找到了类似的东西,但没有找到用于
ktable
的
东西,我也不能简单地将KStream解决方案扩展到
KTable
。KStream Handling bad messages using Kafka's Streams API参考 我
的</em
浏览 11
提问于2019-04-05
得票数 0
1
回答
如果KGroupedStream
的
聚合
器返回null会发生什么?
、
KStream<Integer, Integer> stream;
KTable
null : sum;我
的
溪流上
的
消息
是: 当第二条
消息
到达时,
聚合
器返回null。
KTable
aggregated是否接收(1,null)并为key=1删除其
浏览 1
提问于2018-10-07
得票数 1
回答已采纳
1
回答
Kafka流
KTable
changelog TTL
、
、
假设我将A-Event KStream
聚合
为A-Snapshot
KTable
,将B-Event KStream
聚合
为B-Snapshot
KTable
。A-Snapshot和B-Snapshot都不传递空值(删除事件被
聚合
为快照
的
状态属性)。此时,我们可以假设我们有一个持久化
的
kafka主题和一个rocksDB本地存储库,用于A-
KTable
和B-
KTable
聚合
。然后,我
的
拓扑将与A-
K
浏览 1
提问于2019-05-21
得票数 0
回答已采纳
1
回答
kafka流groupBy
聚合
产生意外值。
、
、
我
的
问题是关于卡夫卡流
Ktable
.groupBy.aggregate。以及由此产生
的
聚合
值。我每天都在努力收集每一分钟
的
事件。我每天对这些事件进行
聚合
,并使用kafka、groupBy和aggregate对进行定位。问题 通常,由于一天中有1440分钟,所以不应该有超过1440个值
的
聚合
。
浏览 5
提问于2019-10-17
得票数 3
1
回答
Kafka GroupTable测试在使用ProcessorTopologyTestDriver时生成额外
的
消息
、
、
、
我编写了一个流,它接收
消息
并发送一个已出现
的
键
的
表。如果出现某种情况,它将显示1
的
计数。这是我
的
生产代码
的
简化版本,以演示该bug。在实时运行中,为收到
的
每条
消息
发送一条
消息
。然而,当我使用ProcessorTopologyTestDriver在单元测试中运行它时,我会得到一个不同
的
行为。如果收到了以前见过
的
密钥,我会收到额外
的
消息
。如果我发送带有"ke
浏览 1
提问于2018-10-19
得票数 2
回答已采纳
1
回答
卡夫卡流:流线程与存储
、
如果为Daily分配了两个并行任务,如果标点符号计划每30分钟运行一次,而在覆盖中,如果我们将所有存储
转发
到接收器1,那么键值存储会被两次提交到接收器,因为两个并行任务共享
相同
的
存储区或每个任务是否都有自己
的
存储区,并且只会发布与分配给它们
的
分区对应
的
数据,这些分区将被保存在各自
的
存储区中?= keyValueIterator.next();context.forward(next.key,next.value);} keyValueIterator.close();
浏览 1
提问于2018-06-06
得票数 1
回答已采纳
1
回答
卡夫卡流API GroupBy行为
、
、
、
、
我是卡夫卡流
的
新手,我试图使用
KTable
函数将一些流数据
聚合
到一个groupBy中。问题如下:{ "current_ts": "2019-12-24 13:16:40.316952", "before24 13:16:40.316941", "table":"
浏览 2
提问于2019-12-31
得票数 1
回答已采纳
点击加载更多
扫码
添加站长 进交流群
领取专属
10元无门槛券
手把手带您无忧上云
相关
资讯
企微如何聚合回复视频号小店的客户消息?视频号小店如何聚合回复
Kafka Streams与Quarkus:实时处理事件
Kafka streams概览
CCIX(六)
一文轻松了解vlan端口模式之Trunk是什么,开发必备通信
热门
标签
更多标签
云服务器
ICP备案
实时音视频
对象存储
即时通信 IM
活动推荐
运营活动
广告
关闭
领券