我遵循(ZIP compressed input for Apache Flink)并编写了下面的代码片段,用一个简单的TextInputFormat在dir中处理.gz日志文件。它在我的本地测试目录上工作,扫描并自动打开.gz文件内容。但是,当我使用s3桶源运行它时,它不会处理.gz压缩文件。不过,这个Flink作业仍然会在.log桶上打开s3文件。似乎它只是不解压缩.gz文件。如何在s3文件系统上解决这个问题?
public static void main(String[] args) throws Exception {
final ParameterTool params = ParameterTool.fromArgs(args);
final String sourceLogDirPath = params.get("source_log_dir_path", "s3://my-test-bucket-logs/"); // "/Users/my.user/logtest/logs"
final Long checkpointInterval = Long.parseLong(params.get("checkpoint_interval", "60000"));
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
env.enableCheckpointing(checkpointInterval, CheckpointingMode.EXACTLY_ONCE);
env.getCheckpointConfig().enableExternalizedCheckpoints(
CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
env.getConfig().setGlobalJobParameters(params);
TextInputFormat textInputFormat = new TextInputFormat(new Path(sourceLogDirPath));
textInputFormat.setNestedFileEnumeration(true);
DataStream<String> stream = env.readFile(
textInputFormat, sourceLogDirPath,
FileProcessingMode.PROCESS_CONTINUOUSLY, 100);
stream.print();
env.execute();
}这是我的类路径jar flink库:
/opt/flink/lib/flink-csv-1.13.2.jar:/opt/flink/lib/flink-json-1.13.2.jar:/opt/flink/lib/flink-shaded-zookeeper-3.4.14.jar:/opt/flink/lib/flink-table-blink_2.12-1.13.2.jar:/opt/flink/lib/flink-table_2.12-1.13.2.jar:/opt/flink/lib/log4j-1.2-api-2.12.1.jar:/opt/flink/lib/log4j-api-2.12.1.jar:/opt/flink/lib/log4j-core-2.12.1.jar:/opt/flink/lib/log4j-slf4j-impl-2.12.1.jar:/opt/flink/lib/sentry_log4j2_deploy.jar:/opt/flink/lib/flink-dist_2.12-1.13.2.jar:::
P.S.我也试过s3a://<bucket>/,但没有运气。
发布于 2021-12-29 16:56:03
可能是,您可以将日志更改为调试模式,并观察文件拆分时是否筛选出该文件。
默认情况下,以“.”开头的文件。或者“_”将被过滤掉

https://stackoverflow.com/questions/70431993
复制相似问题