前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >Flink实时kafka数据写入OSS异常总结

Flink实时kafka数据写入OSS异常总结

原创
作者头像
平常心
修改2023-09-13 19:37:17
3.5K1
修改2023-09-13 19:37:17
举报
文章被收录于专栏:个人总结系列个人总结系列

目前想把kafka json格式的埋点数据写入OSS存储,但是参考官网文档出现很多异常内容,总结如下:

1.参考文档

flink官方文档:https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/deployment/filesystems/oss/

2.异常内容

2.1 Access key id should not be null or empty

根据官方文档,flink-conf.yaml配置oss相关的内容后,发现EnvironmentVariableCredentialsProvider读取不到对应的值内容,异常详情如下:

代码语言:javascript
复制

Caused by: org.apache.flink.fs.osshadoop.shaded.com.aliyun.oss.common.auth.InvalidCredentialsException: Access key id should not be null or empty.
	at org.apache.flink.fs.osshadoop.shaded.com.aliyun.oss.common.auth.EnvironmentVariableCredentialsProvider.getCredentials(EnvironmentVariableCredentialsProvider.java:44) ~[flink-app-jar.jar:?]
	at org.apache.flink.fs.osshadoop.shaded.com.aliyun.oss.internal.OSSOperation.createDefaultContext(OSSOperation.java:166) ~[flink-app-jar.jar:?]
	at org.apache.flink.fs.osshadoop.shaded.com.aliyun.oss.internal.OSSOperation.doOperation(OSSOperation.java:114) ~[flink-app-jar.jar:?]
	at org.apache.flink.fs.osshadoop.shaded.com.aliyun.oss.internal.OSSObjectOperation.getObjectMetadata(OSSObjectOperation.java:458) ~[flink-app-jar.jar:?]
	at org.apache.flink.fs.osshadoop.shaded.com.aliyun.oss.OSSClient.getObjectMetadata(OSSClient.java:579) ~[flink-app-jar.jar:?]
	at org.apache.flink.fs.osshadoop.shaded.com.aliyun.oss.OSSClient.getObjectMetadata(OSSClient.java:569) ~[flink-app-jar.jar:?]
	at org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.fs.aliyun.oss.AliyunOSSFileSystemStore.getObjectMetadata(AliyunOSSFileSystemStore.java:277) ~[flink-app-jar.jar:?]
	at org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.fs.aliyun.oss.AliyunOSSFileSystem.getFileStatus(AliyunOSSFileSystem.java:256) ~[flink-app-jar.jar:?]
	at org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.fs.FileSystem.exists(FileSystem.java:1734) ~[flink-app-jar.jar:?]
	at org.apache.flink.fs.osshadoop.common.HadoopFileSystem.exists(HadoopFileSystem.java:160) ~[flink-app-jar.jar:?]
	at org.apache.flink.core.fs.PluginFileSystemFactory$ClassLoaderFixingFileSystem.exists(PluginFileSystemFactory.java:148) ~[flink-app-jar.jar:?]
	at org.apache.flink.core.fs.FileSystem.initOutPathDistFS(FileSystem.java:977) ~[flink-app-jar.jar:?]
	at org.apache.flink.api.common.io.FileOutputFormat.initializeGlobal(FileOutputFormat.java:286) ~[flink-app-jar.jar:?]
	at org.apache.flink.runtime.jobgraph.InputOutputFormatVertex.initializeOnMaster(InputOutputFormatVertex.java:99) ~[flink-app-jar.jar:?]
	at org.apache.flink.runtime.executiongraph.ExecutionGraphBuilder.buildGraph(ExecutionGraphBuilder.java:221) ~[flink-app-jar.jar:?]
	at org.apache.flink.runtime.scheduler.SchedulerBase.createExecutionGraph(SchedulerBase.java:291) ~[flink-app-jar.jar:?]
	at org.apache.flink.runtime.scheduler.SchedulerBase.createAndRestoreExecutionGraph(SchedulerBase.java:256) ~[flink-app-jar.jar:?]
	at org.apache.flink.runtime.scheduler.SchedulerBase.<init>(SchedulerBase.java:238) ~[flink-app-jar.jar:?]
	at org.apache.flink.runtime.scheduler.DefaultScheduler.<init>(DefaultScheduler.java:134) ~[flink-app-jar.jar:?]
	at org.apache.flink.runtime.scheduler.DefaultSchedulerFactory.createInstance(DefaultSchedulerFactory.java:108) ~[flink-app-jar.jar:?]
	at org.apache.flink.runtime.jobmaster.JobMaster.createScheduler(JobMaster.java:323) ~[flink-app-jar.jar:?]
	at org.apache.flink.runtime.jobmaster.JobMaster.<init>(JobMaster.java:310) ~[flink-app-jar.jar:?]
	at org.apache.flink.runtime.jobmaster.factories.DefaultJobMasterServiceFactory.createJobMasterService(DefaultJobMasterServiceFactory.java:96) ~[flink-app-jar.jar:?]
	at org.apache.flink.runtime.jobmaster.factories.DefaultJobMasterServiceFactory.createJobMasterService(DefaultJobMasterServiceFactory.java:41) ~[flink-app-jar.jar:?]
	at org.apache.flink.runtime.jobmaster.JobManagerRunnerImpl.<init>(JobManagerRunnerImpl.java:141) ~[flink-app-jar.jar:?]
	at org.apache.flink.runtime.dispatcher.DefaultJobManagerRunnerFactory.createJobManagerRunner(DefaultJobManagerRunnerFactory.java:80) ~[flink-app-jar.jar:?]
	at org.apache.flink.runtime.dispatcher.Dispatcher.lambda$createJobManagerRunner$5(Dispatcher.java:450) ~[flink-app-jar.jar:?]
	at java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1604) ~[?:1.8.0_252]
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) ~[?:1.8.0_252]
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) ~[?:1.8.0_252]
	at java.lang.Thread.run(Thread.java:748) ~[?:1.8.0_252]	

查看源代码发现,EnvironmentVariableCredentialsProvider使用的是OSS_ACCESS_KEY_ID,通过System.getenv的方式读取。

更改flink-conf-yaml的授权类为SystemPropertiesCredentialsProvider

代码语言:javascript
复制
fs.oss.credentials.provider: com.aliyun.oss.common.auth.SystemPropertiesCredentialsProvider

发现还是报Access key id should not be null or empty的异常,阅读SystemPropertiesCredentialsProvider源代码发现:

通过System.getProperty的方式读取,主要是JVM的-D参数内容,而在flink-conf.yarm是通过

代码语言:javascript
复制
        //flink conf
        Configuration conf = new Configuration();
        conf.setString("","");
        final StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironment(1);
        env.getConfig().setGlobalJobParameters(conf);

类似GlobalJobParameter方式处理,对应运行任务的时候参数内容显示:

代码语言:javascript
复制
2021-06-08 22:39:58,528 INFO  org.apache.flink.configuration.GlobalConfiguration           [] - Loading configuration property: fs.oss.accessKeyId, Lxxxxxxxxxxxxxxxxxxx
2021-06-08 22:39:58,528 INFO  org.apache.flink.configuration.GlobalConfiguration           [] - Loading configuration property: fs.oss.accessKeySecret, ******
2021-06-08 22:39:58,528 INFO  org.apache.flink.configuration.GlobalConfiguration           [] - Loading configuration property: yarn.application.name, event_topic
2021-06-08 22:39:58,529 INFO  org.apache.flink.configuration.GlobalConfiguration           [] - Loading configuration property: fs.oss.endpoint, https://oss-xxxx.aliyuncs.com
2021-06-08 22:39:58,529 INFO  org.apache.flink.configuration.GlobalConfiguration           [] - Loading configuration property: fs.oss.credentials.provider, com.aliyun.oss.common.auth.EnvironmentVariableCredentialsProvider

所以尝试第一次 -yD的参数处理方式

代码语言:javascript
复制
/opt/flink-1.12.0/bin/flink run -m  yarn-cluster -ynm event_topic -p 1 -yqu nifi -yjm 1024m -ytm 1024m -yD oss.accessKeyId=Lxxxxxxxxxxxxxxxxxxx -yD oss.accessKeySecret= ****** -c com.am.oss.SdkKafkaToOss /home/ws_cdp_dev_admin/flink-app-jar.jar

结果还是生效到GlobalConfiguration,所以更改配置,通过jvm 参数的方式处理:

代码语言:javascript
复制
env.java.opts.jobmanager: -Doss.accessKeyId=Lxxxxxxxxxxxxxxxxxxx -Doss.accessKeySecret=******
env.java.opts.taskmanager: -Doss.accessKeyId=Lxxxxxxxxxxxxxxxxxxx -Doss.accessKeySecret=******

该异常问题解决,如此看来官方文档说的不是很准。

2.2 OVERWRITE的问题

代码语言:javascript
复制
streamSource.writeAsText("oss://xxxx/user_event/dt=${dt}/demo.json", FileSystem.WriteMode.NO_OVERWRITE);

这个API有两个问题,不懂动态的处理,只能在指定的地方写入对应数据,那势必造成流数据写入到该文件后文件过大的问题,另外是不支持NO_OVERWRITE。

2.3 Recoverable writers on Hadoop are only supported for HDFS异常

更改对应写入oss的逻辑代码,类似代码内容如下:

代码语言:javascript
复制
      String path = "oss://xxx/user_event/day=20210608/sdk=sa_sdk/*";
      OutputFileConfig config = OutputFileConfig
                .builder()
                .withPartPrefix("user_event")
                .withPartSuffix(".json")
                .build();

        StreamingFileSink<String> sink = StreamingFileSink.forRowFormat(
                new Path(path),
                new SimpleStringEncoder<String>("UTF-8")
        )
                .withBucketAssigner(new DateTimeBucketAssigner<>())
                .withRollingPolicy(
                        DefaultRollingPolicy.builder()
                                .withRolloverInterval(TimeUnit.SECONDS.toMillis(2))
                                .withInactivityInterval(TimeUnit.SECONDS.toMillis(1))
                                .withMaxPartSize(1024 * 1024 * 1024)
                                .build())
                .withOutputFileConfig(config)
                .build();
//或者BucketingSink的方式
        BucketingSink<String> sink = new BucketingSink<String>(path);
        sink.setBucketer(new DateTimeBucketer<>("yyyy-MM-dd_HH-mm"));
        sink.setWriter(new StringWriter<>());
        sink.setBatchSize(1024 * 1024 * 256L);
        sink.setBatchRolloverInterval(30 * 60 * 1000L);
        sink.setInactiveBucketThreshold(3 * 60 * 1000L);
        sink.setInactiveBucketCheckInterval(30 * 1000L);
        sink.setInProgressSuffix(".in-progress");
        sink.setPendingSuffix(".pending");
        
        streamSource.addSink(sink);

结果都报Recoverable writers on Hadoop are only supported for HDFS异常

代码语言:javascript
复制
2021-06-09 14:57:44,292 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph       [] - Source: Custom Source -> Filter -> Sink: Unnamed (1/1) (2939f69ee024a3dd3faed2c658165ac6) switched from RUNNING to FAILED on container_e131_1618429488036_60239_01_000002 @ ws-hdp06 (dataPort=41092).
java.lang.UnsupportedOperationException: Recoverable writers on Hadoop are only supported for HDFS
	at org.apache.flink.fs.osshadoop.common.HadoopRecoverableWriter.<init>(HadoopRecoverableWriter.java:61) ~[flink-app-jar.jar:?]
	at org.apache.flink.fs.osshadoop.common.HadoopFileSystem.createRecoverableWriter(HadoopFileSystem.java:210) ~[flink-app-jar.jar:?]
	at org.apache.flink.core.fs.PluginFileSystemFactory$ClassLoaderFixingFileSystem.createRecoverableWriter(PluginFileSystemFactory.java:134) ~[flink-app-jar.jar:?]
	at org.apache.flink.core.fs.SafetyNetWrapperFileSystem.createRecoverableWriter(SafetyNetWrapperFileSystem.java:69) ~[flink-app-jar.jar:?]
	at org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink$RowFormatBuilder.createBucketWriter(StreamingFileSink.java:260) ~[flink-app-jar.jar:?]
	at org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink$RowFormatBuilder.createBuckets(StreamingFileSink.java:270) ~[flink-app-jar.jar:?]
	at org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink.initializeState(StreamingFileSink.java:412) ~[flink-app-jar.jar:?]
	at org.apache.flink.streaming.util.functions.StreamingFunctionUtils.tryRestoreFunction(StreamingFunctionUtils.java:185) ~[flink-app-jar.jar:?]
	at org.apache.flink.streaming.util.functions.StreamingFunctionUtils.restoreFunctionState(StreamingFunctionUtils.java:167) ~[flink-app-jar.jar:?]
	at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.initializeState(AbstractUdfStreamOperator.java:96) ~[flink-app-jar.jar:?]
	at org.apache.flink.streaming.api.operators.StreamOperatorStateHandler.initializeOperatorState(StreamOperatorStateHandler.java:107) ~[flink-app-jar.jar:?]
	at org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:264) ~[flink-app-jar.jar:?]
	at org.apache.flink.streaming.runtime.tasks.OperatorChain.initializeStateAndOpenOperators(OperatorChain.java:400) ~[flink-app-jar.jar:?]
	at org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$beforeInvoke$2(StreamTask.java:507) ~[flink-app-jar.jar:?]
	at org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.runThrowing(StreamTaskActionExecutor.java:92) ~[flink-app-jar.jar:?]
	at org.apache.flink.streaming.runtime.tasks.StreamTask.beforeInvoke(StreamTask.java:501) ~[flink-app-jar.jar:?]
	at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:531) ~[flink-app-jar.jar:?]
	at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:722) ~[flink-app-jar.jar:?]
	at org.apache.flink.runtime.taskmanager.Task.run(Task.java:547) ~[flink-app-jar.jar:?]
	at java.lang.Thread.run(Thread.java:748) ~[?:1.8.0_252]

使用StreamingFileSink查看源代码发现:

代码语言:javascript
复制
this.fsWriter = FileSystem.get(basePath.toUri()).createRecoverableWriter();

而使用的oss协议方式,不是能够Recoverable,进行回滚处理的。所以只能通过自定义sink的方式处理,只能说有时候官网的文档也会诱导人,或者功能使用的时候还是欠佳。

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 1.参考文档
  • 2.异常内容
    • 2.1 Access key id should not be null or empty
      • 2.2 OVERWRITE的问题
        • 2.3 Recoverable writers on Hadoop are only supported for HDFS异常
        相关产品与服务
        大数据
        全栈大数据产品,面向海量数据场景,帮助您 “智理无数,心中有数”!
        领券
        问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档