腾讯云
开发者社区
文档
建议反馈
控制台
登录/注册
首页
学习
活动
专区
圈层
工具
MCP广场
文章/答案/技术大牛
搜索
搜索
关闭
发布
文章
问答
(9999+)
视频
沙龙
1
回答
Flink
KeyedCoProcessFunction
中
的
NPE
、
我在连接
的
流上使用
KeyedCoProcessFunction
,两个流都以id为键,如果键不存在,我也使用MapState并设置一个类型为list
的
值,我也在processElement2
中
检查键
的
存在,所以理想情况下没有
NPE
的
机会,但仍然可以得到它。lookupid"))) extend
浏览 51
提问于2021-11-18
得票数 0
1
回答
连接
的
键控数据流上
的
coProcessFunction与
keyedCoProcessFunction
、
在引用
Flink
代码示例时,我观察到当使用connect操作符连接两个流时,coProcessFunction和
keyedCoProcessFunction
在操作键控流(覆盖processElement1、processElement2和onTimer)时几乎可以互换和相同地扩展,所以当操作键控流时,与
keyedCoProcessFunction
相比,扩展CoProcessFunction来实现键控连接流
的
业务逻辑有什么不同
浏览 127
提问于2020-09-02
得票数 1
回答已采纳
1
回答
Apache
中
的
DataStream和KeyedStream有什么区别?
、
、
我在使用
Flink
连接两个流
的
上下文中查看,并希望了解这两个流之间
的
区别以及影响
Flink
处理它们
的
方式。作为一个相关
的
问题,我还想了解CoProcessFunction与
KeyedCoProcessFunction
有何不同。
浏览 2
提问于2021-02-17
得票数 1
回答已采纳
1
回答
是来自processElement1和processElement2
的
KeyedCoProcessFunction
原子w.r.t。州?或者它们可以同时修改共享状态?
、
我有两个流,一个数据流只包含一个设置ValueState passing=true/false
的
标志,另一个控制流添加了一个用户将被通知给MapState。当passing从false更改为true时,会向MapState
中
尚未收到通知
的
用户发出通知。 下面是处理此逻辑
的
KeyedCoProcessFunction
。class TestKeyedCoProcessFunction extends
KeyedCoProcessFunctio
浏览 1
提问于2021-02-07
得票数 1
回答已采纳
1
回答
使用状态
的
Flink
KeyedCoProcessFunction
、
我使用
KeyedCoProcessFunction
函数来使用来自另一个流
的
数据丰富主数据。代码: packet: PacketData, ctx:
KeyedCoPr
浏览 2
提问于2022-03-28
得票数 1
回答已采纳
1
回答
用MiniCluster测试
flink
作业以使用处理时间触发计时器
、
、
在用MiniClusterWithClientResource测试
flink
作业时,是否有办法控制触发 timer
的
处理时间?我能够测试
KeyedCoProcessFunction
的
两种方法,即processElement().触发计时器回调,例如onTimer().在单元
中
,使用testharness测试并控制处理时间,即://通过直接提高操作者
的
处理时间testHarness.setProcessingTime(300000)来触发处理时间定时器 因此。但是,我现
浏览 2
提问于2020-08-25
得票数 0
1
回答
Apache
Flink
中
的
事件重试机制
、
、
我正在阅读Kafka
的
多个主题,然后进行状态操作,然后再次保存到Kafka。taggedStream.getSideOutput(tag) taggedStream.addSink(targetTopic) 在此流程
中
,延迟流
的
另一端: delayQueue(written by
flink
) => consumerAPP(check retrycounts and timestamps) => anotherQueue(这将再次
浏览 35
提问于2021-04-24
得票数 1
1
回答
如何使用
Flink
MiniCluster触发ProcessTimeTimer
、
我有一个
Flink
KeyedCoProcessFunction
,它在一个更大
的
Flink
流作业中注册处理时间计时器,并且我正在尝试使用
Flink
为整个作业创建单元测试。但是我不能让
KeyedCoProcessFunction
中
的
onTimer()回调触发。 有没有人把这个弄好了?它需要特殊
的
配置吗?切换到事件时间可以很好地工作,所以我想知道这是不是对
Flink
MiniCluster不起作用,或者我
的</
浏览 0
提问于2020-08-26
得票数 3
1
回答
Flink
SQL作业堆空间不足
、
、
即使它在
flink
集群中有足够
的
堆空间(60 it * 3) 此查询是否需要驱逐策略?
浏览 6
提问于2019-09-26
得票数 0
1
回答
使用
KeyedCoProcessFunction
的
Flink
连接流
、
对于1:1连接,我使用
KeyedCoProcessFunction
,我有两个流,查找流(每秒100条记录)和点击流(每秒10000条记录)。在processElement2方法
中
,我在MapState<Long,Row>
中
寻找关键字,如果找到的话,用它来丰富点击流数据,否则将此记录设置为端输出,然后将端输出设置为kafka。对于kakfa
中
的
dlq主题,我连续看到每秒产生1-2条记录,在将其推送到端输出之前,我如何才能在processElement2方法中等待几毫秒
的
查找id
浏览 132
提问于2021-11-11
得票数 0
回答已采纳
1
回答
如何在
flink
中加入两个kafka流(一个在运行,第二个是静态
的
,只有很少
的
记录像一个主表)
、
我想在第二流
的
帮助下充实我
的
第一流,像流动
的
记录一样,像查找一样加入第二流,我想把它永远保存在记忆
中
,就像一张桌子。我可以使用
的
任何代码示例或
flink
API都适用于这个用例。
浏览 5
提问于2022-05-30
得票数 0
1
回答
Flink
流顺序
、
Flink
是否保证流
的
执行顺序?我可以将parallelism设置为1来绕过这个问题,但是我想知道我看到
的
是否是预期
的
?return new JSONObject(e.f1).getString("someOtherKey"); source1.connec
浏览 11
提问于2021-09-06
得票数 0
1
回答
如何在Apache
Flink
中
合并两个DataStreams
、
我正在使用
Flink
来处理我
的
流数据。 我有两个数据源:A和B。dataA = env.addSource(sourceA);DataStream<String> dataB = env.addSource(sourceB); 我使用map处理来自A和B
的
数据我尝试做
的
是将它们合并为Aaaa, Bbbb, Cccc...以生成一个新
的
DataStream<String>对象。 如何做到这一点?
浏览 49
提问于2020-07-23
得票数 0
回答已采纳
1
回答
在
Flink
中加入静态和动态Kafka源
、
今天,我想谈谈一个关于
Flink
的
概念性话题,而不是一个技术性的话题。 在我们
的
例子
中
,我们确实有两个卡夫卡主题A和B,它们需要结合在一起。连接应该始终包括主题A
中
的
所有元素,以及主题B
中
的
所有新元素。实现这一目标有两种可能性:始终创建一个新
的
使用者并从一开始就开始使用主题A,或者一旦使用完就将主题A
中
的
所有元素保持在一个状态内。主题A
中
的
元素最终会丢失,如果窗口
浏览 8
提问于2020-03-19
得票数 0
回答已采纳
1
回答
在RichCoFlatMapFunction
中
更新外部数据库
、
} } }在这里,在flatmap1方法
中
,我想更新一个数据库我可以在flatmap1
中
执行该操作吗?我之所以这样问,是因为查询db然后更新DB需要等待一些时间。
浏览 1
提问于2021-01-22
得票数 0
1
回答
Apache
flink
从late window访问键控状态
、
我正在编写一个
Flink
应用程序,它使用kafka主题中
的
时间序列数据。时间序列数据包含指标名称、标记键值对、时间戳和值等组件。我已经创建了一个滚动窗口,用于根据指标键(指标名称、键值对和时间戳
的
组合)聚合数据。我正在考虑使用
flink
的
"allowedLateness“功能将延迟指标发送到不同
的
流。我计划在主"Aggregate the data“操作符
中
添加一个"MapState”,它将键作为度量关键字,将值作为到达主窗口
的</
浏览 17
提问于2021-08-09
得票数 0
回答已采纳
1
回答
对intervalJoin感到困惑
、
、
我试图想出一个解决方案,其中包括在连接操作之后应用一些逻辑,从多个EventB
中
从streamB中选择一个事件。它类似于一个约简函数,但它只返回一个元素,而不是增量地执行它。因此,最终结果将是一个(EventA,EventB)对,而不是一个1 EventA和多个EventB
的
交叉乘积。假设像上面这样
的
连接操作,它用4 EventA生成了1个EventB,成功地加入并收集在MyJoinFunction
中
。现在我要做
的
是,立即访问这些值,并执行一些逻辑来正确地将EventA与匹配,这正是一个 EventB。因
浏览 2
提问于2021-03-25
得票数 0
回答已采纳
1
回答
如何使用
Flink
实现不同数据源之间
的
流连接?
、
、
我
的
数据来自两个不同
的
Kafka主题,由不同
的
代理提供服务,每个主题具有不同数量
的
分区。一个流有关于正在提供
的
广告
的
事件,另一个流有点击:ad_clicks: ad_id, ip, cTime 流程函数
的
文档中有一节介绍了如何使用CoProcessFunction或
KeyedCoProcessFunction
实现,但我不确定如何设置。我还想知道
Flink
的
是否
浏览 2
提问于2021-07-15
得票数 2
2
回答
在
Flink
中
序列化复杂模型
的
最佳实践
、
由于Model类(来自第三方库)不能由
Flink
自动序列化,所以我使用两个变量,如下所示: class MyUDF extends
KeyedCoProcessFunction
[String, ModelDefmodelsBytes是真正
的
(键控)状态,它包含相同
的
模型,但作为一个字节块,这样检查点才能正常工作。总体解决方案很简单(只需要在恢复/保存模型时在模型上调用fromBytes/toBytes ),但我不知道这是否是一种常见
的
最佳实践。对于本质上相同
的
事情,有
浏览 3
提问于2019-12-04
得票数 1
回答已采纳
1
回答
在Apache中加入DataStreams
、
我在Apache中有两个DataStreams来自
Flink
训练 .window(TumblingEventTimeWindows.of(Time.milliseconds(2)))测试是可以
的
,但是在
中
我发现了与RichCoFlatMapFunction
的
连接,您能帮我理解一下: 哪种类型
浏览 3
提问于2021-05-19
得票数 0
点击加载更多
热门
标签
更多标签
云服务器
对象存储
ICP备案
云点播
实时音视频
活动推荐
运营活动
广告
关闭
领券