首页
学习
活动
专区
圈层
工具
发布
首页
学习
活动
专区
圈层
工具
MCP广场
社区首页 >问答首页 >信号处理pika / python

信号处理pika / python
EN

Stack Overflow用户
提问于 2014-04-30 20:13:44
回答 1查看 2K关注 0票数 7

我在一个消费者中使用pika.BlockingConnection,它为每条消息执行一些任务。我还添加了信号处理,以便使用者在完全执行所有任务后正常死亡。

在处理消息和接收信号时,我只从函数中获得"signal received",但代码并不退出。因此,我决定也检查在回调函数结束时接收到的信号。问题是,我要检查信号多少次,因为这段代码中会有更多的函数。有没有一种更好的方法来处理信号而不做过多的事情?

代码语言:javascript
运行
复制
import signal
import sys
import pika
from time import sleep

received_signal = False
all_over = False

def signal_handler(signal, frame):
    global received_signal
    print "signal received"
    received_signal = True

signal.signal(signal.SIGINT, signal_handler)
signal.signal(signal.SIGTERM, signal_handler)

mq_connection = pika.BlockingConnection(pika.ConnectionParameters(my_mq_server, virtual_host='test'))
mq_channel = mq_connection.channel()

def callback(ch, method, properties, body):
    if received_signal:
        print "Exiting, as a kill signal is already received"
        exit(0)
    print body
    sleep(50)
    mq_channel.basic_ack(delivery_tag=method.delivery_tag)
    print "Message consumption complete"

    if received_signal:
        print "Exiting, as a kill signal is already received"
        exit(0)

try:
    print ' [*] Waiting for messages. To exit press CTRL+C'
    mq_channel.basic_consume(callback, queue='test')
    mq_channel.start_consuming()
except Exception:
    mq_channel.close()
    exit()

这是我在这里的第一个问题,如果需要更多细节,请让我知道。

EN

回答 1

Stack Overflow用户

发布于 2014-05-09 23:47:19

我认为这就是你想要的:

代码语言:javascript
运行
复制
#!/usr/bin/python

import signal
import sys 
import pika
from contextlib import contextmanager

received_signal = False
processing_callback = False

def signal_handler(signal, frame):
    global received_signal
    print "signal received"
    received_signal = True
    if not processing_callback:
         sys.exit()

signal.signal(signal.SIGINT, signal_handler)
signal.signal(signal.SIGTERM, signal_handler)

@contextmanager
def block_signals():
    global processing_callback
    processing_callback = True
    try:
        yield
    finally:
        processing_callback = False
        if received_signal:
            sys.exit()

def callback(ch, method, properties, body):
    with block_signals:
        print body
        sum(xrange(0, 200050000)) # sleep gets interrupted by signals, this doesn't.
        mq_channel.basic_ack(delivery_tag=method.delivery_tag)
        print "Message consumption complete"

if __name__ == "__main__":    
    try:
        mq_connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
        mq_channel = mq_connection.channel()
        print ' [*] Waiting for messages. To exit press CTRL+C'
        mq_channel.basic_consume(callback, queue='test')
        mq_channel.start_consuming()
    except Exception as e:
        mq_channel.close()
        sys.exit()

我使用了一个contextmanager来处理阻塞信号,因此所有的逻辑都隐藏在回调本身之外。这也应该使代码的重用变得更容易。为了弄清楚它是如何工作的,它等同于:

代码语言:javascript
运行
复制
def callback(ch, method, properties, body):
    global processing_callback
    processing_callback = True
    try:
        print body
        sum(xrange(0, 200050000))
        mq_channel.basic_ack(delivery_tag=method.delivery_tag)
        print "Message consumption complete"
    finally:
        processing_callback = False
        if received_signal:
            sys.exit()
票数 4
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/23387803

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档