前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >Flink学习随笔-2021-02

Flink学习随笔-2021-02

作者头像
用户8483969
发布2021-04-09 11:32:46
4400
发布2021-04-09 11:32:46
举报
文章被收录于专栏:bgmonkeybgmonkey

Flink学习笔记

一、Flink运行架构

1、 Flink 运行时的组件

代码语言:javascript
复制
`作业管理器(JobManager)`
`资源管理器(ResourceManager)`
`任务管理器(TaskManager)`
`以及分发器(Dispatcher)`

作业管理器(JobManager)

代码语言:javascript
复制
控制一个应用程序执行的主进程,也就是说,每个应用程序都会被一个不同的JobManager 所控制执行。JobManager 会先接收到要执行的应用程序,这个应用程序会包括:作业图(JobGraph)、逻辑数据流图(logical dataflow graph)和打包了所有的类、库和其它资源的 JAR 包。JobManager 会把 JobGraph 转换成一个物理层面的数据流图,这个图被叫做“执行图”(ExecutionGraph),包含了所有可以并发执行的任务。JobManager 会向资源管理器(ResourceManager)请求执行任务必要的资源,也就是任务管理器(TaskManager)上的插槽(slot)。一旦它获取到了足够的资源,就会将执行图分发到真正运行它们的TaskManager 上。而在运行过程中,JobManager 会负责所有需要中央协调的操作,比如说检查点(checkpoints)的协调。

资源管理器(ResourceManager)

代码语言:javascript
复制
主要负责管理任务管理器(TaskManager)的插槽(slot),TaskManger 插槽是 Flink 中定义的处理资源单元。Flink 为不同的环境和资源管理工具提供了不同资源管理器,比如YARN、Mesos、K8s,以及 standalone 部署。当 JobManager 申请插槽资源时,ResourceManager会将有空闲插槽的 TaskManager 分配给 JobManager。如果 ResourceManager 没有足够的插槽来满足 JobManager 的请求,它还可以向资源提供平台发起会话,以提供启动 TaskManager进程的容器。另外,ResourceManager 还负责终止空闲的 TaskManager,释放计算资源。

任务管理器(TaskManager)

代码语言:javascript
复制
Flink 中的工作进程。通常在 Flink 中会有多个TaskManager运行,每一个TaskManager都包含了一定数量的插槽(slots)。插槽的数量限制了TaskManager 能够执行的任务数量。启动之后,TaskManager 会向资源管理器注册它的插槽;收到资源管理器的指令后,TaskManager 就会将一个或者多个插槽提供给JobManager调用。JobManager就可以向插槽分配任务(tasks)来执行了。在执行过程中,一个TaskManager可以跟其它运行同一应用程序的TaskManagerx交换数据。

分发器(Dispatcher)

代码语言:javascript
复制
可以跨作业运行,它为应用提交提供了 REST 接口。当一个应用被提交执行时,分发器就会启动并将应用移交给一个 JobManager。由于是 REST 接口,所以 Dispatcher 可以作为集群的一个 HTTP 接入点,这样就能够不受防火墙阻挡。Dispatcher 也会启动一个 Web UI,用来方便地展示和监控作业执行的信息。Dispatcher 在架构中可能并不是必需的,这取决于应用提交运行的方式。

二、开发

1、Source

1.1从集合读取数据
代码语言:javascript
复制
public class SourceFromList {
    public static void main(String[] args) throws Exception {
        
//        创建执行环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        
//        ①、从集合中读取文件
        DataStreamSource<SensorReading> data1 = env.fromCollection(Arrays.asList(
                new SensorReading("sensor_1", 1547718199L, 35.8),
                new SensorReading("sensor_6", 1547718201L, 15.4),
                new SensorReading("sensor_10", 1547718205L, 38.1)
                ));
        
//        ②、直接读取传入参数  setParallelism为设置并行度
        DataStreamSource<? extends Serializable> data2 = env.fromElements("sensor_16, 1547218201, 27.3","sensor_18, 1547358286, 36.5").setParallelism(1);

//        打印输出
        data1.print("data1");
        data2.print("data2");

//        execute中传参为JobName
        env.execute("Demo1");
        
    }
}
1.2从文件中读取数据
代码语言:javascript
复制
public class SourceFromFile {
    public static void main(String[] args) throws Exception {
//        创建执行环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

//       设置并行度
        env.setParallelism(1);

//        从文件中读取内容
        DataStreamSource<String> dataSource = env.readTextFile("E:\\IDEA2019_3Projects\\FlinkDemo\\src\\main\\resources\\sensor.txt");
        dataSource.print("dataSource");

//        执行
        env.execute();
    }
}
1.3从kafka中读取数据

需要引入连接器jar包

代码语言:javascript
复制
<!-- 0.11为kafka版本,2.12为scala版本,Flink是依赖于scala的。1.10.1是连接器的版本,和Flink版本一致 -->
<dependency>
	<groupId>org.apache.flink</groupId>
	<artifactId>flink-connector-kafka-0.11_2.12</artifactId>
	<version>1.10.1</version>
</dependency>
代码语言:javascript
复制
public class SourceFromKafka {
    public static void main(String[] args) throws Exception {
//        创建执行环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
//        设置并行度
        env.setParallelism(1);
//        配置参数
        Properties prop = new Properties();
//        集群信息
        prop.setProperty("bootstrap.servers", "master:9092");
//        消费者组
        prop.setProperty("group.id", "consumer-group");
//        序列化
        prop.setProperty("key.deserializer","org.apache.kafka.common.serialization.StringDeserializer");
//        反序列化
        prop.setProperty("value.deserializer","org.apache.kafka.common.serialization.StringDeserializer");
//        消费偏移量
        prop.setProperty("auto.offset.reset", "latest");

//        从kafka中读取数据  addSource()
        DataStreamSource<String> dataStreamSource = env.addSource(new FlinkKafkaConsumer011<String>("sensor", new SimpleStringSchema(), prop));

//        打印输出
        dataStreamSource.print("kafka");

//        执行
        env.execute();
    }
}
1.4自定义数据源
代码语言:javascript
复制
public class SourceFromCustom {
    public static void main(String[] args) throws Exception {
//        创建执行环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
//        设置并行度
        env.setParallelism(1);
        DataStreamSource<SensorReading> dataStreamSource = env.addSource(new MySource());
        dataStreamSource.print();
//        执行
        env.execute();
}

    /**
     *
     *自定义SourceFunction
     */
    public static class MySource implements SourceFunction<SensorReading> {
//        定义一个标识位,用来控制循环
        private boolean running = true;

        @Override
        public void run(SourceContext<SensorReading> ctx) throws Exception {
//          为了模拟真实数据变化,定义一个随机数发生器
            Random random = new Random();
//          设置是个传感器的初始温度值
            HashMap<String, Double> sensorTempMap = new HashMap<String,Double>();
            for (int i = 1; i < 11; i++) {
                sensorTempMap.put("sensor_" + i, 60+random.nextGaussian()*20);
            }

            while (running){
               for (String sensorId:sensorTempMap.keySet()){
                   Double newTemp = sensorTempMap.get(sensorId)+random.nextGaussian();
                   sensorTempMap.put(sensorId, newTemp);
                   ctx.collect(new SensorReading(sensorId,System.currentTimeMillis(),newTemp));
                }
             }
            Thread.sleep(1000L);
          }
        
        @Override
        public void cancel() {
            running = false;
        }
    }
}

2、Transform转换算子

读数据

代码语言:javascript
复制
//        创建执行环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
//        设置并行度
        env.setParallelism(1);
//        读数据
DataStreamSource<String> dataStreamSource = env.readTextFile("E:\\IDEA2019_3Projects\\FlinkDemo\\src\\main\\resources\\sensor.txt");
2.1Map
代码语言:javascript
复制
//        1、map,把String转换成长度输出
        SingleOutputStreamOperator<Integer> map = dataStreamSource.map(new MapFunction<String, Integer>() {
            @Override
            public Integer map(String value) throws Exception {
                return value.length();
            }
        });
2.2flatMap
代码语言:javascript
复制
SingleOutputStreamOperator<String> flatMap = dataStreamSource.flatMap(new FlatMapFunction<String, String>() {
            @Override
            public void flatMap(String value, Collector<String> out) throws Exception {

                String[] fields = value.split(",");
                for (String field : fields){
                    out.collect(field);
                }
            }
        });
        flatMap.print("flatMap");
2.3、filter
代码语言:javascript
复制
//        3、filter,筛选温度为37.1的数据
        SingleOutputStreamOperator<String> filter = dataStreamSource.filter(new FilterFunction<String>() {
            @Override
            public boolean filter(String value) throws Exception {
                return value.contains("37.1");
            }
        });
        filter.print("filter");
//       filter,筛选以senser_1开头的
value.startsWith("sensor_1");
2.4、KeyBy
代码语言:javascript
复制
//        转换成SensorReading
//        SingleOutputStreamOperator<SensorReading> map = dataStreamSource.map(new MapFunction<String, SensorReading>() {
//            @Override
//            public SensorReading map(String value) throws Exception {
//                String[] fields = value.split(",");
//                return new SensorReading(fields[0],new Long(fields[1]),new Double(fields[2]));
//            }
//        });

//        Lambda表达式格式
        SingleOutputStreamOperator<SensorReading> dataStream = dataStreamSource.map(line -> {
            String[] fields = line.split(",");
            return new SensorReading(fields[0], new Long(fields[1]), new Double(fields[2]));
        });

//        分组
        KeyedStream<SensorReading, Tuple> keyedStream = dataStream.keyBy("id");
//        滚动聚合
        SingleOutputStreamOperator<SensorReading> temperature = keyedStream.maxBy("temperature");
        temperature.print("temp");
2.4、Reduce
代码语言:javascript
复制
//        分组
        KeyedStream<SensorReading, Tuple> keyedStream = dataStream.keyBy("id");
//        Reduce聚合,取最大的温度值以及当前最新的时间戳
        SingleOutputStreamOperator<SensorReading> reduce = keyedStream.reduce(new ReduceFunction<SensorReading>() {
            @Override
            public SensorReading reduce(SensorReading value1, SensorReading value2) throws Exception {
                return new SensorReading(value1.getId(), value2.getTimestamp(), Math.max(value1.getTemperature(), value2.getTemperature()));
            }
        });
reduce.print("reducce");
2.5、分流split、select
代码语言:javascript
复制
//        读数据
        DataStreamSource<String> dataStreamSource = env.readTextFile("E:\\IDEA2019_3Projects\\FlinkDemo\\src\\main\\resources\\sensor.txt");
//       转换为POJO类
        SingleOutputStreamOperator<SensorReading> map = dataStreamSource.map(new MapFunction<String, SensorReading>() {
            @Override
            public SensorReading map(String value) throws Exception {
                String[] fields = value.split(",");
                return new SensorReading(fields[0], new Long(fields[1]), new Double(fields[2]));
            }
        });

//      分流操作,按照30度为临界值分成两个流
        SplitStream<SensorReading> multiplieStream = map.split(new OutputSelector<SensorReading>() {
            @Override
            public Iterable<String> select(SensorReading value) {
                return (value.getTemperature() > 30) ? Collections.singletonList("high") : Collections.singletonList("low");
            }
        });
        DataStream<SensorReading> high = multiplieStream.select("high");
        DataStream<SensorReading> low = multiplieStream.select("low");
        DataStream<SensorReading> all = multiplieStream.select("high", "low");

        high.print("high");
        low.print("low");
        all.print("all");

//        执行
        env.execute();
    }
2.6、合并流

Connect CoMap

代码语言:javascript
复制
        /**
         *    合流操作,将高温流转换成二元组类型,与低温流连接合并之后,输出状态信息,高温报警低温正常
         *
         */

        SingleOutputStreamOperator<Tuple2<String, Double>> highStream = high.map(new MapFunction<SensorReading, Tuple2<String, Double>>() {
            @Override
            public Tuple2<String, Double> map(SensorReading value) throws Exception {
                return new Tuple2<>(value.getId(),value.getTemperature());
            }
        });

        ConnectedStreams<Tuple2<String, Double>, SensorReading> connectedStreams = highStream.connect(low);
        SingleOutputStreamOperator<Object> result = connectedStreams.map(new CoMapFunction<Tuple2<String, Double>, SensorReading, Object>() {
            @Override
            public Object map1(Tuple2<String, Double> value) throws Exception {
                return new Tuple3<>(value.f0, value.f1, "高温报警");
            }

            @Override
            public Object map2(SensorReading value) throws Exception {
                return new Tuple2<>(value.getId(), "正常");
            }
        });

        result.print();

Union

代码语言:javascript
复制
DataStream<SensorReading> unionStream = highTempStream.union(lowTempStream);

Connect 与 Union区别

代码语言:javascript
复制
1.Union之前两个流的类型必须是一样,Connect可以不一样,在之后的coMap中再去调整成为一样的。
2. Connect只能操作两个流,Union 可以操作多个

2.7、自定义UDF函数

自定义函数并可以传参

代码语言:javascript
复制
//        创建执行环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
//        设置并行度
        env.setParallelism(1);

//        读数据
        DataStreamSource<String> dataStreamSource = env.readTextFile("E:\\IDEA2019_3Projects\\FlinkDemo\\src\\main\\resources\\sensor.txt");
//        自定义keyFilter函数
SingleOutputStreamOperator<String> sensorFilter = dataStreamSource.filter(new keyFilter("sensor_1"));
sensorFilter.print("result");

//        实现keyFilter函数并传参
    public static class keyFilter implements FilterFunction<String>{
        private String key;
        public keyFilter(String key) {
            this.key = key;
        }

        @Override
        public boolean filter(String value) throws Exception {
            return value.contains(this.key);
        }
    }

3、Sink

3.1Sink到Kafka
代码语言:javascript
复制
public class SinkToKafka {
    public static <IN> void main(String[] args) {
//        环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);
//       读取文件
        DataStreamSource<String> inputStream = env.readTextFile("E:\\IDEA2019_3Projects\\FlinkDemo\\src\\main\\resources\\sensor.txt");
//       转换成SensorReading类型
        SingleOutputStreamOperator<String> datastream = inputStream.map(line -> {
            String[] fields = line.split(",");
            return new SensorReading(fields[0], new Long(fields[1]), new Double(fields[1])).toString();
        });
        DataStreamSink<String> test = datastream.addSink(new FlinkKafkaProducer011<String>("master:9092", "test", new SimpleStringSchema()));
    }
}
3.2Sink到MySql
代码语言:javascript
复制
public class SinkToMysql {
    public static void main(String[] args) throws Exception {
//        环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);

//       读取文件
        DataStreamSource<String> inputStream = env.readTextFile("E:\\IDEA2019_3Projects\\FlinkDemo\\src\\main\\resources\\sensor.txt");
//       转换成SensorReading类型
        DataStream<SensorReading> dataStream = inputStream.map(line -> {
            String[] fields = line.split(",");
            return new SensorReading(fields[0], new Long(fields[1]), new Double(fields[1]));
        });

        dataStream.addSink(new MyJDBCSink());
        env.execute();
    }
    
    public static class MyJDBCSink extends RichSinkFunction<SensorReading> {
        Connection conn = null;
        PreparedStatement insertStmt = null;
        PreparedStatement updateStmt = null;

        @Override
        public void open(Configuration parameters) throws Exception {
//            连接
            conn=DriverManager.getConnection("jdbc:mysql://master:3306/student","root", "root");

//            创建预编译器,有占位符,可传入参数
            insertStmt=conn.prepareStatement("INSERT INTO sensor (id, dept) VALUES(?, ?)");
            updateStmt = conn.prepareStatement("UPDATE sensor SET dept = ? WHERE id = ?");

        }
        @Override
        public void invoke(SensorReading value, Context context) throws Exception {
//            直接执行更新语句,如果没有执行成功则执行插入操作
            updateStmt.setDouble(1, value.getTemperature());
            updateStmt.setString(2, value.getId());
            updateStmt.execute();
            if( updateStmt.getUpdateCount() == 0 ){
                insertStmt.setString(1, value.getId());
                insertStmt.setDouble(2, value.getTemperature());
                insertStmt.execute();
            }
        }
        @Override
        public void close() throws Exception {
            insertStmt.close();
            updateStmt.close();
            conn.close();
        }

    }
}

4、Window

4.1Window概述
4.1.1概述
代码语言:javascript
复制
streaming流式计算是一种被设计用于处理无限数据集的数据处理引擎,而无限数据集是指一种不断增长的本质上无限的数据集,而 window是一种切割无限数据为有限块进行处理的手段。 
Window 是无限数据流处理的核心,Window 将一个无限的 stream 拆分成有限大小的”buckets”桶,我们可以在这些桶上做计算操作。
4.1.2Window类型
代码语言:javascript
复制
Window 可以分成两类:
➢ CountWindow:按照指定的数据条数生成一个 Window,与时间无关。
➢ TimeWindow:按照时间生成 Window。

对于 TimeWindow,可以根据窗口实现原理的不同分成三类:滚动窗口(Tumbling Window)、滑动窗口(Sliding Window)和会话窗口(Session Window)。

1、滚动窗口(Tumbling Windows) 将数据依据固定的窗口长度对数据进行切片。 ==特点:==时间对齐,窗口长度固定,没有重叠。 滚动窗口分配器将每个元素分配到一个指定窗口大小的窗口中,滚动窗口有一个固定的大小,并且不会出现重叠。 ==适用场景:==适合做 BI 统计等(做每个时间段的聚合计算)。 **2、滑动窗口(Sliding Windows) ** 滑动窗口是固定窗口的更广义的一种形式,滑动窗口由固定的窗口长度和滑动 间隔组成。 ==特点:==时间对齐,窗口长度固定,可以有重叠。 滑动窗口分配器将元素分配到固定长度的窗口中,与滚动窗口类似,窗口的大小由窗口大小参数来配置,另一个窗口滑动参数控制滑动窗口开始的频率。因此,滑动窗口如果滑动参数小于窗口大小的话,窗口是可以重叠的,在这种情况下元素会被分配到多个窗口中。 例如,你有 10 分钟的窗口和 5 分钟的滑动,那么每个窗口中 5 分钟的窗口里包 含着上个 10 分钟产生的数据 ==适用场景:==对最近一个时间段内的统计(求某接口最近 5min 的失败率来决定是否要报警)。 3. 会话窗口(Session Windows) 由一系列事件组合一个指定时间长度的 timeout 间隙组成,类似于 web 应用的session,也就是一段时间没有接收到新数据就会生成新的窗口。 ==特点:==时间无对齐。 session 窗口分配器通过 session 活动来对元素进行分组,session 窗口跟滚动窗口和滑动窗口相比,不会有重叠和固定的开始时间和结束时间的情况,相反,当它在一个固定的时间周期内不再收到元素,即非活动间隔产生,那个这个窗口就会关闭。一个 session 窗口通过一个 session 间隔来配置,这个 session 间隔定义了非活跃周期的长度,当这个非活跃周期产生,那么当前的 session 将关闭并且后续的元素将被分配到新的 session 窗口中去。

4.2API
本文参与 腾讯云自媒体分享计划,分享自作者个人站点/博客。
原始发表:2021-04-06 ,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 作者个人站点/博客 前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体分享计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • Flink学习笔记
    • 一、Flink运行架构
      • 1、 Flink 运行时的组件
    • 二、开发
      • 1、Source
      • 2、Transform转换算子
      • 3、Sink
      • 4、Window
相关产品与服务
大数据
全栈大数据产品,面向海量数据场景,帮助您 “智理无数,心中有数”!
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档