前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >Flink-1.10中的StreamingFileSink相关特性

Flink-1.10中的StreamingFileSink相关特性

作者头像
王知无-import_bigdata
发布2020-06-04 14:35:06
1.5K0
发布2020-06-04 14:35:06
举报

Flink流式计算的核心概念,就是将数据从Source输入流一个个传递给Operator进行链式处理,最后交给Sink输出流的过程。本篇文章主要讲解Sink端比较强大一个功能类StreamingFileSink,我们基于最新的Flink1.10.0版本进行讲解,之前版本可能使用BucketingSink,但是BucketingSink从Flink 1.9开始已经被废弃,并会在后续的版本中删除,这里只讲解StreamingFileSink相关特性。

1. 写出文件的状态

看这个图片应该能明白,文件会分在不同的桶中,bucket中存在不同状态的文件:

  1. In-progress :当前文件正在写入中
  2. Pending :当处于 In-progress 状态的文件关闭(closed)了,就变为 Pending 状态
  3. Finished :在成功的 Checkpoint 后,Pending 状态将变为 Finished 状态

2. 简单的字符串写出示例

代码语言:javascript
复制
DataStreamSource<String> lines = FlinkUtil.createSocketStream("localhost", 8888);

        StreamExecutionEnvironment env = FlinkUtil.getEnv();
        // 设置checkpoint
        env.enableCheckpointing(TimeUnit.SECONDS.toMillis(10));

        OutputFileConfig config = OutputFileConfig
                .builder()
                .withPartPrefix("prefix")
                .withPartSuffix(".txt")
                .build();


        final StreamingFileSink<String> sink = StreamingFileSink
                .forRowFormat(new Path(outputPath), new SimpleStringEncoder<String>("UTF-8"))
                /**
                 * 设置桶分配政策
                 * DateTimeBucketAssigner--默认的桶分配政策,默认基于时间的分配器,每小时产生一个桶,格式如下yyyy-MM-dd--HH
                 * BasePathBucketAssigner :将所有部分文件(part file)存储在基本路径中的分配器(单个全局桶)
                 */
                .withBucketAssigner(new DateTimeBucketAssigner<>())
                /**
                 * 有三种滚动政策
                 *  CheckpointRollingPolicy
                 *  DefaultRollingPolicy
                 *  OnCheckpointRollingPolicy
                 */
                .withRollingPolicy(
                        /**
                         * 滚动策略决定了写出文件的状态变化过程
                         * 1. In-progress :当前文件正在写入中
                         * 2. Pending :当处于 In-progress 状态的文件关闭(closed)了,就变为 Pending 状态
                         * 3. Finished :在成功的 Checkpoint 后,Pending 状态将变为 Finished 状态
                         *
                         * 观察到的现象
                         * 1.会根据本地时间和时区,先创建桶目录
                         * 2.文件名称规则:part-<subtaskIndex>-<partFileIndex>
                         * 3.在macos中默认不显示隐藏文件,需要显示隐藏文件才能看到处于In-progress和Pending状态的文件,因为文件是按照.开头命名的
                         *
                         */
                        DefaultRollingPolicy.builder()
                                .withRolloverInterval(TimeUnit.SECONDS.toMillis(2)) //设置滚动间隔
                                .withInactivityInterval(TimeUnit.SECONDS.toMillis(1)) //设置不活动时间间隔
                                .withMaxPartSize(1024 * 1024 * 1024) // 最大零件尺寸
                                .build())
                .withOutputFileConfig(config)
                .build();

        lines.addSink(sink).setParallelism(1);

3. 写出文件的滚动策略

数据写入文件时,查看源码可以知道 滚动策略是这么判断的: 没有处于inProgressPart状态的文件 或者 DefaultRollingPolicy.shouldRollOnEvent成立,即打开的文件大小超过了滚动器中设置的大小 滚动文件时,首先关闭当前处于progress的part文件,然后创建一个新的 assembleNewPartPath,并且partCounter++(计数器)

StreamingFileSink继承自RichSinkFunction,显然之后执行一次, 该方法中注册了一个定时器,定时器的执行时间为currentProcessingTime + bucketCheckInterval 其中bucketCheckInterval为调用StreamingFileSink.forRowFormat()时,默认创建的,其默认值为60000,也就是一分钟

onProcessingTime方法继承自ProcessingTimeCallback,此方法使用调度触发器的时间戳调用。 该方法中设定了60秒的定时器,定时每60秒执行一次该方法 该方法中会调用buckets.onProcessingTime(currentTime) 里面判断是否需要关闭part文件,注意是关闭而不是滚动 判断条件为:part文件不为空 并且 DefaultRollingPolicy.shouldRollOnProcessingTime条件成立。 即part文件存在,并且 (当前时间-part的创建时间 >= 滚动时间 或者 当前时间-part的最后修改时间 >= 不活跃时间)

snapshotState和initializeState方法继承自CheckpointedFunction,用来构建快照或者恢复历史状态。 其中snapshotState方法会调用buckets.snapshotState()方法,对桶的状态进行快照处理。 将所有处理活跃状态的桶全部进行快照处理,做快照时会检查是否需要滚动,滚动条件为: part文件不为空 并且 DefaultRollingPolicy.shouldRollOnCheckpoint成立,即文件大小超过设定。 满足该条件时,就会关闭partFile

notifyCheckpointComplete方法继承自CheckpointListener,用来通知检查点完成 该方法中会调用onSuccessfulCompletionOfCheckpoint方法 会将已经关闭的(其实是处于Pending状态的文件)part文件重命名。

本文参与 腾讯云自媒体分享计划,分享自微信公众号。
原始发表:2020-06-04,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 大数据技术与架构 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体分享计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 2. 简单的字符串写出示例
  • 3. 写出文件的滚动策略
相关产品与服务
流计算 Oceanus
流计算 Oceanus 是大数据产品生态体系的实时化分析利器,是基于 Apache Flink 构建的企业级实时大数据分析平台,具备一站开发、无缝连接、亚秒延时、低廉成本、安全稳定等特点。流计算 Oceanus 以实现企业数据价值最大化为目标,加速企业实时化数字化的建设进程。
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档