本文主要研究一下flink的Triggers
flink-streaming-java_2.11-1.7.0-sources.jar!/org/apache/flink/streaming/api/windowing/triggers/Trigger.java
@PublicEvolving
public abstract class Trigger<T, W extends Window> implements Serializable {
private static final long serialVersionUID = -4104633972991191369L;
public abstract TriggerResult onElement(T element, long timestamp, W window, TriggerContext ctx) throws Exception;
public abstract TriggerResult onProcessingTime(long time, W window, TriggerContext ctx) throws Exception;
public abstract TriggerResult onEventTime(long time, W window, TriggerContext ctx) throws Exception;
public boolean canMerge() {
return false;
}
public void onMerge(W window, OnMergeContext ctx) throws Exception {
throw new UnsupportedOperationException("This trigger does not support merging.");
}
public abstract void clear(W window, TriggerContext ctx) throws Exception;
// ------------------------------------------------------------------------
public interface TriggerContext {
long getCurrentProcessingTime();
MetricGroup getMetricGroup();
long getCurrentWatermark();
void registerProcessingTimeTimer(long time);
void registerEventTimeTimer(long time);
void deleteProcessingTimeTimer(long time);
void deleteEventTimeTimer(long time);
<S extends State> S getPartitionedState(StateDescriptor<S, ?> stateDescriptor);
@Deprecated
<S extends Serializable> ValueState<S> getKeyValueState(String name, Class<S> stateType, S defaultState);
@Deprecated
<S extends Serializable> ValueState<S> getKeyValueState(String name, TypeInformation<S> stateType, S defaultState);
}
public interface OnMergeContext extends TriggerContext {
<S extends MergingState<?, ?>> void mergePartitionedState(StateDescriptor<S, ?> stateDescriptor);
}
}
flink-streaming-java_2.11-1.7.0-sources.jar!/org/apache/flink/streaming/api/windowing/triggers/TriggerResult.java
public enum TriggerResult {
CONTINUE(false, false),
FIRE_AND_PURGE(true, true),
FIRE(true, false),
PURGE(false, true);
// ------------------------------------------------------------------------
private final boolean fire;
private final boolean purge;
TriggerResult(boolean fire, boolean purge) {
this.purge = purge;
this.fire = fire;
}
public boolean isFire() {
return fire;
}
public boolean isPurge() {
return purge;
}
}
flink-streaming-java_2.11-1.7.0-sources.jar!/org/apache/flink/streaming/api/windowing/triggers/EventTimeTrigger.java
@PublicEvolving
public class EventTimeTrigger extends Trigger<Object, TimeWindow> {
private static final long serialVersionUID = 1L;
private EventTimeTrigger() {}
@Override
public TriggerResult onElement(Object element, long timestamp, TimeWindow window, TriggerContext ctx) throws Exception {
if (window.maxTimestamp() <= ctx.getCurrentWatermark()) {
// if the watermark is already past the window fire immediately
return TriggerResult.FIRE;
} else {
ctx.registerEventTimeTimer(window.maxTimestamp());
return TriggerResult.CONTINUE;
}
}
@Override
public TriggerResult onEventTime(long time, TimeWindow window, TriggerContext ctx) {
return time == window.maxTimestamp() ?
TriggerResult.FIRE :
TriggerResult.CONTINUE;
}
@Override
public TriggerResult onProcessingTime(long time, TimeWindow window, TriggerContext ctx) throws Exception {
return TriggerResult.CONTINUE;
}
@Override
public void clear(TimeWindow window, TriggerContext ctx) throws Exception {
ctx.deleteEventTimeTimer(window.maxTimestamp());
}
@Override
public boolean canMerge() {
return true;
}
@Override
public void onMerge(TimeWindow window,
OnMergeContext ctx) {
// only register a timer if the watermark is not yet past the end of the merged window
// this is in line with the logic in onElement(). If the watermark is past the end of
// the window onElement() will fire and setting a timer here would fire the window twice.
long windowMaxTimestamp = window.maxTimestamp();
if (windowMaxTimestamp > ctx.getCurrentWatermark()) {
ctx.registerEventTimeTimer(windowMaxTimestamp);
}
}
@Override
public String toString() {
return "EventTimeTrigger()";
}
public static EventTimeTrigger create() {
return new EventTimeTrigger();
}
}
flink-streaming-java_2.11-1.7.0-sources.jar!/org/apache/flink/streaming/api/windowing/triggers/ProcessingTimeTrigger.java
@PublicEvolving
public class ProcessingTimeTrigger extends Trigger<Object, TimeWindow> {
private static final long serialVersionUID = 1L;
private ProcessingTimeTrigger() {}
@Override
public TriggerResult onElement(Object element, long timestamp, TimeWindow window, TriggerContext ctx) {
ctx.registerProcessingTimeTimer(window.maxTimestamp());
return TriggerResult.CONTINUE;
}
@Override
public TriggerResult onEventTime(long time, TimeWindow window, TriggerContext ctx) throws Exception {
return TriggerResult.CONTINUE;
}
@Override
public TriggerResult onProcessingTime(long time, TimeWindow window, TriggerContext ctx) {
return TriggerResult.FIRE;
}
@Override
public void clear(TimeWindow window, TriggerContext ctx) throws Exception {
ctx.deleteProcessingTimeTimer(window.maxTimestamp());
}
@Override
public boolean canMerge() {
return true;
}
@Override
public void onMerge(TimeWindow window,
OnMergeContext ctx) {
// only register a timer if the time is not yet past the end of the merged window
// this is in line with the logic in onElement(). If the time is past the end of
// the window onElement() will fire and setting a timer here would fire the window twice.
long windowMaxTimestamp = window.maxTimestamp();
if (windowMaxTimestamp > ctx.getCurrentProcessingTime()) {
ctx.registerProcessingTimeTimer(windowMaxTimestamp);
}
}
@Override
public String toString() {
return "ProcessingTimeTrigger()";
}
public static ProcessingTimeTrigger create() {
return new ProcessingTimeTrigger();
}
}
flink-streaming-java_2.11-1.7.0-sources.jar!/org/apache/flink/streaming/api/windowing/assigners/GlobalWindows.java
@Internal
public static class NeverTrigger extends Trigger<Object, GlobalWindow> {
private static final long serialVersionUID = 1L;
@Override
public TriggerResult onElement(Object element, long timestamp, GlobalWindow window, TriggerContext ctx) {
return TriggerResult.CONTINUE;
}
@Override
public TriggerResult onEventTime(long time, GlobalWindow window, TriggerContext ctx) {
return TriggerResult.CONTINUE;
}
@Override
public TriggerResult onProcessingTime(long time, GlobalWindow window, TriggerContext ctx) {
return TriggerResult.CONTINUE;
}
@Override
public void clear(GlobalWindow window, TriggerContext ctx) throws Exception {}
@Override
public void onMerge(GlobalWindow window, OnMergeContext ctx) {
}
}
flink-streaming-java_2.11-1.7.0-sources.jar!/org/apache/flink/streaming/api/windowing/triggers/CountTrigger.java
@PublicEvolving
public class CountTrigger<W extends Window> extends Trigger<Object, W> {
private static final long serialVersionUID = 1L;
private final long maxCount;
private final ReducingStateDescriptor<Long> stateDesc =
new ReducingStateDescriptor<>("count", new Sum(), LongSerializer.INSTANCE);
private CountTrigger(long maxCount) {
this.maxCount = maxCount;
}
@Override
public TriggerResult onElement(Object element, long timestamp, W window, TriggerContext ctx) throws Exception {
ReducingState<Long> count = ctx.getPartitionedState(stateDesc);
count.add(1L);
if (count.get() >= maxCount) {
count.clear();
return TriggerResult.FIRE;
}
return TriggerResult.CONTINUE;
}
@Override
public TriggerResult onEventTime(long time, W window, TriggerContext ctx) {
return TriggerResult.CONTINUE;
}
@Override
public TriggerResult onProcessingTime(long time, W window, TriggerContext ctx) throws Exception {
return TriggerResult.CONTINUE;
}
@Override
public void clear(W window, TriggerContext ctx) throws Exception {
ctx.getPartitionedState(stateDesc).clear();
}
@Override
public boolean canMerge() {
return true;
}
@Override
public void onMerge(W window, OnMergeContext ctx) throws Exception {
ctx.mergePartitionedState(stateDesc);
}
@Override
public String toString() {
return "CountTrigger(" + maxCount + ")";
}
public static <W extends Window> CountTrigger<W> of(long maxCount) {
return new CountTrigger<>(maxCount);
}
private static class Sum implements ReduceFunction<Long> {
private static final long serialVersionUID = 1L;
@Override
public Long reduce(Long value1, Long value2) throws Exception {
return value1 + value2;
}
}
}
它使用的是自己定义的Sum函数
),在onElement方法里头,当计数大于等于maxCount时,则会清空计数,然后返回TriggerResult.FIRE,否则返回TriggerResult.CONTINUE;onEventTime、onProcessingTime均返回TriggerResult.CONTINUE;canMerge返回true;onMerge执行的是ctx.mergePartitionedState(stateDesc);clear执行的是ctx.getPartitionedState(stateDesc).clear()flink-streaming-java_2.11-1.7.0-sources.jar!/org/apache/flink/streaming/api/windowing/triggers/PurgingTrigger.java
@PublicEvolving
public class PurgingTrigger<T, W extends Window> extends Trigger<T, W> {
private static final long serialVersionUID = 1L;
private Trigger<T, W> nestedTrigger;
private PurgingTrigger(Trigger<T, W> nestedTrigger) {
this.nestedTrigger = nestedTrigger;
}
@Override
public TriggerResult onElement(T element, long timestamp, W window, TriggerContext ctx) throws Exception {
TriggerResult triggerResult = nestedTrigger.onElement(element, timestamp, window, ctx);
return triggerResult.isFire() ? TriggerResult.FIRE_AND_PURGE : triggerResult;
}
@Override
public TriggerResult onEventTime(long time, W window, TriggerContext ctx) throws Exception {
TriggerResult triggerResult = nestedTrigger.onEventTime(time, window, ctx);
return triggerResult.isFire() ? TriggerResult.FIRE_AND_PURGE : triggerResult;
}
@Override
public TriggerResult onProcessingTime(long time, W window, TriggerContext ctx) throws Exception {
TriggerResult triggerResult = nestedTrigger.onProcessingTime(time, window, ctx);
return triggerResult.isFire() ? TriggerResult.FIRE_AND_PURGE : triggerResult;
}
@Override
public void clear(W window, TriggerContext ctx) throws Exception {
nestedTrigger.clear(window, ctx);
}
@Override
public boolean canMerge() {
return nestedTrigger.canMerge();
}
@Override
public void onMerge(W window, OnMergeContext ctx) throws Exception {
nestedTrigger.onMerge(window, ctx);
}
@Override
public String toString() {
return "PurgingTrigger(" + nestedTrigger.toString() + ")";
}
public static <T, W extends Window> PurgingTrigger<T, W> of(Trigger<T, W> nestedTrigger) {
return new PurgingTrigger<>(nestedTrigger);
}
@VisibleForTesting
public Trigger<T, W> getNestedTrigger() {
return nestedTrigger;
}
}
fire表示是否要触发window的computation操作;而purge表示是否要清理window的窗口数据
),CONTINUE、FIRE_AND_PURGE、FIRE、PURGE五个枚举