前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >RabbitMq可靠性分析

RabbitMq可靠性分析

作者头像
sucl
发布2019-08-07 14:26:37
3670
发布2019-08-07 14:26:37
举报
文章被收录于专栏:企业平台构建企业平台构建

最近了解并简单实用了下Rabbitmq,整个使用也大致了解了,但是要作做到真正的可靠,仅仅依赖于应用提供的方式是否在业务环境中真的能够达到可靠的目的。当然我们所谓的可靠性主要指的以下几方面(个人认为):

  1. 生产消息时,如果broker处理成功/失败,是否一定会告知生产者
  2. 消息生产者告知消息发送成功/失败,是否broker也是一致
  3. 消息被消费,broker是否被删除
  4. 消息消费后的ack是否能够一定被broker知晓

我们知道,在消息从生产者发送到消息服务器是存在一个过程的,如何确保消息到达服务器,目前是通过事务与confirm进行保障,但是不是就一定能够保证消息真实的到达并被服务器处理?由于网络环境波动较大,不能绝对保证在与服务器通信过程中不出现问题,那么如果出现网络或系统故障时,如何保证数据的一致性与完整性?比如在生产者发送消息到Broker,并处理完成,但是在返回处理结果时出现问题,那么Broker存在该条消息,但是生产者却可能认为消息没有正常发送。如果Broker处理过程出现问题,生产者没有收到ack消息会如何?是否会调用confirm回调?

首先看一种现象:

代码语言:javascript
复制
public class Producer {
    private AtomicInteger msgCount = new AtomicInteger(0);

    public void produce(String message) throws IOException, TimeoutException, InterruptedException {
        Connection connection = AmqpConnectionFactory.getConnection(new AmqpConfig());
        Channel channel = connection.createChannel();
        channel.exchangeDeclare(RbCommon.EXCHANGE_X2,ExType.Direct.value(),false,false,false,null);//申明交换机
        
        Map<String, Object> args = new HashMap<>();
        channel.queueDeclare(RbCommon.QUEUE_Q2,false,false,false ,args);
        channel.queueBind(RbCommon.QUEUE_Q2,RbCommon.EXCHANGE_X2,RbCommon.BINDING_KEY2);//将消息交换机与队列绑定

        channel.confirmSelect();//确认机制

        channel.addConfirmListener(new ConfirmListener() {
            @Override
            public void handleAck(long deliveryTag, boolean multiple) throws IOException {
//                System.out.println("消息发送成功! "+deliveryTag);
                msgCount.incrementAndGet();
            }

            @Override
            public void handleNack(long deliveryTag, boolean multiple) throws IOException {
                System.out.println("消息发送失败! "+deliveryTag);
            }
        });

        channel.addReturnListener(new ReturnListener() {
            @Override
            public void handleReturn(int replyCode, String replyText, String exchange, String routingKey, AMQP.BasicProperties properties, byte[] body) throws IOException {
                System.out.println("重新处理消息!");
            }
        });

        int count = 0;
        while (count++ <100000){
            channel.basicPublish(RbCommon.EXCHANGE_X2,RbCommon.ROUTING_KEY2,false,false,null,(new Date()+message+count).getBytes());
        }

        if(channel.waitForConfirms()){
            channel.close();
            AmqpConnectionFactory.close(connection);
        }
    }

    public static void main(String[] args) throws IOException, TimeoutException, InterruptedException {
        Producer producer = new Producer();
        producer.produce("消息 ");
        System.out.println(producer.msgCount.intValue());
    }
}

程序运行完成后,发现mq服务器消息确实有100000,但是最终打印的结果却没那么多,ack本身是一种异步确认机制,那这种机制会不会并非线程安全的?可以看到handleAck有一个参数multiple,表示是否批量处理,如果是这样,那就可以理解为什么不是每条消息都有相应的回应了。所有我们不应该关注回调次数,而是deliveryTag的最后值。

当然可以通过channel.waitForConfirms()来决定使用单挑或批量进行确认。

  • 假设消息发送不管成功/失败都会执行回调
代码语言:javascript
复制
可以在ConfirmListener中对ack=false的数据进行重发。
  • 可能消息服务器处理成功,但没有获取返回进行回调处理
代码语言:javascript
复制
发送消息前进行落库,对于Ack成功的数据进行删除,其余的数据重发。这个会出现消息重复。
  • 如果消息被消费了,但是服务器没有收到ack,消息会被重复发送
本文参与 腾讯云自媒体分享计划,分享自作者个人站点/博客。
如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 作者个人站点/博客 前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体分享计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档