首页
学习
活动
专区
圈层
工具
发布
首页
学习
活动
专区
圈层
工具
MCP广场
社区首页 >问答首页 >SpringBoot @SqsListener -不工作-有异常- TaskRejectedException

SpringBoot @SqsListener -不工作-有异常- TaskRejectedException
EN

Stack Overflow用户
提问于 2018-07-17 12:04:55
回答 6查看 15.1K关注 0票数 12

我有一个在队列中已经有5000条消息的AWS SQS (示例消息看起来像这样的'Hello @ 1')我创建了一个SpringBoot应用程序,并且在其中一个组件类中创建了一个方法来从SQS读取消息。

代码语言:javascript
运行
复制
package com.example.aws.sqs.service;

import org.springframework.cloud.aws.messaging.listener.SqsMessageDeletionPolicy;
import org.springframework.cloud.aws.messaging.listener.annotation.SqsListener;
import org.springframework.stereotype.Component;
import lombok.extern.slf4j.Slf4j;

@Component
@Slf4j
public class MessageReceiverService {   

@SqsListener(value = { "${cloud.aws.sqs.url}" }, deletionPolicy = SqsMessageDeletionPolicy.ALWAYS)
public void readMessage(String message){
    log.info("Reading Message... {}", message);
}

}

我的主要SpringBoot类

代码语言:javascript
运行
复制
@SpringBootApplication 
public class AwsSqsApplicationConsumer {
public static void main(String[] args) {
    SpringApplication.run(AwsSqsApplicationConsumer.class, args);
}
}

当应用程序运行时,我得到了异常:

代码语言:javascript
运行
复制
s.c.a.m.l.SimpleMessageListenerContainer : An Exception occurred while polling queue '<my sqs name>'. The failing operation will be retried in 10000 milliseconds
org.springframework.core.task.TaskRejectedException: Executor [java.util.concurrent.ThreadPoolExecutor@7c1594a5[Running, pool size = 3, active threads = 3, queued tasks = 0, completed tasks = 20]] did not accept task: org.springframework.cloud.aws.messaging.listener.SimpleMessageListenerContainer$SignalExecutingRunnable@1cbd9ef2
at org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor.execute(ThreadPoolTaskExecutor.java:309) ~[spring-context-5.0.7.RELEASE.jar:5.0.7.RELEASE]
at org.springframework.cloud.aws.messaging.listener.SimpleMessageListenerContainer$AsynchronousMessageListener.run(SimpleMessageListenerContainer.java:286) ~[spring-cloud-aws-messaging-2.0.0.RELEASE.jar:2.0.0.RELEASE]
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) [na:1.8.0_65]
at java.util.concurrent.FutureTask.run(FutureTask.java:266) [na:1.8.0_65]
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) [na:1.8.0_65]
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) [na:1.8.0_65]
at java.lang.Thread.run(Thread.java:745) [na:1.8.0_65]
Caused by: java.util.concurrent.RejectedExecutionException: Task org.springframework.cloud.aws.messaging.listener.SimpleMessageListenerContainer$SignalExecutingRunnable@1cbd9ef2 rejected from java.util.concurrent.ThreadPoolExecutor@7c1594a5[Running, pool size = 3, active threads = 2, queued tasks = 0, completed tasks = 20]
at java.util.concurrent.ThreadPoolExecutor$AbortPolicy.rejectedExecution(ThreadPoolExecutor.java:2047) ~[na:1.8.0_65]
at java.util.concurrent.ThreadPoolExecutor.reject(ThreadPoolExecutor.java:823) [na:1.8.0_65]
at java.util.concurrent.ThreadPoolExecutor.execute(ThreadPoolExecutor.java:1369) [na:1.8.0_65]
at org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor.execute(ThreadPoolTaskExecutor.java:306) ~[spring-context-5.0.7.RELEASE.jar:5.0.7.RELEASE]
... 6 common frames omitted

我没有配置任何自定义的Executor服务。使用预配置的Spring Beans。springBootVersion = '2.0.3.RELEASE‘springCloudVersion = 'Finchley.RELEASE’

EN

回答 6

Stack Overflow用户

发布于 2018-10-12 17:42:41

设置消息的最大数量似乎可以解决这个问题:

代码语言:javascript
运行
复制
@Bean
public SimpleMessageListenerContainerFactory simpleMessageListenerContainerFactory(AmazonSQSAsync amazonSQS){
    SimpleMessageListenerContainerFactory factory = new SimpleMessageListenerContainerFactory();
    factory.setAmazonSqs(amazonSQS);
    factory.setMaxNumberOfMessages(10);
    return factory;
}
票数 14
EN

Stack Overflow用户

发布于 2020-03-06 15:57:39

我相信这是Spring中的一个bug或疏忽。此问题源于以下默认值:

代码语言:javascript
运行
复制
public class SimpleMessageListenerContainer extends AbstractMessageListenerContainer {

    private static final int DEFAULT_WORKER_THREADS = 2;

代码语言:javascript
运行
复制
abstract class AbstractMessageListenerContainer implements InitializingBean, DisposableBean, SmartLifecycle, BeanNameAware {
    private static final int DEFAULT_MAX_NUMBER_OF_MESSAGES = 10;

如果没有设置maxNumberOfMessages,那么它将使用10作为从SQS中提取的消息数,并使用2作为任务执行器中的工作进程数。这意味着如果它一次拉取3条或更多消息,您将得到该异常。如果您手动将maxNumberOfMessages设置为一个值(任何值),它将在两个地方使用它来同步这些值,正如我所预期的那样:

代码语言:javascript
运行
复制
    @Bean
    public SimpleMessageListenerContainer simpleMessageListenerContainer(
            SimpleMessageListenerContainerFactory factory, QueueMessageHandler messageHandler)
    {
        SimpleMessageListenerContainer container = factory.createSimpleMessageListenerContainer();
        container.setMaxNumberOfMessages(5);
        container.setMessageHandler(messageHandler);
        return container;
    }
票数 4
EN

Stack Overflow用户

发布于 2018-07-30 15:09:15

侦听器线程配置存在问题。请参阅以下内容

代码语言:javascript
运行
复制
...
ThreadPoolExecutor@7c1594a5[Running, pool size = 3, active threads = 3, queued tasks = 0, completed tasks = 20]]
...

默认线程池大小小于您所需的大小。

将以下配置添加到Spring应用程序中

代码语言:javascript
运行
复制
@Configuration
public class TasksConfiguration implements SchedulingConfigurer {

    @Override
    public void configureTasks(ScheduledTaskRegistrar taskRegistrar) {
        ThreadPoolTaskScheduler taskScheduler = new ThreadPoolTaskScheduler();
        taskScheduler.setPoolSize(5); // TODO: Load this from configuration
        taskScheduler.initialize();
        taskRegistrar.setTaskScheduler(taskScheduler);
    }
}

现在,您应该能够处理这些任务了。

备注:之前被拒绝的任务将在一定时间后重新获得。

编辑:我认为人们被.setPoolSize(5000)行中的数字吓坏了。这是一个可配置的数字,您可以选择任何适合您的要求的数字。为了得到答案,我将它减少到一个更小的数字。

票数 3
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/51373082

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档