本文主要研究一下flink的Sliding Window
flink-streaming-java_2.11-1.7.0-sources.jar!/org/apache/flink/streaming/api/windowing/assigners/SlidingEventTimeWindows.java
@PublicEvolving
public class SlidingEventTimeWindows extends WindowAssigner<Object, TimeWindow> {
private static final long serialVersionUID = 1L;
private final long size;
private final long slide;
private final long offset;
protected SlidingEventTimeWindows(long size, long slide, long offset) {
if (offset < 0 || offset >= slide || size <= 0) {
throw new IllegalArgumentException("SlidingEventTimeWindows parameters must satisfy 0 <= offset < slide and size > 0");
}
this.size = size;
this.slide = slide;
this.offset = offset;
}
@Override
public Collection<TimeWindow> assignWindows(Object element, long timestamp, WindowAssignerContext context) {
if (timestamp > Long.MIN_VALUE) {
List<TimeWindow> windows = new ArrayList<>((int) (size / slide));
long lastStart = TimeWindow.getWindowStartWithOffset(timestamp, offset, slide);
for (long start = lastStart;
start > timestamp - size;
start -= slide) {
windows.add(new TimeWindow(start, start + size));
}
return windows;
} else {
throw new RuntimeException("Record has Long.MIN_VALUE timestamp (= no timestamp marker). " +
"Is the time characteristic set to 'ProcessingTime', or did you forget to call " +
"'DataStream.assignTimestampsAndWatermarks(...)'?");
}
}
public long getSize() {
return size;
}
public long getSlide() {
return slide;
}
@Override
public Trigger<Object, TimeWindow> getDefaultTrigger(StreamExecutionEnvironment env) {
return EventTimeTrigger.create();
}
@Override
public String toString() {
return "SlidingEventTimeWindows(" + size + ", " + slide + ")";
}
public static SlidingEventTimeWindows of(Time size, Time slide) {
return new SlidingEventTimeWindows(size.toMilliseconds(), slide.toMilliseconds(), 0);
}
public static SlidingEventTimeWindows of(Time size, Time slide, Time offset) {
return new SlidingEventTimeWindows(size.toMilliseconds(), slide.toMilliseconds(),
offset.toMilliseconds() % slide.toMilliseconds());
}
@Override
public TypeSerializer<TimeWindow> getWindowSerializer(ExecutionConfig executionConfig) {
return new TimeWindow.Serializer();
}
@Override
public boolean isEventTime() {
return true;
}
}
flink-streaming-java_2.11-1.7.0-sources.jar!/org/apache/flink/streaming/api/windowing/assigners/SlidingProcessingTimeWindows.java
public class SlidingProcessingTimeWindows extends WindowAssigner<Object, TimeWindow> {
private static final long serialVersionUID = 1L;
private final long size;
private final long offset;
private final long slide;
private SlidingProcessingTimeWindows(long size, long slide, long offset) {
if (offset < 0 || offset >= slide || size <= 0) {
throw new IllegalArgumentException("SlidingProcessingTimeWindows parameters must satisfy 0 <= offset < slide and size > 0");
}
this.size = size;
this.slide = slide;
this.offset = offset;
}
@Override
public Collection<TimeWindow> assignWindows(Object element, long timestamp, WindowAssignerContext context) {
timestamp = context.getCurrentProcessingTime();
List<TimeWindow> windows = new ArrayList<>((int) (size / slide));
long lastStart = TimeWindow.getWindowStartWithOffset(timestamp, offset, slide);
for (long start = lastStart;
start > timestamp - size;
start -= slide) {
windows.add(new TimeWindow(start, start + size));
}
return windows;
}
public long getSize() {
return size;
}
public long getSlide() {
return slide;
}
@Override
public Trigger<Object, TimeWindow> getDefaultTrigger(StreamExecutionEnvironment env) {
return ProcessingTimeTrigger.create();
}
@Override
public String toString() {
return "SlidingProcessingTimeWindows(" + size + ", " + slide + ")";
}
public static SlidingProcessingTimeWindows of(Time size, Time slide) {
return new SlidingProcessingTimeWindows(size.toMilliseconds(), slide.toMilliseconds(), 0);
}
public static SlidingProcessingTimeWindows of(Time size, Time slide, Time offset) {
return new SlidingProcessingTimeWindows(size.toMilliseconds(), slide.toMilliseconds(),
offset.toMilliseconds() % slide.toMilliseconds());
}
@Override
public TypeSerializer<TimeWindow> getWindowSerializer(ExecutionConfig executionConfig) {
return new TimeWindow.Serializer();
}
@Override
public boolean isEventTime() {
return false;
}
}
与SlidingEventTimeWindows不同的是SlidingProcessingTimeWindows的这个方法里头使用context.getCurrentProcessingTime()值重置了timestamp
),然后以为start + size > timestamp为循环条件,每次对start减去slide,挨个计算TimeWindow(start, start + size);getDefaultTrigger方法返回的是ProcessingTimeTrigger;getWindowSerializer方法返回的是TimeWindow.Serializer();isEventTime返回的为false