前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >利用Spring Data Redis 来实现消息的发布订阅机制

利用Spring Data Redis 来实现消息的发布订阅机制

作者头像
阿提说说
发布2022-12-02 16:48:46
5830
发布2022-12-02 16:48:46
举报
文章被收录于专栏:Java技术进阶

redis是一款高性能key-value存储系统,不仅能做缓存,还能用于消息队列

这里利用Spring Data Redis 来实现消息的发布订阅机制 Demo地址:GitHub - jujunchen/redis-queue-demo: redis 实现的消息 发布/订阅机制

一共3个应用,1个发布者应用,2个订阅者应用

发布者应用

RedisConfig redis序列化配置

Person 示例传输的POJO对象

Publisher 发布服务

代码语言:javascript
复制
@Component
public class Publisher {
    @Autowired
    @Qualifier(value = "customRedisTemplate")
    private RedisTemplate redisTemplate;
    /**
     * 向指定频道发布消息
     * @param channel 频道名称
     * @param object 消息
     */
    public void publish(String channel,Object object){
        redisTemplate.convertAndSend(channel,object);
    }
}

一个发布测试类

代码语言:javascript
复制
public class DemoApplicationTests {
    @Autowired
    private  Publisher publisher;
    @Test
    public void startPublisher() {
        System.out.println("发布消息");
        //Person person = new Person("redis","10","x");
        publisher.publish("testChannel","渠道1消息");
        publisher.publish("testChannel2","渠道2消息");
    }
}

订阅者应用

MessageConfig 接收消息配置

RedisConfig redis序列化配置,与发布服务相同

Subscriber 订阅服务

MessageConfig接收消息配置

代码语言:javascript
复制
@Configuration
public class MessageConfig {
    @Autowired
    private Subscriber subscriber;
    @Autowired
    @Qualifier(value = "customRedisTemplate")
    private RedisTemplate redisTemplate;
    /**
     * RedisMessageListenerContainer充当消息侦听器容器。
     * 它用于从Redis通道接收消息并驱动注入其中的MessageListener实例。
     * 侦听器容器负责消息接收的所有线程并将其分派到侦听器进行处理。
     * 消息监听器容器是MDP和消息传递提供者之间的中介,并负责注册以接收消息,资源获取和释放,异常转换等。
     *
     * 此外,为了最小化应用程序占用空间,RedisMessageListenerContainer允许多个侦听器共享一个连接和一个线程,即使它们不共享订阅。
     * 因此,无论应用程序跟踪多少个侦听器或通道,运行时成本在其整个生命周期内保持不变。
     * 此外,容器允许更改运行时配置,以便您可以在应用程序运行时添加或删除侦听器,而无需重新启动。
     * 此外,容器使用延迟订阅方法,仅在需要时使用RedisConnection。
     * 如果所有侦听器都已取消订阅,则会自动执行清理,并释放该线程。
     * 为了帮助消息的异步性,容器需要一个java.util.concurrent.Executor(或Spring的TaskExecutor)来分派消息。
     * 根据负载,侦听器数量或运行时环境,您应该更改或调整执行程序以更好地满足您的需求。 强烈建议选择适当的TaskExecutor来利用其运行时。
     * @param listenerAdapter
     * @return
     */
    @Bean
    RedisMessageListenerContainer container(MessageListenerAdapter listenerAdapter) {
        RedisMessageListenerContainer container = new RedisMessageListenerContainer();
        container.setConnectionFactory(redisTemplate.getConnectionFactory());
        List<Topic> topicList = new ArrayList<>();
        topicList.add(new PatternTopic("testChannel"));
        container.addMessageListener(listenerAdapter, topicList);
        return container;
    }
    /**
     * 消息侦听器适配器,能将消息委托给目标侦听器方法
     * @return
     */
    @Bean
    MessageListenerAdapter listenerAdapter() {
        return new MessageListenerAdapter(subscriber);
    }
}

Subscriber 订阅服务

代码语言:javascript
复制
@Component
public class Subscriber implements MessageListener{
    @Autowired
    @Qualifier(value = "customRedisTemplate")
    private RedisTemplate redisTemplate;
    /**
     * 每次新消息到达时,都会调用回调
     * @param message
     * @param pattern
     */
    @Override
    public void onMessage(Message message, byte[] pattern) {
        RedisSerializer<?> keySerializer = redisTemplate.getKeySerializer();
        RedisSerializer<?> valueSerializer = redisTemplate.getValueSerializer();
        Object channel = keySerializer.deserialize(message.getChannel());
        Object body = valueSerializer.deserialize(message.getBody());
        System.out.println("渠道: " + channel);
        System.out.println("消息内容: " + String.valueOf(body));
    }
}

当我跑下发布服务测试用例的时候,两个订阅者分别会收到来自订阅渠道的消息

本文参与 腾讯云自媒体同步曝光计划,分享自作者个人站点/博客。
原始发表:2022-06-12,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 作者个人站点/博客 前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 发布者应用
  • 订阅者应用
相关产品与服务
文件存储
文件存储(Cloud File Storage,CFS)为您提供安全可靠、可扩展的共享文件存储服务。文件存储可与腾讯云服务器、容器服务、批量计算等服务搭配使用,为多个计算节点提供容量和性能可弹性扩展的高性能共享存储。腾讯云文件存储的管理界面简单、易使用,可实现对现有应用的无缝集成;按实际用量付费,为您节约成本,简化 IT 运维工作。
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档