在我们使用 evictor 算子的时候,官网有这样的一句话:
Flink provides no guarantees about the order of the elements within a window. This implies that although an evictor may remove elements from the beginning of the window, these are not necessarily the ones that arrive first or last
大概的意思就是说,evictor 尽管可以从窗口开始时移除元素但是并不保证,这个元素是第一个或最后一个到达窗口的。 why?
env.addSource(consumer).uid("orderAndRegisterUserIdSource")
.keyBy(new KeySelector<String, String>() {
@Override
public String getKey(String value) throws Exception {
return "a";
}
})
.timeWindow(Time.seconds(1000))
.evictor(new Evictor<String, TimeWindow>() {
@Override
public void evictBefore(Iterable<TimestampedValue<String>> elements, int size, TimeWindow window, EvictorContext evictorContext) {
System.out.println("evictBefore");
}
@Override
public void evictAfter(Iterable<TimestampedValue<String>> elements, int size, TimeWindow window, EvictorContext evictorContext) {
System.out.println("evictAfter");
}
})
.trigger(new CountAndTimeTrigger(2L))
.process(new ProcessWindowFunctionImp()).uid("process");
通常我们会写出这样的代码,但当我们在 evictBefore 或者 evictAfter输出第一个元素时,它竟然保证是第一个或者最后一个进入 window 的。
我们一起来看一下源码,进入 EvictingWindowOperator
public void processElement(StreamRecord<IN> element) throws Exception {
final Collection<W> elementWindows = windowAssigner.assignWindows(
element.getValue(), element.getTimestamp(), windowAssignerContext);
//if element is handled by none of assigned elementWindows
boolean isSkippedElement = true;
final K key = this.<K>getKeyedStateBackend().getCurrentKey();
if (windowAssigner instanceof MergingWindowAssigner) {
MergingWindowSet<W> mergingWindows = getMergingWindowSet();
for (W window : elementWindows) {
// adding the new window might result in a merge, in that case the actualWindow
// is the merged window and we work with that. If we don't merge then
// actualWindow == window
W actualWindow = mergingWindows.addWindow(window,
new MergingWindowSet.MergeFunction<W>() {
@Override
public void merge(W mergeResult,
Collection<W> mergedWindows, W stateWindowResult,
Collection<W> mergedStateWindows) throws Exception {
if ((windowAssigner.isEventTime() && mergeResult.maxTimestamp() + allowedLateness <= internalTimerService.currentWatermark())) {
throw new UnsupportedOperationException("The end timestamp of an " +
"event-time window cannot become earlier than the current watermark " +
"by merging. Current watermark: " + internalTimerService.currentWatermark() +
" window: " + mergeResult);
} else if (!windowAssigner.isEventTime() && mergeResult.maxTimestamp() <= internalTimerService.currentProcessingTime()) {
throw new UnsupportedOperationException("The end timestamp of a " +
"processing-time window cannot become earlier than the current processing time " +
"by merging. Current processing time: " + internalTimerService.currentProcessingTime() +
" window: " + mergeResult);
}
triggerContext.key = key;
triggerContext.window = mergeResult;
triggerContext.onMerge(mergedWindows);
for (W m : mergedWindows) {
triggerContext.window = m;
triggerContext.clear();
deleteCleanupTimer(m);
}
// merge the merged state windows into the newly resulting state window
evictingWindowState.mergeNamespaces(stateWindowResult, mergedStateWindows);
}
});
// drop if the window is already late
if (isWindowLate(actualWindow)) {
mergingWindows.retireWindow(actualWindow);
continue;
}
isSkippedElement = false;
W stateWindow = mergingWindows.getStateWindow(actualWindow);
if (stateWindow == null) {
throw new IllegalStateException("Window " + window + " is not in in-flight window set.");
}
evictingWindowState.setCurrentNamespace(stateWindow);
evictingWindowState.add(element);
triggerContext.key = key;
triggerContext.window = actualWindow;
evictorContext.key = key;
evictorContext.window = actualWindow;
TriggerResult triggerResult = triggerContext.onElement(element);
if (triggerResult.isFire()) {
Iterable<StreamRecord<IN>> contents = evictingWindowState.get();
if (contents == null) {
// if we have no state, there is nothing to do
continue;
}
emitWindowContents(actualWindow, contents, evictingWindowState);
}
if (triggerResult.isPurge()) {
evictingWindowState.clear();
}
registerCleanupTimer(actualWindow);
}
// need to make sure to update the merging state in state
mergingWindows.persist();
} else {
for (W window : elementWindows) {
// check if the window is already inactive
if (isWindowLate(window)) {
continue;
}
isSkippedElement = false;
evictingWindowState.setCurrentNamespace(window);
evictingWindowState.add(element);
triggerContext.key = key;
triggerContext.window = window;
evictorContext.key = key;
evictorContext.window = window;
TriggerResult triggerResult = triggerContext.onElement(element);
if (triggerResult.isFire()) {
Iterable<StreamRecord<IN>> contents = evictingWindowState.get();
if (contents == null) {
// if we have no state, there is nothing to do
continue;
}
emitWindowContents(window, contents, evictingWindowState);
}
if (triggerResult.isPurge()) {
evictingWindowState.clear();
}
registerCleanupTimer(window);
}
}
// side output input event if
// element not handled by any window
// late arriving tag has been set
// windowAssigner is event time and current timestamp + allowed lateness no less than element timestamp
if (isSkippedElement && isElementLate(element)) {
if (lateDataOutputTag != null){
sideOutput(element);
} else {
this.numLateRecordsDropped.inc();
}
}
}
这个里的 evictingWindowState 是一个 RocksDBListState。
由 Flink key state 为何仅与 key 有关的 ,我们知道 evictingWindowState.get 时也仅仅会得到当前 key 对应的值 。
当窗口触发时,传递给 emitWindowContents 时,也仅仅是当前 key 的值。( 对于同一个 key 而言是可以保证顺序的 )。故当 evictor 处理数据时也仅仅是当前 key 的值,而非整个 window 的值。故 evictor 处理的第一个数据不一定是 第一个或最后一个到达 window 的。而 window 也不保证元素顺序(进入window 窗口的顺序)