前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >flink之DataStream算子1

flink之DataStream算子1

原创
作者头像
用户11134802
修改2024-06-16 07:45:43
960
修改2024-06-16 07:45:43
举报
文章被收录于专栏:flink基础知识点

一、基本转换算子

基本转换算子有 map 、filter 、 flatmap

Flink对POJO(Plain Ordinary Java Object简单的Java对象,实际就是普通JavaBeans)类型的要求如下:

类是公有(public)的

有一个无参的构造方法

所有属性都是公有(public)的

所有属性的类型都是可以序列化的(基础类型就用包装类型)

1、创建bean类

为了更好的演示案例,基于Flink的pojo类型的定义,创建一个pojo类

代码语言:javascript
复制
public class WaterSensor {
    public String id;
    public Long ts;
    public Integer vc;

    public WaterSensor() {
    }

    public WaterSensor(String id, Long ts, Integer vc) {
        this.id = id;
        this.ts = ts;
        this.vc = vc;
    }

    public String getId() {
        return id;
    }

    public void setId(String id) {
        this.id = id;
    }

    public Long getTs() {
        return ts;
    }

    public void setTs(Long ts) {
        this.ts = ts;
    }

    public Integer getVc() {
        return vc;
    }

    public void setVc(Integer vc) {
        this.vc = vc;
    }

    @Override
    public String toString() {
        return "WaterSensor{" +
                "id='" + id + '\'' +
                ", ts=" + ts +
                ", vc=" + vc +
                '}';
    }

    @Override
    public boolean equals(Object o) {
        if (this == o) return true;
        if (o == null || getClass() != o.getClass()) return false;
        WaterSensor that = (WaterSensor) o;
        return Objects.equals(id, that.id) && Objects.equals(ts, that.ts) && Objects.equals(vc, that.vc);
    }

    @Override
    public int hashCode() {
        return Objects.hash(id, ts, vc);
    }
}

2、map 映射

map是大家非常熟悉的大数据操作算子,主要用于将数据流中的数据进行转换,形成新的数据流。简单来说,就是一个“一一映射”,消费一个元素就产出一个元素。

进行数据格式的转换 , 如wordcount里,将 “hello world” 转成了 (hello , 1),(world ,1)

数据操作流向
数据操作流向

只需要基于DataStream调用map()方法就可以进行转换处理。方法需要传入的参数是接口MapFunction的实现;返回值类型还是DataStream,不过泛型(流中的元素类型)可能改变。

案例需求:提取WaterSensor中的id字段

代码语言:javascript
复制
public static void main(String[] args) throws Exception {

    StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

    DataStreamSource<WaterSensor> stream = env.fromElements(
            new WaterSensor("sensor_1", 1, 1),
            new WaterSensor("sensor_2", 2, 2)
    );

    // 方式一:传入匿名类,实现MapFunction
    stream.map(new MapFunction<WaterSensor, String>() {
        @Override
        public String map(WaterSensor e) throws Exception {
            return e.id;
        }
    }).print();

    // 方式二:传入MapFunction的实现类
    // stream.map(new UserMap()).print();

    env.execute();
}

public static class UserMap implements MapFunction<WaterSensor, String> {
    @Override
    public String map(WaterSensor e) throws Exception {
        return e.id;
    }
}

3、filter 过滤

filter转换操作,顾名思义是对数据流执行一个过滤,通过一个布尔条件表达式设置过滤条件,对于每一个流内元素进行判断,若为true则元素正常输出,若为false则元素被过滤掉

数据操作流向
数据操作流向

进行filter转换之后的新数据流的数据类型与原数据流是相同的。filter转换需要传入的参数需要实现FilterFunction接口,而FilterFunction内要实现filter()方法,就相当于一个返回布尔类型的条件表达式。

案例需求:下面的代码会将数据流中传感器id为sensor_1的数据过滤出来。

代码语言:javascript
复制
public static void main(String[] args) throws Exception {

    StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

    DataStreamSource<WaterSensor> stream = env.fromElements(

            new WaterSensor("sensor_1", 1, 1),
            new WaterSensor("sensor_1", 2, 2),
            new WaterSensor("sensor_2", 2, 2),
            new WaterSensor("sensor_3", 3, 3)
    );

    // 方式一:传入匿名类实现FilterFunction
    stream.filter(new FilterFunction<WaterSensor>() {
        @Override
        public boolean filter(WaterSensor e) throws Exception {
            return e.id.equals("sensor_1");
        }
    }).print();

    // 方式二:传入FilterFunction实现类
    // stream.filter(new UserFilter()).print();

    env.execute();
}
public static class UserFilter implements FilterFunction<WaterSensor> {
    @Override
    public boolean filter(WaterSensor e) throws Exception {
        return e.id.equals("sensor_1");
    }
}

4、flatmap 扁平映射

flatMap操作又称为扁平映射,主要是将数据流中的整体(一般是集合类型)拆分成一个一个的个体使用消费一个元素,可以产生0到多个元素。flatMap可以认为是“扁平化”(flatten)和“映射”(map)两步操作的结合,也就是先按照某种规则对数据进行打散拆分,再对拆分后的元素做转换处理。

数据操作流向
数据操作流向

同map一样,flatMap也可以使用Lambda表达式或者FlatMapFunction接口实现类的方式来进行传参,返回值类型取决于所传参数的具体逻辑,可以与原数据流相同,也可以不同。

案例需求:如果输入的数据是sensor_1,只打印vc;如果输入的数据是sensor_2,既打印ts又打印vc。

代码语言:javascript
复制
public static void main(String[] args) throws Exception {

    StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

    DataStreamSource<WaterSensor> stream = env.fromElements(

            new WaterSensor("sensor_1", 1, 1),
            new WaterSensor("sensor_1", 2, 2),
            new WaterSensor("sensor_2", 2, 2),
            new WaterSensor("sensor_3", 3, 3)

    );

    stream.flatMap(new MyFlatMap()).print();

    env.execute();
}

public static class MyFlatMap implements FlatMapFunction<WaterSensor, String> {

    @Override
    public void flatMap(WaterSensor value, Collector<String> out) throws Exception {

        if (value.id.equals("sensor_1")) {
            out.collect(String.valueOf(value.vc));
        } else if (value.id.equals("sensor_2")) {
            out.collect(String.valueOf(value.ts));
            out.collect(String.valueOf(value.vc));
        }
    }
}

二、聚合算子

聚合算子前必须要KeyBy算子进行按Key分区,才能聚合

聚合算子有sum( ) 、min( ) 、max( ) 、maxBy( ) 、minBy( )

计算的结果不仅依赖当前数据,还跟之前的数据有关,相当于要把所有数据聚在一起进行汇总合并——这就是所谓的“聚合”(Aggregation),类似于MapReduce中的reduce操作。

1、按键分区(keyBy)

对于Flink而言,DataStream是没有直接进行聚合的API的。因为我们对海量数据做聚合肯定要进行分区并行处理,这样才能提高效率。所以在Flink中,要做聚合,需要先进行分区;这个操作就是通过keyBy来完成的。例如 Top N 问题

keyBy是聚合前必须要用到的一个算子(如果没有keyBy,那数据仍以一条流的方式聚集在一起)。keyBy通过指定键(key),可以将一条流从逻辑上划分成不同的分区(partitions)。这里所说的分区,其实就是并行处理的子任务。

基于不同的key,流中的数据将被分配到不同的分区中去;这样一来,所有具有相同的key的数据,都将被发往同一个分区。

数据操作方向
数据操作方向

在内部,是通过计算key的哈希值(hash code),对分区数进行取模运算来实现的。所以这里key的对象如果是POJO的话,必须要重写hashCode()方法

keyBy()方法需要传入一个参数这个参数指定了一个或一组key。有很多不同的方法来指定key:比如对于Tuple数据类型,可以指定字段的位置或者多个位置的组合;对于POJO类型,可以指定字段的名称(String);另外,还可以传入Lambda表达式或者实现一个键选择器(KeySelector),用于说明从数据中提取key的逻辑。

案例需求:按照WaterSensor的id进行keyBy分组

代码语言:javascript
复制
public static void main(String[] args) throws Exception {

    StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

    DataStreamSource<WaterSensor> stream = env.fromElements(
            new WaterSensor("sensor_1", 1, 1),
            new WaterSensor("sensor_1", 2, 2),
            new WaterSensor("sensor_2", 2, 2),
            new WaterSensor("sensor_3", 3, 3)
    );

    // 方式一:使用Lambda表达式
    KeyedStream<WaterSensor, String> keyedStream = stream.keyBy(e -> e.id);

    // 方式二:使用匿名类实现KeySelector
    KeyedStream<WaterSensor, String> keyedStream1 = stream.keyBy(new KeySelector<WaterSensor, String>() {
        @Override
        public String getKey(WaterSensor e) throws Exception {
            return e.id;
        }
    });

    env.execute();
}

需要注意的是,keyBy得到的结果将不再是DataStream,而是会将DataStream转换为KeyedStream。KeyedStream可以认为是“分区流”或者“键控流”,它是对DataStream按照key的一个逻辑分区,所以泛型有两个类型:除去当前流中的元素类型外,还需要指定key的类型。

KeyedStream也继承自DataStream,所以基于它的操作也都归属于DataStream API。但它跟之前的转换操作得到的SingleOutputStreamOperator不同,只是一个流的分区操作,并不是一个转换算子。KeyedStream是一个非常重要的数据结构,只有基于它才可以做后续的聚合操作(比如sum,reduce)。

2、简单聚合算子

有了按键分区的数据流KeyedStream(前提),我们就可以基于它进行聚合操作了。Flink为我们内置实现了一些最基本、最简单的聚合API,主要有以下几种:

· sum():在输入流上,对指定的字段做叠加求和的操作。

· min():在输入流上,对指定的字段求最小值。

· max():在输入流上,对指定的字段求最大值。

·minBy():与min()类似,在输入流上针对指定字段求最小值。不同的是,min()只计算指定字段的最小值,其他字段会保留最

初第一个数据的值;而minBy()则会返回包含字段最小值的整条数据

·maxBy():与max()类似,在输入流上针对指定字段求最大值。两者区别与min()/minBy()完全一致。

简单聚合算子使用非常方便,语义也非常明确。这些聚合方法调用时,也需要传入参数;但并不像基本转换算子那样需要实现自定义函数,只要说明聚合指定的字段就可以了。指定字段的方式有两种:指定位置,和指定名称

对于元组类型的数据,可以使用这两种方式来指定字段。需要注意的是,元组中字段的名称,是以f0、f1、f2、…来命名的。

如果数据流的类型是POJO类,那么就只能通过字段名称来指定,不能通过位置来指定了。

案例需求:输出POJO类vc字段的最小值

代码语言:javascript
复制
public static void main(String[] args) throws Exception {

    StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

    DataStreamSource<WaterSensor> stream = env.fromElements(
            new WaterSensor("sensor_1", 1, 1),
            new WaterSensor("sensor_1", 2, 2),
            new WaterSensor("sensor_2", 2, 2),
            new WaterSensor("sensor_3", 3, 3)
    );

    stream.keyBy(e -> e.id).max("vc");    // 指定字段名称

    env.execute();
}

简单聚合算子返回的,同样是一个SingleOutputStreamOperator,也就是从KeyedStream又转换成了常规的DataStream。所以可以这样理解:keyBy和聚合是成对出现的,先分区、后聚合,得到的依然是一个DataStream。而且经过简单聚合之后的数据流,元素的数据类型保持不变

一个聚合算子,会为每一个key保存一个聚合的值,在Flink中我们把它叫作“状态”(state)。所以每当有一个新的数据输入,算子就会更新保存的聚合结果,并发送一个带有更新后聚合值的事件到下游算子。对于无界流来说,这些状态是永远不会被清除的,所以我们使用聚合算子,应该只用在含有有限个key的数据流上(后面项目实战的时候这个点很关键)

3、reduce归约聚合

reduce可以对已有的数据进行归约处理,把每一个新输入的数据和当前已经归约出来的值,再做一个聚合计算。

reduce操作也会将KeyedStream转换为DataStream。它不会改变流的元素数据类型,所以输出类型和输入类型是一样的

调用KeyedStream的reduce方法时,需要传入一个参数,实现ReduceFunction接口。接口在源码中的定义如下:

代码语言:javascript
复制
public interface ReduceFunction<T> extends Function, Serializable {
    T reduce(T value1, T value2) throws Exception;
}

ReduceFunction接口里需要实现reduce()方法,这个方法接收两个输入事件,经过转换处理之后输出一个相同类型的事件。在流处理的底层实现过程中,实际上是将中间“合并的结果”作为任务的一个状态保存起来的;之后每来一个新的数据,就和之前的聚合状态进一步做归约。

我们可以单独定义一个函数类实现ReduceFunction接口,也可以直接传入一个匿名类。当然,同样也可以通过传入Lambda表达式实现类似的功能。

为了方便后续使用,定义一个WaterSensorMapFunction

代码语言:javascript
复制
//这个实现类的作用是实现数据的格式转换,把string变成WatetSensor的POJO类
public class WaterSensorMapFunction implements MapFunction<String,WaterSensor> {
    @Override
    public WaterSensor map(String value) throws Exception {
        String[] datas = value.split(",");
        return new WaterSensor(datas[0],Long.valueOf(datas[1]) ,Integer.valueOf(datas[2]) );
    }
}

案例需求:使用reduce实现max和maxBy的功能。

代码语言:javascript
复制
public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.socketTextStream("hadoop102", 7777)
                .map(new WaterSensorMapFunction())
                .keyBy(WaterSensor::getId)
                .reduce(new ReduceFunction<WaterSensor>()
                {
                    @Override
                    public WaterSensor reduce(WaterSensor value1, WaterSensor value2) throws Exception {
                        System.out.println("Demo7_Reduce.reduce");
                        
                        //这里就已经实现了max的功能
                        int maxVc = Math.max(value1.getVc(), value2.getVc());
                        
                        //实现max(vc)的效果  取最大值,其他字段以当前组的第一个为主
                        //value1.setVc(maxVc);
                        //实现maxBy(vc)的效果  取当前最大值的所有字段
                        if (value1.getVc() > value2.getVc()){
                            value1.setVc(maxVc);//意义不大 , 但是方便理解
                            return value1;
                        }else {
                            value2.setVc(maxVc);
                            return value2;
                        }
                    }
                })
                .print();
        env.execute();
    }

但是注意的是,reduce还有一个问题要,比如有三个元素,reduce算子的操作聚合前两个是第一次一般是能理解的,但是第二次的聚合就存疑了?究竟是单独的第二和第三个元素,还是前两个元素的reduce结果和第三个元素进行reduce,即对reduce的结果传递性是怎么样的?在上一个代码片段中,如果看不懂的小伙伴可以参考一下这个代码的实现的目标是什么?没错是实现maxBy的功能,因此reduce算子reduce结果是可以传递的,具有传递性。

因此reduce的工作流程:

代码语言:javascript
复制
1、创建 Keyed Stream:
    在调用 reduce 之前,通常会先调用 keyBy方法来指定一个或多个字段作为键。这些字段的值相同的所有元素都会
    被分配到相同的逻辑分区,形成一个键控的流。
2、设置 ReduceFunction:
    reduce 方法需要一个实现了 ReduceFunction 接口的实例。这个接口定义了一个 reduce 方法,该方法接受两个
    相同类型的元素作为参数,并返回一个相同类型的新元素。这个方法定义了如何合并两个元素。
3、归约操作:
    对于键控流中的每个键,Flink 会在该键对应的所有元素上调用 ReduceFunction 的 reduce 方法。这个过程是
    迭代进行的,直到每个键对应的元素被归约成一个元素。
        ·首先,对于每个键的第一个和第二个元素,reduce 方法会被调用。
        ·然后,返回的结果(即归约后的元素)会与下一个元素一起再次调用 reduce 方法。
        ·这个过程会持续进行,直到该键的所有元素都被处理完毕,最终得到一个归约后的元素。
4、并行处理:
    Flink 是一个分布式流处理框架,因此 reduce 操作可以在多个并行任务(task)中同时进行。每个键的归约操作
    都会在其对应的任务中执行,这样可以实现并行处理,提高处理效率。
5、结果输出:
    归约操作完成后,每个键的归约结果会被发送到下游操作。在上面的例子中,使用 .print() 方法将结果输出到控
    制台。
6、故障恢复:
    Flink 提供了强大的故障恢复机制。如果在归约过程中发生故障(如节点宕机),Flink 会自动重新分配任务,并
    从最近的检查点(checkpoint)恢复状态,以确保归约操作的正确性和一致性。
7、性能优化:
    Flink 还提供了一些优化手段来提高归约操作的性能,如状态后端(state backend)的选择、检查点的配置等。
    这些优化手段可以帮助减少状态存储的开销、提高故障恢复的速度等。

reduce同简单聚合算子一样,也要针对每一个key保存状态。因为状态不会清空,所以我们需要将reduce算子作用在一个有限key的流上。

reduce总结

1、keyBy算子之后的reduce,其实计算的是历史以来所有数据的和,每过来一条数据,就输出一次结果。

2、window算子之后的reduce,其实计算的是window窗口内的数据和,每次窗口触发的时候,才会输出一次结果。

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 一、基本转换算子
    • 1、创建bean类
      • 2、map 映射
        • 案例需求:提取WaterSensor中的id字段
      • 3、filter 过滤
        • 案例需求:下面的代码会将数据流中传感器id为sensor_1的数据过滤出来。
      • 4、flatmap 扁平映射
        • 案例需求:如果输入的数据是sensor_1,只打印vc;如果输入的数据是sensor_2,既打印ts又打印vc。
    • 二、聚合算子
      • 1、按键分区(keyBy)
        • 案例需求:按照WaterSensor的id进行keyBy分组
      • 2、简单聚合算子
        • 案例需求:输出POJO类vc字段的最小值
      • 3、reduce归约聚合
        • 案例需求:使用reduce实现max和maxBy的功能。
    领券
    问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档