我们都知道redis 也有发布订阅模式, 但是使用的比较少。 并且redis的发布订阅不会持久化落入磁盘。总的来说就是不可靠。
但是在一些场景我们还是会用到的。这里我们就来springboot 整合一下redis 进行发布订阅。
1,首先我们要引入 dataRedis 的jar包, 在配置消息的监听器, 指定监听的topic , 这里的topic 可以使用* 通配符 和? 来全量匹配和模糊匹配。这里就有了组的概念, 我们要是订阅某一组的信息,topic 就写成topicName.*
要是某一组下的某一类,topicName.x?x, 类似于这种写法。
2, 注册完监听器之后,在监听自己的适配器,里面监听自己自定义的方法
也可以实现MessageListener, 不需要指定方法。 里面也能获取到topic的名字。
具体代码
application,properties 的配置
# 默认选择零号数据库
spring.redis.database=0
spring.redis.host=127.0.0.1
spring.redis.port=6379
@Configuration
public class RedisCacheConfig {
@Bean
RedisMessageListenerContainer container(RedisConnectionFactory connectionFactory,
MessageListenerAdapter listenerAdapter) {
RedisMessageListenerContainer container = new RedisMessageListenerContainer();
container.setConnectionFactory(connectionFactory);
// topic 有两种,PatternTopic 和channel
container.addMessageListener(listenerAdapter, new PatternTopic("test.?2"));
// 支持多个监听适配器,不同的topic
//container.addMessageListener(listenerAdapter1, new PatternTopic("test1"));
return container;
}
@Bean
// @Primary 多个监听适配器的时候要指定哪一个是主的
MessageListenerAdapter testListenerAdapter(RedisReceiver receiver) {
// 使用适配器对象的默认方法,方法名称必须叫这个handleMessage
return new MessageListenerAdapter(receiver);
}
//
// @Bean("test1listenerAdapter")
// MessageListenerAdapter test1listenerAdapter(RedisReceiver receiver) {
// return new MessageListenerAdapter(receiver);
// }
@Bean
StringRedisTemplate template(RedisConnectionFactory connectionFactory) {
return new StringRedisTemplate(connectionFactory);
}
}
@Component
public class RedisReceiver {
//{ implements MessageListener {
//
// @Override
// public void onMessage(Message message, byte[] pattern) {
//
// System.out.println("接收到的消息 : "+new String(message.getBody()));
// System.out.println("订阅的topic 的名称 : "+new String(message.getChannel()));
//
// }
// 适配器默认找的方法名称, 名称不对,启动会报错。
public void handleMessage(String message) {
System.out.println("接收到的消息: "+message);
}
}
测试类
@Autowired
StringRedisTemplate stringRedisTemplate;
@GetMapping("/index")
public String index(){
for (int i = 0; i < 10; i++) {
stringRedisTemplate.convertAndSend("test.12", String.format("我是消息{%d}号: %tT", i, new Date()));
//stringRedisTemplate.convertAndSend("test1", String.format("我是1消息{%d}号: %tT", i, new Date()));
}
return "sucess";
}
启动运行一下看一下结果:
"test.?2" ,咱们设置的topic 和发送的topic "test.12" 说明模糊匹配是可以的, 再试一下全量匹配。
把 "test.?2" 变成 test.* 发布的时候置顶两个。
再次运行看一下结果
我们这就有了组的概念, 还有人说。我要是想知道我真正的topicName , 我改咋办啊。
@Component
public class RedisReceiver implements MessageListener {
@Override
public void onMessage(Message message, byte[] pattern) {
System.out.println("接收到的消息 : "+new String(message.getBody()));
System.out.println("订阅的topic 的名称 : "+new String(message.getChannel()));
}
}
把刚才适配器里面的对象注释的代码放开,再次运行我们看看结果
这里我们就可以根据不同的topicName 走不同的业务逻辑了。
我上面的代码也支持写多个监听适配器,我们在把代码注释放开
@Configuration
public class RedisCacheConfig {
@Bean
RedisMessageListenerContainer container(RedisConnectionFactory connectionFactory,
MessageListenerAdapter listenerAdapter,@Qualifier("test1listenerAdapter")MessageListenerAdapter listenerAdapter1) {
RedisMessageListenerContainer container = new RedisMessageListenerContainer();
container.setConnectionFactory(connectionFactory);
container.addMessageListener(listenerAdapter, new PatternTopic("test.*"));
container.addMessageListener(listenerAdapter1, new PatternTopic("prod.*"));
return container;
}
@Bean
@Primary
MessageListenerAdapter testListenerAdapter(RedisReceiver receiver) {
return new MessageListenerAdapter(receiver);
}
@Bean("test1listenerAdapter")
MessageListenerAdapter test1listenerAdapter(RedisReceiver receiver) {
return new MessageListenerAdapter(receiver);
}
@Bean
StringRedisTemplate template(RedisConnectionFactory connectionFactory) {
return new StringRedisTemplate(connectionFactory);
}
}
测试代码
@Autowired
StringRedisTemplate stringRedisTemplate;
@GetMapping("/index")
public String index(){
for (int i = 0; i < 10; i++) {
stringRedisTemplate.convertAndSend("test.2", String.format("我是2消息{%d}号: %tT", i, new Date()));
stringRedisTemplate.convertAndSend("prod.1", String.format("我是1消息{%d}号: %tT", i, new Date()));
}
return "sucess";
}
运行看一下结果
好了,今天的springboot整合redis 消息的发布订阅就完成了。
这里有一个问题,就是新加入的订阅者不会消费之后的数据,也不支持动态的添加topicName , 就是发布者。
喜欢,点赞,转发。