首页
学习
活动
专区
圈层
工具
发布
首页
学习
活动
专区
圈层
工具
MCP广场
社区首页 >问答首页 >春季AMQP老化消费者对RabbitMQ的PCF

春季AMQP老化消费者对RabbitMQ的PCF
EN

Stack Overflow用户
提问于 2019-03-06 19:00:25
回答 1查看 564关注 0票数 0

我们的团队在Spring使用云铸造连接器时遇到了一些问题。每当网络出现不稳定时,连接就会失败,那么AMQP似乎会尝试自动恢复,无法自动恢复,然后创建一个新的消费者。

问题是RabbitMQ似乎并没有取消老消费者的注册.因此,我们最终有40个消费者,而不是通常的one消费者。而且,我认为,由于无效消费者的数量,老消费者会收到一些消息,但不会运行我们的代码。因此,我们有一堆未加加的消息,直到应用程序重新启动并清除使用者列表。

--我不确定这是我们端的错误配置,还是错误。有什么主意吗?

以下是补充资料。

我们日志中的一小部分:

代码语言:javascript
运行
复制
a0b279ba-fb73-41cc-bdf9-b430cec52856 1 Mon Mar 04 2019 02:31:47.842 com.rabbitmq.client.TopologyRecoveryException: Caught an exception while recovering consumer amq.ctag-Dqik8vz9dSPiUzMbM4uGbw: connection is already closed due to connection error; cause: java.net.SocketException: Connection reset
a0b279ba-fb73-41cc-bdf9-b430cec52856 1 Mon Mar 04 2019 02:31:47.842     at java.lang.Thread.run(Thread.java:748) [na:1.8.0_162]
a0b279ba-fb73-41cc-bdf9-b430cec52856 1 Mon Mar 04 2019 02:31:47.842 2019-03-04 00:31:47.823 ERROR [Atlas Backend,,,] 23 --- [0.32.27.77:5672] c.r.c.impl.ForgivingExceptionHandler     : Caught an exception when recovering topology Caught an exception while recovering consumer amq.ctag-t7_vY47weZNYtWagl__pYA: connection is already closed due to connection error; cause: java.net.SocketException: Connection reset
a0b279ba-fb73-41cc-bdf9-b430cec52856 1 Mon Mar 04 2019 02:31:47.842     at com.rabbitmq.client.impl.AMQChannel.ensureIsOpen(AMQChannel.java:198) ~[amqp-client-4.0.3.jar!/:4.0.3]
a0b279ba-fb73-41cc-bdf9-b430cec52856 1 Mon Mar 04 2019 02:31:47.842 2019-03-04 00:31:47.823 ERROR [Atlas Backend,,,] 23 --- [0.32.27.77:5672] c.r.c.impl.ForgivingExceptionHandler     : Caught an exception when recovering topology Caught an exception while recovering consumer amq.ctag-AK1YOpN0_E2KVwSi4fzOaw: connection is already closed due to connection error; cause: java.net.SocketException: Connection reset
a0b279ba-fb73-41cc-bdf9-b430cec52856 1 Mon Mar 04 2019 02:31:47.842     at com.rabbitmq.client.impl.AMQConnection.notifyRecoveryCanBeginListeners(AMQConnection.java:693) [amqp-client-4.0.3.jar!/:4.0.3]
a0b279ba-fb73-41cc-bdf9-b430cec52856 1 Mon Mar 04 2019 02:31:47.842     at com.rabbitmq.client.impl.AMQConnection.doFinalShutdown(AMQConnection.java:687) [amqp-client-4.0.3.jar!/:4.0.3]
a0b279ba-fb73-41cc-bdf9-b430cec52856 1 Mon Mar 04 2019 02:31:47.842     at com.rabbitmq.client.impl.AMQConnection$MainLoop.run(AMQConnection.java:577) [amqp-client-4.0.3.jar!/:4.0.3]
a0b279ba-fb73-41cc-bdf9-b430cec52856 1 Mon Mar 04 2019 02:31:47.842     at java.lang.Thread.run(Thread.java:748) [na:1.8.0_162]
a0b279ba-fb73-41cc-bdf9-b430cec52856 1 Mon Mar 04 2019 02:31:47.842 2019-03-04 00:31:47.822 ERROR [Atlas Backend,,,] 23 --- [0.32.27.77:5672] c.r.c.impl.ForgivingExceptionHandler     : Caught an exception when recovering topology Caught an exception while recovering consumer amq.ctag-vrWhRdhk3ejrXo2-ImjXzA: connection is already closed due to connection error; cause: java.net.SocketException: Connection reset
a0b279ba-fb73-41cc-bdf9-b430cec52856 1 Mon Mar 04 2019 02:31:47.842     at com.rabbitmq.client.impl.AMQChannel.rpc(AMQChannel.java:244) ~[amqp-client-4.0.3.jar!/:4.0.3]
a0b279ba-fb73-41cc-bdf9-b430cec52856 1 Mon Mar 04 2019 02:31:47.842     at java.lang.Thread.run(Thread.java:748) [na:1.8.0_162]
a0b279ba-fb73-41cc-bdf9-b430cec52856 1 Mon Mar 04 2019 02:31:47.842     at java.lang.Thread.run(Thread.java:748) [na:1.8.0_162]
a0b279ba-fb73-41cc-bdf9-b430cec52856 1 Mon Mar 04 2019 02:31:47.842     at com.rabbitmq.client.impl.recovery.RecordedConsumer.recover(RecordedConsumer.java:60) ~[amqp-client-4.0.3.jar!/:4.0.3]
a0b279ba-fb73-41cc-bdf9-b430cec52856 1 Mon Mar 04 2019 02:31:47.842     ... 7 common frames omitted
a0b279ba-fb73-41cc-bdf9-b430cec52856 1 Mon Mar 04 2019 02:31:47.842     at com.rabbitmq.client.impl.recovery.AutorecoveringConnection.recoverConsumers(AutorecoveringConnection.java:657) [amqp-client-4.0.3.jar!/:4.0.3]
a0b279ba-fb73-41cc-bdf9-b430cec52856 1 Mon Mar 04 2019 02:31:47.842     at com.rabbitmq.client.impl.recovery.AutorecoveringConnection.access$000(AutorecoveringConnection.java:53) [amqp-client-4.0.3.jar!/:4.0.3]
a0b279ba-fb73-41cc-bdf9-b430cec52856 1 Mon Mar 04 2019 02:31:47.842     at com.rabbitmq.client.impl.recovery.AutorecoveringConnection.recoverConsumers(AutorecoveringConnection.java:657) [amqp-client-4.0.3.jar!/:4.0.3]
a0b279ba-fb73-41cc-bdf9-b430cec52856 1 Mon Mar 04 2019 02:31:47.842     at com.rabbitmq.client.impl.AMQConnection$MainLoop.run(AMQConnection.java:577) [amqp-client-4.0.3.jar!/:4.0.3]
a0b279ba-fb73-41cc-bdf9-b430cec52856 1 Mon Mar 04 2019 02:31:47.842     at com.rabbitmq.client.impl.ChannelN.basicConsume(ChannelN.java:1242) ~[amqp-client-4.0.3.jar!/:4.0.3]
a0b279ba-fb73-41cc-bdf9-b430cec52856 1 Mon Mar 04 2019 02:31:47.842     at com.rabbitmq.client.impl.ChannelN.basicConsume(ChannelN.java:1242) ~[amqp-client-4.0.3.jar!/:4.0.3]
a0b279ba-fb73-41cc-bdf9-b430cec52856 1 Mon Mar 04 2019 02:31:47.842     at com.rabbitmq.client.impl.recovery.AutorecoveringConnection.recoverConsumers(AutorecoveringConnection.java:657) [amqp-client-4.0.3.jar!/:4.0.3]
a0b279ba-fb73-41cc-bdf9-b430cec52856 1 Mon Mar 04 2019 02:31:47.842     at com.rabbitmq.client.impl.AMQConnection.doFinalShutdown(AMQConnection.java:687) [amqp-client-4.0.3.jar!/:4.0.3]
a0b279ba-fb73-41cc-bdf9-b430cec52856 1 Mon Mar 04 2019 02:31:47.842     at com.rabbitmq.client.impl.AMQConnection.notifyRecoveryCanBeginListeners(AMQConnection.java:693) [amqp-client-4.0.3.jar!/:4.0.3]
a0b279ba-fb73-41cc-bdf9-b430cec52856 1 Mon Mar 04 2019 02:31:47.842     at com.rabbitmq.client.impl.AMQChannel.ensureIsOpen(AMQChannel.java:198) ~[amqp-client-4.0.3.jar!/:4.0.3]
a0b279ba-fb73-41cc-bdf9-b430cec52856 1 Mon Mar 04 2019 02:31:47.842     at com.rabbitmq.client.impl.recovery.RecordedConsumer.recover(RecordedConsumer.java:60) ~[amqp-client-4.0.3.jar!/:4.0.3]
a0b279ba-fb73-41cc-bdf9-b430cec52856 1 Mon Mar 04 2019 02:31:47.842     at com.rabbitmq.client.impl.recovery.RecordedConsumer.recover(RecordedConsumer.java:60) ~[amqp-client-4.0.3.jar!/:4.0.3]
a0b279ba-fb73-41cc-bdf9-b430cec52856 1 Mon Mar 04 2019 02:31:47.842     at com.rabbitmq.client.impl.recovery.AutorecoveringConnection.recoverConsumers(AutorecoveringConnection.java:673) [amqp-client-4.0.3.jar!/:4.0.3]
a0b279ba-fb73-41cc-bdf9-b430cec52856 1 Mon Mar 04 2019 02:31:47.842     at com.rabbitmq.client.impl.ChannelN.basicConsume(ChannelN.java:1242) ~[amqp-client-4.0.3.jar!/:4.0.3]
a0b279ba-fb73-41cc-bdf9-b430cec52856 1 Mon Mar 04 2019 02:31:47.842     at java.lang.Thread.run(Thread.java:748) [na:1.8.0_162]

我们使用@RabbitListener注释注册我们的消费者

代码语言:javascript
运行
复制
    @RabbitListener(id = CONSUMER_ID, queues = "${CommonInternalEndPoint}", containerFactory = "rabbitLocalContainerFactory")
    protected void process(Object messageObject) {

当我们连接到多个兔子mq实例时,我们需要手动声明连接工厂。我们的连接工厂来自云铸造连接器:

代码语言:javascript
运行
复制
@Configuration
@Profile("cloud")
public class RabbitCloudConfiguration {

    @Bean
    public Cloud cloud(){
        return new CloudFactory().getCloud();
    }

    @Bean
    @Primary
    public ConnectionFactory connectionFactory(){
        return cloud().getSingletonServiceConnector(ConnectionFactory.class, null);
    }

    @Bean("rabbitLocalContainerFactory")
    public SimpleRabbitListenerContainerFactory rabbitLocalContainerFactory(){
        SimpleRabbitListenerContainerFactory containerFactory = new SimpleRabbitListenerContainerFactory();
        containerFactory.setConnectionFactory(connectionFactory());
        containerFactory.setAutoStartup(true);
        return containerFactory;
    }
}
EN

回答 1

Stack Overflow用户

回答已采纳

发布于 2019-03-07 17:24:05

我们使用SpringBoot1.5.9、Framework4.3.6、AMQP1.7.4、RabbitMQ AMQP客户机4.0.3

从2.0版本开始,Spring禁用目标automaticRecoveryEnabled RabbitMQ ConnectionFactoryhttps://docs.spring.io/spring-amqp/docs/2.1.4.RELEASE/reference/#auto-recovery

由于您使用的版本是基于云连接器提供的RabbitMQ ConnectionFactory,所以需要显式禁用它:

代码语言:javascript
运行
复制
@Bean
@Primary
public ConnectionFactory connectionFactory(){
    ConnectionFactory connectionFactory = cloud().getSingletonServiceConnector(ConnectionFactory.class, null);
    ((CachingConnectionFactory) connectionFactory).getRabbitConnectionFactory().setAutomaticRecoveryEnabled(false);
    return connectionFactory;
}

您的解决方案将基于Spring提供的恢复机制。

票数 1
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/55030386

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档