我有一个在AWS EMR中运行的高度并行(400)的Flink应用程序。它使用BucketingSink源码Kafka并汇入S3 (使用RocksDb后端设置检查点)。Flink作业是一个持续运行的流媒体应用程序。在任何给定的时间,所有工作进程加在一起都有可能生成/写入400个文件(由于400个并行度)。(S3AFileSystem.java:662)
at org.apache.flink.streami
我们希望在读取来自kafka的消息时实现并行性。因此,我们希望在flinkkafkaconsumer中指定分区编号。它将读取kafka中所有分区的消息,而不是特定的分区号。kafkaConsumer = new FlinkKafkaConsumer<String>("EventLog", new SimpleStringSchema(), properties); 请建议任何更好的选项来获得并行性