RabbitMQ(二) ——工作队列

RabbitMQ(二)——工作队列

(原创内容,转载请注明来源,谢谢)

一、概述

工作队列模式(work queue),是有多个消费者的情况下,可以共同消费队列内的内容,加快消息处理速度。这是RabbitMQ的基本工作模式。

二、使用方式

和上一篇中的生产和消费消息的方式一样,就是需要多在cli进程中打开一个消费者的php文件。即需要打开3个php,一个是生产者的php文件,两个消费者的php文件(或多个php文件)。

三、工作机制

3.1 轮询(Round-robin dispatching)

当开启多个生产者的时候,消费者产生消息并发送到队列的情况下,队列会将消息均衡的分发给同时打开的多个消费者。即采用轮询的方式,假设有两个消费者c1和c2,第一次有消息给c1,第二次有消息给c2,第三个再给c1,以此类推。

3.2 回馈机制(Message acknowledgment)

为了保证消息的可靠性,RabbitMQ允许用户采用消费者的ack机制,即只有消费者回馈给队列ack后,队列才会将消息从队列中剔除。这样,可以确保队列中的每个消息都是确认被消费者处理完毕的。

开启的方式,只要将方法basic_consume第四个参数设置成false,就表示开启ack机制。

开启ack,就必须要记得在消费者的代码总,加入回馈的代码,否则,消息会被队列认为没有消费,不断的堵在队列中,导致队列堵塞。

要查看队列中还没确认的内容,可以采用RabbitMQ的管理工具——rabbitmqctl。

       sudo rabbitmqctl list_queues namemessages_ready messages_unacknowledged

3.3 消息持久化(Message durability)

RabbitMQ具有完善的持久化机制,能够确保消息的安全性。可以在代码中开启持久化。消息持久化需要在队列和消息分别开启持久化。

1)队列持久化:

queue_declare的第三个参数设置为true。

2)消息持久化:

$msg = new AMQPMessage($data,
       array('delivery_mode' =>AMQPMessage::DELIVERY_MODE_PERSISTENT)
    );

这样可以确保消息可以保存在本地磁盘,因此即使rabbitmq的服务器宕机的时候,也可以保证消息的安全性。

当然,这也也存在一定的风险,因为操作系统自身的机制,开启持久化的时候,操作系统为了保证运作速度,消息会保存在操作系统层面的缓存,并且定时将消息存入硬盘。因此,还没存入磁盘的这一小段间隔(30秒左右),如果服务器宕机,有可能消息丢失。但是这个可能性很低,因此安全性已经很高。

3.4 公平分发(Fair dispatch)与预取机制(prefetch)

消息的轮询机制,每次有消息,队列则直接按照排好的顺序将消息传给对应的队列,有可能出现一部分队列上一条消息还没处理完,就出现了下一条的消息,而另外一部分队列则空闲的情况。

因此,RabbitMQ允许用户开启公平分发机制,让每个消费者只能接收一个消息,如果还没给队列回复ack,则消息来的时候,即使顺序轮到队列,也不会分发给它,而是分发给它下一个空队列。

开启方式很简单,在消费者的channel,在消费消息之前(使用basic_consume方法),给channel加一个qos机制:

       $channel->basic_qos(null, 1, null);

第二个参数就是要求消费者每次只能处理几个消息。

这样也存在一个隐患,即如果所有的消费者都还没ack,而生产者又不断的往队列发数据,则队列有可能会塞满。这个处理方案只有增加消费者,或者从代码、逻辑层面控制消息的产生速度。

——written by linhxx 2017.08.19

原文发布于微信公众号 - 决胜机器学习(phpthinker)

原文发表时间:2017-08-19

本文参与腾讯云自媒体分享计划,欢迎正在阅读的你也加入,一起分享。

发表于

我来说两句

0 条评论
登录 后参与评论

相关文章

来自专栏JAVA同学会

MongoDB之分片集群(Sharding)

分片(sharding)是一个通过多台机器分配数据的方法。MongoDB使用分片支持大数据集和高吞吐量的操作。大数据集和高吞吐量的数据库系统挑战着单一服务的性能...

613
来自专栏Java Edge

Kafka架构解析1之背景及架构介绍简介为何使用消息系统常用Message Queue对比Kafka架构拓扑结构Producer消息路由

3175
来自专栏IT技术精选文摘

深入讲解ActiveMQ5.X消息的持久性

我经常被问到一些基本的关于解释消息存储在ActiveMQ中是如何工作的问题。在这里我将做一个高层面的解释。注意,上下文环境是它是在JMS范围内。如果你使用的是A...

1795
来自专栏无题

消息中间件的设计与实践

也无风雨也无晴 消息中间件对应用的解耦 如登陆系统负责向消息中间件发送消息,而其他的系统则向消息中间件来订阅这个消息,然后完成自己的工作. 通过消息中间件解耦...

3416
来自专栏积累沉淀

kafka学习之路(一)——入门

kafka学习之路(一)——入门 Kafka学习之路... 一、入门.. 1、 简介 2、 主题(Topics)、日志(Logs) 3、 分布式(Distrib...

28410
来自专栏编程札记

MQ对比之RabbitMQ &Redis

Redis 是完全开源免费的,遵守BSD协议,是一个高性能的key-value数据库。

1103
来自专栏社区的朋友们

Kafka 设计原理

跟 RabbitMQ 、RocketMQ 等目前流行的开源消息中间件相比,Kakfa 具有高吞吐、低延迟等特点,在大数据、日志收集等应用场景下被广泛使用。

1.9K0
来自专栏Java技术栈

史上最全Redis面试题及答案。

花了大量时间 整理了这套Redis面试题 首发50题,绝无仅有 从入门到精通 从基础,高级知识点 再到集群,运维,方案… 弄明白了这些题 可以说可以成为面霸了 ...

5167
来自专栏谭伟华)的专栏

Amazon Aurora:云时代的数据库 ( 中)

文章详尽的介绍了 Aurora 设计背后的驱动和思考,以及如何在云上实现一个同时满足高并发、高吞吐量、高稳定性、高可用、高扩展的云数据库。

2670
来自专栏Java工程师日常干货

RocketMQ实战(二)Quick Start初步了解消息失败重试机制天然的消息负载均衡及高效的水平扩展机制 集群消费 AND 广播消费

在上一篇《RocketMQ实战(一)》中已经为大家初步介绍了下RocketMQ以及搭建了双Master环境,接下来继续为大家介绍!

842

扫码关注云+社区