首页
学习
活动
专区
圈层
工具
发布
首页
学习
活动
专区
圈层
工具
MCP广场
社区首页 >问答首页 >RabbitMQ / Pika保证消息按创建的顺序接收?

RabbitMQ / Pika保证消息按创建的顺序接收?
EN

Stack Overflow用户
提问于 2012-05-14 23:48:20
回答 2查看 3.1K关注 0票数 0

作为一个简单的例子,我正在向一个新的RabbitMQ(v2.6.1)队列中添加5个项目:

代码语言:javascript
运行
复制
import pika
connection = pika.BlockingConnection(pika.ConnectionParameters(host='not.my.real.server.net'))
channel = connection.channel()
channel.queue_declare(queue='dw.neil',durable=True)
# add 5 messages to the queue, the numbers 1-5
for x in range(5):
    message = x+1
    channel.basic_publish(exchange='',routing_key='dw.neil', body=str(message))
    print " [x] Sent '%s'" % message
connection.close()

我清除我的队列,然后运行上面的代码来添加5个项目:

代码语言:javascript
运行
复制
nkodner@hadoop4 sports_load_v2$ python send_5.py 
 [x] Sent '1'
 [x] Sent '2'
 [x] Sent '3'
 [x] Sent '4'
 [x] Sent '5'

现在,我尝试模拟失败的处理。给定要从队列中使用的以下代码。请注意,我注释掉了对basic_ack的调用:

代码语言:javascript
运行
复制
#!/usr/bin/env python
import pika

connection = pika.BlockingConnection(pika.ConnectionParameters(host='not.my.real.server.net'))
channel = connection.channel()
channel.queue_declare(queue='dw.neil',durable=True)
method_frame, header_frame, body=channel.basic_get(queue='dw.neil')
print method_frame, header_frame
print "body: %s" % body
#channel.basic_ack(delivery_tag=method_frame.delivery_tag)
connection.close()

我运行接收代码来从队列中抓取一件物品。正如我所料,我得到了第一项:

代码语言:javascript
运行
复制
nkodner@hadoop4 sports_load_v2$ python r5.py 
<Basic.GetOk(['message_count=9', 'redelivered=False', 'routing_key=dw.neil', 'delivery_tag=1', 'exchange='])>
<BasicProperties([])>
body: 1

因为对channel.basic_ack()的调用被注释掉了,所以我希望未确认的消息会被放到队列中,以便下一个使用者得到它。我希望消息#1 (再次)是队列中的第一条消息,并将Redelivered属性设置为True。相反,收到消息#2:

代码语言:javascript
运行
复制
nkodner@hadoop4 sports_load_v2$ python r5.py 
<Basic.GetOk(['message_count=9', 'redelivered=False', 'routing_key=dw.neil', 'delivery_tag=1', 'exchange='])>
<BasicProperties([])>
body: 2

队列中的所有其他消息都是在#1返回并将重新传递标志设置为True之前收到的:

..。

代码语言:javascript
运行
复制
nkodner@hadoop4 sports_load_v2$ python r5.py 
<Basic.GetOk(['message_count=9', 'redelivered=False', 'routing_key=dw.neil', 'delivery_tag=1', 'exchange='])>
<BasicProperties([])>
body: 5

nkodner@hadoop4 sports_load_v2$ python r5.py 
<Basic.GetOk(['message_count=9', 'redelivered=True', 'routing_key=dw.neil', 'delivery_tag=1', 'exchange='])>
<BasicProperties([])>
body: 1

有没有什么属性或选项我可以设置,让我一直得到#1的交付,直到它得到确认?

我的用例是使用顺序生成的文件加载数据仓库。我们使用基于消息的处理来让我的程序知道一些新文件已经准备好并要加载到DW中。我们必须按照文件生成的顺序来处理文件。

EN

Stack Overflow用户

发布于 2012-05-15 00:01:53

尝试使用channel.basic_reject -这应该会将未确认的消息推送回RabbitMQ,它会将该消息视为新消息。此外--如果有失败的消息被阻塞,您可以使用channel.basic_recover告诉RabbitMQ重新传递所有未确认的消息。

  • http://pika.github.com/communicating.html#channel.Channel.basic_reject
  • http://pika.github.com/communicating.html#channel.Channel.basic_recover

http://www.rabbitmq.com/extensions.html#negative-acknowledgements提供了关于Basic.Reject和Basic.Nack的区别信息。

http://www.rabbitmq.com/semantics.html中解释了消息排序语义

票数 1
EN
查看全部 2 条回答
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/10586694

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档