首页
学习
活动
专区
圈层
工具
发布
首页
学习
活动
专区
圈层
工具
MCP广场
社区首页 >问答首页 >Flink ` `textInputFormat`‘不处理aws S3’`Source`‘文件系统中的GZ压缩文件

Flink ` `textInputFormat`‘不处理aws S3’`Source`‘文件系统中的GZ压缩文件
EN

Stack Overflow用户
提问于 2021-12-21 07:38:13
回答 1查看 283关注 0票数 1

我遵循(ZIP compressed input for Apache Flink)并编写了下面的代码片段,用一个简单的TextInputFormat在dir中处理.gz日志文件。它在我的本地测试目录上工作,扫描并自动打开.gz文件内容。但是,当我使用s3桶源运行它时,它不会处理.gz压缩文件。不过,这个Flink作业仍然会在.log桶上打开s3文件。似乎它只是不解压缩.gz文件。如何在s3文件系统上解决这个问题?

代码语言:javascript
复制
public static void main(String[] args) throws Exception {

    final ParameterTool params = ParameterTool.fromArgs(args);
    final String sourceLogDirPath = params.get("source_log_dir_path", "s3://my-test-bucket-logs/"); // "/Users/my.user/logtest/logs"
    final Long checkpointInterval = Long.parseLong(params.get("checkpoint_interval", "60000"));
    
    final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

    env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
    env.enableCheckpointing(checkpointInterval, CheckpointingMode.EXACTLY_ONCE);
    env.getCheckpointConfig().enableExternalizedCheckpoints(
        CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
    env.getConfig().setGlobalJobParameters(params);

    TextInputFormat textInputFormat = new TextInputFormat(new Path(sourceLogDirPath));
    textInputFormat.setNestedFileEnumeration(true);

    DataStream<String> stream = env.readFile(
            textInputFormat, sourceLogDirPath,
            FileProcessingMode.PROCESS_CONTINUOUSLY, 100);

    stream.print();
    env.execute();
}

这是我的类路径jar flink库:

/opt/flink/lib/flink-csv-1.13.2.jar:/opt/flink/lib/flink-json-1.13.2.jar:/opt/flink/lib/flink-shaded-zookeeper-3.4.14.jar:/opt/flink/lib/flink-table-blink_2.12-1.13.2.jar:/opt/flink/lib/flink-table_2.12-1.13.2.jar:/opt/flink/lib/log4j-1.2-api-2.12.1.jar:/opt/flink/lib/log4j-api-2.12.1.jar:/opt/flink/lib/log4j-core-2.12.1.jar:/opt/flink/lib/log4j-slf4j-impl-2.12.1.jar:/opt/flink/lib/sentry_log4j2_deploy.jar:/opt/flink/lib/flink-dist_2.12-1.13.2.jar:::

P.S.我也试过s3a://<bucket>/,但没有运气。

EN

回答 1

Stack Overflow用户

回答已采纳

发布于 2021-12-29 16:56:03

可能是,您可以将日志更改为调试模式,并观察文件拆分时是否筛选出该文件。

默认情况下,以“.”开头的文件。或者“_”将被过滤掉

票数 0
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/70431993

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档