首页
学习
活动
专区
圈层
工具
发布
首页
学习
活动
专区
圈层
工具
MCP广场
社区首页 >问答首页 >Spring Boot、JmsListener和SQS在不同账户上的队列

Spring Boot、JmsListener和SQS在不同账户上的队列
EN

Stack Overflow用户
提问于 2018-03-05 14:03:48
回答 2查看 1.8K关注 0票数 2

我正在尝试开发一个Spring Boot(1.5)应用程序,它需要监听来自两个不同AWS帐户的SQS队列。可以使用JmsListener注释创建监听程序吗?我已经检查了权限是否正确,我可以使用getQueueUrl()获取队列url,并使用setQueueOwnerAWSAccountId()设置正确的帐户id。

下面是我在main account下使用的监听器代码。尝试将其用于另一个帐户上的队列时,出现错误

代码语言:javascript
运行
复制
HTTPStatusCode: 400 AmazonErrorCode: AWS.SimpleQueueService.NonExistentQueue 
com.amazonaws.services.sqs.model.QueueDoesNotExistException: The specified queue does not exist for this wsdl version.

队列读取器类

代码语言:javascript
运行
复制
@Service
public class QueueReader {

    @JmsListener(destination = "queue-name")
    public void messageReceived(@Payload String message) {
        // message received
    }
}

队列配置类

代码语言:javascript
运行
复制
@Configuration
@EnableJms
public class QueueReaderConfig {
    SQSConnectionFactory connectionFactory = SQSConnectionFactory.builder().withRegion(Region.getRegion(Regions.EU_WEST_1))
                    .withAWSCredentialsProvider(new DefaultAWSCredentialsProviderChain())
                    .build();

    @Bean
    public DefaultJmsListenerContainerFactory jmsListenerContainerFactory() {
        DefaultJmsListenerContainerFactory factory =
                new DefaultJmsListenerContainerFactory();
        factory.setConnectionFactory(this.connectionFactory);
        factory.setDestinationResolver(new DynamicDestinationResolver());
        factory.setConcurrency("3-10");
        factory.setSessionAcknowledgeMode(Session.CLIENT_ACKNOWLEDGE);
        return factory;
    }

    @Bean
    public JmsTemplate defaultJmsTemplate() {
        return new JmsTemplate(this.connectionFactory);
    }
}
EN

回答 2

Stack Overflow用户

发布于 2019-03-30 03:59:01

我也遇到了同样的问题。我找到了一个解决方法,创建自定义DestinationResolver并将其设置在"DefaultJmsListenerContainerFactory“和"JmsTemplate”中。

另外,在"CustomDynamicDestinationResolver“中通过ownerAccountId查找队列。

代码语言:javascript
运行
复制
queue = ((SQSSession) session).createQueue(queueName, ownerAccountId);

使用连接工厂监听队列。

代码语言:javascript
运行
复制
@JmsListener(destination = "MyQueue", containerFactory = "customJmsListenerContainerFactory")
public void process(String message) throws IOException {

有点晚了,但我希望这能帮助像我这样的人寻找解决方案。

谢谢,

阿克沙伊

代码语言:javascript
运行
复制
import com.amazon.sqs.javamessaging.ProviderConfiguration;
import com.amazon.sqs.javamessaging.SQSConnectionFactory;
import com.amazon.sqs.javamessaging.SQSSession;
import com.amazonaws.auth.DefaultAWSCredentialsProviderChain;
import com.amazonaws.regions.Regions;
import com.amazonaws.services.sqs.AmazonSQSClientBuilder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.jms.config.DefaultJmsListenerContainerFactory;
import org.springframework.jms.core.JmsTemplate;
import org.springframework.jms.support.destination.DestinationResolver;
import org.springframework.util.Assert;

import javax.jms.*;

@Configuration
public class CustomJmsConfig {

    private static final Logger LOGGER = LoggerFactory.getLogger(CustomJmsConfig.class);

    @Value("${copies.processor.concurrency:5}")
    private String concurrency;

    @Value("${owner.account.id:1234}")
    private String ownerAccountId;

    SQSConnectionFactory customConnectionFactory =
        new SQSConnectionFactory(
            new ProviderConfiguration(),
            AmazonSQSClientBuilder.standard().withRegion(Regions.EU_CENTRAL_1).withCredentials(new DefaultAWSCredentialsProviderChain())
        );

    @Bean
    public DefaultJmsListenerContainerFactory customJmsListenerContainerFactory() {
    DefaultJmsListenerContainerFactory factory = new DefaultJmsListenerContainerFactory();
    factory.setConnectionFactory(this.customConnectionFactory);
    factory.setDestinationResolver(new CustomDynamicDestinationResolver(ownerAccountId));
    factory.setConcurrency(concurrency);
    factory.setSessionAcknowledgeMode(Session.CLIENT_ACKNOWLEDGE);
    return factory;
    }

    @Bean
    public JmsTemplate customJmsTemplate() {
    JmsTemplate jmsTemplate = new JmsTemplate(this.customConnectionFactory);
    jmsTemplate.setDestinationResolver(new CustomDynamicDestinationResolver(ownerAccountId));
    return jmsTemplate;
    }

    public static class CustomDynamicDestinationResolver implements DestinationResolver {

    private String ownerAccountId;

    public CustomDynamicDestinationResolver(String ownerAccountId) {
        this.ownerAccountId = ownerAccountId;
    }

    @Override
    public Destination resolveDestinationName(Session session, String destinationName, boolean pubSubDomain) throws JMSException {
        Assert.notNull(session, "Session must not be null");
        Assert.notNull(destinationName, "Destination name must not be null");
        if (pubSubDomain) {
        return resolveTopic(session, destinationName);
        } else {
        return resolveQueue(session, destinationName);
        }
    }

    protected Topic resolveTopic(Session session, String topicName) throws JMSException {
        return session.createTopic(topicName);
    }

    protected Queue resolveQueue(Session session, String queueName) throws JMSException {
        Queue queue;
        LOGGER.info("Getting destination for libraryOwnerAccountId: {}, queueName: {}", libraryOwnerAccountId, queueName);
        if (libraryOwnerAccountId != null && session instanceof SQSSession) {
        queue = ((SQSSession) session).createQueue(queueName, ownerAccountId);
        } else {
        queue = session.createQueue(queueName);
        }
        return queue;
    }
    }
}
票数 2
EN

Stack Overflow用户

发布于 2021-09-17 07:18:01

我的解决方案是基于Akshay的回应。我也在使用定制的DestinationResolver。但是,我的实现不需要固定的ownerAccountId

代码语言:javascript
运行
复制
public class SqsDynamicDestinationResolver extends DynamicDestinationResolver {
  @Override
  protected Queue resolveQueue(Session session, String queueName) throws JMSException {
    if (session instanceof SQSSession) {
      SQSSession sqsSession = (SQSSession) session;
      String[] parts = queueName.split(":");
      if (parts.length == 2) {
        return sqsSession.createQueue(parts[1], parts[0]);
      }
    }

    return super.resolveQueue(session, queueName);
  }

然后在JmsTemplateDefaultJmsListenerContainerFactory中设置解析器。

您可以简单地将队列名称指定为"whatever_queue_name",或者使用冒号分隔所有者帐户id,例如"0000216587:whatever_queue_name"

请注意,如果 ownerAccountId 来自不同的区域,则此解决方案将不起作用!在这种情况下,您将需要单独的配置。

票数 0
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/49104384

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档