前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >消息中间件工作队列 — RabbitMQ

消息中间件工作队列 — RabbitMQ

作者头像
木野归郎
发布2020-11-25 12:00:13
3990
发布2020-11-25 12:00:13
举报
文章被收录于专栏:share ai happiness

工作队列

工作队列(又称:任务队列——Task Queues)是为了避免等待一些占用大量资源、时间的操作。当我们把任务(Task)当作消息发送到队列中,一个运行在后台的工作者(worker)进程就会取出任务然后处理。当你运行多个工作者(workers),任务就会在它们之间共享。这个概念在网络应用中是非常有用的,它可以在短暂的HTTP请求中处理一些复杂的任务。

RabbitMQ分发策略:轮询和公平分发。

轮询分发:

如果现在有两个消费者,生产者产生的消息会轮流分发给两个消费者。

公平分发:

比如有两个工作者(workers),处理奇数消息的比较繁忙,处理偶数消息的比较轻松。然而RabbitMQ并不知道这些,它仍然一如既往的派发消息。

这时因为RabbitMQ只管分发进入队列的消息,不会关心有多少消费者(consumer)没有作出响应。它盲目的把第n-th条消息发给第n-th个消费者。

我们可以使用basic.qos方法,并设置prefetch_count=1。这样是告诉RabbitMQ,在同一时刻,不要发送超过1条消息给一个工作者(worker),直到它已经处理了上一条消息并且作出了响应。这样,RabbitMQ就会把消息分发给下一个空闲的工作者(worker)。

代码语言:javascript
复制
channel.basic_qos(prefetch_count=1)

关于队列大小

如果所有的工作者都处理繁忙状态,你的队列就会被填满。你需要留意这个问题,要么添加更多的工作者(workers),要么使用其他策略。

点对点进行发布:

生产者代码:

代码语言:javascript
复制
import pika
import json

credentials = pika.PlainCredentials('xuan', '123456')  # mq用户名和密码
# 虚拟队列需要指定参数 virtual_host,如果是默认的可以不填。
connection = pika.BlockingConnection(pika.ConnectionParameters(host = '210.30.97.163',port = 5672,virtual_host = '/',credentials = credentials))
channel=connection.channel()
# 声明消息队列,消息将在这个队列传递,如不存在,则创建
result = channel.queue_declare(queue = 'mq-test', durable=True)

for i in range(10):
    message=json.dumps({'CID':"1000%s"%i})
# 向队列插入数值 routing_key是队列名
    channel.basic_publish(exchange = '',routing_key = 'mq-test',body = message)
    print(message)
connection.close()

消费者代码:

代码语言:javascript
复制
import pika

credentials = pika.PlainCredentials('xuan', '123456')
connection = pika.BlockingConnection(pika.ConnectionParameters(host = '210.30.97.163',port = 5672,virtual_host = '/',credentials = credentials))
channel = connection.channel()
# 申明消息队列,消息在这个队列传递,如果不存在,则创建队列
channel.queue_declare(queue = 'mq-test', durable = True)
# 定义一个回调函数来处理消息队列中的消息,这里是打印出来
def callback(ch, method, properties, body):
    ch.basic_ack(delivery_tag = method.delivery_tag)
    print(body.decode())

# 告诉rabbitmq,用callback来接收消息
channel.basic_consume('mq-test',callback)
# 开始接收信息,并进入阻塞状态,队列里有信息才会调用callback进行处理
channel.start_consuming()

一对多进行发布:

生产者代码:

代码语言:javascript
复制
#!/usr/bin/env python
import pika
import sys

credentials = pika.PlainCredentials('xuan', '123456')  # mq用户名和密码
connection = pika.BlockingConnection(pika.ConnectionParameters(host = '210.30.97.163',port = 5672,virtual_host = '/',credentials = credentials))
channel=connection.channel()

channel.queue_declare(queue='task_queue', durable=True)

message = ' '.join(sys.argv[1:]) or "Hello World!"
channel.basic_publish(
    exchange='',
    routing_key='task_queue',
    body=message,
    properties=pika.BasicProperties(
        delivery_mode=2,  # make message persistent
    ))
print(" [x] Sent %r" % message)
connection.close()

消费者代码:

代码语言:javascript
复制
#!/usr/bin/env python
import pika
import time

credentials = pika.PlainCredentials('xuan', '123456')  # mq用户名和密码
connection = pika.BlockingConnection(pika.ConnectionParameters(host = '210.30.97.163',port = 5672,virtual_host = '/',credentials = credentials))
channel=connection.channel()

channel.queue_declare(queue='task_queue', durable=True)
print(' [*] Waiting for messages. To exit press CTRL+C')

# 当工作者(worker)完成了任务,就发送一个响应。
#下面的代码,我们发现即使使用CTRL+C杀掉了一个工作者(worker)进程,消息也不会丢失。当工作者(worker)挂掉这后,所有没有响应的消息都会重新发送。
def callback(ch, method, properties, body):
    print(" [x] Received %r" % body.decode())
    time.sleep(body.count(b'.'))
    print(" [x] Done")
    ch.basic_ack(delivery_tag=method.delivery_tag)

#可以使用basic.qos方法,并设置prefetch_count=1。这样是告诉RabbitMQ,再同一时刻,不要发送超过1条消息给一个工作者(worker)
channel.basic_qos(prefetch_count=1)
channel.basic_consume(queue='task_queue', on_message_callback=callback)

channel.start_consuming()
本文参与 腾讯云自媒体同步曝光计划,分享自微信公众号。
原始发表:2020-11-21,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 OnlyCoding 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档