前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >Python3 通过 pika 连接 R

Python3 通过 pika 连接 R

作者头像
py3study
发布2020-01-03 16:28:07
8280
发布2020-01-03 16:28:07
举报
文章被收录于专栏:python3

【RabbitMQ 服务器】

代码语言:javascript
复制
# 在 vhosttest 里面有 exchangetest 和 queuetest 通过 rkeytest 绑定
Broker: 192.168.0.xx
virtual host: vhosttest
Exchange: exchangetest 
Queue: queuetest 
Routing key: rkeytest

【Python 环境】

代码语言:javascript
复制
OS: Windows 10
Python: 3.6.3 x64
pika: 0.11.2

【查看队列状态】

代码语言:javascript
复制
# 通过浏览器查看队列状态
http://192.168.0.xx:15672/api/queues/vhosttest/queuetest 

# 通过命令行查看队列状态
curl -u user:password http://192.168.0.xx:15672/api/queues/vhosttest/queuetest  |  jq

# 通过命令行查看队列长度(messages = messages_ready + messages_unacknowledged)
curl -s -u user:password http://192.168.0.xx:15672/api/queues/vhosttest/queuetest  | \
    jq '.messages'

【send.py】

代码语言:javascript
复制
#encoding: utf-8
#author: walker
#date: 2018-01-31
#summary: 发送方/生产者

import os, sys, time
import pika

def Main():
	credentials = pika.PlainCredentials("test", "test")
	parameters = pika.ConnectionParameters(host="192.168.0.xx", 
											virtual_host='vhosttest', 
											credentials=credentials)
	connection = pika.BlockingConnection(parameters)    # 连接 RabbitMQ

	channel = connection.channel()          # 创建频道
	
	queue = channel.queue_declare(queue='queuetest')     # 声明或创建队列
	
	while True:  # 循环向队列中发送信息
		message = time.strftime('%H:%M:%S', time.localtime())
		channel.basic_publish(exchange='exchangetest',  
								routing_key='rkeytest',
								body=message)
		
		print('send message: %s' %  message)     

		while True:   
			# 检查队列,以重新得到消息计数
			queue = channel.queue_declare(queue='queuetest', passive=True)  
			'''
			 queue.method.message_count 获取的为 ready 的消息数
			 截至 2018-03-06(pika 0.11.2)
			 walker 没找到利用 pika 获取 unack 或者 total 消息数的方法  
			''' 
			messageCount = queue.method.message_count
			print('messageCount: %d' % messageCount)
			if messageCount < 100:
				break
			connection.sleep(1)
	
	# 关闭连接
	connection.close()

if __name__ == '__main__':
	Main()

【recv.py - 版本1】

一个消费者

代码语言:javascript
复制
#encoding: utf-8
#author: walker
#date: 2018-01-31
#summary: 接收方/消费者

import os, sys, time
import pika

# 接收处理消息的回调函数
def ConsumerCallback (channel, method, properties, body):
	print("Received %s" % body)


def Main():
	credentials = pika.PlainCredentials("test", "test")
	parameters = pika.ConnectionParameters(host="192.168.0.xx", 
											virtual_host='vhosttest', 
											credentials=credentials)
	connection = pika.BlockingConnection(parameters)    # 连接 RabbitMQ
	
	channel = connection.channel()          # 创建频道
	
	queue = channel.queue_declare(queue='queuetest')     # 声明或创建队列
	
	# no_ack=True 开启自动确认,不然消费后的消息会一直留在队列里面
	# no_ack = no_manual_ack = auto_ack;不手动应答,开启自动应答模式
	channel.basic_consume(ConsumerCallback, queue='queuetest', no_ack=True)
	print('Wait Message ...')
	
	channel.start_consuming()

if __name__ == '__main__':
	Main()

【recv.py - 版本2】

利用多线程实现多个消费者同时消费

代码语言:javascript
复制
#encoding: utf-8
#author: walker
#date: 2018-03-9
#summary: 接收方/消费者

import os, sys, time
import pika
import threading
from queue import Queue

GlobalQueue = Queue(10)

class Consumer(threading.Thread):
	def run(self):
		while True:
			task = GlobalQueue.get()
			print('thread-%d,\ttask: %s' % (threading.get_ident(), task))

# 接收处理消息的回调函数
def ConsumerCallback (channel, method, properties, body):
	# 将消息推入队列
	GlobalQueue.put(body)


def Main():
	credentials = pika.PlainCredentials("test", "test")
	parameters = pika.ConnectionParameters(host="192.168.0.86", 
											virtual_host='vhosttest', 
											credentials=credentials)
	connection = pika.BlockingConnection(parameters)    # 连接 RabbitMQ
	
	channel = connection.channel()          # 创建频道
	channel.basic_qos(prefetch_size=0, prefetch_count=1, all_channels=True) # 公平消费
	
	# no_ack=True 开启自动确认,不然消费后的消息会一直留在队列里面
	# no_ack = no_manual_ack = auto_ack;不手动应答,开启自动应答模式
	channel.basic_consume(ConsumerCallback, queue='queuetest', no_ack=True)
	print('Wait Message ...')

	for i in range(3):
		c = Consumer()
		c.start()

	channel.start_consuming()   # 开始接收任务
	
if __name__ == '__main__':
	Main()

【后记】

  • 如果希望不通过 exchange 路由转发,直接给队列发送消息,可以将 exchange 设为空字符串,routing_key 设为队列名。
代码语言:javascript
复制
channel.basic_publish(exchange='',  
						routing_key='queuetest',
						body=task)

【0.x 到 1.x 的迁移】

  • pika.ConnectionParameters
代码语言:javascript
复制
# 0.x 版本
pika.ConnectionParameters(host=Host, 
							virtual_host=VirtualHost, 
							credentials=pika.PlainCredentials(User, Pwd),
							heartbeat_interval=0)
# 1.x 版本					
pika.ConnectionParameters(host=MQHost, 
								virtual_host=MQVirtualHost, 
								credentials=credentials,
								heartbeat=0)
  • channel.basic_qos
代码语言:javascript
复制
# 0.x 版本
channel.basic_qos(prefetch_size=0, prefetch_count=1, all_channels=True)
# 1.x 版本
channel.basic_qos(prefetch_size=0, prefetch_count=1, global_qos=True)
  • channel.basic_consume,终于换掉了坑爹的 no_ack 命名
代码语言:javascript
复制
# 0.x 版本
# no_ack = no_manual_ack = auto_ack;不手动应答,开启自动应答模式
channel.basic_consume(consumer_callback=ConsumerCallback, queue=MQQueueNode2Center, no_ack=False)
# 1.x 版本
channel.basic_consume(queue=MQQueueNode2Center, on_message_callback=ConsumerCallback, auto_ack=False)
本文参与 腾讯云自媒体同步曝光计划,分享自作者个人站点/博客。
原始发表:2019/09/27 ,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 作者个人站点/博客 前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档