前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >深入理解Redis的Pub/Sub模式

深入理解Redis的Pub/Sub模式

作者头像
烟雨平生
发布2023-11-07 17:02:56
4640
发布2023-11-07 17:02:56
举报
文章被收录于专栏:数字化之路数字化之路
  • 什么是pub/sub?

  • Redis的pub/sub指令
  • Redis pub/sub的适用场景
  • Redis pub/sub指令的注意事项及缺点
  • 基于spring-boot-starter-data-redis实现pub/sub
  • 小结

什么是pub/sub?

Pub/Sub(发布/订阅)是一种消息传递模式,它允许一个或多个订阅者监听一个特定的主题(频道),当有新的消息发布到该主题时,所有订阅者都会收到通知。

这种模式在分布式系统中非常常见,因为它可以解耦生产者和消费者之间的关系,使得系统更加灵活和可扩展。 RocketMQ、RabbitMQ也支持Pub/Sub的消息传递模式。

以RocketMQ为例,Pub/Sub的结构如下:

RocketMQ 中消息的生命周期主要分为消息生产、消息存储、消息消费这三部分。 生产者生产消息并发送至RocketMQ 服务端,消息被存储在服务端的主题[Topic]中,消费者通过订阅主题[Topic]消费消息。 Redis场景也类似,不同的是消息发送到了Redis服务器。 JackieTang,公众号:的数字化之路RocketMQ系列 | 如何让消息“丢失”?

Redis的pub/sub指令

Redis实现的“发布/订阅”模式可以实现进程间的消息传递,其原理是这样的: “发布/订阅”模式中包含两种角色,分别是发布者和订阅者。订阅者可以订阅一个或若干个频道(channel),而发布者可以向指定的频道发送消息,所有订阅此频道的订阅者都会收到此消息。

Redis消息队列不支持消息的多播机制。 消息多播允许 生产者只生产一次消息,由中间件负责将消息复制到多个消息队列,每个消息队列由相应的消费组进行消费。支持了消息多播,不同消费组的逻辑就可以放到不同的子系统中。 为了支持多播,Redis不再依赖那5种基本类型了,它单独使用了一个模块来支持消息多播,这个模块的名字叫做PubSub,也就是PublisherSubscriber(发布者/订阅者模式)。

Redis提供了一组命令可以让开发者实现“发布/订阅”(publish/subscribe)模式,包括以下几个指令:

  1. PUBLISH:用于发布消息到指定的频道。用法是PUBLISH channel message
  2. SUBSCRIBE:用于订阅一个或多个频道。用法是SUBSCRIBE channel [channel...]
  3. UNSUBSCRIBE:用于取消订阅一个或多个频道。
  4. PSUBSCRIBE:用于订阅一个或多个频道,但不会立即开始接收消息,而是等待客户端执行SUBSCRIBE命令后才开始接收。
  5. PUNSUBSCRIBE:用于取消订阅一个或多个频道,但不会立即停止接收消息,而是等待客户端执行UNSUBSCRIBE命令后才会停止。

Redis pub/sub的适用场景

Redis的Pub/Sub模式适用于以下场景:

  1. 实时消息推送:如新闻更新、股票价格变动等。
  2. 事件驱动系统:如用户注册、订单创建等事件的通知。
  3. 分布式系统中的数据同步:如数据库的主从复制、分布式缓存等。

Redis pub/sub指令的注意事项及缺点

在使用Redis的Pub/Sub模式时,需要注意以下几点:

  1. 频道名必须是字符串类型。
  2. 发布的消息必须是字符串类型。
  3. 订阅和取消订阅频道的操作是异步的,不会阻塞客户端的其他操作。
  4. 如果客户端断开了与Redis服务器的连接,那么它订阅的所有频道都会被自动取消订阅。

在写demo之前,咱们再来多看一眼Redis PubSub模块的缺点: 1、没有消息存储。 Redis只会把消息投递给当前正在的订阅的Subscriber。 如果没有消费者,此条消息就丢弃。这与RocketMQ、RabbitMQ不同。 PubSub的生产者传递过来一条消息,Redis会直接找到相应的消费者传递过去。如果一个消费者都没有,那么消息会被直接丢弃。如果开始有三个消费者,一个消费者突然挂掉了,生产者会继续发送消息,另外两个消费者可以持续收到消息,但是当挂掉的消费者重新连接上的时候,在断连期间生产者发送的消息,对于这个消费者来说就是彻底丢失了。

2、Redis宕掉,期间所有的消息都丢失。

如果Redis停机重启,PubSub的消息是不会持久化的,毕竟Redis的宕机就相当于一个Subscriber都没有,所有的消息会被直接丢弃。

为弥补这个不足,2018.6,Redis5.0新增了Stream数据结构,这个功能给Redis带来了持久化消息队列。有兴趣的同学可以了解下。

基于spring-boot-starter-data-redis实现pub/sub

代码语言:javascript
复制
      <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-data-redis</artifactId>
        <version>2.7.16</version>
      </dependency>

定义Subscriber,这个类需要实现MessageListener接口:

代码语言:javascript
复制

import lombok.extern.slf4j.Slf4j;
import org.springframework.data.redis.connection.Message;
import org.springframework.data.redis.connection.MessageListener;

/**
 * @Auther: cheng.tang
 * @Date: 2023/10/17
 * @Description: redis-subject
 */
@Slf4j
public class CustomMessageListener implements MessageListener {

    public static final String CUSTOM_CHANNEL_PREFIX = "event:customer:channel:";

    @Override
    public void onMessage(Message message, byte[] pattern) {
        String channel = new String(message.getChannel());
        String body = new String(message.getBody());
        log.info(" channel {} body {} pattern {} ", channel, body, new String(pattern));
    }


}

绑定订阅关系(Subscription),注册Subscriber:

代码语言:javascript
复制

import com.tangcheng.redislistener.expire.listener.CustomMessageListener;
import lombok.extern.slf4j.Slf4j;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.core.task.TaskExecutor;
import org.springframework.data.redis.connection.RedisConnectionFactory;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.data.redis.listener.PatternTopic;
import org.springframework.data.redis.listener.RedisMessageListenerContainer;
import org.springframework.data.redis.serializer.RedisSerializer;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;

import java.util.concurrent.ThreadPoolExecutor;

import static com.tangcheng.redislistener.expire.listener.CustomMessageListener.CUSTOM_CHANNEL_PREFIX;

/**
 * @Auther: cheng.tang
 * @Date: 2023/10/17
 * @Description: redis-subject
 */
@Configuration
@Slf4j
public class MessageListenerContainerConfig {

    @Bean
    public RedisTemplate<String, Object> redisTemplate(RedisConnectionFactory connectionFactory) {
        RedisTemplate<String, Object> redisTemplate = new RedisTemplate<>();
        redisTemplate.setConnectionFactory(connectionFactory);
        redisTemplate.setKeySerializer(RedisSerializer.string());
        redisTemplate.setValueSerializer(RedisSerializer.json());
        redisTemplate.setHashKeySerializer(RedisSerializer.string());
        redisTemplate.setHashValueSerializer(RedisSerializer.json());
        return redisTemplate;
    }

    /**
     * 绑定订阅关系。subscriber和channel或topic的关系
     *
     * @param connectionFactory
     * @param threadPoolTaskExecutor
     * @return
     */
    @Bean
    public RedisMessageListenerContainer redisMessageListenerContainer(RedisConnectionFactory connectionFactory,
                                                                       TaskExecutor threadPoolTaskExecutor) {
        RedisMessageListenerContainer container = new RedisMessageListenerContainer();
        // 设置Redis的连接工厂
        container.setConnectionFactory(connectionFactory);
        // 设置监听使用的线程池
        container.setTaskExecutor(threadPoolTaskExecutor);
        container.addMessageListener(new CustomMessageListener(), new PatternTopic(CUSTOM_CHANNEL_PREFIX + "*"));
        return container;
    }

    @Bean
    public TaskExecutor threadPoolTaskExecutor() {
        ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
        executor.setCorePoolSize(10);
        executor.setMaxPoolSize(60);
        executor.setQueueCapacity(1000);
        executor.setKeepAliveSeconds(3600);
        executor.setThreadNamePrefix("redis-listener-");
        executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
        return executor;
    }


}

定义Producer类,由Producer pub消息:

代码语言:javascript
复制

import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Service;

import java.time.Duration;
import java.time.LocalDateTime;
import java.util.concurrent.ThreadLocalRandom;

import static com.tangcheng.redislistener.expire.listener.CustomMessageListener.CUSTOM_CHANNEL_PREFIX;

@Service
@Slf4j
public class RedisPubJob {

    @Autowired
    private RedisTemplate<String, Object> redisTemplate;

    @Value("${server.port:8080}")
    private int serverPort;

    @Scheduled(cron = "0 */1 * * * ?")
    public void pubMsg() {
        LocalDateTime now = LocalDateTime.now();
        log.info("开始发送消息");
        int value = ThreadLocalRandom.current().nextInt();
        redisTemplate.convertAndSend(CUSTOM_CHANNEL_PREFIX + value, serverPort);
        log.info("消息发送完成: " + Duration.between(now, LocalDateTime.now()));
    }

}

搭建一个Producer+三个Subscriber的场景,期望这样的效果: 一个topic有三个subscriber场景,Producer往指定的topic pub一条消息后,订阅这个topic的三个subscirber都会消费到。 同一台JVM进程中,Redis PubSub的生产者和消费者在不同的线程中支持,也就是使用了不同的连接。因为Redis不允许连接在subscribe等待消息时还需要进行其它操作。

run起来,看下pub/sub的效果: step1: build一个jar;

step2: 启动三个jvm进程:

代码语言:javascript
复制
 java -jar redis-listener-0.0.1-SNAPSHOT.jar ---server.port=8080

 java -jar redis-listener-0.0.1-SNAPSHOT.jar ---server.port=8090

 java -jar redis-listener-0.0.1-SNAPSHOT.jar ---server.port=8099

订阅情况:

源码详见文末。

小结

总的来说,Redis的Pub/Sub模式是一种非常轻量级的消息传递模型,它可以在一些低频、低数据量的场景帮助我们实现多播的实时消息推送、事件驱动系统和分布式系统中的数据同步等功能。而在Spring Boot应用中,我们可以通过Spring Boot Starter Data Redis来轻松地实现Redis的Pub/Sub模式。

参考

show the code : https://gitee.com/baidumap/redis-subject

《Redis深度历险 核心原理与应用初中》 《Redis入门指南 》(第2版)

本文参与 腾讯云自媒体分享计划,分享自微信公众号。
原始发表:2023-11-07,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 的数字化之路 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体分享计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 什么是pub/sub?
  • Redis的pub/sub指令
  • Redis pub/sub的适用场景
  • Redis pub/sub指令的注意事项及缺点
  • 基于spring-boot-starter-data-redis实现pub/sub
  • 小结
  • 参考
相关产品与服务
云数据库 Redis
腾讯云数据库 Redis(TencentDB for Redis)是腾讯云打造的兼容 Redis 协议的缓存和存储服务。丰富的数据结构能帮助您完成不同类型的业务场景开发。支持主从热备,提供自动容灾切换、数据备份、故障迁移、实例监控、在线扩容、数据回档等全套的数据库服务。
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档