前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
圈层
工具
发布
首页
学习
活动
专区
圈层
工具
社区首页 >专栏 >Flink 的数据流算子

Flink 的数据流算子

作者头像
前Thoughtworks-杨焱
发布于 2021-12-07 08:05:15
发布于 2021-12-07 08:05:15
47600
代码可运行
举报
文章被收录于专栏:杨焱的专栏杨焱的专栏
运行总次数:0
代码可运行

Map

输入DataStream返回DataStream。

接收一个元素,产生一个元素。下面是转换为双倍值的MapFunction

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
DataStream<Integer> dataStream = //...
dataStream.map(new MapFunction<Integer, Integer>() {
    @Override
    public Integer map(Integer value) throws Exception {
        return 2 * value;
    }
});

FlatMap

输入DataStream返回DataStream。

接收一个元素,产出0个,1个,或者更多的元素。下面是一个字符串拆分为多个字符串的FlatMap

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
dataStream.flatMap(new FlatMapFunction<String, String>() {
    @Override
    public void flatMap(String value, Collector<String> out)
        throws Exception {
        for(String word: value.split(" ")){
            out.collect(word);
        }
    }
});

Filter

输入DataStream返回DataStream。

过滤数据,function返回为true的会被保留,为false的会被排除。

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
dataStream.filter(new FilterFunction<Integer>() {
    @Override
    public boolean filter(Integer value) throws Exception {
        return value != 0;
    }
});

KeyBy

输入DataStream返回KeyedStream。

按照key将数据拆分为不同的集合,具有相同key的数据放到同一个集合,内部使用hashCode来判断是否属于同一个key。

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
dataStream.keyBy(value -> value.getSomeKey());
dataStream.keyBy(value -> value.f0);

Reduce

输入 KeyedStream 返回 DataStream。

将按照key拆分的集合滚动处理。合并当前元素和最后一次合并的结果,然后返回一个新的值。

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
keyedStream.reduce(new ReduceFunction<Integer>() {
    @Override
    public Integer reduce(Integer value1, Integer value2)
    throws Exception {
        return value1 + value2;
    }
});

Window

输入 KeyedStream 返回 WindowedStream。

窗口可以定义在KeyedStreams上,窗口可以将每个key的数据按照某种特征分组,点击链接 windows 查看完整的针对窗口的描述

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
dataStrea。m
  .keyBy(value -> value.f0)
  .window(TumblingEventTimeWindows.of(Time.seconds(5))); 

WindowAll

输入 DataStream 返回 AllWindowedStream。

窗口可以定义在常规的DataStream上。窗口可以将数据按照某种特征分组,点击链接 windows 查看完整的针对窗口的描述

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
dataStream
  .windowAll(TumblingEventTimeWindows.of(Time.seconds(5)));
Window Apply

输入 WindowedStream或者AllWindowedStream 输出 DataStream 。

应用一个函数reduce到窗口。点击链接 windows 查看完整的针对窗口的描述

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
windowedStream.apply(new WindowFunction<Tuple2<String,Integer>, Integer, Tuple, Window>() {
    public void apply (Tuple tuple,
            Window window,
            Iterable<Tuple2<String, Integer>> values,
            Collector<Integer> out) throws Exception {
        int sum = 0;
        for (value t: values) {
            sum += t.f1;
        }
        out.collect (new Integer(sum));
    }
});

WindowFunction 类型参数: – 输入值的类型。 – 输出值的类型。 – 密钥的类型。 – 可以应用此窗口函数的Window类型。

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
// applying an AllWindowFunction on non-keyed window stream
allWindowedStream.apply (new AllWindowFunction<Tuple2<String,Integer>, Integer, Window>() {
    public void apply (Window window,
            Iterable<Tuple2<String, Integer>> values,
            Collector<Integer> out) throws Exception {
        int sum = 0;
        for (value t: values) {
            sum += t.f1;
        }
        out.collect (new Integer(sum));
    }
});

AllWindowFunction类型参数: – 输入值的类型。 – 输出值的类型。 – 可以应用此窗口函数的Window类型。

WindowReduce

输入 WindowedStream 输出 DataStream 。

应用一个Reduct函数到窗口,并返回合并后的值。

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
windowedStream.reduce (new ReduceFunction<Tuple2<String,Integer>>() {
    public Tuple2<String, Integer> reduce(Tuple2<String, Integer> value1, Tuple2<String, Integer> value2) throws Exception {
        return new Tuple2<String,Integer>(value1.f0, value1.f1 + value2.f1);
    }
});
Union

合并两个或更多的流,返回新的流包含所有流中的元素。包含重复的。

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
dataStream.union(otherStream1, otherStream2, ...);
Window Join

基于指定的key和共同窗口join两个数据流,返回一个新的数据流。

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
dataStream.join(otherStream)
    .where(<key selector>).equalTo(<key selector>)
    .window(TumblingEventTimeWindows.of(Time.seconds(3)))
    .apply (new JoinFunction () {...});
Interval Join

输入 KeyedStream,返回一个数据流。

基于在指定时间间隔内的共同key,Join 两个KeyedStream的流。

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
// this will join the two streams so that
// key1 == key2 && leftTs - 2 < rightTs < leftTs + 2
keyedStream.intervalJoin(otherKeyedStream)
    .between(Time.milliseconds(-2), Time.milliseconds(2)) // lower and upper bound
    .upperBoundExclusive(true) // optional
    .lowerBoundExclusive(true) // optional
    .process(new IntervalJoinFunction() {...});
Window CoGroup

输入两个数据流,返回一个数据流。

将两个流按照指定key和公共窗口合并,某些键可能只包含在两个原始数据集之一中。 在这种情况下,对于不包含具有该特定键的元素的数据集一侧,将使用空输入调用 CoGroup 函数。 CoGroupFunction类型参数:

– 第一个输入数据集的数据类型。 – 第二个输入数据集的数据类型。 – 返回元素的数据类型。

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
dataStream.coGroup(otherStream)
    .where(0).equalTo(1)
    .window(TumblingEventTimeWindows.of(Time.seconds(3)))
    .apply (new CoGroupFunction () {...});
Connect

输入 DataStream,DataStream ,返回ConnectedStream。

连接两个数据流保持原有类型。连接允许两个流之间共享状态。

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
DataStream<Integer> someStream = //...
DataStream<String> otherStream = //...

ConnectedStreams<Integer, String> connectedStreams = someStream.connect(otherStream);
CoMap,CoFlatMap

输入 ConnectedStream 输出 DataStream 。

类似于已关联数据流上的map和flatMap。

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
connectedStreams.map(new CoMapFunction<Integer, String, Boolean>() {
    @Override
    public Boolean map1(Integer value) {
        return true;
    }

    @Override
    public Boolean map2(String value) {
        return false;
    }
});
connectedStreams.flatMap(new CoFlatMapFunction<Integer, String, String>() {

   @Override
   public void flatMap1(Integer value, Collector<String> out) {
       out.collect(value.toString());
   }

   @Override
   public void flatMap2(String value, Collector<String> out) {
       for (String word: value.split(" ")) {
         out.collect(word);
       }
   }
});
Iterate

通过将一个操作符的输出重定向到之前的某个操作符,在流中创建一个“反馈”循环。这对于定义不断更新模型的算法尤其有用。下面的代码从一个流开始,并不断地应用迭代体。大于0的元素被发送回反馈通道,其余的元素被向下转发。

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
IterativeStream<Long> iteration = initialStream.iterate();
DataStream<Long> iterationBody = iteration.map (/*do something*/);
DataStream<Long> feedback = iterationBody.filter(new FilterFunction<Long>(){
    @Override
    public boolean filter(Long value) throws Exception {
        return value > 0;
    }
});
iteration.closeWith(feedback);
DataStream<Long> output = iterationBody.filter(new FilterFunction<Long>(){
    @Override
    public boolean filter(Long value) throws Exception {
        return value <= 0;
    }
});
本文参与 腾讯云自媒体同步曝光计划,分享自作者个人站点/博客。
原始发表:2021-09-11,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 作者个人站点/博客 前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
暂无评论
推荐阅读
编辑精选文章
换一批
Flink1.4 Operator概述
算子(Operator)将一个或多个 DataStream 转换为新的 DataStream。程序可以将多个转换组合成复杂的数据流拓扑。
smartsi
2019/08/07
3.4K0
5分钟Flink - 流处理API转换算子集合
本文总结了Flink Streaming的算子操作,统统简单实现一次算子操作类型,更加熟悉了Flink带来的便利,有时间可以浏览一次,理解一次,后面具体使用的时候,可以进行查看
Python编程爱好者
2020/09/08
1K0
5分钟Flink - 流处理API转换算子集合
Flink入门宝典(详细截图版)
本文基于java构建Flink1.9版本入门程序,需要Maven 3.0.4 和 Java 8 以上版本。需要安装Netcat进行简单调试。
用户6070864
2019/09/18
8980
Flink入门宝典(详细截图版)
硬核!一文学完Flink流计算常用算子(Flink算子大全)
Flink和Spark类似,也是一种一站式处理的框架;既可以进行批处理(DataSet),也可以进行实时处理(DataStream)。
五分钟学大数据
2021/04/02
2.2K0
Flink1.4 数据流类型与转换关系
Flink 为流处理和批处理分别提供了 DataStream API 和 DataSet API。正是这种高层的抽象和 flunent API 极大地便利了用户编写大数据应用。不过很多初学者在看到官方文档中那一大坨的转换时,常常会蒙了圈,文档中那些只言片语也很难讲清它们之间的关系。所以本文将介绍几种关键的数据流类型,它们之间是如何通过转换关联起来的。下图展示了 Flink 中目前支持的主要几种流的类型,以及它们之间的转换关系。
smartsi
2019/08/07
1.7K0
Flink入门宝典(详细截图版)
本文基于java构建Flink1.9版本入门程序,需要Maven 3.0.4 和 Java 8 以上版本。需要安装Netcat进行简单调试。
大数据流动
2019/09/29
8130
Flink入门宝典(详细截图版)
Flink DataStream编程指南及使用注意事项。
Flink中的DataStream程序是对数据流进行转换的常规程序(例如,过滤,更新状态,定义窗口,聚合)。数据流的最初的源可以从各种来源(例如,消息队列,套接字流,文件)创建,并通过sink返回结果,例如可以将数据写入文件或标准输出。Flink程序以各种上下文运行,独立或嵌入其他程序中。执行可能发生在本地JVM或许多机器的集群上。 一,套接字流 下面举一个例子,该例子,数据来源是网络套接字,带窗口的流处理,窗口大小是5s,这些概念玩过spark Streaming应该都很清楚,我们后面也会给大家详细讲解。
Spark学习技巧
2018/01/31
5.8K0
Flink DataStream编程指南及使用注意事项。
快速上手Flink Windows窗口编程!
Flink中的窗口机制,如同一道桥梁,将原本连续不断的“流式数据”转化为有限的“批处理”数据块。这种转化为后续的分析计算提供了坚实的基础。
JavaEdge
2024/08/04
1930
快速上手Flink Windows窗口编程!
全网最详细4W字Flink入门笔记(中)
Flink是一个有状态的流式计算引擎,所以会将中间计算结果(状态)进行保存,默认保存到TaskManager的堆内存中,但是当task挂掉,那么这个task所对应的状态都会被清空,造成了数据丢失,无法保证结果的正确性,哪怕想要得到正确结果,所有数据都要重新计算一遍,效率很低。想要保证 At -least-once 和 Exactly-once,需要把数据状态持久化到更安全的存储介质中,Flink提供了堆内内存、堆外内存、HDFS、RocksDB等存储介质。
BookSea
2023/07/21
5130
Flink学习笔记
流式计算是大数据计算的痛点,第1代实时计算引擎Storm对Exactly Once 语义和窗口支持较弱,使用的场景有限且无法支持高吞吐计算;Spark Streaming 采用“微批处理”模拟流计算,在窗口设置很小的场景中有性能瓶颈,Spark 本身也在尝试连续执行模式(Continuous Processing),但进展缓慢。
数据社
2021/01/08
9630
Flink学习笔记
一网打尽Flink中的时间、窗口和流Join
首先,我们会学习如何定义时间属性,时间戳和水位线。然后我们将会学习底层操作process function,它可以让我们访问时间戳和水位线,以及注册定时器事件。接下来,我们将会使用Flink的window API,它提供了通常使用的各种窗口类型的内置实现。我们将会学到如何进行用户自定义窗口操作符,以及窗口的核心功能:assigners(分配器)、triggers(触发器)和evictors(清理器)。最后,我们将讨论如何基于时间来做流的联结查询,以及处理迟到事件的策略。
王知无-import_bigdata
2021/09/22
1.8K0
Flink实战(七) - Time & Windows编程
掌握Flink中三种常用的Time处理方式,掌握Flink中滚动窗口以及滑动窗口的使用,了解Flink中的watermark。
JavaEdge
2019/07/23
9150
Flink实战(七) - Time & Windows编程
全网最详细4W字Flink入门笔记(下)
Flink是一个有状态的流式计算引擎,所以会将中间计算结果(状态)进行保存,默认保存到TaskManager的堆内存中,但是当task挂掉,那么这个task所对应的状态都会被清空,造成了数据丢失,无法保证结果的正确性,哪怕想要得到正确结果,所有数据都要重新计算一遍,效率很低。想要保证 At -least-once 和 Exactly-once,需要把数据状态持久化到更安全的存储介质中,Flink提供了堆内内存、堆外内存、HDFS、RocksDB等存储介质。
BookSea
2023/10/16
9450
全网最详细4W字Flink入门笔记(下)
2021年大数据Flink(十二):流批一体API Transformation
Apache Flink 1.12 Documentation: Operators
Lansonli
2021/10/11
6010
Flink 的窗口指定者和函数
窗口是处理无限流的核心。窗口拆分将流拆为有限数量数据的bucket,这样就可以应用计算。
前Thoughtworks-杨焱
2021/12/07
8080
全网最详细4W字Flink全面解析与实践(下)
Flink是一个有状态的流式计算引擎,所以会将中间计算结果(状态)进行保存,默认保存到TaskManager的堆内存中。
BookSea
2023/11/02
1K0
全网最详细4W字Flink全面解析与实践(下)
Flink 内核原理与实现-应用
Flink作为流批一体的计算引擎,其面对的是业务场景,面向的使用者是开发人员和运维管理人员。
857技术社区
2022/05/17
6930
Flink 内核原理与实现-应用
Flink1.13架构全集| 一文带你由浅入深精通Flink方方面面(二)
之前所介绍的流处理API,无论是基本的转换、聚合,还是更为复杂的窗口操作,其实都是基于DataStream进行转换的;所以可以统称为DataStream API,这也是Flink编程的核心。而我们知道,为了让代码有更强大的表现力和易用性,Flink本身提供了多层API,DataStream API只是中间的一环,如图所示:
857技术社区
2022/12/18
1.6K0
Flink1.13架构全集| 一文带你由浅入深精通Flink方方面面(二)
Flink Transformation
Flink 的 Transformations 操作主要用于将一个和多个 DataStream 按需转换成新的 DataStream。它主要分为以下三类:
每天进步一点点
2022/07/27
2670
Flink Transformation
Flink教程(1) Flink DataStream 创建数据源 转换算子「建议收藏」
从前年开始,就被公众号上Flink文章频繁的刷屏,看来是时候了解下Flink了。 Flink官网第一句话介绍是数据流上的有状态计算。 我第一眼看这句话感觉很拗口,什么是流上的计算?什么是有状态? 作为菜鸟,我觉的学习Flink最好方法是看官网并敲代码实践,不会的百度些博客学学。
Java架构师必看
2022/05/26
1.5K0
Flink教程(1) Flink DataStream 创建数据源 转换算子「建议收藏」
相关推荐
Flink1.4 Operator概述
更多 >
领券
社区富文本编辑器全新改版!诚邀体验~
全新交互,全新视觉,新增快捷键、悬浮工具栏、高亮块等功能并同时优化现有功能,全面提升创作效率和体验
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档
查看详情【社区公告】 技术创作特训营有奖征文