前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >聊聊storm的WindowedBoltExecutor

聊聊storm的WindowedBoltExecutor

原创
作者头像
code4it
发布2018-10-25 13:55:18
4870
发布2018-10-25 13:55:18
举报
文章被收录于专栏:码匠的流水账

本文主要研究一下storm的WindowedBoltExecutor

WindowedBoltExecutor

storm-2.0.0/storm-client/src/jvm/org/apache/storm/topology/WindowedBoltExecutor.java

代码语言:javascript
复制
/**
 * An {@link IWindowedBolt} wrapper that does the windowing of tuples.
 */
public class WindowedBoltExecutor implements IRichBolt {
    public static final String LATE_TUPLE_FIELD = "late_tuple";
    private static final Logger LOG = LoggerFactory.getLogger(WindowedBoltExecutor.class);
    private static final int DEFAULT_WATERMARK_EVENT_INTERVAL_MS = 1000; // 1s
    private static final int DEFAULT_MAX_LAG_MS = 0; // no lag
    private final IWindowedBolt bolt;
    // package level for unit tests
    transient WaterMarkEventGenerator<Tuple> waterMarkEventGenerator;
    private transient WindowedOutputCollector windowedOutputCollector;
    private transient WindowLifecycleListener<Tuple> listener;
    private transient WindowManager<Tuple> windowManager;
    private transient int maxLagMs;
    private TimestampExtractor timestampExtractor;
    private transient String lateTupleStream;
    private transient TriggerPolicy<Tuple, ?> triggerPolicy;
    private transient EvictionPolicy<Tuple, ?> evictionPolicy;
    private transient Duration windowLengthDuration;

    public WindowedBoltExecutor(IWindowedBolt bolt) {
        this.bolt = bolt;
        timestampExtractor = bolt.getTimestampExtractor();
    }

    @Override
    public void prepare(Map<String, Object> topoConf, TopologyContext context, OutputCollector collector) {
        doPrepare(topoConf, context, collector, new ConcurrentLinkedQueue<>(), false);
    }

    // NOTE: the queue has to be thread safe.
    protected void doPrepare(Map<String, Object> topoConf, TopologyContext context, OutputCollector collector,
                             Collection<Event<Tuple>> queue, boolean stateful) {
        Objects.requireNonNull(topoConf);
        Objects.requireNonNull(context);
        Objects.requireNonNull(collector);
        Objects.requireNonNull(queue);
        this.windowedOutputCollector = new WindowedOutputCollector(collector);
        bolt.prepare(topoConf, context, windowedOutputCollector);
        this.listener = newWindowLifecycleListener();
        this.windowManager = initWindowManager(listener, topoConf, context, queue, stateful);
        start();
        LOG.info("Initialized window manager {} ", windowManager);
    }

    @Override
    public void execute(Tuple input) {
        if (isTupleTs()) {
            long ts = timestampExtractor.extractTimestamp(input);
            if (waterMarkEventGenerator.track(input.getSourceGlobalStreamId(), ts)) {
                windowManager.add(input, ts);
            } else {
                if (lateTupleStream != null) {
                    windowedOutputCollector.emit(lateTupleStream, input, new Values(input));
                } else {
                    LOG.info("Received a late tuple {} with ts {}. This will not be processed.", input, ts);
                }
                windowedOutputCollector.ack(input);
            }
        } else {
            windowManager.add(input);
        }
    }

    @Override
    public void cleanup() {
        if (waterMarkEventGenerator != null) {
            waterMarkEventGenerator.shutdown();
        }
        windowManager.shutdown();
        bolt.cleanup();
    }

    // for unit tests
    WindowManager<Tuple> getWindowManager() {
        return windowManager;
    }

    @Override
    public void declareOutputFields(OutputFieldsDeclarer declarer) {
        String lateTupleStream = (String) getComponentConfiguration().get(Config.TOPOLOGY_BOLTS_LATE_TUPLE_STREAM);
        if (lateTupleStream != null) {
            declarer.declareStream(lateTupleStream, new Fields(LATE_TUPLE_FIELD));
        }
        bolt.declareOutputFields(declarer);
    }

    @Override
    public Map<String, Object> getComponentConfiguration() {
        return bolt.getComponentConfiguration();
    }

    //......
}
  • WindowedBoltExecutor实现了IRichBolt接口,在prepare的时候初始化windowedOutputCollector、listener、windowManager,调用了bolt.prepare;在cleanup的时候对waterMarkEventGenerator、windowManager、bolt进行清理;TopologyBuilder在setBolt的时候,对原始的IWindowedBolt的实现类进行了一次包装,用WindowedBoltExecutor替代
  • declareOutputFields采用的是bolt.declareOutputFields(declarer);getComponentConfiguration也返回的是bolt.getComponentConfiguration();
  • execute方法主要是将tuple添加到windowManager,对于不纳入window的tuple则立刻进行ack

WindowedOutputCollector

storm-2.0.0/storm-client/src/jvm/org/apache/storm/topology/WindowedBoltExecutor.java

代码语言:javascript
复制
    /**
     * Creates an {@link OutputCollector} wrapper that automatically anchors the tuples to inputTuples while emitting.
     */
    private static class WindowedOutputCollector extends OutputCollector {
        private List<Tuple> inputTuples;

        WindowedOutputCollector(IOutputCollector delegate) {
            super(delegate);
        }

        void setContext(List<Tuple> inputTuples) {
            this.inputTuples = inputTuples;
        }

        @Override
        public List<Integer> emit(String streamId, List<Object> tuple) {
            return emit(streamId, inputTuples, tuple);
        }

        @Override
        public void emitDirect(int taskId, String streamId, List<Object> tuple) {
            emitDirect(taskId, streamId, inputTuples, tuple);
        }
    }
  • WindowedOutputCollector继承了OutputCollector,可以看到这里重写了emit计emitDirect方法,默认对inputTuples进行anchor

WindowLifecycleListener

storm-2.0.0/storm-client/src/jvm/org/apache/storm/windowing/WindowLifecycleListener.java

代码语言:javascript
复制
/**
 * A callback for expiry, activation of events tracked by the {@link WindowManager}
 *
 * @param <T> The type of Event in the window (e.g. Tuple).
 */
public interface WindowLifecycleListener<T> {
    /**
     * Called on expiry of events from the window due to {@link EvictionPolicy}
     *
     * @param events the expired events
     */
    void onExpiry(List<T> events);

    /**
     * Called on activation of the window due to the {@link TriggerPolicy}
     *
     * @param events        the list of current events in the window.
     * @param newEvents     the newly added events since last activation.
     * @param expired       the expired events since last activation.
     * @param referenceTime the reference (event or processing) time that resulted in activation
     */
    default void onActivation(List<T> events, List<T> newEvents, List<T> expired, Long referenceTime) {
        throw new UnsupportedOperationException("Not implemented");
    }

    /**
     * Called on activation of the window due to the {@link TriggerPolicy}. This is typically invoked when the windows are persisted in
     * state and is huge to be loaded entirely in memory.
     *
     * @param eventsIt      a supplier of iterator over the list of current events in the window
     * @param newEventsIt   a supplier of iterator over the newly added events since the last ativation
     * @param expiredIt     a supplier of iterator over the expired events since the last activation
     * @param referenceTime the reference (event or processing) time that resulted in activation
     */
    default void onActivation(Supplier<Iterator<T>> eventsIt, Supplier<Iterator<T>> newEventsIt, Supplier<Iterator<T>> expiredIt,
                              Long referenceTime) {
        throw new UnsupportedOperationException("Not implemented");
    }
}
  • WindowLifecycleListener定义了几个回调方法,分别是onExpiry、onActivation
  • 它们分别是由EvictionPolicy、TriggerPolicy两种策略来触发

EvictionPolicy

storm-2.0.0/storm-client/src/jvm/org/apache/storm/windowing/EvictionPolicy.java

代码语言:javascript
复制
/**
 * Eviction policy tracks events and decides whether an event should be evicted from the window or not.
 *
 * @param <T> the type of event that is tracked.
 */
public interface EvictionPolicy<T, S> {
    /**
     * Decides if an event should be expired from the window, processed in the current window or kept for later processing.
     *
     * @param event the input event
     * @return the {@link org.apache.storm.windowing.EvictionPolicy.Action} to be taken based on the input event
     */
    Action evict(Event<T> event);

    /**
     * Tracks the event to later decide whether {@link EvictionPolicy#evict(Event)} should evict it or not.
     *
     * @param event the input event to be tracked
     */
    void track(Event<T> event);

    /**
     * Returns the current context that is part of this eviction policy.
     *
     * @return the eviction context
     */
    EvictionContext getContext();

    /**
     * Sets a context in the eviction policy that can be used while evicting the events. E.g. For TimeEvictionPolicy, this could be used to
     * set the reference timestamp.
     *
     * @param context the eviction context
     */
    void setContext(EvictionContext context);

    /**
     * Resets the eviction policy.
     */
    void reset();

    /**
     * Return runtime state to be checkpointed by the framework for restoring the eviction policy in case of failures.
     *
     * @return the state
     */
    S getState();

    /**
     * Restore the eviction policy from the state that was earlier checkpointed by the framework.
     *
     * @param state the state
     */
    void restoreState(S state);

    /**
     * The action to be taken when {@link EvictionPolicy#evict(Event)} is invoked.
     */
    public enum Action {
        /**
         * expire the event and remove it from the queue.
         */
        EXPIRE,
        /**
         * process the event in the current window of events.
         */
        PROCESS,
        /**
         * don't include in the current window but keep the event in the queue for evaluating as a part of future windows.
         */
        KEEP,
        /**
         * stop processing the queue, there cannot be anymore events satisfying the eviction policy.
         */
        STOP
    }
}
  • EvictionPolicy主要负责追踪event,然后判断event是否该从window中移除
  • EvictionPolicy有几个实现类:CountEvictionPolicy、TimeEvictionPolicy、WatermarkCountEvictionPolicy、WatermarkTimeEvictionPolicy

TriggerPolicy

storm-2.0.0/storm-client/src/jvm/org/apache/storm/windowing/TriggerPolicy.java

代码语言:javascript
复制
/**
 * Triggers the window calculations based on the policy.
 *
 * @param <T> the type of the event that is tracked
 */
public interface TriggerPolicy<T, S> {
    /**
     * Tracks the event and could use this to invoke the trigger.
     *
     * @param event the input event
     */
    void track(Event<T> event);

    /**
     * resets the trigger policy.
     */
    void reset();

    /**
     * Starts the trigger policy. This can be used during recovery to start the triggers after recovery is complete.
     */
    void start();

    /**
     * Any clean up could be handled here.
     */
    void shutdown();

    /**
     * Return runtime state to be checkpointed by the framework for restoring the trigger policy in case of failures.
     *
     * @return the state
     */
    S getState();

    /**
     * Restore the trigger policy from the state that was earlier checkpointed by the framework.
     *
     * @param state the state
     */
    void restoreState(S state);
}
  • TriggerPolicy主要是负责window的计算
  • TriggerPolicy有几个实现类:CountTriggerPolicy、TimeTriggerPolicy、WatermarkCountTriggerPolicy、WatermarkTimeTriggerPolicy

WindowedBoltExecutor.newWindowLifecycleListener

storm-2.0.0/storm-client/src/jvm/org/apache/storm/topology/WindowedBoltExecutor.java

代码语言:javascript
复制
    protected WindowLifecycleListener<Tuple> newWindowLifecycleListener() {
        return new WindowLifecycleListener<Tuple>() {
            @Override
            public void onExpiry(List<Tuple> tuples) {
                for (Tuple tuple : tuples) {
                    windowedOutputCollector.ack(tuple);
                }
            }

            @Override
            public void onActivation(List<Tuple> tuples, List<Tuple> newTuples, List<Tuple> expiredTuples, Long timestamp) {
                windowedOutputCollector.setContext(tuples);
                boltExecute(tuples, newTuples, expiredTuples, timestamp);
            }

        };
    }

    protected void boltExecute(List<Tuple> tuples, List<Tuple> newTuples, List<Tuple> expiredTuples, Long timestamp) {
        bolt.execute(new TupleWindowImpl(tuples, newTuples, expiredTuples, getWindowStartTs(timestamp), timestamp));
    }
  • 这里创建了一个匿名的WindowLifecycleListener实现
  • 在onExpiry的时候挨个对tuple进行ack,在onActivation的时候,调用了boltExecute,构造TupleWindowImpl,传递给bolt进行执行

WindowedBoltExecutor.initWindowManager

storm-2.0.0/storm-client/src/jvm/org/apache/storm/topology/WindowedBoltExecutor.java

代码语言:javascript
复制
    private WindowManager<Tuple> initWindowManager(WindowLifecycleListener<Tuple> lifecycleListener, Map<String, Object> topoConf,
                                                   TopologyContext context, Collection<Event<Tuple>> queue, boolean stateful) {

        WindowManager<Tuple> manager = stateful ?
            new StatefulWindowManager<>(lifecycleListener, queue)
            : new WindowManager<>(lifecycleListener, queue);

        Count windowLengthCount = null;
        Duration slidingIntervalDuration = null;
        Count slidingIntervalCount = null;
        // window length
        if (topoConf.containsKey(Config.TOPOLOGY_BOLTS_WINDOW_LENGTH_COUNT)) {
            windowLengthCount = new Count(((Number) topoConf.get(Config.TOPOLOGY_BOLTS_WINDOW_LENGTH_COUNT)).intValue());
        } else if (topoConf.containsKey(Config.TOPOLOGY_BOLTS_WINDOW_LENGTH_DURATION_MS)) {
            windowLengthDuration = new Duration(
                ((Number) topoConf.get(Config.TOPOLOGY_BOLTS_WINDOW_LENGTH_DURATION_MS)).intValue(),
                TimeUnit.MILLISECONDS);
        }
        // sliding interval
        if (topoConf.containsKey(Config.TOPOLOGY_BOLTS_SLIDING_INTERVAL_COUNT)) {
            slidingIntervalCount = new Count(((Number) topoConf.get(Config.TOPOLOGY_BOLTS_SLIDING_INTERVAL_COUNT)).intValue());
        } else if (topoConf.containsKey(Config.TOPOLOGY_BOLTS_SLIDING_INTERVAL_DURATION_MS)) {
            slidingIntervalDuration =
                new Duration(((Number) topoConf.get(Config.TOPOLOGY_BOLTS_SLIDING_INTERVAL_DURATION_MS)).intValue(), TimeUnit.MILLISECONDS);
        } else {
            // default is a sliding window of count 1
            slidingIntervalCount = new Count(1);
        }
        // tuple ts
        if (timestampExtractor != null) {
            // late tuple stream
            lateTupleStream = (String) topoConf.get(Config.TOPOLOGY_BOLTS_LATE_TUPLE_STREAM);
            if (lateTupleStream != null) {
                if (!context.getThisStreams().contains(lateTupleStream)) {
                    throw new IllegalArgumentException(
                        "Stream for late tuples must be defined with the builder method withLateTupleStream");
                }
            }
            // max lag
            if (topoConf.containsKey(Config.TOPOLOGY_BOLTS_TUPLE_TIMESTAMP_MAX_LAG_MS)) {
                maxLagMs = ((Number) topoConf.get(Config.TOPOLOGY_BOLTS_TUPLE_TIMESTAMP_MAX_LAG_MS)).intValue();
            } else {
                maxLagMs = DEFAULT_MAX_LAG_MS;
            }
            // watermark interval
            int watermarkInterval;
            if (topoConf.containsKey(Config.TOPOLOGY_BOLTS_WATERMARK_EVENT_INTERVAL_MS)) {
                watermarkInterval = ((Number) topoConf.get(Config.TOPOLOGY_BOLTS_WATERMARK_EVENT_INTERVAL_MS)).intValue();
            } else {
                watermarkInterval = DEFAULT_WATERMARK_EVENT_INTERVAL_MS;
            }
            waterMarkEventGenerator = new WaterMarkEventGenerator<>(manager, watermarkInterval,
                                                                    maxLagMs, getComponentStreams(context));
        } else {
            if (topoConf.containsKey(Config.TOPOLOGY_BOLTS_LATE_TUPLE_STREAM)) {
                throw new IllegalArgumentException("Late tuple stream can be defined only when specifying a timestamp field");
            }
        }
        // validate
        validate(topoConf, windowLengthCount, windowLengthDuration,
                 slidingIntervalCount, slidingIntervalDuration);
        evictionPolicy = getEvictionPolicy(windowLengthCount, windowLengthDuration);
        triggerPolicy = getTriggerPolicy(slidingIntervalCount, slidingIntervalDuration,
                                         manager, evictionPolicy);
        manager.setEvictionPolicy(evictionPolicy);
        manager.setTriggerPolicy(triggerPolicy);
        return manager;
    }

    private EvictionPolicy<Tuple, ?> getEvictionPolicy(Count windowLengthCount, Duration windowLengthDuration) {
        if (windowLengthCount != null) {
            if (isTupleTs()) {
                return new WatermarkCountEvictionPolicy<>(windowLengthCount.value);
            } else {
                return new CountEvictionPolicy<>(windowLengthCount.value);
            }
        } else {
            if (isTupleTs()) {
                return new WatermarkTimeEvictionPolicy<>(windowLengthDuration.value, maxLagMs);
            } else {
                return new TimeEvictionPolicy<>(windowLengthDuration.value);
            }
        }
    }

    private TriggerPolicy<Tuple, ?> getTriggerPolicy(Count slidingIntervalCount, Duration slidingIntervalDuration,
                                                     WindowManager<Tuple> manager, EvictionPolicy<Tuple, ?> evictionPolicy) {
        if (slidingIntervalCount != null) {
            if (isTupleTs()) {
                return new WatermarkCountTriggerPolicy<>(slidingIntervalCount.value, manager, evictionPolicy, manager);
            } else {
                return new CountTriggerPolicy<>(slidingIntervalCount.value, manager, evictionPolicy);
            }
        } else {
            if (isTupleTs()) {
                return new WatermarkTimeTriggerPolicy<>(slidingIntervalDuration.value, manager, evictionPolicy, manager);
            } else {
                return new TimeTriggerPolicy<>(slidingIntervalDuration.value, manager, evictionPolicy);
            }
        }
    }
  • 对于WindowedBoltExecutor来说,stateful为false,这里创建的是WindowManager
  • 这里默认的DEFAULT_MAX_LAG_MS为0,即没有lag,默认的DEFAULT_WATERMARK_EVENT_INTERVAL_MS为1000,即1秒
  • 这里根据windowLength及slidingInterval指定的参数类型,来获取相应的EvictionPolicy及TriggerPolicy,对于有配置timestampField的,参数是Duration的,则创建的是WatermarkTimeEvictionPolicy以及WatermarkTimeTriggerPolicy

WindowManager

storm-2.0.0/storm-client/src/jvm/org/apache/storm/windowing/WindowManager.java

代码语言:javascript
复制
/**
 * Tracks a window of events and fires {@link WindowLifecycleListener} callbacks on expiry of events or activation of the window due to
 * {@link TriggerPolicy}.
 *
 * @param <T> the type of event in the window.
 */
public class WindowManager<T> implements TriggerHandler {

    protected final Collection<Event<T>> queue;

    private final AtomicInteger eventsSinceLastExpiry;

    //......
    /**
     * Add an event into the window, with the given ts as the tracking ts.
     *
     * @param event the event to track
     * @param ts    the timestamp
     */
    public void add(T event, long ts) {
        add(new EventImpl<T>(event, ts));
    }

    /**
     * Tracks a window event
     *
     * @param windowEvent the window event to track
     */
    public void add(Event<T> windowEvent) {
        // watermark events are not added to the queue.
        if (!windowEvent.isWatermark()) {
            queue.add(windowEvent);
        } else {
            LOG.debug("Got watermark event with ts {}", windowEvent.getTimestamp());
        }
        track(windowEvent);
        compactWindow();
    }

    /**
     * feed the event to the eviction and trigger policies for bookkeeping and optionally firing the trigger.
     */
    private void track(Event<T> windowEvent) {
        evictionPolicy.track(windowEvent);
        triggerPolicy.track(windowEvent);
    }

    /**
     * expires events that fall out of the window every EXPIRE_EVENTS_THRESHOLD so that the window does not grow too big.
     */
    protected void compactWindow() {
        if (eventsSinceLastExpiry.incrementAndGet() >= EXPIRE_EVENTS_THRESHOLD) {
            scanEvents(false);
        }
    }

    /**
     * Scan events in the queue, using the expiration policy to check if the event should be evicted or not.
     *
     * @param fullScan if set, will scan the entire queue; if not set, will stop as soon as an event not satisfying the expiration policy is
     *                 found
     * @return the list of events to be processed as a part of the current window
     */
    private List<Event<T>> scanEvents(boolean fullScan) {
        LOG.debug("Scan events, eviction policy {}", evictionPolicy);
        List<T> eventsToExpire = new ArrayList<>();
        List<Event<T>> eventsToProcess = new ArrayList<>();
        try {
            lock.lock();
            Iterator<Event<T>> it = queue.iterator();
            while (it.hasNext()) {
                Event<T> windowEvent = it.next();
                Action action = evictionPolicy.evict(windowEvent);
                if (action == EXPIRE) {
                    eventsToExpire.add(windowEvent.get());
                    it.remove();
                } else if (!fullScan || action == STOP) {
                    break;
                } else if (action == PROCESS) {
                    eventsToProcess.add(windowEvent);
                }
            }
            expiredEvents.addAll(eventsToExpire);
        } finally {
            lock.unlock();
        }
        eventsSinceLastExpiry.set(0);
        LOG.debug("[{}] events expired from window.", eventsToExpire.size());
        if (!eventsToExpire.isEmpty()) {
            LOG.debug("invoking windowLifecycleListener.onExpiry");
            windowLifecycleListener.onExpiry(eventsToExpire);
        }
        return eventsToProcess;
    }

    //......
}
  • WindowedBoltExecutor的execute主要是将tuple添加到windowManager
  • EventImpl的isWatermark返回false,这里主要是执行track及compactWindow操作
  • track主要是委托给evictionPolicy以及triggerPolicy进行track,compactWindow在events超过指定阈值的时候,会触发scanEvents,不是fullScan的话,检测到一个非过期的event就跳出遍历,然后检测eventsToExpire是否为空如果有则触发windowLifecycleListener.onExpiry(eventsToExpire);

WaterMarkEventGenerator

storm-2.0.0/storm-client/src/jvm/org/apache/storm/windowing/WaterMarkEventGenerator.java

代码语言:javascript
复制
/**
 * Tracks tuples across input streams and periodically emits watermark events. Watermark event timestamp is the minimum of the latest tuple
 * timestamps across all the input streams (minus the lag). Once a watermark event is emitted any tuple coming with an earlier timestamp can
 * be considered as late events.
 */
public class WaterMarkEventGenerator<T> implements Runnable {
    private static final Logger LOG = LoggerFactory.getLogger(WaterMarkEventGenerator.class);
    private final WindowManager<T> windowManager;
    private final int eventTsLag;
    private final Set<GlobalStreamId> inputStreams;
    private final Map<GlobalStreamId, Long> streamToTs;
    private final ScheduledExecutorService executorService;
    private final int interval;
    private ScheduledFuture<?> executorFuture;
    private volatile long lastWaterMarkTs;

    //......

    public void start() {
        this.executorFuture = executorService.scheduleAtFixedRate(this, interval, interval, TimeUnit.MILLISECONDS);
    }

    @Override
    public void run() {
        try {
            long waterMarkTs = computeWaterMarkTs();
            if (waterMarkTs > lastWaterMarkTs) {
                this.windowManager.add(new WaterMarkEvent<>(waterMarkTs));
                lastWaterMarkTs = waterMarkTs;
            }
        } catch (Throwable th) {
            LOG.error("Failed while processing watermark event ", th);
            throw th;
        }
    }
}
  • WindowedBoltExecutor在start的时候会调用WaterMarkEventGenerator的start方法
  • 该方法每隔watermarkInterval时间调度WaterMarkEventGenerator这个任务
  • 其run方法就是计算watermark(这批数据最小值-lag),当大于lastWaterMarkTs时,更新lastWaterMarkTs,往windowManager添加WaterMarkEvent(该event的isWatermark为true)
  • windowManager.add(new WaterMarkEvent<>(waterMarkTs))会触发triggerPolicy.track(windowEvent)以及compactWindow操作

WatermarkTimeTriggerPolicy.track

storm-2.0.0/storm-client/src/jvm/org/apache/storm/windowing/WatermarkTimeTriggerPolicy.java

代码语言:javascript
复制
    @Override
    public void track(Event<T> event) {
        if (started && event.isWatermark()) {
            handleWaterMarkEvent(event);
        }
    }

    /**
     * Invokes the trigger all pending windows up to the watermark timestamp. The end ts of the window is set in the eviction policy context
     * so that the events falling within that window can be processed.
     */
    private void handleWaterMarkEvent(Event<T> event) {
        long watermarkTs = event.getTimestamp();
        long windowEndTs = nextWindowEndTs;
        LOG.debug("Window end ts {} Watermark ts {}", windowEndTs, watermarkTs);
        while (windowEndTs <= watermarkTs) {
            long currentCount = windowManager.getEventCount(windowEndTs);
            evictionPolicy.setContext(new DefaultEvictionContext(windowEndTs, currentCount));
            if (handler.onTrigger()) {
                windowEndTs += slidingIntervalMs;
            } else {
                /*
                 * No events were found in the previous window interval.
                 * Scan through the events in the queue to find the next
                 * window intervals based on event ts.
                 */
                long ts = getNextAlignedWindowTs(windowEndTs, watermarkTs);
                LOG.debug("Next aligned window end ts {}", ts);
                if (ts == Long.MAX_VALUE) {
                    LOG.debug("No events to process between {} and watermark ts {}", windowEndTs, watermarkTs);
                    break;
                }
                windowEndTs = ts;
            }
        }
        nextWindowEndTs = windowEndTs;
    }

    /**
     * Computes the next window by scanning the events in the window and finds the next aligned window between the startTs and endTs. Return
     * the end ts of the next aligned window, i.e. the ts when the window should fire.
     *
     * @param startTs the start timestamp (excluding)
     * @param endTs   the end timestamp (including)
     * @return the aligned window end ts for the next window or Long.MAX_VALUE if there are no more events to be processed.
     */
    private long getNextAlignedWindowTs(long startTs, long endTs) {
        long nextTs = windowManager.getEarliestEventTs(startTs, endTs);
        if (nextTs == Long.MAX_VALUE || (nextTs % slidingIntervalMs == 0)) {
            return nextTs;
        }
        return nextTs + (slidingIntervalMs - (nextTs % slidingIntervalMs));
    }
  • handleWaterMarkEvent会触发handler.onTrigger()方法

WindowManager.onTrigger

storm-2.0.0/storm-client/src/jvm/org/apache/storm/windowing/WindowManager.java

代码语言:javascript
复制
    /**
     * The callback invoked by the trigger policy.
     */
    @Override
    public boolean onTrigger() {
        List<Event<T>> windowEvents = null;
        List<T> expired = null;
        try {
            lock.lock();
            /*
             * scan the entire window to handle out of order events in
             * the case of time based windows.
             */
            windowEvents = scanEvents(true);
            expired = new ArrayList<>(expiredEvents);
            expiredEvents.clear();
        } finally {
            lock.unlock();
        }
        List<T> events = new ArrayList<>();
        List<T> newEvents = new ArrayList<>();
        for (Event<T> event : windowEvents) {
            events.add(event.get());
            if (!prevWindowEvents.contains(event)) {
                newEvents.add(event.get());
            }
        }
        prevWindowEvents.clear();
        if (!events.isEmpty()) {
            prevWindowEvents.addAll(windowEvents);
            LOG.debug("invoking windowLifecycleListener onActivation, [{}] events in window.", events.size());
            windowLifecycleListener.onActivation(events, newEvents, expired, evictionPolicy.getContext().getReferenceTime());
        } else {
            LOG.debug("No events in the window, skipping onActivation");
        }
        triggerPolicy.reset();
        return !events.isEmpty();
    }
  • onTrigger方法主要是计算出三类数据,events、expiredEvents、newEvents
  • 当events不为空时,触发windowLifecycleListener.onActivation,也就是调用bolt的execute方法

小结

  • WindowedBoltExecutor实现了IRichBolt接口,是一个bolt,TopologyBuilder在setBolt的时候,对用户的IWindowedBolt的实现类进行了一次包装,用WindowedBoltExecutor替代,它改造了execute方法,对于该纳入windows的调用windowManager.add添加,该丢弃的则进行ack,而真正的bolt的execute操作,则需要等待window的触发
  • WindowLifecycleListener有两个回调操作,一个是由EvictionPolicy触发的onExpiry,一个是由TriggerPolicy触发的onActivation操作
  • 由于window的windowLength及slidingInterval参数有Duration及Count两个维度,因而EvictionPolicy及TriggerPolicy也有这两类维度,外加watermark属性,因而每个policy分别有4个实现类,EvictionPolicy有几个实现类:CountEvictionPolicy、TimeEvictionPolicy、WatermarkCountEvictionPolicy、WatermarkTimeEvictionPolicy;TriggerPolicy有几个实现类:CountTriggerPolicy、TimeTriggerPolicy、WatermarkCountTriggerPolicy、WatermarkTimeTriggerPolicy
  • windowManager.add除了把tuple保存起来外,还调用了两类trigger的track操作,然后进行compactWindow操作;WatermarkTimeEvictionPolicy的track目前没有操作,而WatermarkTimeTriggerPolicy的track方法在event是WaterMarkEvent的时候会触发window操作,调用WindowManager的onTrigger方法,进而筛选出window的数据,然后触发windowLifecycleListener.onActivation操作,最后触发windowedBolt的execute方法
  • WindowManager的onTrigger方法以及add方法都会调用scanEvents,区别是前者是fullScan,后者不是;scanEvents会调用evictionPolicy.evict来判断是否该剔除tuple,进而触发windowLifecycleListener.onExpiry操作,该操作会对tuple进行ack,即过期的tuple在expired的时候会自动ack(理论上所有tuple都会过期,也就都会自动被ack,因而要求Config.TOPOLOGY_MESSAGE_TIMEOUT_SECS大于windowLength + slidingInterval,避免还没ack就被认为超时)
  • WindowedBoltExecutor在start的时候会启动WaterMarkEventGenerator,它会注册一个定时任务,每隔watermarkInterval时间计算watermark(这批数据最小值-lag),当大于lastWaterMarkTs时,更新lastWaterMarkTs,往windowManager添加WaterMarkEvent(该event的isWatermark为true),整个WindowManager的onTrigger方法(即windowLifecycleListener.onActivation操作)就是靠这里来触发的
  • 关于ack的话,在WindowedBoltExecutor.execute方法对于未能进入window队列的,没有配置配置Config.TOPOLOGY_BOLTS_LATE_TUPLE_STREAM的话,则立马ack;在tuple过期的时候会自ack;WindowedBoltExecutor使用了WindowedOutputCollector,它继承了OutputCollector,对输入的tuples做anchor操作

doc

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • WindowedBoltExecutor
  • WindowedOutputCollector
  • WindowLifecycleListener
    • EvictionPolicy
      • TriggerPolicy
      • WindowedBoltExecutor.newWindowLifecycleListener
      • WindowedBoltExecutor.initWindowManager
      • WindowManager
      • WaterMarkEventGenerator
        • WatermarkTimeTriggerPolicy.track
          • WindowManager.onTrigger
          • 小结
          • doc
          领券
          问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档