所有的数据都天然带有时间的概念,必然发生在某一个时间点。把事件按照时间顺序排列起来,就形成了一个事件流,也叫作数据流。「无界数据」是持续产生的数据,所以必须持续地处理无界数据流。「有界数据」,就是在一个确定的时间范围内的数据流,有开始有结束,一旦确定了就不会再改变。
运行时架构图
处理数据API
顺序输出模式(先收到的数据元素先输出,后续数据元素的异步函数调用无论是否先完成,都需要等待)
无序输出模式(先处理完的数据元素先输出,不保证消息顺序)
用于处理乱序事件,而正确地处理乱序事件,通常用Watermark机制结合窗口来实现
一个MemorySegment对应着一个32KB大小的内存块。这块内存既可以是堆上内存(Java的byte数组),也可以是堆外内存(基于Netty的DirectByteBuffer)
实际申请的是堆外内存,通过RocksDB的Block Cache和WriterBufferManager参数来限制,RocksDB使用的内存量
状态管理需要考虑的因素:
结构
TwoPhaseCommitSinkFunction
stream.join(otherStream)
.where(<KeySelector>)
.equalTo(<KeySelector>)
.window(<WindowAssigner>)
.apply(<JoinFunction>)
DataStream<Integer> orangeStream = ...
DataStream<Integer> greenStream = ...
orangeStream.join(greenStream)
.where(<KeySelector>)
.equalTo(<KeySelector>)
.window(TumblingEventTimeWindows.of(Time.milliseconds(2)))
.apply (new JoinFunction<Integer, Integer, String> (){
@Override
public String join(Integer first, Integer second) {
return first + "," + second;
}
});
DataStream<Integer> orangeStream = ...
DataStream<Integer> greenStream = ...
orangeStream.join(greenStream)
.where(<KeySelector>)
.equalTo(<KeySelector>)
.window(SlidingEventTimeWindows.of(Time.milliseconds(2) /* size */, Time.milliseconds(1) /* slide */))
.apply (new JoinFunction<Integer, Integer, String> (){
@Override
public String join(Integer first, Integer second) {
return first + "," + second;
}
});
DataStream<Integer> orangeStream = ...
DataStream<Integer> greenStream = ...
orangeStream.join(greenStream)
.where(<KeySelector>)
.equalTo(<KeySelector>)
.window(EventTimeSessionWindows.withGap(Time.milliseconds(1)))
.apply (new JoinFunction<Integer, Integer, String> (){
@Override
public String join(Integer first, Integer second) {
return first + "," + second;
}
});