Loading [MathJax]/jax/output/CommonHTML/config.js
前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
圈层
工具
发布
首页
学习
活动
专区
圈层
工具
MCP广场
社区首页 >专栏 >Flink学习随笔-2021-02

Flink学习随笔-2021-02

作者头像
用户8483969
发布于 2021-04-09 03:32:46
发布于 2021-04-09 03:32:46
47300
代码可运行
举报
文章被收录于专栏:bgmonkeybgmonkey
运行总次数:0
代码可运行

Flink学习笔记

一、Flink运行架构

1、 Flink 运行时的组件

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
`作业管理器(JobManager)`
`资源管理器(ResourceManager)`
`任务管理器(TaskManager)`
`以及分发器(Dispatcher)`

作业管理器(JobManager)

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
控制一个应用程序执行的主进程,也就是说,每个应用程序都会被一个不同的JobManager 所控制执行。JobManager 会先接收到要执行的应用程序,这个应用程序会包括:作业图(JobGraph)、逻辑数据流图(logical dataflow graph)和打包了所有的类、库和其它资源的 JAR 包。JobManager 会把 JobGraph 转换成一个物理层面的数据流图,这个图被叫做“执行图”(ExecutionGraph),包含了所有可以并发执行的任务。JobManager 会向资源管理器(ResourceManager)请求执行任务必要的资源,也就是任务管理器(TaskManager)上的插槽(slot)。一旦它获取到了足够的资源,就会将执行图分发到真正运行它们的TaskManager 上。而在运行过程中,JobManager 会负责所有需要中央协调的操作,比如说检查点(checkpoints)的协调。

资源管理器(ResourceManager)

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
主要负责管理任务管理器(TaskManager)的插槽(slot),TaskManger 插槽是 Flink 中定义的处理资源单元。Flink 为不同的环境和资源管理工具提供了不同资源管理器,比如YARN、Mesos、K8s,以及 standalone 部署。当 JobManager 申请插槽资源时,ResourceManager会将有空闲插槽的 TaskManager 分配给 JobManager。如果 ResourceManager 没有足够的插槽来满足 JobManager 的请求,它还可以向资源提供平台发起会话,以提供启动 TaskManager进程的容器。另外,ResourceManager 还负责终止空闲的 TaskManager,释放计算资源。

任务管理器(TaskManager)

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
Flink 中的工作进程。通常在 Flink 中会有多个TaskManager运行,每一个TaskManager都包含了一定数量的插槽(slots)。插槽的数量限制了TaskManager 能够执行的任务数量。启动之后,TaskManager 会向资源管理器注册它的插槽;收到资源管理器的指令后,TaskManager 就会将一个或者多个插槽提供给JobManager调用。JobManager就可以向插槽分配任务(tasks)来执行了。在执行过程中,一个TaskManager可以跟其它运行同一应用程序的TaskManagerx交换数据。

分发器(Dispatcher)

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
可以跨作业运行,它为应用提交提供了 REST 接口。当一个应用被提交执行时,分发器就会启动并将应用移交给一个 JobManager。由于是 REST 接口,所以 Dispatcher 可以作为集群的一个 HTTP 接入点,这样就能够不受防火墙阻挡。Dispatcher 也会启动一个 Web UI,用来方便地展示和监控作业执行的信息。Dispatcher 在架构中可能并不是必需的,这取决于应用提交运行的方式。

二、开发

1、Source

1.1从集合读取数据
代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
public class SourceFromList {
    public static void main(String[] args) throws Exception {
        
//        创建执行环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        
//        ①、从集合中读取文件
        DataStreamSource<SensorReading> data1 = env.fromCollection(Arrays.asList(
                new SensorReading("sensor_1", 1547718199L, 35.8),
                new SensorReading("sensor_6", 1547718201L, 15.4),
                new SensorReading("sensor_10", 1547718205L, 38.1)
                ));
        
//        ②、直接读取传入参数  setParallelism为设置并行度
        DataStreamSource<? extends Serializable> data2 = env.fromElements("sensor_16, 1547218201, 27.3","sensor_18, 1547358286, 36.5").setParallelism(1);

//        打印输出
        data1.print("data1");
        data2.print("data2");

//        execute中传参为JobName
        env.execute("Demo1");
        
    }
}
1.2从文件中读取数据
代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
public class SourceFromFile {
    public static void main(String[] args) throws Exception {
//        创建执行环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

//       设置并行度
        env.setParallelism(1);

//        从文件中读取内容
        DataStreamSource<String> dataSource = env.readTextFile("E:\\IDEA2019_3Projects\\FlinkDemo\\src\\main\\resources\\sensor.txt");
        dataSource.print("dataSource");

//        执行
        env.execute();
    }
}
1.3从kafka中读取数据

需要引入连接器jar包

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
<!-- 0.11为kafka版本,2.12为scala版本,Flink是依赖于scala的。1.10.1是连接器的版本,和Flink版本一致 -->
<dependency>
	<groupId>org.apache.flink</groupId>
	<artifactId>flink-connector-kafka-0.11_2.12</artifactId>
	<version>1.10.1</version>
</dependency>
代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
public class SourceFromKafka {
    public static void main(String[] args) throws Exception {
//        创建执行环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
//        设置并行度
        env.setParallelism(1);
//        配置参数
        Properties prop = new Properties();
//        集群信息
        prop.setProperty("bootstrap.servers", "master:9092");
//        消费者组
        prop.setProperty("group.id", "consumer-group");
//        序列化
        prop.setProperty("key.deserializer","org.apache.kafka.common.serialization.StringDeserializer");
//        反序列化
        prop.setProperty("value.deserializer","org.apache.kafka.common.serialization.StringDeserializer");
//        消费偏移量
        prop.setProperty("auto.offset.reset", "latest");

//        从kafka中读取数据  addSource()
        DataStreamSource<String> dataStreamSource = env.addSource(new FlinkKafkaConsumer011<String>("sensor", new SimpleStringSchema(), prop));

//        打印输出
        dataStreamSource.print("kafka");

//        执行
        env.execute();
    }
}
1.4自定义数据源
代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
public class SourceFromCustom {
    public static void main(String[] args) throws Exception {
//        创建执行环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
//        设置并行度
        env.setParallelism(1);
        DataStreamSource<SensorReading> dataStreamSource = env.addSource(new MySource());
        dataStreamSource.print();
//        执行
        env.execute();
}

    /**
     *
     *自定义SourceFunction
     */
    public static class MySource implements SourceFunction<SensorReading> {
//        定义一个标识位,用来控制循环
        private boolean running = true;

        @Override
        public void run(SourceContext<SensorReading> ctx) throws Exception {
//          为了模拟真实数据变化,定义一个随机数发生器
            Random random = new Random();
//          设置是个传感器的初始温度值
            HashMap<String, Double> sensorTempMap = new HashMap<String,Double>();
            for (int i = 1; i < 11; i++) {
                sensorTempMap.put("sensor_" + i, 60+random.nextGaussian()*20);
            }

            while (running){
               for (String sensorId:sensorTempMap.keySet()){
                   Double newTemp = sensorTempMap.get(sensorId)+random.nextGaussian();
                   sensorTempMap.put(sensorId, newTemp);
                   ctx.collect(new SensorReading(sensorId,System.currentTimeMillis(),newTemp));
                }
             }
            Thread.sleep(1000L);
          }
        
        @Override
        public void cancel() {
            running = false;
        }
    }
}

2、Transform转换算子

读数据

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
//        创建执行环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
//        设置并行度
        env.setParallelism(1);
//        读数据
DataStreamSource<String> dataStreamSource = env.readTextFile("E:\\IDEA2019_3Projects\\FlinkDemo\\src\\main\\resources\\sensor.txt");
2.1Map
代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
//        1、map,把String转换成长度输出
        SingleOutputStreamOperator<Integer> map = dataStreamSource.map(new MapFunction<String, Integer>() {
            @Override
            public Integer map(String value) throws Exception {
                return value.length();
            }
        });
2.2flatMap
代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
SingleOutputStreamOperator<String> flatMap = dataStreamSource.flatMap(new FlatMapFunction<String, String>() {
            @Override
            public void flatMap(String value, Collector<String> out) throws Exception {

                String[] fields = value.split(",");
                for (String field : fields){
                    out.collect(field);
                }
            }
        });
        flatMap.print("flatMap");
2.3、filter
代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
//        3、filter,筛选温度为37.1的数据
        SingleOutputStreamOperator<String> filter = dataStreamSource.filter(new FilterFunction<String>() {
            @Override
            public boolean filter(String value) throws Exception {
                return value.contains("37.1");
            }
        });
        filter.print("filter");
//       filter,筛选以senser_1开头的
value.startsWith("sensor_1");
2.4、KeyBy
代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
//        转换成SensorReading
//        SingleOutputStreamOperator<SensorReading> map = dataStreamSource.map(new MapFunction<String, SensorReading>() {
//            @Override
//            public SensorReading map(String value) throws Exception {
//                String[] fields = value.split(",");
//                return new SensorReading(fields[0],new Long(fields[1]),new Double(fields[2]));
//            }
//        });

//        Lambda表达式格式
        SingleOutputStreamOperator<SensorReading> dataStream = dataStreamSource.map(line -> {
            String[] fields = line.split(",");
            return new SensorReading(fields[0], new Long(fields[1]), new Double(fields[2]));
        });

//        分组
        KeyedStream<SensorReading, Tuple> keyedStream = dataStream.keyBy("id");
//        滚动聚合
        SingleOutputStreamOperator<SensorReading> temperature = keyedStream.maxBy("temperature");
        temperature.print("temp");
2.4、Reduce
代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
//        分组
        KeyedStream<SensorReading, Tuple> keyedStream = dataStream.keyBy("id");
//        Reduce聚合,取最大的温度值以及当前最新的时间戳
        SingleOutputStreamOperator<SensorReading> reduce = keyedStream.reduce(new ReduceFunction<SensorReading>() {
            @Override
            public SensorReading reduce(SensorReading value1, SensorReading value2) throws Exception {
                return new SensorReading(value1.getId(), value2.getTimestamp(), Math.max(value1.getTemperature(), value2.getTemperature()));
            }
        });
reduce.print("reducce");
2.5、分流split、select
代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
//        读数据
        DataStreamSource<String> dataStreamSource = env.readTextFile("E:\\IDEA2019_3Projects\\FlinkDemo\\src\\main\\resources\\sensor.txt");
//       转换为POJO类
        SingleOutputStreamOperator<SensorReading> map = dataStreamSource.map(new MapFunction<String, SensorReading>() {
            @Override
            public SensorReading map(String value) throws Exception {
                String[] fields = value.split(",");
                return new SensorReading(fields[0], new Long(fields[1]), new Double(fields[2]));
            }
        });

//      分流操作,按照30度为临界值分成两个流
        SplitStream<SensorReading> multiplieStream = map.split(new OutputSelector<SensorReading>() {
            @Override
            public Iterable<String> select(SensorReading value) {
                return (value.getTemperature() > 30) ? Collections.singletonList("high") : Collections.singletonList("low");
            }
        });
        DataStream<SensorReading> high = multiplieStream.select("high");
        DataStream<SensorReading> low = multiplieStream.select("low");
        DataStream<SensorReading> all = multiplieStream.select("high", "low");

        high.print("high");
        low.print("low");
        all.print("all");

//        执行
        env.execute();
    }
2.6、合并流

Connect CoMap

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
        /**
         *    合流操作,将高温流转换成二元组类型,与低温流连接合并之后,输出状态信息,高温报警低温正常
         *
         */

        SingleOutputStreamOperator<Tuple2<String, Double>> highStream = high.map(new MapFunction<SensorReading, Tuple2<String, Double>>() {
            @Override
            public Tuple2<String, Double> map(SensorReading value) throws Exception {
                return new Tuple2<>(value.getId(),value.getTemperature());
            }
        });

        ConnectedStreams<Tuple2<String, Double>, SensorReading> connectedStreams = highStream.connect(low);
        SingleOutputStreamOperator<Object> result = connectedStreams.map(new CoMapFunction<Tuple2<String, Double>, SensorReading, Object>() {
            @Override
            public Object map1(Tuple2<String, Double> value) throws Exception {
                return new Tuple3<>(value.f0, value.f1, "高温报警");
            }

            @Override
            public Object map2(SensorReading value) throws Exception {
                return new Tuple2<>(value.getId(), "正常");
            }
        });

        result.print();

Union

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
DataStream<SensorReading> unionStream = highTempStream.union(lowTempStream);

Connect 与 Union区别

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
1.Union之前两个流的类型必须是一样,Connect可以不一样,在之后的coMap中再去调整成为一样的。
2. Connect只能操作两个流,Union 可以操作多个

2.7、自定义UDF函数

自定义函数并可以传参

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
//        创建执行环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
//        设置并行度
        env.setParallelism(1);

//        读数据
        DataStreamSource<String> dataStreamSource = env.readTextFile("E:\\IDEA2019_3Projects\\FlinkDemo\\src\\main\\resources\\sensor.txt");
//        自定义keyFilter函数
SingleOutputStreamOperator<String> sensorFilter = dataStreamSource.filter(new keyFilter("sensor_1"));
sensorFilter.print("result");

//        实现keyFilter函数并传参
    public static class keyFilter implements FilterFunction<String>{
        private String key;
        public keyFilter(String key) {
            this.key = key;
        }

        @Override
        public boolean filter(String value) throws Exception {
            return value.contains(this.key);
        }
    }

3、Sink

3.1Sink到Kafka
代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
public class SinkToKafka {
    public static <IN> void main(String[] args) {
//        环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);
//       读取文件
        DataStreamSource<String> inputStream = env.readTextFile("E:\\IDEA2019_3Projects\\FlinkDemo\\src\\main\\resources\\sensor.txt");
//       转换成SensorReading类型
        SingleOutputStreamOperator<String> datastream = inputStream.map(line -> {
            String[] fields = line.split(",");
            return new SensorReading(fields[0], new Long(fields[1]), new Double(fields[1])).toString();
        });
        DataStreamSink<String> test = datastream.addSink(new FlinkKafkaProducer011<String>("master:9092", "test", new SimpleStringSchema()));
    }
}
3.2Sink到MySql
代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
public class SinkToMysql {
    public static void main(String[] args) throws Exception {
//        环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);

//       读取文件
        DataStreamSource<String> inputStream = env.readTextFile("E:\\IDEA2019_3Projects\\FlinkDemo\\src\\main\\resources\\sensor.txt");
//       转换成SensorReading类型
        DataStream<SensorReading> dataStream = inputStream.map(line -> {
            String[] fields = line.split(",");
            return new SensorReading(fields[0], new Long(fields[1]), new Double(fields[1]));
        });

        dataStream.addSink(new MyJDBCSink());
        env.execute();
    }
    
    public static class MyJDBCSink extends RichSinkFunction<SensorReading> {
        Connection conn = null;
        PreparedStatement insertStmt = null;
        PreparedStatement updateStmt = null;

        @Override
        public void open(Configuration parameters) throws Exception {
//            连接
            conn=DriverManager.getConnection("jdbc:mysql://master:3306/student","root", "root");

//            创建预编译器,有占位符,可传入参数
            insertStmt=conn.prepareStatement("INSERT INTO sensor (id, dept) VALUES(?, ?)");
            updateStmt = conn.prepareStatement("UPDATE sensor SET dept = ? WHERE id = ?");

        }
        @Override
        public void invoke(SensorReading value, Context context) throws Exception {
//            直接执行更新语句,如果没有执行成功则执行插入操作
            updateStmt.setDouble(1, value.getTemperature());
            updateStmt.setString(2, value.getId());
            updateStmt.execute();
            if( updateStmt.getUpdateCount() == 0 ){
                insertStmt.setString(1, value.getId());
                insertStmt.setDouble(2, value.getTemperature());
                insertStmt.execute();
            }
        }
        @Override
        public void close() throws Exception {
            insertStmt.close();
            updateStmt.close();
            conn.close();
        }

    }
}

4、Window

4.1Window概述
4.1.1概述
代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
streaming流式计算是一种被设计用于处理无限数据集的数据处理引擎,而无限数据集是指一种不断增长的本质上无限的数据集,而 window是一种切割无限数据为有限块进行处理的手段。 
Window 是无限数据流处理的核心,Window 将一个无限的 stream 拆分成有限大小的”buckets”桶,我们可以在这些桶上做计算操作。
4.1.2Window类型
代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
Window 可以分成两类:
➢ CountWindow:按照指定的数据条数生成一个 Window,与时间无关。
➢ TimeWindow:按照时间生成 Window。

对于 TimeWindow,可以根据窗口实现原理的不同分成三类:滚动窗口(Tumbling Window)、滑动窗口(Sliding Window)和会话窗口(Session Window)。

1、滚动窗口(Tumbling Windows 将数据依据固定的窗口长度对数据进行切片。 ==特点:==时间对齐,窗口长度固定,没有重叠。 滚动窗口分配器将每个元素分配到一个指定窗口大小的窗口中,滚动窗口有一个固定的大小,并且不会出现重叠。 ==适用场景:==适合做 BI 统计等(做每个时间段的聚合计算)。 **2、滑动窗口(Sliding Windows) ** 滑动窗口是固定窗口的更广义的一种形式,滑动窗口由固定的窗口长度和滑动 间隔组成。 ==特点:==时间对齐,窗口长度固定,可以有重叠。 滑动窗口分配器将元素分配到固定长度的窗口中,与滚动窗口类似,窗口的大小由窗口大小参数来配置,另一个窗口滑动参数控制滑动窗口开始的频率。因此,滑动窗口如果滑动参数小于窗口大小的话,窗口是可以重叠的,在这种情况下元素会被分配到多个窗口中。 例如,你有 10 分钟的窗口和 5 分钟的滑动,那么每个窗口中 5 分钟的窗口里包 含着上个 10 分钟产生的数据 ==适用场景:==对最近一个时间段内的统计(求某接口最近 5min 的失败率来决定是否要报警)。 3. 会话窗口(Session Windows) 由一系列事件组合一个指定时间长度的 timeout 间隙组成,类似于 web 应用的session,也就是一段时间没有接收到新数据就会生成新的窗口。 ==特点:==时间无对齐。 session 窗口分配器通过 session 活动来对元素进行分组,session 窗口跟滚动窗口和滑动窗口相比,不会有重叠和固定的开始时间和结束时间的情况,相反,当它在一个固定的时间周期内不再收到元素,即非活动间隔产生,那个这个窗口就会关闭。一个 session 窗口通过一个 session 间隔来配置,这个 session 间隔定义了非活跃周期的长度,当这个非活跃周期产生,那么当前的 session 将关闭并且后续的元素将被分配到新的 session 窗口中去。

4.2API
本文参与 腾讯云自媒体同步曝光计划,分享自作者个人站点/博客。
原始发表:2021/04/06 ,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 作者个人站点/博客 前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
暂无评论
推荐阅读
编辑精选文章
换一批
聊聊Flink框架中的状态管理机制
在目前所有流式计算的场景中,将数据流的状态分为有状态和无状态两种类型。无状态指的就是无状态的计算观察每个独立的事件,并且只根据最后一个事件输出结果。举个栗子:一个流处理程序,从传感器接收温度数据然后在温度为90摄氏度发出报警信息。有状态的计算则会根据多个事件输出结果。举个栗子:计算过去一小时的平均温度,就是有状态的计算、若在一分钟内收到两个相差 20 度以上的温度读数,则发出警告等等。
百思不得小赵
2022/12/01
5590
聊聊Flink框架中的状态管理机制
Flink流处理API大合集:掌握所有flink流处理技术,看这一篇就够了
在之前的文章中有提到过,一个flink应用程序开发的步骤大致为五个步骤:构建执行环境、获取数据源、操作数据源、输出到外部系统、触发程序执行。由这五个模块组成了一个flink任务,接下来围绕着每个模块对应的API进行梳理。 以下所有的代码案例都已收录在本人的Gitee仓库,有需要的同学点击链接直接获取: Gitee地址:https://gitee.com/xiaoZcode/flink_test
百思不得小赵
2022/12/01
8230
Flink流处理API大合集:掌握所有flink流处理技术,看这一篇就够了
关于Flink框架窗口(window)函数最全解析
在真实的场景中数据流往往都是没有界限的,无休止的,就像是一个通道中水流持续不断地通过管道流向别处,这样显然是无法进行处理、计算的,那如何可以将没有界限的数据进行处理呢?我们可以将这些无界限的数据流进行切割、拆分,将其得到一个有界限的数据集合然后进行处理、计算就方便多了。
百思不得小赵
2022/12/01
1.5K0
关于Flink框架窗口(window)函数最全解析
结合案例总结Flink框架中的最底层API(ProcessFunction)用法
在之前总结的文章中有提到过,Flink框架提供了三层API完成流处理任务。至此已经学习了DataStream API ,ProcessFunction API 是Flink中最底层的API,可以访问时间戳、watermark 以及注册定时事件。还可以输出特定的一些事件。、
百思不得小赵
2022/12/01
4880
结合案例总结Flink框架中的最底层API(ProcessFunction)用法
2024年最新Flink教程,从基础到就业,大家一起学习--Flink运行架构底层源码详解+实战
1、客户端:提交的任务的节点,提交任务的地方,JobManager:管理者,TaskManager:实际工作者
小白的大数据之旅
2024/11/20
2020
2024年最新Flink教程,从基础到就业,大家一起学习--Flink运行架构底层源码详解+实战
2021年最新最全Flink系列教程_Flink快速入门(概述,安装部署)(一)(建议收藏!!)
下面为大家带来阿里巴巴极度热推的Flink,实时数仓是未来的方向,学好Flink,月薪过万不是梦!!
Maynor
2021/06/24
2.7K0
Flink框架中的时间语义和Watermark(数据标记)
“时间”在我们日常的开发学习过程中是特别常见的一个名词,例如:Java中的日期处理类、获取系统的当前时间、毫秒级的时间戳等等。接下来让我们来看看在Flink框架中,对时间不同的概念。Flink框架中有三个时间的语义:事件时间(Event Time )、摄入时间(Ingestion Time)、系统处理时间(Processing Time)。
百思不得小赵
2022/12/01
8480
Flink框架中的时间语义和Watermark(数据标记)
2024年最新Flink教程,从基础到就业,大家一起学习--Flink DataStream API-第一篇+源码讲解
DataStream API是Flink的核心层API。一个Flink程序,其实就是对DataStream的各种转换。具体来说,代码基本上都由以下几部分构成:
小白的大数据之旅
2024/11/20
1300
2024年最新Flink教程,从基础到就业,大家一起学习--Flink DataStream API-第一篇+源码讲解
Flink优化器与源码解析系列--算子Chain策略优化
Flink 任务是一个DAG图,由多个节点(Operator)组成,部分上下游的节点在运行时可以合成为一个节点,称为算子链Chain。Chain后的节点,总CPU为所有节点CPU的最大值,总内存为所有节点内存的总和。多节点合成一个节点可以有效的减少网络传输,降低成本。但如一个任务DAG过大,需根据实时情况对算子链Chain进行拆解操作。接下来对算子链三种策略进行说明、策略对应的使用方法、哪些算子可进行操作和在何处应用并举例讲解。
用户7600169
2022/04/25
1.5K0
Flink1.13架构全集| 一文带你由浅入深精通Flink方方面面
导读:Flink是由德国几所大学发起的的学术项目,后来不断发展壮大,并于2014年末成为Apache顶级项目。Flink如何在流处理中多得王者地位?带着问题在文章寻找答案吧。
857技术社区
2022/12/18
2.2K0
Flink1.13架构全集| 一文带你由浅入深精通Flink方方面面
2021年最新最全Flink系列教程_Flink原理初探和流批一体API(二)
oolConfig config = new FlinkJedisPoolConfig.Builder() .setHost(“node1”).build(); result.addSink(new RedisSink>(config, new RedisMapperEx())); env.execute(); // * 最后将结果保存到Redis 实现 FlinkJedisPoolConfig // * 注意:存储到Redis的数据结构:使用hash也就是map // * key value // * WordCount (单词,数量)
Maynor
2021/12/07
5180
2021年最新最全Flink系列教程_Flink原理初探和流批一体API(二)
2021年最新最全Flink系列教程__Flink高级API(三)
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-znYxlAeB-1624261970363)(assets/image-20210507151242102.png)]
Maynor
2021/12/07
5280
2021年最新最全Flink系列教程__Flink高级API(三)
Flink——运行在数据流上的有状态计算框架和处理引擎
Apache Flink® - Stateful Computations over Data Streams
时间静止不是简史
2020/07/27
1.1K0
Flink——运行在数据流上的有状态计算框架和处理引擎
全网最详细4W字Flink全面解析与实践(上)
在大数据技术栈的探索中,我们曾讨论了离线计算的Spark,而当谈到实时计算,就不得不提Flink。本文将集中讨论Flink,旨在详尽展示其核心概念,从而助力你在大数据旅程中向前迈进。
BookSea
2023/10/28
1.2K2
全网最详细4W字Flink全面解析与实践(上)
一网打尽Flink中的时间、窗口和流Join
首先,我们会学习如何定义时间属性,时间戳和水位线。然后我们将会学习底层操作process function,它可以让我们访问时间戳和水位线,以及注册定时器事件。接下来,我们将会使用Flink的window API,它提供了通常使用的各种窗口类型的内置实现。我们将会学到如何进行用户自定义窗口操作符,以及窗口的核心功能:assigners(分配器)、triggers(触发器)和evictors(清理器)。最后,我们将讨论如何基于时间来做流的联结查询,以及处理迟到事件的策略。
王知无-import_bigdata
2021/09/22
1.9K0
全网最详细4W字Flink全面解析与实践(下)
Flink是一个有状态的流式计算引擎,所以会将中间计算结果(状态)进行保存,默认保存到TaskManager的堆内存中。
BookSea
2023/11/02
1K0
全网最详细4W字Flink全面解析与实践(下)
Flink的DataSource三部曲之三:自定义
本文是《Flink的DataSource三部曲》的终篇,前面都是在学习Flink已有的数据源功能,但如果这些不能满足需要,就要自定义数据源(例如从数据库获取数据),也就是今天实战的内容,如下图红框所示:
程序员欣宸
2020/05/26
6880
2021年最新最全Flink系列教程__Flink高级API(四)
并添加Watermark来解决一定程度上的数据延迟和数据乱序(最多延时 3 秒)问题。
Maynor
2021/12/07
3410
2021年最新最全Flink系列教程__Flink高级API(四)
Flink-看完就会flink基础API
最简单的方式,就是直接调用 getExecutionEnvironment 方法。它会根据当前运行的上下文直接得到正确的结果:如果程序是独立运行的,就返回一个本地执行环境;如果是创建了 jar包,然后从命令行调用它并提交到集群执行,那么就返回集群的执行环境。也就是说,这个方法会根据当前运行的方式,自行决定该返回什么样的运行环境。
ha_lydms
2023/08/10
5890
Flink-看完就会flink基础API
Flink处理函数实战之一:ProcessFunction类
如下图,在常规的业务开发中,SQL、Table API、DataStream API比较常用,处于Low-level的Porcession相对用得较少,从本章开始,我们一起通过实战来熟悉处理函数(Process Function),看看这一系列的低级算子可以带给我们哪些能力?
程序员欣宸
2020/05/26
1.1K0
推荐阅读
相关推荐
聊聊Flink框架中的状态管理机制
更多 >
领券
💥开发者 MCP广场重磅上线!
精选全网热门MCP server,让你的AI更好用 🚀
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档
本文部分代码块支持一键运行,欢迎体验
本文部分代码块支持一键运行,欢迎体验