首页
学习
活动
专区
工具
TVP
发布
社区首页 >问答首页 >用SpringBatch的S3问题阅读ItemReaders资源

用SpringBatch的S3问题阅读ItemReaders资源
EN

Stack Overflow用户
提问于 2017-07-27 20:14:45
回答 1查看 2.1K关注 0票数 1

我有一个Spring作业,它从一个S3桶中读取一堆文件,对它们进行处理,然后将它们发送到数据库,在多线程配置中这样做。application.properties文件包含以下内容:

代码语言:javascript
复制
cloud.aws.credentials.accessKey=accessKey 
cloud.aws.credentials.secretKey=secret
cloud.aws.region.static=us-east-1
cloud.aws.credentials.instanceProfile=true 
cloud.aws.stack.auto=false

我的ItemReader:

代码语言:javascript
复制
@Bean
ItemReader<DataRecord> itemReader() {
    FlatFileItemReader<DataRecord> flatFileItemReader = new FlatFileItemReader<>();
    flatFileItemReader.setLinesToSkip(0);
    flatFileItemReader.setLineMapper(new DataRecord.DataRecordLineMapper());
    flatFileItemReader.setSaveState(false);

    MultiResourceItemReader<DataRecord> multiResourceItemReader = new MultiResourceItemReader<>();
    multiResourceItemReader.setDelegate(flatFileItemReader);
    multiResourceItemReader.setResources(loadS3Resources(null, null));
    multiResourceItemReader.setSaveState(false);

    SynchronizedItemStreamReader<DataRecord> itemStreamReader = new SynchronizedItemStreamReader<>();
    itemStreamReader.setDelegate(multiResourceItemReader);
    return itemStreamReader;
}

我的TaskExecutor:

代码语言:javascript
复制
@Bean
TaskExecutor taskExecutor() {
    ThreadPoolTaskExecutor threadPoolTaskExecutor = new ThreadPoolTaskExecutor();
    threadPoolTaskExecutor.setCorePoolSize(Runtime.getRuntime().availableProcessors());
    return threadPoolTaskExecutor;
}

作业只包含一个步骤,其中它从文件中读取、处理它们,然后写到DB。在此配置下,将加载资源,开始作业,并对第一个资源的~240 k第一行进行处理(有7条参考资料,每一行有120万行)。然后我得到以下例外:

代码语言:javascript
复制
org.springframework.batch.item.file.NonTransientFlatFileException: Unable to read from resource: [Amazon s3 resource [bucket='my-bucket' and object='output/part-r-00000']]
at org.springframework.batch.item.file.FlatFileItemReader.readLine(FlatFileItemReader.java:220) ~[spring-batch-infrastructure-3.0.7.RELEASE.jar!/:3.0.7.RELEASE]
at org.springframework.batch.item.file.FlatFileItemReader.doRead(FlatFileItemReader.java:173) ~[spring-batch-infrastructure-3.0.7.RELEASE.jar!/:3.0.7.RELEASE]
at org.springframework.batch.item.support.AbstractItemCountingItemStreamItemReader.read(AbstractItemCountingItemStreamItemReader.java:88) ~[spring-batch-infrastructure-3.0.7.RELEASE.jar!/:3.0.7.RELEASE]
at org.springframework.batch.item.file.MultiResourceItemReader.readFromDelegate(MultiResourceItemReader.java:140) ~[spring-batch-infrastructure-3.0.7.RELEASE.jar!/:3.0.7.RELEASE]
at org.springframework.batch.item.file.MultiResourceItemReader.readNextItem(MultiResourceItemReader.java:119) ~[spring-batch-infrastructure-3.0.7.RELEASE.jar!/:3.0.7.RELEASE]
at org.springframework.batch.item.file.MultiResourceItemReader.read(MultiResourceItemReader.java:108) ~[spring-batch-infrastructure-3.0.7.RELEASE.jar!/:3.0.7.RELEASE]
at org.springframework.batch.item.support.SynchronizedItemStreamReader.read(SynchronizedItemStreamReader.java:55) ~[spring-batch-infrastructure-3.0.7.RELEASE.jar!/:3.0.7.RELEASE]
at org.springframework.batch.core.step.item.SimpleChunkProvider.doRead(SimpleChunkProvider.java:91) ~[spring-batch-core-3.0.7.RELEASE.jar!/:3.0.7.RELEASE]
at org.springframework.batch.core.step.item.SimpleChunkProvider.read(SimpleChunkProvider.java:157) ~[spring-batch-core-3.0.7.RELEASE.jar!/:3.0.7.RELEASE]
at org.springframework.batch.core.step.item.SimpleChunkProvider$1.doInIteration(SimpleChunkProvider.java:116) ~[spring-batch-core-3.0.7.RELEASE.jar!/:3.0.7.RELEASE]
at org.springframework.batch.repeat.support.RepeatTemplate.getNextResult(RepeatTemplate.java:374) ~[spring-batch-infrastructure-3.0.7.RELEASE.jar!/:3.0.7.RELEASE]
at org.springframework.batch.repeat.support.RepeatTemplate.executeInternal(RepeatTemplate.java:215) ~[spring-batch-infrastructure-3.0.7.RELEASE.jar!/:3.0.7.RELEASE]
at org.springframework.batch.repeat.support.RepeatTemplate.iterate(RepeatTemplate.java:144) ~[spring-batch-infrastructure-3.0.7.RELEASE.jar!/:3.0.7.RELEASE]
at org.springframework.batch.core.step.item.SimpleChunkProvider.provide(SimpleChunkProvider.java:110) ~[spring-batch-core-3.0.7.RELEASE.jar!/:3.0.7.RELEASE]
at org.springframework.batch.core.step.item.ChunkOrientedTasklet.execute(ChunkOrientedTasklet.java:69) ~[spring-batch-core-3.0.7.RELEASE.jar!/:3.0.7.RELEASE]
at org.springframework.batch.core.step.tasklet.TaskletStep$ChunkTransactionCallback.doInTransaction(TaskletStep.java:406) ~[spring-batch-core-3.0.7.RELEASE.jar!/:3.0.7.RELEASE]
at org.springframework.batch.core.step.tasklet.TaskletStep$ChunkTransactionCallback.doInTransaction(TaskletStep.java:330) ~[spring-batch-core-3.0.7.RELEASE.jar!/:3.0.7.RELEASE]
at org.springframework.transaction.support.TransactionTemplate.execute(TransactionTemplate.java:133) ~[spring-tx-4.3.9.RELEASE.jar!/:4.3.9.RELEASE]
at org.springframework.batch.core.step.tasklet.TaskletStep$2.doInChunkContext(TaskletStep.java:271) ~[spring-batch-core-3.0.7.RELEASE.jar!/:3.0.7.RELEASE]
at org.springframework.batch.core.scope.context.StepContextRepeatCallback.doInIteration(StepContextRepeatCallback.java:81) ~[spring-batch-core-3.0.7.RELEASE.jar!/:3.0.7.RELEASE]
at org.springframework.batch.repeat.support.TaskExecutorRepeatTemplate$ExecutingRunnable.run(TaskExecutorRepeatTemplate.java:262) ~[spring-batch-infrastructure-3.0.7.RELEASE.jar!/:3.0.7.RELEASE]
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) ~[na:1.8.0_65]
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) ~[na:1.8.0_65]
at java.lang.Thread.run(Thread.java:745) ~[na:1.8.0_65]
Caused by: javax.net.ssl.SSLException: SSL peer shut down incorrectly
at sun.security.ssl.InputRecord.readV3Record(InputRecord.java:596) ~[na:1.8.0_65]
at sun.security.ssl.InputRecord.read(InputRecord.java:532) ~[na:1.8.0_65]
at sun.security.ssl.SSLSocketImpl.readRecord(SSLSocketImpl.java:973) ~[na:1.8.0_65]
at sun.security.ssl.SSLSocketImpl.readDataRecord(SSLSocketImpl.java:930) ~[na:1.8.0_65]
at sun.security.ssl.AppInputStream.read(AppInputStream.java:105) ~[na:1.8.0_65]
at org.apache.http.impl.io.SessionInputBufferImpl.streamRead(SessionInputBufferImpl.java:137) ~[httpcore-4.4.6.jar!/:4.4.6]
at org.apache.http.impl.io.SessionInputBufferImpl.read(SessionInputBufferImpl.java:198) ~[httpcore-4.4.6.jar!/:4.4.6]
at org.apache.http.impl.io.ContentLengthInputStream.read(ContentLengthInputStream.java:176) ~[httpcore-4.4.6.jar!/:4.4.6]
at org.apache.http.conn.EofSensorInputStream.read(EofSensorInputStream.java:135) ~[httpclient-4.5.3.jar!/:4.5.3]
at com.amazonaws.internal.SdkFilterInputStream.read(SdkFilterInputStream.java:72) ~[aws-java-sdk-core-1.11.125.jar!/:na]
at com.amazonaws.event.ProgressInputStream.read(ProgressInputStream.java:180) ~[aws-java-sdk-core-1.11.125.jar!/:na]
at com.amazonaws.internal.SdkFilterInputStream.read(SdkFilterInputStream.java:72) ~[aws-java-sdk-core-1.11.125.jar!/:na]
at com.amazonaws.services.s3.internal.S3AbortableInputStream.read(S3AbortableInputStream.java:117) ~[aws-java-sdk-s3-1.11.125.jar!/:na]
at com.amazonaws.internal.SdkFilterInputStream.read(SdkFilterInputStream.java:72) ~[aws-java-sdk-core-1.11.125.jar!/:na]
at com.amazonaws.internal.SdkFilterInputStream.read(SdkFilterInputStream.java:72) ~[aws-java-sdk-core-1.11.125.jar!/:na]
at com.amazonaws.internal.SdkFilterInputStream.read(SdkFilterInputStream.java:72) ~[aws-java-sdk-core-1.11.125.jar!/:na]
at com.amazonaws.event.ProgressInputStream.read(ProgressInputStream.java:180) ~[aws-java-sdk-core-1.11.125.jar!/:na]
at com.amazonaws.internal.SdkFilterInputStream.read(SdkFilterInputStream.java:72) ~[aws-java-sdk-core-1.11.125.jar!/:na]
at com.amazonaws.util.LengthCheckInputStream.read(LengthCheckInputStream.java:107) ~[aws-java-sdk-core-1.11.125.jar!/:na]
at com.amazonaws.internal.SdkFilterInputStream.read(SdkFilterInputStream.java:72) ~[aws-java-sdk-core-1.11.125.jar!/:na]
at sun.nio.cs.StreamDecoder.readBytes(StreamDecoder.java:284) ~[na:1.8.0_65]
at sun.nio.cs.StreamDecoder.implRead(StreamDecoder.java:326) ~[na:1.8.0_65]
at sun.nio.cs.StreamDecoder.read(StreamDecoder.java:178) ~[na:1.8.0_65]
at java.io.InputStreamReader.read(InputStreamReader.java:184) ~[na:1.8.0_65]
at java.io.BufferedReader.fill(BufferedReader.java:161) ~[na:1.8.0_65]
at java.io.BufferedReader.readLine(BufferedReader.java:324) ~[na:1.8.0_65]
at java.io.BufferedReader.readLine(BufferedReader.java:389) ~[na:1.8.0_65]
at org.springframework.batch.item.file.FlatFileItemReader.readLine(FlatFileItemReader.java:201) ~[spring-batch-infrastructure-3.0.7.RELEASE.jar!/:3.0.7.RELEASE]
... 23 common frames omitted

我想知道是否有一个简单的方法来解决这个问题。目前,我正在考虑做一个文件的本地副本,然后从这些文件中读取,但我想知道这种异常是否可以通过某些配置来避免。

谢谢!

EN

回答 1

Stack Overflow用户

回答已采纳

发布于 2017-08-02 09:09:36

我猜是一个线程关闭SFTP会话,而另一个线程仍在处理。

更好的做法是使用MultiResourcePartitioner为每个资源(文件)创建一个分区,然后让读取器将每个文件分别作为它自己的分区来选择。有了这种配置,您就不再需要MultiResourceItemReader了(您可以直接进入委托)。

参考这里的示例 https://github.com/spring-projects/spring-batch/blob/master/spring-batch-samples/src/main/resources/jobs/partitionFileJob.xml

还指 How to apply partitioned count for MultiResourceItemReader?

票数 1
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/45360253

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档