前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >Flink-1.9流计算开发:十三、min、minBy、max、maxBy函数

Flink-1.9流计算开发:十三、min、minBy、max、maxBy函数

作者头像
cosmozhu
修改2020-06-15 09:32:29
1.7K0
修改2020-06-15 09:32:29
举报
文章被收录于专栏:cosmozhu技术篇cosmozhu技术篇

Flink是下一代大数据计算平台,可处理流计算和批量计算。《Flink-1.9流计算开发:十三、min、minBy、max、maxBy函数》cosmozhu写的本系列文章的第十三篇。通过简单的DEMO来演示min、minBy、max、maxBy函数执行的效果 。

需求

本篇文章我们来区分min(max)与minBy(maxBy)之间的区别,下面案例是每10秒计算一次最近1分钟的最小值订单。

解决方案

    public static void main(String[] args) throws Exception {
        final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        // 添加自定义数据源,每秒发出一笔订单信息{商品名称,商品数量}
        DataStreamSource<Tuple2<String, Integer>> orderSource = env
                .addSource(new SourceFunction<Tuple2<String, Integer>>() {
                    private volatile boolean isRunning = true;
                    private final Random random = new Random();

                    @Override
                    public void run(SourceContext<Tuple2<String, Integer>> ctx) throws Exception {
                        while (isRunning) {
                            TimeUnit.SECONDS.sleep(1);
                            Tuple2<String, Integer> t = Tuple2.of(TYPE[random.nextInt(TYPE.length)], random.nextInt(1000));
                            LOG.info("提交数据:"+t);
                            ctx.collect(t);
                        }
                    }
                    @Override
                    public void cancel() {
                        isRunning = false;
                    }

                }, "order-info");

        orderSource
        .timeWindowAll(Time.minutes(1),Time.seconds(10))
        .min(1)
//      .minBy(1)
        .print();

        env.execute("Flink Streaming Java API Skeleton");
    }

执行效果

执行min函数,我们可以看出min函数确实返回了最小值,但是最小值前面对应的商品名称却对应不上。

15:52:20,261 INFO  fun.cosmozhu.session13.StreamTest                             - 提交数据:(梨,542)
15:52:21,282 INFO  fun.cosmozhu.session13.StreamTest                             - 提交数据:(梨,331)
15:52:22,282 INFO  fun.cosmozhu.session13.StreamTest                             - 提交数据:(苹果,894)
15:52:23,282 INFO  fun.cosmozhu.session13.StreamTest                             - 提交数据:(梨,791)
15:52:24,282 INFO  fun.cosmozhu.session13.StreamTest                             - 提交数据:(葡萄,836)
15:52:25,282 INFO  fun.cosmozhu.session13.StreamTest                             - 提交数据:(火龙果,198)
15:52:26,282 INFO  fun.cosmozhu.session13.StreamTest                             - 提交数据:(苹果,221)
15:52:27,282 INFO  fun.cosmozhu.session13.StreamTest                             - 提交数据:(西瓜,309)
15:52:28,282 INFO  fun.cosmozhu.session13.StreamTest                             - 提交数据:(葡萄,553)
15:52:29,282 INFO  fun.cosmozhu.session13.StreamTest                             - 提交数据:(火龙果,969)
2> (梨,198)
15:52:30,282 INFO  fun.cosmozhu.session13.StreamTest                             - 提交数据:(梨,981)
15:52:31,282 INFO  fun.cosmozhu.session13.StreamTest                             - 提交数据:(梨,483)
15:52:32,282 INFO  fun.cosmozhu.session13.StreamTest                             - 提交数据:(梨,381)
15:52:33,282 INFO  fun.cosmozhu.session13.StreamTest                             - 提交数据:(梨,982)
15:52:34,282 INFO  fun.cosmozhu.session13.StreamTest                             - 提交数据:(火龙果,738)
15:52:35,282 INFO  fun.cosmozhu.session13.StreamTest                             - 提交数据:(西瓜,654)
15:52:36,282 INFO  fun.cosmozhu.session13.StreamTest                             - 提交数据:(火龙果,124)
15:52:37,282 INFO  fun.cosmozhu.session13.StreamTest                             - 提交数据:(苹果,703)
15:52:38,282 INFO  fun.cosmozhu.session13.StreamTest                             - 提交数据:(葡萄,996)
15:52:39,282 INFO  fun.cosmozhu.session13.StreamTest                             - 提交数据:(西瓜,127)
3> (梨,124)
15:52:40,282 INFO  fun.cosmozhu.session13.StreamTest                             - 提交数据:(西瓜,249)
15:52:41,282 INFO  fun.cosmozhu.session13.StreamTest                             - 提交数据:(梨,705)
15:52:42,282 INFO  fun.cosmozhu.session13.StreamTest                             - 提交数据:(火龙果,37)
15:52:43,282 INFO  fun.cosmozhu.session13.StreamTest                             - 提交数据:(苹果,647)
15:52:44,282 INFO  fun.cosmozhu.session13.StreamTest                             - 提交数据:(葡萄,842)
15:52:45,282 INFO  fun.cosmozhu.session13.StreamTest                             - 提交数据:(葡萄,475)
15:52:46,282 INFO  fun.cosmozhu.session13.StreamTest                             - 提交数据:(火龙果,999)
15:52:47,282 INFO  fun.cosmozhu.session13.StreamTest                             - 提交数据:(葡萄,994)
15:52:48,282 INFO  fun.cosmozhu.session13.StreamTest                             - 提交数据:(苹果,417)
15:52:49,282 INFO  fun.cosmozhu.session13.StreamTest                             - 提交数据:(葡萄,830)

执行minBy函数,我们可以看出minBy返回的最小值,并且对应的商品名称也是正确的

15:59:06,657 INFO  fun.cosmozhu.session13.StreamTest                             - 提交数据:(苹果,935)
15:59:07,684 INFO  fun.cosmozhu.session13.StreamTest                             - 提交数据:(葡萄,638)
15:59:08,684 INFO  fun.cosmozhu.session13.StreamTest                             - 提交数据:(梨,485)
15:59:09,684 INFO  fun.cosmozhu.session13.StreamTest                             - 提交数据:(苹果,720)
4> (梨,485)
15:59:10,684 INFO  fun.cosmozhu.session13.StreamTest                             - 提交数据:(梨,407)
15:59:11,684 INFO  fun.cosmozhu.session13.StreamTest                             - 提交数据:(西瓜,353)
15:59:12,684 INFO  fun.cosmozhu.session13.StreamTest                             - 提交数据:(西瓜,76)
15:59:13,684 INFO  fun.cosmozhu.session13.StreamTest                             - 提交数据:(苹果,629)
15:59:14,684 INFO  fun.cosmozhu.session13.StreamTest                             - 提交数据:(苹果,974)
15:59:15,684 INFO  fun.cosmozhu.session13.StreamTest                             - 提交数据:(西瓜,263)
15:59:16,684 INFO  fun.cosmozhu.session13.StreamTest                             - 提交数据:(西瓜,840)
15:59:17,684 INFO  fun.cosmozhu.session13.StreamTest                             - 提交数据:(火龙果,220)
15:59:18,684 INFO  fun.cosmozhu.session13.StreamTest                             - 提交数据:(苹果,27)
15:59:19,685 INFO  fun.cosmozhu.session13.StreamTest                             - 提交数据:(西瓜,94)
1> (苹果,27)
15:59:20,685 INFO  fun.cosmozhu.session13.StreamTest                             - 提交数据:(西瓜,733)
15:59:21,685 INFO  fun.cosmozhu.session13.StreamTest                             - 提交数据:(火龙果,390)
15:59:22,685 INFO  fun.cosmozhu.session13.StreamTest                             - 提交数据:(苹果,766)
15:59:23,685 INFO  fun.cosmozhu.session13.StreamTest                             - 提交数据:(苹果,321)
15:59:24,685 INFO  fun.cosmozhu.session13.StreamTest                             - 提交数据:(苹果,784)
15:59:25,685 INFO  fun.cosmozhu.session13.StreamTest                             - 提交数据:(梨,781)
15:59:26,685 INFO  fun.cosmozhu.session13.StreamTest                             - 提交数据:(火龙果,459)
15:59:27,685 INFO  fun.cosmozhu.session13.StreamTest                             - 提交数据:(葡萄,481)
15:59:28,685 INFO  fun.cosmozhu.session13.StreamTest                             - 提交数据:(西瓜,91)
15:59:29,685 INFO  fun.cosmozhu.session13.StreamTest                             - 提交数据:(葡萄,663)
2> (苹果,27)
15:59:30,685 INFO  fun.cosmozhu.session13.StreamTest                             - 提交数据:(葡萄,183)

小结

从上面的案例我们可以分析得出:

  1. min只返回计算的最小值,而最小值对应的其他数据不保证正确。
  2. minBy返回计算的最小值,并且最小值对应的其他数据是保证正确的。

max和maxBy与其相似

代码地址

https://github.com/chaoxxx/learn-flink-stream-api/blob/master/src/main/java/fun/cosmozhu/session13/StreamTest.java

作者:cosmozhu --90后的老父亲,专注于保护地球的程序员

个人网站:https://www.cosmozhu.fun

欢迎转载,转载时请注明出处。

相关文章

本文参与 腾讯云自媒体分享计划,分享自作者个人站点/博客。
如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 作者个人站点/博客 前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体分享计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 需求
  • 解决方案
  • 执行效果
  • 小结
  • 代码地址
    • 相关文章
    相关产品与服务
    对象存储
    对象存储(Cloud Object Storage,COS)是由腾讯云推出的无目录层次结构、无数据格式限制,可容纳海量数据且支持 HTTP/HTTPS 协议访问的分布式存储服务。腾讯云 COS 的存储桶空间无容量上限,无需分区管理,适用于 CDN 数据分发、数据万象处理或大数据计算与分析的数据湖等多种场景。
    领券
    问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档