专栏首页小勇DW3Flink中API使用详细范例--window

Flink中API使用详细范例--window

--------------------------------------------------------------- 12月10号 --------------------------------------------------------

window机制:

什么是Window?有哪些用途? 下面我们结合一个现实的例子来说明。

window又可以分为基于时间(Time-based)的window以及基于数量(Count-based)的window。

Flink DataStream API提供了Time和Count的window,同时增加了基于Session的window。同时,由于某些特殊的需要,DataStream API也提供了定制化的window操作,供用户自定义window。

下面,主要介绍Time-Based window以及Count-Based window,以及自定义的window操作,Session-Based Window操作将会在后续的文章中讲到。

1、Time-Based Window 

细分:基于时间的window又分为 增量聚合、全量聚合。

增量聚合:

1.1、Tumbling window(翻滚)  此处的window要在keyed Stream上应用window操作,当输入1个参数时,代表Tumbling window操作,每分钟统计一次,此处用scala语言实现:

增量聚合代码---- 求和操作:

        //todo 获得数据源后进行算子操作
        DataStream<StartAppCount> windowedData = startupInfoData.keyBy("appId")     //以设备id进行分组
                        .timeWindow(Time.minutes(60))                      //指定时间窗口大小为5分钟,指定时间间隔为5分钟
                        .aggregate(new CountAgg(), new WindowResultFunction());

        windowedData.print();

CountAgg自定义的函数,需要实现 AggregateFunction函数

public class CountAgg implements AggregateFunction<StartupInfoData, Long, Long> {

    @Override
    public Long createAccumulator() {                            //初始化算子
        return 0L;
    }

    @Override
    public Long add(StartupInfoData startupInfoData, Long acc) {    //传入一个入参后,做累加操作,将算子加1
        return acc + 1;
    }

    @Override
    public Long getResult(Long acc) {                              //最输出merge产生的结果
        return acc;
    }

    @Override
    public Long merge(Long acc1, Long acc2) {                     //对算子进行每一个的累和
        return acc1 + acc2;
    }
}

输出函数格式:

public class WindowResultFunction implements WindowFunction<Long, StartAppCount, Tuple, TimeWindow>
{
    @Override
    public void apply(
            Tuple key,                              // 窗口的主键,即 appId
            TimeWindow window,                      // 窗口
            Iterable<Long> aggregateResult,         // 聚合函数的结果,即 count 值
            Collector<StartAppCount> collector      // 输出类型为 StartAppCount
    ) throws Exception
    {
        String appId = ((Tuple1<String>) key).f0;
        Long count = aggregateResult.iterator().next();
        collector.collect(StartAppCount.of(appId, window.getEnd(), count));
    }

自定义输出类的类格式:

public class StartAppCount {

    public String appId;     // 商品ID
    public long windowEnd;  // 窗口结束时间戳
    public long count;  // 商品的点击量

    public static StartAppCount of (String appId, long windowEnd, long count) {
        StartAppCount result = new StartAppCount();
        result.appId = appId;
        result.windowEnd = windowEnd;
        result.count = count;
        return result;
    }

    @Override
    public String toString() {
        return "WordWithCount{" +
                "appId='" + appId + '\'' +
                ", count=" + count +
                '}';
    }

}

增量聚合代码---- 求平均值操作:

public class AverageAggregate implements AggregateFunction<Tuple2<String,Long>, Tuple2<Long, Long>, Double> {
    @Override
    public Tuple2<Long, Long> createAccumulator() {
        return new Tuple2<>(0L, 0L);
    }

    @Override
    public Tuple2<Long, Long> add(Tuple2<String, Long> value, Tuple2<Long, Long> acc) {   //可以理解为缓存的中间值
        return new Tuple2<>(acc.f0 + value.f1, acc.f1 + 1L);   //传入的值加到acc的第一个值得到传入值, 第二个值为个数
    }

    @Override
    public Double getResult(Tuple2<Long, Long> acc) {
        return (double)acc.f0 / acc.f1;
    }

    @Override
    public Tuple2<Long, Long> merge(Tuple2<Long, Long> acc1, Tuple2<Long, Long> acc2) {    //进行累和合并
        return new Tuple2<>(acc1.f0+acc2.f0, acc1.f1+acc2.f1);
    }
}

使用sum进行求和的代码:

        DataStream<WordWithCount> windowCounts = text.flatMap(new FlatMapFunction<String, WordWithCount>() {
            public void flatMap(String value, Collector<WordWithCount> out) throws Exception {
                String[] splits = value.split("\\s");
                for (String word : splits) {
                    out.collect(new WordWithCount(word, 1L));
                }
            }
        }).keyBy("word")
                .timeWindow(Time.seconds(2), Time.seconds(1))//指定时间窗口大小为2秒,指定时间间隔为1秒
                .sum("count");//在这里使用sum或者reduce都可以
                /*.reduce(new ReduceFunction<WordWithCount>() {
                                    public WordWithCount reduce(WordWithCount a, WordWithCount b) throws Exception {

                                        return new WordWithCount(a.word,a.count+b.count);
                                    }
                                })*/
        //把数据打印到控制台并且设置并行度
        windowCounts.print().setParallelism(1);

使用reduce进行求和的方法:

        DataStream<WordWithCount> windowCounts = text.flatMap(new FlatMapFunction<String, WordWithCount>() {
            public void flatMap(String value, Collector<WordWithCount> out) throws Exception {
                String[] splits = value.split("\\s");
                for (String word : splits) {
                    out.collect(new WordWithCount(word, 1L));
                }
            }
        }).keyBy("word")
                .timeWindow(Time.seconds(2), Time.seconds(1))//指定时间窗口大小为2秒,指定时间间隔为1秒
                //.sum("count");//在这里使用sum或者reduce都可以
                .reduce(new ReduceFunction<WordWithCount>() {
                                    public WordWithCount reduce(WordWithCount a, WordWithCount b) throws Exception {

                                        return new WordWithCount(a.word,a.count+b.count);
                                    }
                                });

全量的时间窗口操作:

 代码示例:

public class MyprocessWindowFunction extends ProcessWindowFunction<Tuple2<String, Long>, String, String, TimeWindow> {

    @Override
    public void process(String s, Context context, Iterable<Tuple2<String, Long>> iterable, Collector<String> out) throws Exception {
        long count = 0;
        for(Tuple2<String,Long> in : iterable)
        {
            count++;
        }

        out.collect("Window: " + context.window() + "count: " + count);
    }


}

1.2、Sliding window(滑动) 

        //todo 获得数据源后进行算子操作
        DataStream<StartAppCount> windowedData = startupInfoData.keyBy("appId")     //以设备id进行分组
                        .timeWindow(Time.minutes(60), Time.seconds(5))                      //指定时间窗口大小为5分钟,指定时间间隔为5分钟
                        .aggregate(new CountAgg(), new WindowResultFunction());

        windowedData.print();

2、Count-Based Window  2.1、Tumbling Window  和Time-Based一样,Count-based window同样支持翻滚与滑动窗口,即在Keyed Stream上,统计每100个元素的数量之和

2.2、Sliding Window 

每10个元素统计过去100个元素的数量之和:

3、Advanced Window(自定义window) 

自定义的Window需要指定3个function。  3.1、Window Assigner:负责将元素分配到不同的window。

WindowAPI提供了自定义的WindowAssigner接口,我们可以实现WindowAssigner的public abstract Collection<W> assignWindows(T element, long timestamp)方法。同时,对于基于Count的window而言,默认采用了GlobalWindow的window assigner,例如:

keyValue.window(GlobalWindows.create())

本文参与腾讯云自媒体分享计划,欢迎正在阅读的你也加入,一起分享。

我来说两句

0 条评论
登录 后参与评论

相关文章

  • Metrics在Flink系统中的使用分析

    Flink 提供的 Metrics 可以在 Flink 内部收集一些指标,通过这些指标让开发人员更好地理解作业或集群的状态。由于集群运行后很难发现内部的实际状况...

    小勇DW3
  • java使用validator进行校验

    不管是html页面表单提交的对象数据还是和第三方公司进行接口对接,都需要对接收到的数据进行校验(非空、长度、格式等等)。如果使用if一个个进行校验(字段非常多)...

    小勇DW3
  • 模糊查询中输入通配符的问题

    比如说在搜索框中输入'%'、'_'、'/'时会出错,因为这些特殊符号在sql语句查询的时候是有他特定的意义的,所有这里要对前台传过来的keyword搜索内容进行...

    小勇DW3
  • 【注解驱动时代】细说@Value注解

    另外application.yml 获取方式一样,只是两种配置文件的样式不同。注意:yml文件中的冒号后面要有空格,冒号+空格=properties文件中的点....

    用户4143945
  • [享学Eureka] 九、远程通信模块:使用TransportClientFactory构建底层请求客户端完成服务注册、服务下线

    代码下载地址:https://github.com/f641385712/netflix-learning

    YourBatman
  • java浅拷贝和深拷贝(基础也是很重要的)

    对于的github基础代码https://github.com/chywx/JavaSE

    陈灬大灬海
  • 自定义参数解析器

    开发中,app端给服务端会传基础参数、其他参数,一般基础参数app端都会传给服务端,其他参数则是根据不同接口传不同参数。若以表单的形式提交的数据:

    LiosWong
  • SPI概念及使用方法

    SPI全称Service Provider Interfaces,用于发现接口的实现。在jdbc、日志、dubbo的设计中都使用SPI用于服务的发现。简单的以j...

    LiosWong
  • Java 对象的初始化过程_上

    本文主要以白话的形式 ‘简单’ 的描述在 java 中 new 对象的过程,之所以说是 ‘简单’ 的描述是因为,在本文中不会讲述底层的加载过程。

    Melody132
  • Flash/Flex学习笔记(34):AS3中的自定义事件

    类似C#中自定义事件需要一个自定义的EventArgs子类一样,AS3也需要开发者自定义一个Event类的子类,这里我们假设一种场景:设计一个Person(人物...

    菩提树下的杨过

扫码关注云+社区

领取腾讯云代金券