首页
学习
活动
专区
圈层
工具
发布
首页
学习
活动
专区
圈层
工具
MCP广场
社区首页 >问答首页 >@SQSListener不使用并发设置

@SQSListener不使用并发设置
EN

Stack Overflow用户
提问于 2018-10-02 03:51:22
回答 1查看 2.8K关注 0票数 7

我正在构建一个使用AWS SQS队列的Spring应用程序。我能够使用队列,但是我似乎不能在多个并发的消费者中这样做。

当从命令行mvn spring-boot:run运行时,它会连接到SQS,每次只接收一个消息。侦听器方法有额外的代码,这会导致几个第二个延迟,在此期间,我希望接收和处理第二条消息。

就好像配置没有被使用一样。

pom.xml:

代码语言:javascript
运行
复制
<parent>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-parent</artifactId>
    <version>2.0.5.RELEASE</version>
    <relativePath /> <!-- lookup parent from repository -->
</parent>

<properties>
    <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
    <project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
    <java.version>1.8</java.version>
    <spring-cloud.version>Finchley.SR1</spring-cloud.version>
</properties>

<dependencies>
    <dependency>
        <groupId>org.springframework.cloud</groupId>
        <artifactId>spring-cloud-starter-aws</artifactId>
    </dependency>
    <dependency>
        <groupId>org.springframework</groupId>
        <artifactId>spring-jms</artifactId>
    </dependency>
    <dependency>
        <groupId>org.springframework.cloud</groupId>
        <artifactId>spring-cloud-aws-messaging</artifactId>
    </dependency>

    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-test</artifactId>
        <scope>test</scope>
    </dependency>

    <dependency>
        <groupId>com.amazonaws</groupId>
        <artifactId>aws-java-sdk</artifactId>
        <version>1.11.419</version>
    </dependency>

    <dependency>
        <groupId>org.slf4j</groupId>
        <artifactId>slf4j-api</artifactId>
    </dependency>
    <dependency>
        <groupId>org.slf4j</groupId>
        <artifactId>slf4j-log4j12</artifactId>
    </dependency>
</dependencies>

<dependencyManagement>
    <dependencies>
        <dependency>
            <groupId>org.springframework.cloud</groupId>
            <artifactId>spring-cloud-aws</artifactId>
            <version>2.0.0.RELEASE</version>
            <type>pom</type>
            <scope>import</scope>
        </dependency>
    </dependencies>
</dependencyManagement>

<build>
    <plugins>
        <plugin>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-maven-plugin</artifactId>
        </plugin>
    </plugins>
</build>

主修班:

代码语言:javascript
运行
复制
@SpringBootApplication
public class IngestConsumerApplication {

    public static void main(String[] args) {
        SpringApplication.run(IngestConsumerApplication.class, args);
    }
}

配置类:

代码语言:javascript
运行
复制
@Configuration
public class AppConfig {

@Value("${aws.region}")
private String region;

@Bean
@Primary
public AWSCredentialsProvider buildAWSCredentialsProvider() {
    return new ProfileCredentialsProvider();
}

@Bean
@Primary
public AmazonS3 buildS3Client(@Autowired AWSCredentialsProvider credentials) {
    return AmazonS3ClientBuilder
            .standard()
            .withCredentials(credentials)
            .withRegion(region)
            .build();
}

@Bean
@Primary
public AmazonSQSAsync amazonSQSAsync(@Autowired AWSCredentialsProvider credentials) {
    return AmazonSQSAsyncClientBuilder.standard()
            .withCredentials(credentials)
            .withRegion(region)
            .build();
}

@Bean
public QueueMessageHandler queueMessageHandler(@Autowired AmazonSQSAsync sqsClient, @Autowired QueueMessageHandlerFactory queueMessageHandlerFactory) {
    queueMessageHandlerFactory.setAmazonSqs(sqsClient);
    QueueMessageHandler queueMessageHandler = queueMessageHandlerFactory.createQueueMessageHandler();
    return queueMessageHandler;
}

@Bean
public ObjectMapper buildJacksonObjectMapper() {
    return new ObjectMapper();
}

@Bean
public QueueMessageHandlerFactory queueMessageHandlerFactory() {
    QueueMessageHandlerFactory factory = new QueueMessageHandlerFactory();
    MappingJackson2MessageConverter messageConverter = new MappingJackson2MessageConverter();

    //set strict content type match to false
    messageConverter.setStrictContentTypeMatch(false);
    factory.setArgumentResolvers(Collections.<HandlerMethodArgumentResolver>singletonList(new PayloadArgumentResolver(messageConverter)));
    return factory;
}

@Bean
public SimpleMessageListenerContainer simpleMessageListenerContainer(@Autowired AmazonSQSAsync amazonSQSAsync, 
        @Autowired QueueMessageHandler queueMessageHandler,
        @Autowired ThreadPoolTaskExecutor threadPoolExecutor) {
    SimpleMessageListenerContainer simpleMessageListenerContainer = new SimpleMessageListenerContainer();
    simpleMessageListenerContainer.setAmazonSqs(amazonSQSAsync);
    simpleMessageListenerContainer.setMessageHandler(queueMessageHandler);
    simpleMessageListenerContainer.setMaxNumberOfMessages(2);
    simpleMessageListenerContainer.setTaskExecutor(threadPoolExecutor);

    return simpleMessageListenerContainer;
}

@Bean
public ThreadPoolTaskExecutor threadPoolTaskExecutor() {
    ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
    executor.setCorePoolSize(2);
    executor.setThreadNamePrefix("queueExec");
    executor.initialize();
    return executor;
}

}

最后,消费者:

代码语言:javascript
运行
复制
@Async
@SqsListener(deletionPolicy = SqsMessageDeletionPolicy.ON_SUCCESS, value = "${queue}")
public void listener(S3EventNotification event) {
    ...
}
EN

回答 1

Stack Overflow用户

发布于 2022-02-13 19:18:55

从starters的@SqsListner标记方法中删除@异步标记。我不认为这会给您提供任何东西,因为第二个标记在运行时由不同的进程进行评估,并且可能不会得到任何方便异步的代理对象。

另外,"simpleMessageListenerContainer.setMaxNumberOfMessages(2);“是在一次SQS民意测验中可以撤回多少条消息。最大值为10,但这不是并发性,但它确实为进程提供了更多的消息来调用更多侦听器实例。因此,该值将更好地促进并发性,但不能保证并发性。

票数 0
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/52601777

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档