前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >flink之DataStream2

flink之DataStream2

原创
作者头像
用户11134802
修改2024-06-16 20:41:56
660
修改2024-06-16 20:41:56
举报
文章被收录于专栏:flink基础知识点flink基础知识点

这是接上文的flink之Datastream1,文章链接 https://cloud.tencent.com/developer/article/2428018?shareByChannel=link

------------------------------------------------------分割线---------------------------------------------------------

三、自定义UDF(user-defined function)函数

用户自定义函数(user-defined function,UDF),即用户可以根据自身需求,重新实现算子的逻辑。

用户自定义函数分为:函数类、匿名函数、富函数类(富函数这个后续会用到很多,涉及状态的保存以及更改需要操控open和close函数)

1、函数类(Function Classes)

Flink暴露了所有UDF函数的接口,具体实现方式为接口或者抽象类,例如MapFunction、FilterFunction、ReduceFunction等。所以用户可以自定义一个函数类,实现对应的接口。

因此之前写过实现MapFunction、FilterFunction、ReduceFunction的自定义函数,且此类函数用处不大,这里不过多赘述。

2、匿名函数

flink的这个函数只能在某个算子里面实现, 比如之前keyBy算子,如下

代码语言:javascript
复制
KeyedStream<WaterSensor, String> keyedStream = stream.keyBy(e -> e.id);

就使用了匿名函数类,此函数类优点是能简化函数的编写方式,但是缺点也很明显,只能使用一次。

3、富函数类

此函数的作用效果含括了函数类,如果是实现同一个接口,富函数接口在普通函数接口上多增加了一些抽象函数的定义,比如最常用的open、close函数,因此重点介绍。

富函数类”也是DataStream API提供的一个函数类的接口所有的Flink函数类都有其Rich版本富函数类一般是以抽象类的形式出现的。例如:RichMapFunction、RichFilterFunction、RichReduceFunction等

与常规函数类的不同主要在于,富函数类可以获取运行环境的上下文,并拥有一些生命周期方法,所以可以实现更复杂的功能

Rich Function有生命周期的概念。典型的生命周期方法有:

· open()方法,是Rich Function的初始化方法,也就是会开启一个算子的生命周期。当一个算子的实际工作方法例如map()或者filter()方法被调用之前,open()会首先被调用。

· close()方法,是生命周期中的最后一个调用的方法,类似于结束方法。一般用来做一些清理工作。

需要注意的是,这里的生命周期方法,对于一个并行子任务来说只会调用一次;而对应的,实际工作方法,例如RichMapFunction中的map(),在每条数据到来后都会触发一次调用(重点理解这句话!!!!)

比如之前写过的mapFunction

代码语言:javascript
复制
public class WaterSensorMapFunction implements MapFunction<String, WaterSensor> {
    @Override
    public WaterSensor map(String s) throws Exception {
        String[] datas = s.split(",");

        return new WaterSensor(datas[0],Long.valueOf(datas[1]),Integer.valueOf(datas[2]));
    }
}

变成RIchMapFunction只需要

1、在实现的mapFunction接口名字改成RichMapFunction接口

2、按Alt + Enter键自动导包

3、按Ctrl + O 选择 生成open方法和close方法

如下

代码语言:javascript
复制
public class WaterSensorMapFunction extends RichMapFunction<String, WaterSensor> {
    @Override
    public void open(Configuration parameters) throws Exception {
        super.open(parameters);
    }

    @Override
    public void close() throws Exception {
        super.close();
    }
    @Override
    public WaterSensor map(String s) throws Exception {
        String[] datas = s.split(",");

        return new WaterSensor(datas[0],Long.valueOf(datas[1]),Integer.valueOf(datas[2]));
    }
}

四、物理分区算子

分区算子只介绍广播

1、广播 broadcast

这种方式其实不应该叫做“重分区”,因为经过广播之后,数据会在不同的分区都保留一份(从当前分区,往每个分区发一份重复的数据)可能进行重复处理。可以通过调用DataStream的broadcast()方法,将输入数据复制并发送到下游算子的所有并行任务中去。

代码:

代码语言:javascript
复制
stream.broadcast()

五、分流

1、分流

所谓“分流”,就是将一条数据流拆分成完全独立的两条、甚至多条流。也就是基于一个DataStream,定义一些筛选条件,将符合条件的数据拣选出来放到对应的流里。

案例需求:读取一个整数数字流,将数据流划分为奇数流和偶数流。

代码语言:javascript
复制
public static void main(String[] args) throws Exception {


    StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

    SingleOutputStreamOperator<Integer> ds = env.socketTextStream("hadoop102", 7777)
            .map(Integer::valueOf);
    //将ds 分为两个流 ,一个是奇数流,一个是偶数流
    //使用filter 过滤两次
    SingleOutputStreamOperator<Integer> ds1 = ds.filter(x -> x % 2 == 0);
    SingleOutputStreamOperator<Integer> ds2 = ds.filter(x -> x % 2 == 1);

    ds1.print("偶数");
    ds2.print("奇数");

    env.execute();
}

这种实现非常简单,但代码显得有些冗余——我们的处理逻辑对拆分出的三条流其实是一样的,却重复写了三次。而且这段代码背后的含义,是将原始数据流stream复制三份,然后对每一份分别做筛选;这明显是不够高效的。我们自然想到,能不能不用复制流,直接用一个算子就把它们都拆分开呢?

2、测流

只需要调用上下文ctx的.output()方法,就可以输出任意类型的数据了。而侧输出流的标记和提取,都离不开一个“输出标签”(OutputTag),指定了侧输出流的id和类型。

代码语言:javascript
复制
public static void main(String[] args) throws Exception {
    StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

    SingleOutputStreamOperator<WaterSensor> ds = env.socketTextStream("hadoop102", 7777)
            .map(new WaterSensorMapFunction());


    OutputTag<WaterSensor> s1 = new OutputTag<>("s1", Types.POJO(WaterSensor.class)){};
    OutputTag<WaterSensor> s2 = new OutputTag<>("s2", Types.POJO(WaterSensor.class)){};
    //返回的都是主流
    SingleOutputStreamOperator<WaterSensor> ds1 = ds.process(new ProcessFunction<WaterSensor, WaterSensor>()
    {
        @Override
        public void processElement(WaterSensor value, Context ctx, Collector<WaterSensor> out) throws Exception {
            //将同一份数据进行需求进行处理,即可完成分流的操作,无需复制多份
            if ("s1".equals(value.getId())) {
                ctx.output(s1, value);
            } else if ("s2".equals(value.getId())) {
                ctx.output(s2, value);
            } else {
                //主流
                out.collect(value);
            }

        }
    });

    ds1.print("主流,非s1,s2的传感器");
    //获取侧输出流
    SideOutputDataStream<WaterSensor> s1DS = ds1.getSideOutput(s1);
    SideOutputDataStream<WaterSensor> s2DS = ds1.getSideOutput(s2);
    
    //打上标识,方便辨识
    s1DS.printToErr("s1");
    s2DS.printToErr("s2");

    env.execute();

}

六、合流

这里先不写,因为我用到的地方不多,待我用到再来总结

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • ------------------------------------------------------分割线---------------------------------------------------------
  • 三、自定义UDF(user-defined function)函数
    • 1、函数类(Function Classes)
      • 2、匿名函数
        • 3、富函数类
        • 四、物理分区算子
          • 1、广播 broadcast
            • 代码:
        • 五、分流
          • 1、分流
            • 案例需求:读取一个整数数字流,将数据流划分为奇数流和偶数流。
          • 2、测流
          • 六、合流
          领券
          问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档