前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >Python3 通过 kombu 连接

Python3 通过 kombu 连接

作者头像
py3study
发布2020-01-03 16:30:44
7290
发布2020-01-03 16:30:44
举报
文章被收录于专栏:python3python3

【RabbitMQ 服务器】

代码语言:javascript
复制
# 在 vhosttest 里面有 exchangetest 和 queuetest 通过 rkeytest 绑定
Broker: 192.168.0.xx
virtual host: vhosttest
Exchange: exchangetest 
Queue: queuetest 
Routing key: rkeytest

【Python 环境】

代码语言:javascript
复制
OS: Windows 10
Python: 3.6.3 x64
kombu: 4.1.0

【查看队列状态】

代码语言:javascript
复制
# 通过浏览器查看队列状态
http://192.168.0.xx:15672/api/queues/vhosttest/queuetest  

# 通过命令行查看队列状态curl -u user:password 
http://192.168.0.xx:15672/api/queues/vhosttest/queuetest  |  jq 

# 通过命令行查看队列长度(messages = messages_ready + messages_unacknowledged)
curl -s -u user:password http://192.168.0.xx:15672/api/queues/vhosttest/queuetest  | \
    jq '.messages'

【send.py】

代码语言:javascript
复制
#encoding: utf-8
#author: walker
#date: 2018-03-09
#summary: 发送方/生产者

import os, sys, time
from kombu import Connection

def Main():
	with Connection('amqp://test:test@192.168.0.xx:5672/vhosttest') as conn:
		with conn.channel() as channel:
			#producer = Producer(channel)
			producer = channel.Producer()
			
			while True:
				message = time.strftime('%H:%M:%S', time.localtime())
				producer.publish(
						body=message,
						retry=True,
						exchange='exchangetest',
						routing_key='rkeytest'
					)
				print('send message: %s' %  message)     
					
				while True:
				        # 检查队列,以重新得到消息计数
					queue = channel.queue_declare(queue='queuetest', passive=True)
					messageCount = queue.message_count
					print('messageCount: %d' % messageCount)
					if messageCount < 100:
						break
					time.sleep(1)
					
					
if __name__ == '__main__':
	Main()

【recv.py】

代码语言:javascript
复制
#encoding: utf-8
#author: walker
#date:  2018-03-09
#summary: 接收方/消费者

import os, sys, time
from kombu import Connection, Queue
from kombu.mixins import ConsumerMixin

class C(ConsumerMixin):

	def __init__(self, connection, queueNmae):
		self.connection = connection
		self.queues = [Queue(queueNmae, durable=False)]

	def get_consumers(self, Consumer, channel):
		return [
			Consumer(self.queues, callbacks=[self.on_message]),
		]

	# 接收处理消息的回调函数
	def on_message(self, body, message):
		print("Received %s" % body)
		message.ack()
		
def Main():
	with Connection('amqp://test:test@192.168.0.xx:5672/vhosttest') as conn:
		C(conn, 'queuetest').run()					
					
if __name__ == '__main__':
	Main()	
本文参与 腾讯云自媒体同步曝光计划,分享自作者个人站点/博客。
原始发表:2019/09/27 ,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 作者个人站点/博客 前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档