从上个星期开始,我用scala中的Flink构建了一个DataStream程序。但我的行为很奇怪,弗林克比我想象的要多。
我的processFunction中有一个4 processFunction的元组( INT,long),我用它在不同的时间范围内得到不同的唯一计数器,并且我预计大部分内存都被这个列表使用了。
但事实并非如此。所以我打印了JVM的他的生活。我很惊讶这么多的记忆被使用了。
num #instances #bytes class name
----------------------------------------------
1:
我想在我的Flink作业(Scala)中跨Flink的并行任务共享一个私有变量。我的代码是这样的:
object myJob extends flinkJob {
private val myVariable = someValue
def run(params) : Unit = {
//Stream processing
//myVariable is used here in the ProcessFunction
}
}
当我使用一些并行化运行这个作业时,是否会在所有的myVariable任务之间共享一个“Flink”副本?如果不是,如何确保在所有并行任务中只使用和维护
我有一个用例,其中我正在接收包含不同信息集的事件流,并希望对它们执行聚合。对于这些聚合中的每一个,都需要多个滚动窗口,例如: Daily、Weekly、Monthly、Yearly等。
聚合最初是所见计数的基本添加,但稍后可能是跨这些事件的一些分析/联接处理。因此,如果一个事件A每天出现一次,另一个事件B每周出现一次,结果将如下所示:
Daily
A: 1
B: 1 (Only for the day it was received)
Weekly
A: 7
B: 1
Monthly
A: 30 (30 day month)
B: 4
我使用getSideOutput创建了一个端输出流,在使用getSideOutput处理之前,预处理流中存在元素,但当调用getSideOutput方法时,不会发出任何元素。 代码如下 DataStream<String> asyncTable =
join3
.flatMap(new ExtractList())
.process( // detect code using for test
new ProcessFunction<String, String>() {
我的Flink应用程序从一个kafka源读取数据,将其映射到一个对象并写入另一个kafka主题。如果我使用MapFunction进行转换,一切都很好,但一旦我使用了extends ProcessFunction or RichFlatMapFunction类的实现,接收器就不会被调用(写入卡夫卡主题代码不会被执行)。我之所以使用ProcessFunction或RichFlatMapFunction,是因为我需要RuntimeConext()来读写ValueState。在这种情况下,我应该如何实现对接收器的调用?
env.addSource(新FlinkKafkaConsumer<>
在Flink作业中,我希望在构建后24小时删除内存中的状态。我检查了这个并设置了状态生存时间(Ttl),但正如本文中提到的,状态删除是惰性/被动的,这可能会导致内存泄漏。
例如,23小时57分钟后,我收到了key的最后一条消息('USA','Male',2018),在那之后就没有关于这个key的消息了。然后我将不能调用这个键的函数和状态的ttl ('USA','Male',2018),那么它将永远保存在内存中。
这篇文章提到了使用计时器:The idea is to register a timer with the TTL pe
我是一个新的emgu cv c#。我想创建一个简单的相机只从我的笔记本电脑相机和其他相机设备连接到我的笔记本电脑的相机捕捉。我不想视频捕捉只有一个简单的照片capture.with一开始和一个捕获button.and将保存,特别是location.helped将是可欣赏的。
namespace camera
{
public partial class cameracaps : Form
{
Capture capturecam=null;
bool capturingprocess=false;
Image<Bgr,Byte>imgOrg;
Ima