虽然不能跨越多个线程使用单个的PIKA连接,但我们可以使用多个线程之间连接的通道got吗?我得到了这样的错误,答案似乎是否定的。只是张贴以供参考
INFO 2019-02-07 13:14:12,927 pika.connection _on_terminate 2095: Disconnected from RabbitMQ at 127.0.0.1:5672 (505): UNEXPECTED_FRAME - expected content header for class 60, got non content header frame instead
发布于 2019-02-15 04:08:49
我是这样做的
Example using PIKA consumer without blocking thread - PIKA and GRPC Streaming
###########
def grpc_test(self, request, context):
# A response streaming GRPC implementation - Client gets stream of messages
message_queue = Queue.Queue()
app = request
def rmq_callback(data):
print("Got a call back from RMQ Client")
message_queue.put(data)
# Register with RabbitMQ for Data
# thread safe - create a connection here and a channel
pikaconsumer = TestConsumer()
# Client want to listen on this queue
pikaconsumer.listen_on_queue("xxxx", rmq_callback)
# use the connection and channel in a new thread (and no other thread)
t= threading.Thread(target=pikaconsumer.start_consuming)
t.start()
while True:
date = message_queue.get(True)
protobuf_obj = proto.Data()
message.ParseFromString(obj)
yield message
###########
class TestConsumer(object):
def __init__(self):
amqp_url ='amqp://guest:guest@127.0.0.1:5672/'
parameters = pika.URLParameters(amqp_url)
connection = pika.BlockingConnection(parameters)
self._channel = connection.channel()
def listen_on_queue(self,queue_name,exchange,routing_keys,_callback):
# In case queue is not there - create a queue
self._channel.queue_declare(queue=queue_name,auto_delete=True,)
for routing_key in routing_keys:
self._channel.queue_bind(queue_name,
exchange, str(routing_key))
LOGGER.info('Binding Exchange[%s] to Queue[%s] with RoutingKey[%s]',
exchange, queue_name, str(routing_key))
def __on_message(channel, method_frame, header_frame, body, callback=()):
print(method_frame.delivery_tag)
callback(body)
channel.basic_ack(delivery_tag=method_frame.delivery_tag)
self._consumer_tag = self._channel.basic_consume(partial(__on_message,
callback=_callback), queue_name)
def start_consuming(self):
self._channel.start_consuming()
https://stackoverflow.com/questions/54626536
复制相似问题