前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >Python操作Rabbit MQ的5种

Python操作Rabbit MQ的5种

作者头像
py3study
发布2020-01-19 15:15:46
7930
发布2020-01-19 15:15:46
举报
文章被收录于专栏:python3

python版本:   2.7.14

一 消息生产者代码:

代码语言:javascript
复制
 1 # -*- coding: utf-8 -*-
 2 
 3 import json
 4 import pika
 5 import urllib
 6 import urllib2
 7 import chardet
 8 import sys
 9 import json
10 from common import CommonMethod
11 import pika
12 import time
13 
14 HOST_NAME = "172.21.204.14"
15 USER_NAME = "xxx"
16 PASSWORD = "xxx"
17 
18 # 1."Hello World!"
19 def hello_world():
20     credentials = pika.PlainCredentials(USER_NAME, PASSWORD)
21     connection = pika.BlockingConnection(pika.ConnectionParameters(HOST_NAME, 5672,'/', credentials))
22     channel = connection.channel()
23     
24     channel.queue_declare(queue='hello')
25     channel.basic_publish(exchange='',
26                             routing_key='hello',      # specify queue  name
27                             body='Hello World!')
28     print(" [x] Sent 'Hello World!'")
29     connection.close()
30 
31 # 2."Work queues"
32 def new_task():
33     credentials = pika.PlainCredentials(USER_NAME, PASSWORD)
34     connection = pika.BlockingConnection(pika.ConnectionParameters(HOST_NAME, 5672,'/', credentials))
35     channel = connection.channel()
36 
37     channel.queue_declare(queue='task_queue', durable=True)   # 设置队列持久化
38     message = ' '.join(sys.argv[1:]) or "Hello World!"
39     channel.basic_publish(exchange='',
40                         routing_key='task_queue',
41                         body=message,
42                         properties=pika.BasicProperties(
43                             delivery_mode = 2,                # 设置消息持久化
44                         ))
45     print(" [x] Sent %r" % message) 
46     connection.close()
47 
48 # 3."Publish/Subscribe"
49 def emit_log(message):
50     credentials = pika.PlainCredentials(USER_NAME, PASSWORD)
51     connection = pika.BlockingConnection(pika.ConnectionParameters(HOST_NAME, 5672,'/', credentials))
52     channel = connection.channel()
53 
54     channel.exchange_declare(exchange='logs',        # 申明logs交换机
55                          exchange_type='fanout')     # 交换机类型: 发布/订阅
56 
57     channel.basic_publish(exchange='logs',
58                         routing_key='',
59                         body=message)
60     print(" [x] Sent %r" % message)
61     connection.close()
62 
63 # 4."Routing"
64 def emit_log_direct(log_level,message):
65     credentials = pika.PlainCredentials(USER_NAME, PASSWORD)
66     connection = pika.BlockingConnection(pika.ConnectionParameters(HOST_NAME, 5672,'/', credentials))
67     channel = connection.channel()
68 
69     channel.exchange_declare(exchange='direct_logs', # 申明logs交换机
70                          exchange_type='direct')     # 交换机类型: 路由(Routing)
71 
72     channel.basic_publish(exchange='direct_logs',
73                         routing_key=log_level,
74                         body=message)
75     print(" [x] Sent %r:%r" % (log_level, message))
76     connection.close()
77 
78 emit_log_direct("info", "info log message:...")
79 emit_log_direct("error", "error log message:...")
80 
81 # 5."Topic"
82 # 与Routing模式类似,比Routing模式多了routing_key可以使用通配符"*","#"等,使用更加灵活

View Code

二 消息消费者代码:

代码语言:javascript
复制
 1 # -*- coding: utf-8 -*-
 2 
 3 import json
 4 import pika
 5 import urllib
 6 import urllib2
 7 import chardet
 8 import sys
 9 import json
10 from common import CommonMethod
11 import pika
12 import time
13 
14 HOST_NAME = "172.21.204.14"
15 USER_NAME = "xxx"
16 PASSWORD = "xxx"
17 
18 # 1."Hello World!"
19 def hello_world():
20     credentials = pika.PlainCredentials(USER_NAME, PASSWORD)
21     connection = pika.BlockingConnection(pika.ConnectionParameters(HOST_NAME, 5672,'/', credentials))
22     channel = connection.channel()
23     
24     channel.queue_declare(queue='hello')
25     channel.basic_publish(exchange='',
26                             routing_key='hello',      # specify queue  name
27                             body='Hello World!')
28     print(" [x] Sent 'Hello World!'")
29     connection.close()
30 
31 # 2."Work queues"
32 def new_task():
33     credentials = pika.PlainCredentials(USER_NAME, PASSWORD)
34     connection = pika.BlockingConnection(pika.ConnectionParameters(HOST_NAME, 5672,'/', credentials))
35     channel = connection.channel()
36 
37     channel.queue_declare(queue='task_queue', durable=True)   # 设置队列持久化
38     message = ' '.join(sys.argv[1:]) or "Hello World!"
39     channel.basic_publish(exchange='',
40                         routing_key='task_queue',
41                         body=message,
42                         properties=pika.BasicProperties(
43                             delivery_mode = 2,                # 设置消息持久化
44                         ))
45     print(" [x] Sent %r" % message) 
46     connection.close()
47 
48 # 3."Publish/Subscribe"
49 def emit_log(message):
50     credentials = pika.PlainCredentials(USER_NAME, PASSWORD)
51     connection = pika.BlockingConnection(pika.ConnectionParameters(HOST_NAME, 5672,'/', credentials))
52     channel = connection.channel()
53 
54     channel.exchange_declare(exchange='logs',        # 申明logs交换机
55                          exchange_type='fanout')     # 交换机类型: 发布/订阅
56 
57     channel.basic_publish(exchange='logs',
58                         routing_key='',
59                         body=message)
60     print(" [x] Sent %r" % message)
61     connection.close()
62 
63 # 4."Routing"
64 def emit_log_direct(log_level,message):
65     credentials = pika.PlainCredentials(USER_NAME, PASSWORD)
66     connection = pika.BlockingConnection(pika.ConnectionParameters(HOST_NAME, 5672,'/', credentials))
67     channel = connection.channel()
68 
69     channel.exchange_declare(exchange='direct_logs', # 申明logs交换机
70                          exchange_type='direct')     # 交换机类型: 路由(Routing)
71 
72     channel.basic_publish(exchange='direct_logs',
73                         routing_key=log_level,
74                         body=message)
75     print(" [x] Sent %r:%r" % (log_level, message))
76     connection.close()
77 
78 emit_log_direct("info", "info log message:...")
79 emit_log_direct("error", "error log message:...")
80 
81 # 5."Topic"
82 # 与Routing模式类似,比Routing模式多了routing_key可以使用通配符"*","#"等,使用更加灵活

View Code

三 图片

官网参考文档: http://www.rabbitmq.com/getstarted.html

本文参与 腾讯云自媒体同步曝光计划,分享自作者个人站点/博客。
原始发表:2019/03/04 ,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 作者个人站点/博客 前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档