前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >redis实现消息队列

redis实现消息队列

原创
作者头像
shigen
发布2023-09-08 07:59:40
1.4K0
发布2023-09-08 07:59:40
举报
文章被收录于专栏:shigen的学习笔记

背景

消息队列(Message Queue)是一种常见的软件架构模式,用于在分布式系统中传递和处理异步消息。它解耦了发送消息的应用程序和接收消息的应用程序之间的直接依赖关系,使得消息的发送者和接收者可以独立地演化和扩展。

消息队列的基本原理是发送者将消息发送到一个中间代理(即消息队列),然后接收者从该中间代理中消费消息。中间代理充当了消息的缓冲区,确保消息的可靠传递和持久化存储(根据需要),同时提供了高吞吐量、低延迟和可伸缩性。

消息队列的原理图
消息队列的原理图

相信在做分布式服务开发的时候,或多或少的使用到了消息队列,如主流的kafkarocketMQ。总结下来,消息队列的优点包括:

  • 异步通信:发送者和接收者之间的解耦,使得它们可以独立地操作和演化,无需实时等待回应。
  • 应用解耦:消息队列使不同的应用程序能够以独立的方式进行开发、部署和伸缩,降低了系统之间的耦合度。
  • 削峰填谷:消息队列可以作为缓冲区,处理突发的请求和高峰期的流量,从而减轻系统的压力。
  • 消息持久化:消息队列可以将消息持久化存储,确保在异常情况下不会丢失消息。
  • 可靠性和扩展性:消息队列提供了高可靠性和可伸缩性,通过多个消费者处理大量的消息。

总而言之,消息队列是一种强大的软件架构模式,通过解耦应用程序之间的依赖关系,提供了高可靠性、高吞吐量和可伸缩性的消息传递机制。它在构建分布式系统、处理异步任务和解决系统耦合等方面发挥着重要作用。

那今天的案例呢,没有使用到kafka rocketMQ, 而是继续我的专题redis

redis实现消息队列

list

list这种数据结构天然的支持消息队列,常用的命令如下:

命令

描述

LPUSH key value

在列表头部插入一个或多个值

RPUSH key value

在列表尾部插入一个或多个值

LPOP key

弹出并返回列表头部的一个值

RPOP key

弹出并返回列表尾部的一个值

LRANGE key start stop

获取列表中指定范围内的所有值

LLEN key

获取列表的长度

list实现消息队列的常用命令案例
list实现消息队列的常用命令案例

好的,这个shigen用Java的代码实现以下:

  • 创建消息队列服务类redisMessageQueueService

主要的是三个方法,发送数据、消费数据和判断消息队列是否为空。

  • 消息处理类messProcessor

这个类或者说是组件主要是处理消息,这里简单的在控制台输出打印。

  • 系统的接口messageQueueController

其实就是通过接口的方式调用messageQueueServie,实现消息的发送和接受消费。

那最终的效果是什么样的呢》我本地使用的是curl进行的进一步的测试。

list实现的方式测试效果:

list实现的方式测试效果
list实现的方式测试效果

最后,总结一下list实现消息队列的优缺点:

优点:

  • 简单易用:Redis的List数据结构操作简单,易于理解和使用。
  • 支持多样化操作:List数据结构提供了丰富的操作方法,如插入、删除、获取范围等。

缺点:

消息队列的设计最重要的就是消息的防丢失问题

  • 缺乏消息确认机制:List方式没有内置的消息确认机制,当消费者处理消息失败或发生异常时,消息可能会丢失。
  • 不支持消息持久化:Redis的List数据结构默认存储在内存中,当Redis重启或宕机时,消息也会丢失。
  • 不适合高并发场景:在高并发情况下,List方式可能存在性能问题,因为LPUSH和BRPOP是单线程操作,无法充分利用多核CPU的优势。
  • 不适合多订阅者。现在的list是一对一的模式,不支持一对多的模式。

pub/sub模式

针对list一对一的模式,pub/sub可以实现一对多的模式。

常见的redis操作命令如下:

命令

描述

PUBLISH channel message

将消息 message 发送到指定的频道 channel

SUBSCRIBE channel channel ...

订阅一个或多个频道,接收这些频道中发布的消息

UNSUBSCRIBE [channel channel ...]

取消订阅一个或多个频道

PSUBSCRIBE pattern pattern ...

订阅一个或多个符合给定模式的频道

PUNSUBSCRIBE [pattern pattern ...]

取消订阅一个或多个符合给定模式的频道

PUBSUB subcommand [argument argument ...]

获取关于 Redis Pub/Sub 状态的信息

我们在控制台测试一下:

那具体的代码如何实现呢?这里依旧选取的是Java代码作为案例的设计。

  • 定义消息发布的接口并实现发送消息的操作MessagePublisherImpl
  • 消息订阅者messageSubscriberImpl
  • 配置类中加上redisMessageListenerContainer的bean
  • controller测试

服务运行,接口测试一下:

订阅多个topic的话,这样设置:

代码语言:java
复制
container.addMessageListener(messageListener, new PatternTopic("pub_channel"));
// 监听多个topic
container.addMessageListener(messageListener, new PatternTopic("pub_channel1"));

ok,貌似这种方式也显得很nice,至少比list的实现方式更nice,那它能解决实际的问题吗?我们总结一下这种方式的优缺点:

优点:

  • 实现了多个消费者订阅同一个topic

缺点

  • 数据不可靠:Redis 的 pub/sub 模式没有任何持久化机制,如果发布的消息在订阅者还没有收到前发生宕机,那么这些消息将会丢失。因此,如果需要确保数据的可靠性和持久化,需要使用 Redis 的其他数据结构或者使用 Redis 的 AOF 或 RDB 持久化机制。
  • 消息不能防止重复消费:Redis 的 pub/sub 模式不支持消息的确认和回调机制,因此,当订阅者收到消息时,无法对其进行确认,也就无法防止重复消费

那有什么好的解决方式呢?stream应需求而生。

stream

Redis 的 Stream 是一个基于时间序列的数据结构,用于存储和处理消息。Stream 可以看作是一个由消息组成的日志,每个消息都有一个唯一的 ID(可以是时间戳或其他方式生成),并且可以对消息进行按照时间的顺序和优先级进行排序。

Stream 可以支持多个消费者,并且可以保证每个消费者只能消费一次。Stream 还可以在一个组内进行消费者间负载均衡,以提高系统的可扩展性和高可用性。

常用的API如下:

API

描述

XADD

向指定的 Stream 中添加一个条目(消息)XADD key ID field string field string ...

XDEL

从指定的 Stream 中删除一个或多个条目

XRANGE

获取指定范围内的条目

XREVRANGE

获取指定范围内的逆序条目

XLEN

获取 Stream 中的条目数量

XREAD

从一个或多个 Stream 中读取待处理的条目

XGROUP

创建、管理和操作消费者组

XACK

确认一个或多个已处理的条目

XCLAIM

批量方式对待处理的条目进行声明和处理

XPENDING

获取待处理的条目信息

XTRIM

删除指定范围之外的条目

XINFO

获取 Stream 的相关信息

参考文章:基于Redis的Stream类型的完美消息队列解决方案

添加和读取消息的命令测试如下:

shigen在敲命令的时候也觉得很繁琐,有点麻烦,还是期待Java代码的api去操作消息队列。

参考文章:redis灵魂拷问:如何使用stream实现消息队列 如何在Springboot中使用Redis5的Stream

  • 定义生产消息的messageProcuder

主要是用来实现消息的发送

  • 消息的接受messageReceiver

实现了消息的ack

  • 测试接口

测试中发现了如下错误:

使用stream并不适合用jedis作为连接池。因为我之前的案例都是基于jedis的,在这里果断的放弃了。

好了,以上就是《redis实现消息队列》的全部内容了。

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 背景
  • redis实现消息队列
    • list
      • pub/sub模式
        • stream
        相关产品与服务
        云数据库 Redis
        腾讯云数据库 Redis(TencentDB for Redis)是腾讯云打造的兼容 Redis 协议的缓存和存储服务。丰富的数据结构能帮助您完成不同类型的业务场景开发。支持主从热备,提供自动容灾切换、数据备份、故障迁移、实例监控、在线扩容、数据回档等全套的数据库服务。
        领券
        问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档