首页
学习
活动
专区
圈层
工具
发布
首页
学习
活动
专区
圈层
工具
MCP广场
社区首页 >问答首页 >跨多个线程使用Pika通道

跨多个线程使用Pika通道
EN

Stack Overflow用户
提问于 2019-02-11 08:32:01
回答 1查看 2.1K关注 0票数 1

虽然不能跨越多个线程使用单个的PIKA连接,但我们可以使用多个线程之间连接的通道got吗?我得到了这样的错误,答案似乎是否定的。只是张贴以供参考

代码语言:javascript
运行
复制
INFO  2019-02-07 13:14:12,927 pika.connection _on_terminate  2095: Disconnected from RabbitMQ at 127.0.0.1:5672 (505): UNEXPECTED_FRAME - expected content header for class 60, got non content header frame instead
EN

回答 1

Stack Overflow用户

发布于 2019-02-15 04:08:49

我是这样做的

代码语言:javascript
运行
复制
Example using PIKA consumer without blocking thread  - PIKA and GRPC Streaming

###########
    def grpc_test(self, request, context): 
    # A response streaming GRPC implementation - Client gets stream of messages

        message_queue = Queue.Queue()
        app = request
        def rmq_callback(data):
            print("Got a call back from RMQ Client")
            message_queue.put(data)

        # Register with RabbitMQ for Data
        # thread safe - create a connection here and a channel
        pikaconsumer = TestConsumer()
        # Client want to listen on this queue
        pikaconsumer.listen_on_queue("xxxx", rmq_callback) 
        # use the connection and channel in a new thread (and no other thread)
        t= threading.Thread(target=pikaconsumer.start_consuming)
        t.start()

        while True:
              date = message_queue.get(True)
              protobuf_obj = proto.Data()
              message.ParseFromString(obj)
              yield message

###########

class TestConsumer(object):

    def __init__(self):
        amqp_url ='amqp://guest:guest@127.0.0.1:5672/'
        parameters = pika.URLParameters(amqp_url)
        connection = pika.BlockingConnection(parameters)
        self._channel = connection.channel()


    def listen_on_queue(self,queue_name,exchange,routing_keys,_callback):
        # In case queue is  not there - create a queue
        self._channel.queue_declare(queue=queue_name,auto_delete=True,)
        for routing_key in routing_keys:
            self._channel.queue_bind(queue_name,
                                 exchange, str(routing_key))
            LOGGER.info('Binding Exchange[%s] to Queue[%s] with RoutingKey[%s]',
                    exchange, queue_name, str(routing_key))

        def __on_message(channel, method_frame, header_frame, body, callback=()):
            print(method_frame.delivery_tag)
            callback(body)
            channel.basic_ack(delivery_tag=method_frame.delivery_tag)
        self._consumer_tag = self._channel.basic_consume(partial(__on_message,
                    callback=_callback), queue_name)

    def start_consuming(self):
        self._channel.start_consuming()
票数 0
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/54626536

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档