首页
学习
活动
专区
圈层
工具
发布
首页
学习
活动
专区
圈层
工具
MCP广场
社区首页 >问答首页 >无法使用自定义端点配置从S3轮询

无法使用自定义端点配置从S3轮询
EN

Stack Overflow用户
提问于 2020-03-30 12:24:58
回答 1查看 1.2K关注 0票数 1

我尝试使用spring-integration-aws从S3桶中轮询触发春季批处理作业。我的S3桶不是托管在亚马逊上,而是在本地的minio服务器上,所以我得到了一个自定义配置:

代码语言:javascript
运行
复制
    @Bean
    public AmazonS3 amazonS3(ConfigProperties configProperties) {

        return AmazonS3ClientBuilder.standard()
                .withEndpointConfiguration(new AwsClientBuilder.EndpointConfiguration("http://localhost:9001","eu-west-0")) // Region matches with minio region
                .withPathStyleAccessEnabled(configProperties.getS3().isPathStyle())
                .withCredentials(new AWSStaticCredentialsProvider(new BasicAWSCredentials(
                        configProperties.getS3().getAccessKey(), configProperties.getS3().getSecretKey()
                ))).build();
    }

我用这种方式定义了我的IntegrationFlow:

代码语言:javascript
运行
复制
  @Bean
    public IntegrationFlow s3InboundFlow() {

        S3RemoteFileTemplate template = new S3RemoteFileTemplate(new S3SessionFactory(amazonS3));
        S3StreamingMessageSource s3StreamingMessageSource = new S3StreamingMessageSource(template);
        s3StreamingMessageSource.setRemoteDirectory(String.format("%s/OUT/", configProperties.getS3().getBucketDataPath()));

        return IntegrationFlows.from(s3StreamingMessageSource, configurer -> configurer
                .id("s3InboundAdapter")
                .autoStartup(true)
                .poller(Pollers.fixedDelay(POLL, TimeUnit.SECONDS)))
                .handle(jobLaunchingGateway(jobRepository)) // Launch a spring-batch job
                .get();
    }

问题是,当轮询发生时,我得到了以下错误:

代码语言:javascript
运行
复制
2020-03-30 19:05:21,008 ERROR [scheduling-1] org.springframework.integration.handler.LoggingHandler:  org.springframework.messaging.MessagingException: nested exception is java.lang.IllegalStateException: S3 client with invalid S3 endpoint configured: localhost:9001
    at org.springframework.integration.endpoint.AbstractPollingEndpoint.pollForMessage(AbstractPollingEndpoint.java:342)
    at org.springframework.integration.endpoint.AbstractPollingEndpoint.lambda$null$1(AbstractPollingEndpoint.java:275)
    at org.springframework.integration.util.ErrorHandlingTaskExecutor.lambda$execute$0(ErrorHandlingTaskExecutor.java:57)
    at org.springframework.core.task.SyncTaskExecutor.execute(SyncTaskExecutor.java:50)
    at org.springframework.integration.util.ErrorHandlingTaskExecutor.execute(ErrorHandlingTaskExecutor.java:55)
    at org.springframework.integration.endpoint.AbstractPollingEndpoint.lambda$createPoller$2(AbstractPollingEndpoint.java:272)
    at org.springframework.scheduling.support.DelegatingErrorHandlingRunnable.run(DelegatingErrorHandlingRunnable.java:54)
    at org.springframework.scheduling.concurrent.ReschedulingRunnable.run(ReschedulingRunnable.java:93)
    at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
    at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
    at java.base/java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:304)
    at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
    at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
    at java.base/java.lang.Thread.run(Thread.java:834)
Caused by: java.lang.IllegalStateException: S3 client with invalid S3 endpoint configured: localhost:9001
    at com.amazonaws.services.s3.AmazonS3Client.getRegion(AmazonS3Client.java:4270)
    at org.springframework.integration.aws.support.S3Session.getHostPort(S3Session.java:228)
    at org.springframework.integration.file.remote.AbstractRemoteFileStreamingMessageSource.doReceive(AbstractRemoteFileStreamingMessageSource.java:214)
    at org.springframework.integration.endpoint.AbstractMessageSource.receive(AbstractMessageSource.java:167)
    at org.springframework.integration.endpoint.SourcePollingChannelAdapter.receiveMessage(SourcePollingChannelAdapter.java:250)
    at org.springframework.integration.endpoint.AbstractPollingEndpoint.doPoll(AbstractPollingEndpoint.java:359)
    at org.springframework.integration.endpoint.AbstractPollingEndpoint.pollForMessage(AbstractPollingEndpoint.java:328)

之所以会发生这种情况,是因为当接收到文件时,在spring-integration-aws中设置了一些头:

AbstractRemoteFileStreamingMessageSource.java

代码语言:javascript
运行
复制
return getMessageBuilderFactory()
            .withPayload(session.readRaw(remotePath))
            .setHeader(IntegrationMessageHeaderAccessor.CLOSEABLE_RESOURCE, session)
            .setHeader(FileHeaders.REMOTE_DIRECTORY, file.getRemoteDirectory())
            .setHeader(FileHeaders.REMOTE_FILE, file.getFilename())
            .setHeader(FileHeaders.REMOTE_HOST_PORT, session.getHostPort())
            .setHeader(FileHeaders.REMOTE_FILE_INFO,
                                    this.fileInfoJson ? file.toJson() : file);

REMOTE_HOST_PORT头是由于getHostPort()方法而设置的。然后,getHostPort()在S3Session.java中调用getRegion()方法。

getRegion()方法在AmazonS3Client中没有检查用户在signing region字段中是否设置了一个值。它只检查主机是否匹配"amazonaws.com“模式。

代码语言:javascript
运行
复制
@Override
    public String getHostPort() {
        Region region = this.amazonS3.getRegion().toAWSRegion();
        return String.format("%s.%s.%s:%d", AmazonS3.ENDPOINT_PREFIX, region.getName(), region.getDomain(), 443);
    }
代码语言:javascript
运行
复制
   @Override
    public synchronized Region getRegion() {
        String authority = super.endpoint.getAuthority();
        if (Constants.S3_HOSTNAME.equals(authority)) {
            return Region.US_Standard;
        } else {
            Matcher m = Region.S3_REGIONAL_ENDPOINT_PATTERN.matcher(authority);
            if (m.matches()) {
                return Region.fromValue(m.group(1));
            } else {
                throw new IllegalStateException(
                    "S3 client with invalid S3 endpoint configured: " + authority);
            }
        }
    }

如何使用自定义端点配置从S3进行轮询?为什么getHostPort()方法不检查签名区域值?有可能解决这个问题吗?

EN

回答 1

Stack Overflow用户

回答已采纳

发布于 2020-03-30 13:48:46

是的,这是可以解决的。您只需扩展一个S3SessionFactory,就可以为您的自定义端点返回带有重写的getHostPort()方法的S3Session扩展。

代码语言:javascript
运行
复制
public class MyS3SessionFactory extends S3SessionFactory {

    private MyS3Session s3Session;

    @Override
    public S3Session getSession() {
        return s3Session;
    }

    public MyS3SessionFactory(AmazonS3 amazonS3) {
        super(amazonS3);
        Assert.notNull(amazonS3, "'amazonS3' must not be null.");
        this.s3Session = new MyS3Session(amazonS3);
    }
}
代码语言:javascript
运行
复制
public class MyS3Session extends S3Session {

    public MyS3Session(AmazonS3 amazonS3) {
        super(amazonS3);
    }

    @Override
    public String getHostPort() {
        return "";
    }
}

让我们讨论一下您提出的问题的一个可能的解决方法:https://github.com/spring-projects/spring-integration-aws/issues/160

票数 1
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/60930518

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档