RabbitMQ - TcpConnection析构引发的一次handshake_timeout

使用RabbitMQ时,连接rabbit-server一直连接失败,代码没有任何错误提示。但是通过rabbitmqctl始终查询不到连接以及创建的queue等信息。 官方的文件demo里面也没有TcpConnection相关例子,只在github上有些简单说明。 然而网上几乎所有人都依然还是在使用Connection,几乎没有使用TcpConnection的例子。最后还是放弃了网络求助,老老实实看源码定位了。 使用tcpdump确认,代码这边的TcpConnection确实是已经向rabbit-server发出了连接请求。

开始观察发现三次握手是已经建立了连接的,但是几秒后,rabbit-server主动发送返回了一个RST包。这非常诧异,查看rabbit-server日志看到,产生了一次handshake_timeout错误。

 现在可以确认,不是鉴权产生的问题,而是在连接时就已经失败了,在完成连接到RST包收到刚好过了10s时间。在官方文档查阅到,rabbit-server的心跳也刚好是10s。
 后来还是确定问题点是在代码上,但是代码只有短短几行从github上copy下来的,怎么会出错呢。
 最后在日志打印上发现monitor函数执行了两次,这个小小的信息感觉看到了问题的原因,查看TcpConnection源码monitor被调用的地方。
  1 public:  
 2     /**  
  3      *  Constructor  
 4      *  @param  connection  Parent TCP connection object  
  5      *  @param  socket      The socket filedescriptor  
 6      *  @param  buffer      The buffer that was already built  
  7      *  @param  handler     User-supplied handler object  
 8      */  
  9     TcpConnected(TcpConnection *connection, int socket, TcpOutBuffer &&buffer, TcpHandler *handler) :  
 10         TcpState(connection, handler), 
  11         _socket(socket), 
 12         _out(std::move(buffer)), 
  13         _in(4096) 
 14     { 
  15         // if there is already an output buffer, we have to send out that first 
 16         if (_out) _out.sendto(_socket); 
  17          
 18         // tell the handler to monitor the socket, if there is an out 
  19         _handler->monitor(_connection, _socket, _out ? readable | writable : readable); 
 20     } 
  21      
 22     /** 
  23      *  Destructor 
 24      */ 
  25     virtual ~TcpConnected() noexcept 
 26     { 
  27         // we no longer have to monitor the socket 
 28         _handler->monitor(_connection, _socket, 0); 
  29          
 30         // close the socket 31         close(_socket); 
  32     }
 在构造和析构中各调用了一次,而且内部使用connection可能是为了提高效率进行了线程操作,也就是说实际的connection是在多线程中完成的。
 最后尝试修改代码,使用指针进行操作,因为代码并不是github上的单个函数文件,而是多处引用,最后问题解决。成功使用TcpConnection连接上了rabbit-server。
  
 附上简单代码:
  1 int Broker::init(std::string host,int port, std::string username, std::string userpasswd, int svrid)  
2 {  
3     // create an instance of your own tcp handler  
4     _handle = new DSBrokerMessageHandle();  
5   
6     // address of the server  
7     AMQP::Address address(host, port,AMQP::Login(username,userpasswd),"/");  
8   
9     // create a AMQP connection object 
10     _connection = new AMQP::TcpConnection(_handle, address); 
11  
12     // and create a channel 
13     _channel = new AMQP::TcpChannel(&connection); 
14  
15     auto receiveMessageCallback = [=](const AMQP::Message &message, 
16         uint64_t deliveryTag, 
17         bool redelivered) 
18     { 
19         //_channel->ack(deliveryTag); 
20     }; 
21 
 22     AMQP::QueueCallback callback = 
23         [=](const std::string &name, int msgcount, int consumercount) 
24     { 
25         _channel->bindQueue("service", name, name); 
26         _channel->bindQueue("service", name, "monitor"); 
27         _channel->bindQueue("service", name, "heartbeat"); 
28  
29         _channel->consume(name, AMQP::noack).onReceived(receiveMessageCallback); 
30     }; 
31  
32     AMQP::SuccessCallback success = [svrid, this, callback]() 
33     { 
34         char que[4] = { '\0' }; 
35         ACE_OS::itoa(svrid, que, 10); 
36         std::string quename(que); 
37         _channel->declareQueue(quename, AMQP::durable).onSuccess(callback); 
38     }; 
39 
 40     // use the channel object to call the AMQP method you like 
41     _channel->declareExchange("service", AMQP::fanout).onSuccess(success); 
42  
43     return 0; 
44 }
  
 

本文参与腾讯云自媒体分享计划,欢迎正在阅读的你也加入,一起分享。

发表于

我来说两句

0 条评论
登录 后参与评论

相关文章

来自专栏

后端开源软件集合

缓存系统:memcached(group cache)、redis、mongodb、Couchbase(CouchDB、Membase、CouchOne) ht...

2239
来自专栏后端技术探索

RabbitMQ 系列AMQP协议

ZeroMQ和RabbitMQ是目前两种业界最为流行的消息队列,ZeroMQ的优势在于性能和轻量级,使用上类似于Socket通信,帮助应用封装了底层通信的细节,...

852
来自专栏编程一生

一个高性能、轻量级的分布式内存队列系统--beanstalk

1472
来自专栏菩提树下的杨过

ActiveMQ笔记(6):消息延时投递

在开发业务系统时,某些业务场景需要消息定时发送或延时发送(类似:飞信的短信定时发送需求),这时候就需要用到activemq的消息延时投递,详细的文档可参考官网说...

2805
来自专栏乐沙弥的世界

Oracle 基于备份控制文件的恢复(unsing backup controlfile)

    通常在当前控制文件丢失,或者当前的控制文件与需要恢复的控制文件不一致的情况下,我们需要重新创建一个控制文件或者使用 unsing backup cont...

752
来自专栏白驹过隙

RabbitMQ - TcpConnection析构引发的一次handshake_timeout

40013
来自专栏大数据架构

Kafka设计解析(八)- Exactly Once语义与事务机制原理

3193
来自专栏微信公众号:Java团长

消息中间件企业级应用

众所周知,消息中间件是大型分布式系统中不可或缺的重要组件。它使用简单,却解决了不少难题,比如异步处理,系统藕合,流量削锋,分布式事务管理等。实现了一个高性能,高...

1671
来自专栏Crossin的编程教室

Hexo(3)-安装自己喜欢的主题

本系列其它文章: 用 GitHub + Hexo 建立你的第一个博客 [Hexo]部署博客及更新博文 欢迎在今天下面一条推送中留下你的博客地址 本篇来讲解如何安...

4855
来自专栏along的开发之旅

Windows下Git和GitExtension配置

先提下写这篇博客遇到的坑,截图千万不要存为bmp格式,要存为png或其他.大小相差特别大,bmp格式1.3mb,png才80kb.加载速度快了不是一点点.

731

扫码关注云+社区