首页
学习
活动
专区
圈层
工具
发布
首页
学习
活动
专区
圈层
工具
MCP广场
社区首页 >问答首页 >在pika / RabbitMQ中处理长时间运行的任务

在pika / RabbitMQ中处理长时间运行的任务
EN

Stack Overflow用户
提问于 2013-01-29 05:58:56
回答 6查看 28.5K关注 0票数 65

我们正在尝试建立一个基本的定向队列系统,其中生产者将生成多个任务,一个或多个消费者将一次获取一个任务,对其进行处理,并确认消息。

问题是,处理过程可能需要10-20分钟,而我们当时没有响应消息,导致服务器断开与我们的连接。

下面是我们的消费者的一些伪代码:

代码语言:javascript
运行
复制
#!/usr/bin/env python
import pika
import time

connection = pika.BlockingConnection(pika.ConnectionParameters(
        host='localhost'))
channel = connection.channel()

channel.queue_declare(queue='task_queue', durable=True)
print ' [*] Waiting for messages. To exit press CTRL+C'

def callback(ch, method, properties, body):
    long_running_task(connection)
    ch.basic_ack(delivery_tag = method.delivery_tag)

channel.basic_qos(prefetch_count=1)
channel.basic_consume(callback,
                      queue='task_queue')

channel.start_consuming()

在第一个任务完成之后,在BlockingConnection内部的某个地方会抛出一个异常,抱怨套接字被重置。此外,RabbitMQ日志显示消费者因为没有及时响应而被断开连接(为什么它重置连接而不是发送FIN很奇怪,但我们不会担心这一点)。

我们搜索了很多,因为我们相信这是RabbitMQ的正常用例(有许多长时间运行的任务,应该在许多消费者之间分配),但似乎没有其他人真的有这个问题。最后,我们偶然发现了一个线程,其中建议使用心跳并在单独的线程中生成long_running_task()

所以代码变成了:

代码语言:javascript
运行
复制
#!/usr/bin/env python
import pika
import time
import threading

connection = pika.BlockingConnection(pika.ConnectionParameters(
        host='localhost',
        heartbeat_interval=20))
channel = connection.channel()

channel.queue_declare(queue='task_queue', durable=True)
print ' [*] Waiting for messages. To exit press CTRL+C'

def thread_func(ch, method, body):
    long_running_task(connection)
    ch.basic_ack(delivery_tag = method.delivery_tag)

def callback(ch, method, properties, body):
    threading.Thread(target=thread_func, args=(ch, method, body)).start()

channel.basic_qos(prefetch_count=1)
channel.basic_consume(callback,
                      queue='task_queue')

channel.start_consuming()

这似乎是可行的,但它非常混乱。我们确定ch对象是线程安全的吗?此外,假设long_running_task()正在使用该连接参数将一个任务添加到一个新队列中(即,这个长流程的第一部分已经完成,让我们将任务发送到第二部分)。因此,线程使用的是connection对象。这个线程安全吗?

更重要的是,做这件事的首选方法是什么?我觉得这非常混乱,而且可能不是线程安全的,所以我们可能做得不对。谢谢!

EN

Stack Overflow用户

发布于 2017-08-16 15:09:32

不要禁用心跳。

最好的解决方案是在一个单独的线程中运行该任务,并将prefetch_count设置为1,这样使用者就可以使用下面的channel.basic_qos(prefetch_count=1)只得到一条未确认的消息

票数 7
EN
查看全部 6 条回答
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/14572020

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档