首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >聊聊storm的WindowedBolt

聊聊storm的WindowedBolt

原创
作者头像
code4it
发布2018-10-24 10:27:39
8350
发布2018-10-24 10:27:39
举报
文章被收录于专栏:码匠的流水账码匠的流水账

本文主要研究一下storm的WindowedBolt

实例

    @Test
    public void testSlidingTupleTsTopology() throws InvalidTopologyException, AuthorizationException, AlreadyAliveException {
        TopologyBuilder builder = new TopologyBuilder();
        builder.setSpout("integer", new RandomIntegerSpout(), 1);
        BaseWindowedBolt baseWindowedBolt = new SlidingWindowSumBolt()
                //windowLength , slidingInterval
                .withWindow(new BaseWindowedBolt.Duration(5, TimeUnit.SECONDS), new BaseWindowedBolt.Duration(3, TimeUnit.SECONDS))
                //通过withTimestampField指定tuple的某个字段作为这个tuple的timestamp
                .withTimestampField("timestamp")
                //输入流中最新的元组时间戳的最小值减去Lag值=watermark,用于指定触发watermark的的interval,默认为1秒
                //当watermark被触发的时候,tuple timestamp比watermark早的window将被计算
                .withWatermarkInterval(new BaseWindowedBolt.Duration(1, TimeUnit.SECONDS))
                //withLag用于处理乱序的数据,当接收到的tuple的timestamp小于等lastWaterMarkTs(`取这批watermark的最大值`),则会被丢弃
                .withLag(new BaseWindowedBolt.Duration(5, TimeUnit.SECONDS));
        builder.setBolt("slidingSum", baseWindowedBolt, 1).shuffleGrouping("integer");
        builder.setBolt("printer", new PrinterBolt(), 1).shuffleGrouping("slidingSum");
        SubmitHelper.submitRemote("slideWindowTopology",builder.createTopology());
    }
  • 这里主要设置了withWindow、withTimestampField、withWatermarkInterval、withLag
  • SlidingWindowSumBolt
public class SlidingWindowSumBolt extends BaseWindowedBolt {
 private static final Logger LOG = LoggerFactory.getLogger(SlidingWindowSumBolt.class);
 private int sum = 0;
 private OutputCollector collector;
 @Override
 public void prepare(Map topoConf, TopologyContext context, OutputCollector collector) {
 this.collector = collector;
 }
 @Override
 public void execute(TupleWindow inputWindow) {
 /*
 * The inputWindow gives a view of
 * (a) all the events in the window
 * (b) events that expired since last activation of the window
 * (c) events that newly arrived since last activation of the window
 */
 List<Tuple> tuplesInWindow = inputWindow.get();
 List<Tuple> newTuples = inputWindow.getNew();
 List<Tuple> expiredTuples = inputWindow.getExpired();
 LOG.debug("Events in current window: " + tuplesInWindow.size());
 /*
 * Instead of iterating over all the tuples in the window to compute
 * the sum, the values for the new events are added and old events are
 * subtracted. Similar optimizations might be possible in other
 * windowing computations.
 */
 for (Tuple tuple : newTuples) {
 sum += (int) tuple.getValue(0);
 }
 for (Tuple tuple : expiredTuples) {
 sum -= (int) tuple.getValue(0);
 }
 collector.emit(new Values(sum));
 }
 @Override
 public void declareOutputFields(OutputFieldsDeclarer declarer) {
 declarer.declare(new Fields("sum"));
 }
}
  • TupleWindow可以获取三类值,一类是当前窗口里头的所有数据,一类是上次窗口后新到达的数据,一类是过期的数据

WindowedBolt

IWindowedBolt

storm-2.0.0/storm-client/src/jvm/org/apache/storm/topology/IWindowedBolt.java

/**
 * A bolt abstraction for supporting time and count based sliding & tumbling windows.
 */
public interface IWindowedBolt extends IComponent {
    /**
     * This is similar to the {@link org.apache.storm.task.IBolt#prepare(Map, TopologyContext, OutputCollector)} except that while emitting,
     * the tuples are automatically anchored to the tuples in the inputWindow.
     */
    void prepare(Map<String, Object> topoConf, TopologyContext context, OutputCollector collector);

    /**
     * Process the tuple window and optionally emit new tuples based on the tuples in the input window.
     */
    void execute(TupleWindow inputWindow);

    void cleanup();

    /**
     * Return a {@link TimestampExtractor} for extracting timestamps from a tuple for event time based processing, or null for processing
     * time.
     *
     * @return the timestamp extractor
     */
    TimestampExtractor getTimestampExtractor();
}
  • IWindowedBolt是无状态的,也就是window的数据都存在内存中
  • IWindowedBolt接口有个抽象实现类BaseWindowedBolt,其子类有BaseStatefulWindowedBolt、JoinBolt

IStatefulWindowedBolt

storm-2.0.0/storm-client/src/jvm/org/apache/storm/topology/IStatefulWindowedBolt.java

/**
 * A windowed bolt abstraction for supporting windowing operation with state.
 */
public interface IStatefulWindowedBolt<T extends State> extends IStatefulComponent<T>, IWindowedBolt {
    /**
     * If the stateful windowed bolt should have its windows persisted in state and maintain a subset of events in memory.
     * <p>
     * The default is to keep all the window events in memory.
     * </p>
     *
     * @return true if the windows should be persisted
     */
    default boolean isPersistent() {
        return false;
    }

    /**
     * The maximum number of window events to keep in memory.
     */
    default long maxEventsInMemory() {
        return 1_000_000L; // default
    }
}
  • 在1.2.2版本IStatefulWindowedBolt没有定义任何方法,2.0.0版本定义了两个default方法,一个是isPersistent,一个是maxEventsInMemory
  • isPersistent决定创建的是PersistentWindowedBoltExecutor还是StatefulWindowedBoltExecutor
  • maxEventsInMemory决定WindowState保留多少数据在内存,其余的移到KeyValueState(HBaseKeyValueState、InMemoryKeyValueState、RedisKeyValueState)中
  • IStatefulWindowedBolt接口有个抽象实现类BaseStatefulWindowedBolt

withWindow与withTumblingWindow

storm-2.0.0/storm-client/src/jvm/org/apache/storm/topology/base/BaseWindowedBolt.java

    /**
     * Tuple count based sliding window configuration.
     *
     * @param windowLength    the number of tuples in the window
     * @param slidingInterval the number of tuples after which the window slides
     */
    public BaseWindowedBolt withWindow(Count windowLength, Count slidingInterval) {
        return withWindowLength(windowLength).withSlidingInterval(slidingInterval);
    }

    /**
     * Time duration based sliding window configuration.
     *
     * @param windowLength    the time duration of the window
     * @param slidingInterval the time duration after which the window slides
     */
    public BaseWindowedBolt withWindow(Duration windowLength, Duration slidingInterval) {
        return withWindowLength(windowLength).withSlidingInterval(slidingInterval);
    }

    /**
     * A time duration based tumbling window.
     *
     * @param duration the time duration after which the window tumbles
     */
    public BaseWindowedBolt withTumblingWindow(Duration duration) {
        return withWindowLength(duration).withSlidingInterval(duration);
    }

    /**
     * A count based tumbling window.
     *
     * @param count the number of tuples after which the window tumbles
     */
    public BaseWindowedBolt withTumblingWindow(Count count) {
        return withWindowLength(count).withSlidingInterval(count);
    }
  • BaseWindowedBolt抽象类定义了诸多withWindow方法,该方法主要定义windowLength及slidingIntervals参数,而该参数有两个维度,一个是Duration,一个是Count
  • withWindow即sliding window,而withTumblingWindow则是tumbling window
  • 从方法定义可以看到withTumblingWindow的windowLength及slidingInterval参数值相同

WindowedBoltExecutor

storm-2.0.0/storm-client/src/jvm/org/apache/storm/topology/WindowedBoltExecutor.java

public class WindowedBoltExecutor implements IRichBolt {
    public static final String LATE_TUPLE_FIELD = "late_tuple";
    private static final Logger LOG = LoggerFactory.getLogger(WindowedBoltExecutor.class);
    private static final int DEFAULT_WATERMARK_EVENT_INTERVAL_MS = 1000; // 1s
    private static final int DEFAULT_MAX_LAG_MS = 0; // no lag

    //......

    private WindowManager<Tuple> initWindowManager(WindowLifecycleListener<Tuple> lifecycleListener, Map<String, Object> topoConf,
                                                   TopologyContext context, Collection<Event<Tuple>> queue, boolean stateful) {

        WindowManager<Tuple> manager = stateful ?
            new StatefulWindowManager<>(lifecycleListener, queue)
            : new WindowManager<>(lifecycleListener, queue);

        Count windowLengthCount = null;
        Duration slidingIntervalDuration = null;
        Count slidingIntervalCount = null;
        // window length
        if (topoConf.containsKey(Config.TOPOLOGY_BOLTS_WINDOW_LENGTH_COUNT)) {
            windowLengthCount = new Count(((Number) topoConf.get(Config.TOPOLOGY_BOLTS_WINDOW_LENGTH_COUNT)).intValue());
        } else if (topoConf.containsKey(Config.TOPOLOGY_BOLTS_WINDOW_LENGTH_DURATION_MS)) {
            windowLengthDuration = new Duration(
                ((Number) topoConf.get(Config.TOPOLOGY_BOLTS_WINDOW_LENGTH_DURATION_MS)).intValue(),
                TimeUnit.MILLISECONDS);
        }
        // sliding interval
        if (topoConf.containsKey(Config.TOPOLOGY_BOLTS_SLIDING_INTERVAL_COUNT)) {
            slidingIntervalCount = new Count(((Number) topoConf.get(Config.TOPOLOGY_BOLTS_SLIDING_INTERVAL_COUNT)).intValue());
        } else if (topoConf.containsKey(Config.TOPOLOGY_BOLTS_SLIDING_INTERVAL_DURATION_MS)) {
            slidingIntervalDuration =
                new Duration(((Number) topoConf.get(Config.TOPOLOGY_BOLTS_SLIDING_INTERVAL_DURATION_MS)).intValue(), TimeUnit.MILLISECONDS);
        } else {
            // default is a sliding window of count 1
            slidingIntervalCount = new Count(1);
        }
        // tuple ts
        if (timestampExtractor != null) {
            // late tuple stream
            lateTupleStream = (String) topoConf.get(Config.TOPOLOGY_BOLTS_LATE_TUPLE_STREAM);
            if (lateTupleStream != null) {
                if (!context.getThisStreams().contains(lateTupleStream)) {
                    throw new IllegalArgumentException(
                        "Stream for late tuples must be defined with the builder method withLateTupleStream");
                }
            }
            // max lag
            if (topoConf.containsKey(Config.TOPOLOGY_BOLTS_TUPLE_TIMESTAMP_MAX_LAG_MS)) {
                maxLagMs = ((Number) topoConf.get(Config.TOPOLOGY_BOLTS_TUPLE_TIMESTAMP_MAX_LAG_MS)).intValue();
            } else {
                maxLagMs = DEFAULT_MAX_LAG_MS;
            }
            // watermark interval
            int watermarkInterval;
            if (topoConf.containsKey(Config.TOPOLOGY_BOLTS_WATERMARK_EVENT_INTERVAL_MS)) {
                watermarkInterval = ((Number) topoConf.get(Config.TOPOLOGY_BOLTS_WATERMARK_EVENT_INTERVAL_MS)).intValue();
            } else {
                watermarkInterval = DEFAULT_WATERMARK_EVENT_INTERVAL_MS;
            }
            waterMarkEventGenerator = new WaterMarkEventGenerator<>(manager, watermarkInterval,
                                                                    maxLagMs, getComponentStreams(context));
        } else {
            if (topoConf.containsKey(Config.TOPOLOGY_BOLTS_LATE_TUPLE_STREAM)) {
                throw new IllegalArgumentException("Late tuple stream can be defined only when specifying a timestamp field");
            }
        }
        // validate
        validate(topoConf, windowLengthCount, windowLengthDuration,
                 slidingIntervalCount, slidingIntervalDuration);
        evictionPolicy = getEvictionPolicy(windowLengthCount, windowLengthDuration);
        triggerPolicy = getTriggerPolicy(slidingIntervalCount, slidingIntervalDuration,
                                         manager, evictionPolicy);
        manager.setEvictionPolicy(evictionPolicy);
        manager.setTriggerPolicy(triggerPolicy);
        return manager;
    }

    @Override
    public void execute(Tuple input) {
        if (isTupleTs()) {
            long ts = timestampExtractor.extractTimestamp(input);
            if (waterMarkEventGenerator.track(input.getSourceGlobalStreamId(), ts)) {
                windowManager.add(input, ts);
            } else {
                if (lateTupleStream != null) {
                    windowedOutputCollector.emit(lateTupleStream, input, new Values(input));
                } else {
                    LOG.info("Received a late tuple {} with ts {}. This will not be processed.", input, ts);
                }
                windowedOutputCollector.ack(input);
            }
        } else {
            windowManager.add(input);
        }
    }
  • initWindowManager会读取maxLags的值,默认为0,即没有lag,之后创建WaterMarkEventGenerator的时候传入了maxLags参数
  • 如果waterMarkEventGenerator.track方法返回false,且没有配置Config.TOPOLOGY_BOLTS_LATE_TUPLE_STREAM的话,则会打印log,其format为Received a late tuple {} with ts {}. This will not be processed.

WaterMarkEventGenerator

storm-2.0.0/storm-client/src/jvm/org/apache/storm/windowing/WaterMarkEventGenerator.java

public class WaterMarkEventGenerator<T> implements Runnable {

    /**
     * Creates a new WatermarkEventGenerator.
     *
     * @param windowManager The window manager this generator will submit watermark events to
     * @param intervalMs    The generator will check if it should generate a watermark event with this interval
     * @param eventTsLagMs  The max allowed lag behind the last watermark event before an event is considered late
     * @param inputStreams  The input streams this generator is expected to handle
     */
    public WaterMarkEventGenerator(WindowManager<T> windowManager, int intervalMs,
                                   int eventTsLagMs, Set<GlobalStreamId> inputStreams) {
        this.windowManager = windowManager;
        streamToTs = new ConcurrentHashMap<>();

        ThreadFactory threadFactory = new ThreadFactoryBuilder()
            .setNameFormat("watermark-event-generator-%d")
            .setDaemon(true)
            .build();
        executorService = Executors.newSingleThreadScheduledExecutor(threadFactory);

        this.interval = intervalMs;
        this.eventTsLag = eventTsLagMs;
        this.inputStreams = inputStreams;
    }

    public void start() {
        this.executorFuture = executorService.scheduleAtFixedRate(this, interval, interval, TimeUnit.MILLISECONDS);
    }

    //......

    /**
     * Tracks the timestamp of the event in the stream, returns true if the event can be considered for processing or false if its a late
     * event.
     */
    public boolean track(GlobalStreamId stream, long ts) {
        Long currentVal = streamToTs.get(stream);
        if (currentVal == null || ts > currentVal) {
            streamToTs.put(stream, ts);
        }
        checkFailures();
        return ts >= lastWaterMarkTs;
    }

    @Override
    public void run() {
        try {
            long waterMarkTs = computeWaterMarkTs();
            if (waterMarkTs > lastWaterMarkTs) {
                this.windowManager.add(new WaterMarkEvent<>(waterMarkTs));
                lastWaterMarkTs = waterMarkTs;
            }
        } catch (Throwable th) {
            LOG.error("Failed while processing watermark event ", th);
            throw th;
        }
    }

    /**
     * Computes the min ts across all streams.
     */
    private long computeWaterMarkTs() {
        long ts = 0;
        // only if some data has arrived on each input stream
        if (streamToTs.size() >= inputStreams.size()) {
            ts = Long.MAX_VALUE;
            for (Map.Entry<GlobalStreamId, Long> entry : streamToTs.entrySet()) {
                ts = Math.min(ts, entry.getValue());
            }
        }
        return ts - eventTsLag;
    }
}
  • track方法根据tuple的时间戳与lastWaterMarkTs判断,是否需要处理该tuple
  • lastWaterMarkTs在WaterMarkEventGenerator的run方法里头被更新,computeWaterMarkTs方法先计算streamToTs这批tuple的最小时间戳,然后减去eventTsLag,就是waterMarkTs值
  • 如果waterMarkTs大于lastWaterMarkTs,则更新,也就是说WaterMarkEventGenerator的run方法不断计算waterMarkTs,然后保证lastWaterMarkTs取waterMarkTs的最大值
  • WaterMarkEventGenerator在start方法里头触发一个定时调度任务,其时间间隔正是watermarkInterval,也就是run方法每隔watermarkInterval时间被执行一次

小结

  • storm的WindowedBolt分为IWindowedBolt及IStatefulWindowedBolt,一个是无状态的,一个是有状态的
  • window有两个重要的参数,一个是windowLength,一个是slidingInterval,它们有两个维度,一个是Duration,一个是Count
  • BaseWindowedBolt的withTumblingWindow方法设置的windowLength及slidingInterval参数值相同;即tumbling window是一种特殊的sliding window,两个参数值一样,即window不会重叠
  • WaterMarkEventGenerator会触发一个调度任务,每隔watermarkInterval时间计算一下waterMarkTs(输入流中最新的元组时间戳的最小值减去Lag值),然后如果比lastWaterMarkTs值大,则更新lastWaterMarkTs
  • WaterMarkEventGenerator.track方法用于计算该tuple是否应该处理,如果该tuple的timestamp小于lastWaterMarkTs,则返回false,如果有配置Config.TOPOLOGY_BOLTS_LATE_TUPLE_STREAM则会发送给该stream,没有则打印log

doc

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 实例
  • WindowedBolt
    • IWindowedBolt
      • IStatefulWindowedBolt
      • withWindow与withTumblingWindow
      • WindowedBoltExecutor
      • WaterMarkEventGenerator
      • 小结
      • doc
      领券
      问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档