前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >storm RollingTopWords 实时top-N计算任务窗口设计

storm RollingTopWords 实时top-N计算任务窗口设计

作者头像
sanmutongzi
发布2020-03-05 10:48:11
4880
发布2020-03-05 10:48:11
举报

转发请注明原创地址 http://www.cnblogs.com/dongxiao-yang/p/6381037.html

流式计算中我们经常会遇到需要将数据根据时间窗口进行批量统计的场景,窗口性质一般由两个参数规定:1 Window length: 可以用时间或者数量来定义窗口大小;2 Sliding interval: 窗口滑动的间隔 。通过这两个参数一般把window分成滚动窗口和滑动窗口。

Sliding Window(滑动窗口)

Tuples are grouped in windows and window slides every sliding interval. A tuple can belong to more than one window.

For example a time duration based sliding window with length 10 secs and sliding interval of 5 seconds.

| e1 e2 | e3 e4 e5 e6 | e7 e8 e9 |...
0       5             10         15    -> time

|<------- w1 -------->|
        |------------ w2 ------->|

Tumbling Window(滚动窗口)

Tuples are grouped in a single window based on time or count. Any tuple belongs to only one of the windows.

For example a time duration based tumbling window with length 5 secs.

| e1 e2 | e3 e4 e5 e6 | e7 e8 e9 |...
0       5             10         15    -> time
   w1         w2            w3

storm直到1.0.0版本后才官方加入了IWindowedBolt接口用来实现窗口计算,在此之前storm-starter里有一个稍微复杂点的RollingTopWords滑动窗口计算top N实现的demo。topology主要组件的流程设置如下:

(1)TestWordSpout负责产生单词源数据并通过fieldsGrouping发送到下游bolt

(2)RollingCountBolt负责统计Window length范围内的所有单词计数并每Sliding interval时间发送一次汇总信息到下游。

(3) IntermediateRankingsBolt,这是个中间bolt,主要是为了预先计算部分word的top-N排行榜出来,减少最终节点的排序工作。

(4)TotalRankingsBolt 最终top-N排序并输出计算结果。

1     String spoutId = "wordGenerator";
2     String counterId = "counter";
3     String intermediateRankerId = "intermediateRanker";
4     String totalRankerId = "finalRanker";
5     builder.setSpout(spoutId, new TestWordSpout(), 5);
6     builder.setBolt(counterId, new RollingCountBolt(9, 3), 4).fieldsGrouping(spoutId, new Fields("word"));
7     builder.setBolt(intermediateRankerId, new IntermediateRankingsBolt(TOP_N), 4).fieldsGrouping(counterId, new Fields(
8         "obj"));
9     builder.setBolt(totalRankerId, new TotalRankingsBolt(TOP_N)).globalGrouping(intermediateRankerId);

RollingCountBolt初始化参数正好就是上面提到的windowLengthInSeconds和emitFrequencyInSeconds,new RollingCountBolt(300, 60)表示每分钟输出一下最近五分钟内的数据统计。

counter = new SlidingWindowCounter<Object>(deriveNumWindowChunksFrom(this.windowLengthInSeconds,this.emitFrequencyInSeconds));

RollingCountBolt内部存放了一个SlidingWindowCounter的结构,SlidingWindowCounter内部存储了SlotBasedCounter,SlotBasedCounter才是具体实现了怎样进行

窗口计算,滑动的窗口不停对应到一个环形的slot列表中。SlidingWindowCounter在窗口滑动的时候采取了如下动作:

1   public Map<T, Long> getCountsThenAdvanceWindow() {
2     Map<T, Long> counts = objCounter.getCounts();
3     objCounter.wipeZeros();
4     objCounter.wipeSlot(tailSlot);
5     advanceHead();
6     return counts;
7   }

1. 取出Map<T, Long> counts, 对象和窗口内所有slots求和值的map

2. 调用wipeZeros, 删除已经不被使用的obj, 释放空间 3. 最重要的一步, 清除tailSlot, 并advanceHead, 以实现滑动窗口 advanceHead的实现, 如何在数组实现循环的滑动窗口

  private void advanceHead() {
    headSlot = tailSlot;
    tailSlot = slotAfter(tailSlot);
  }

SlotBasedCounter主要用于按照窗口对应的slot进行incrementCount,getCounts和computeTotalCount,用于数据新增统计,全窗口数据提取和对应元素窗口内全部slot数据求和。

  public void incrementCount(T obj, int slot) {
    long[] counts = objToCounts.get(obj);
    if (counts == null) {
      counts = new long[this.numSlots];
      objToCounts.put(obj, counts);
    }
    counts[slot]++;
  }


  public Map<T, Long> getCounts() {
    Map<T, Long> result = new HashMap<T, Long>();
    for (T obj : objToCounts.keySet()) {
      result.put(obj, computeTotalCount(obj));
    }
    return result;
  }

  private long computeTotalCount(T obj) {
    long[] curr = objToCounts.get(obj);
    long total = 0;
    for (long l : curr) {
      total += l;
    }
    return total;
  }

如上所述, RollingCountBolt在没有窗口接口的情况下通过代码结构巧妙的实现了一个滑动窗口(理论上滚动窗口也一样可以实现),感觉还是很巧妙地。

参考资料:

1 Storm starter - RollingTopWords

2 Implementing Real-Time Trending Topics With a Distributed Rolling Count Algorithm in Storm

本文参与 腾讯云自媒体分享计划,分享自作者个人站点/博客。
原始发表:2017-02-09 ,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 作者个人站点/博客 前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体分享计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
相关产品与服务
流计算 Oceanus
流计算 Oceanus 是大数据产品生态体系的实时化分析利器,是基于 Apache Flink 构建的企业级实时大数据分析平台,具备一站开发、无缝连接、亚秒延时、低廉成本、安全稳定等特点。流计算 Oceanus 以实现企业数据价值最大化为目标,加速企业实时化数字化的建设进程。
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档