前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >基于Flink快速开发实时TopN程序最简单的思路

基于Flink快速开发实时TopN程序最简单的思路

作者头像
王知无-import_bigdata
发布2020-10-09 15:44:50
8160
发布2020-10-09 15:44:50
举报

TopN 是统计报表和大屏非常常见的功能,主要用来实时计算排行榜。流式的TopN可以使业务方在内存中按照某个统计指标(如出现次数)计算排名并快速出发出更新后的排行榜。

我们以统计词频为例展示一下如何快速开发一个计算TopN的flink程序。

Flink支持各种各样的流数据接口作为数据的数据源,本次demo我们采用内置的socketTextStream作为数据数据源。

代码语言:javascript
复制
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime); //以processtime作为时间语义
DataStream<String> text = env.socketTextStream(hostName, port); //监听指定socket端口作为输入

与离线wordcount类似,程序首先需要把输入的整句文字按照分隔符split成一个一个单词,然后按照单词为key实现累加。

代码语言:javascript
复制
DataStream<Tuple2<String, Integer>> ds = text
                .flatMap(new LineSplitter()); //将输入语句split成一个一个单词并初始化count值为1的Tuple2<String, Integer>类型
private static final class LineSplitter implements
            FlatMapFunction<String, Tuple2<String, Integer>> {

        @Override
        public void flatMap(String value, Collector<Tuple2<String, Integer>> out) {
            // normalize and split the line
            String[] tokens = value.toLowerCase().split("\\W+");

            // emit the pairs
            for (String token : tokens) {
                if (token.length() > 0) {
                    out.collect(new Tuple2<String, Integer>(token, 1));
                }
            }
        }
    }
代码语言:javascript
复制
DataStream<Tuple2<String, Integer>> wcount = ds
                .keyBy(0) //按照Tuple2<String, Integer>的第一个元素为key,也就是单词
                .window(SlidingProcessingTimeWindows.of(Time.seconds(600),Time.seconds(20)))
                //key之后的元素进入一个总时间长度为600s,每20s向后滑动一次的滑动窗口
                .sum(1);// 将相同的key的元素第二个count值相加

全局TopN

数据流经过前面的处理后会每20s计算一次各个单词的count值并发送到下游窗口。

代码语言:javascript
复制
  DataStream<Tuple2<String, Integer>> ret = wcount
                .windowAll(TumblingProcessingTimeWindows.of(Time.seconds(20)))
                //所有key元素进入一个20s长的窗口(选20秒是因为上游窗口每20s计算一轮数据,topN窗口一次计算只统计一个窗口时间内的变化)
                .process(new TopNAllFunction(5));//计算该窗口TopN

windowAll是一个全局并发为1的特殊操作,也就是所有元素都会进入到一个窗口内进行计算。

代码语言:javascript
复制
private static class TopNAllFunction
            extends
            ProcessAllWindowFunction<Tuple2<String, Integer>, Tuple2<String, Integer>, TimeWindow> {

        private int topSize = 10;

        public TopNAllFunction(int topSize) {
            // TODO Auto-generated constructor stub

            this.topSize = topSize;
        }

        @Override
        public void process(
                ProcessAllWindowFunction<Tuple2<String, Integer>, Tuple2<String, Integer>, TimeWindow>.Context arg0,
                Iterable<Tuple2<String, Integer>> input,
                Collector<Tuple2<String, Integer>> out) throws Exception {
            // TODO Auto-generated method stub

            TreeMap<Integer, Tuple2<String, Integer>> treemap = new TreeMap<Integer, Tuple2<String, Integer>>(
                    new Comparator<Integer>() {

                        @Override
                        public int compare(Integer y, Integer x) {
                            // TODO Auto-generated method stub
                            return (x < y) ? -1 : 1;
                        }

                    }); //treemap按照key降序排列,相同count值不覆盖

            for (Tuple2<String, Integer> element : input) {
                treemap.put(element.f1, element);
                if (treemap.size() > topSize) { //只保留前面TopN个元素
                    treemap.pollLastEntry();
                }
            }

            for (Entry<Integer, Tuple2<String, Integer>> entry : treemap
                    .entrySet()) {
                out.collect(entry.getValue());
            }

        }

    }

分组TopN

在部分场景下,用户希望根据不同的分组进行排序,计算出每个分组的一个排行榜。

代码语言:javascript
复制
  wcount.keyBy(new TupleKeySelectorByStart()) // 按照首字母分组
                .window(TumblingProcessingTimeWindows.of(Time.seconds(20))) //20s窗口统计上游数据
                .process(new TopNFunction(5)) //分组TopN统计
代码语言:javascript
复制
private static class TupleKeySelectorByStart implements
            KeySelector<Tuple2<String, Integer>, String> {

        @Override
        public String getKey(Tuple2<String, Integer> value) throws Exception {
            // TODO Auto-generated method stub
            return value.f0.substring(0, 1); //取首字母做key
        }
    }
代码语言:javascript
复制
/**
     *
     *针对keyby window的TopN函数,继承自ProcessWindowFunction
     *
     */
    private static class TopNFunction
            extends
            ProcessWindowFunction<Tuple2<String, Integer>, Tuple2<String, Integer>, String, TimeWindow> {

        private int topSize = 10;

        public TopNFunction(int topSize) {
            // TODO Auto-generated constructor stub
            this.topSize = topSize;
        }

        @Override
        public void process(
                String arg0,
                ProcessWindowFunction<Tuple2<String, Integer>, Tuple2<String, Integer>, String, TimeWindow>.Context arg1,
                Iterable<Tuple2<String, Integer>> input,
                Collector<Tuple2<String, Integer>> out) throws Exception {
            // TODO Auto-generated method stub

            TreeMap<Integer, Tuple2<String, Integer>> treemap = new TreeMap<Integer, Tuple2<String, Integer>>(
                    new Comparator<Integer>() {

                        @Override
                        public int compare(Integer y, Integer x) {
                            // TODO Auto-generated method stub
                            return (x < y) ? -1 : 1;
                        }

                    });

            for (Tuple2<String, Integer> element : input) {
                treemap.put(element.f1, element);
                if (treemap.size() > topSize) {
                    treemap.pollLastEntry();
                }
            }

            for (Entry<Integer, Tuple2<String, Integer>> entry : treemap
                    .entrySet()) {
                out.collect(entry.getValue());
            }
        }
    }

上面的代码实现了按照首字母分组,取每组元素count最高的TopN方法。

嵌套TopN

全局topN的缺陷是,由于windowall是一个全局并发为1的操作,所有的数据只能汇集到一个节点进行 TopN 的计算,那么计算能力就会受限于单台机器,容易产生数据热点问题。

解决思路就是使用嵌套 TopN,或者说两层 TopN。在原先的 TopN 前面,再加一层 TopN,用于分散热点。例如可以先加一层分组 TopN,第一层会计算出每一组的 TopN,而后在第二层中进行合并汇总,得到最终的全网TopN。第二层虽然仍是单点,但是大量的计算量由第一层分担了,而第一层是可以水平扩展的。

本文参与 腾讯云自媒体分享计划,分享自微信公众号。
原始发表:2020-09-29,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 大数据技术与架构 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体分享计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 全局TopN
  • 分组TopN
  • 嵌套TopN
相关产品与服务
流计算 Oceanus
流计算 Oceanus 是大数据产品生态体系的实时化分析利器,是基于 Apache Flink 构建的企业级实时大数据分析平台,具备一站开发、无缝连接、亚秒延时、低廉成本、安全稳定等特点。流计算 Oceanus 以实现企业数据价值最大化为目标,加速企业实时化数字化的建设进程。
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档