在选择消息中间件的问题上,我们有很多解决方案,具体选择哪一种还是要根据实际的情况来进行确认。
如果直接有成熟的第三方消息中间件,能用就直接用,如rabbitMq
、kafka
等。
再如果,推送的消息比较简单,又恰好有个redis
,那么就选择redis
吧。
下面,将进行介绍,如果使用redis
作为消息队列,我们该如何编写这段程序。
前置工作,本次使用的工程框架直接是springBoot
,其他maven
依赖就不贴出来了,主要是要有这个redis
的依赖
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-redis</artifactId>
</dependency>
有了依赖,记得在application.yml
配置文件中加入对应redis
的配置信息
spring:
redis:
database: 0
host: localhost
port: 6379
还有一件事,redisTemplate
的这个bean
我们要进行润色一下,虽然用自带的也行,但作为一个强迫症,我还是希望我写入的key
和redis
中的key
一致
package com.banmoon.test.config;
import com.fasterxml.jackson.annotation.JsonAutoDetect;
import com.fasterxml.jackson.annotation.PropertyAccessor;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.data.redis.connection.RedisConnectionFactory;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.data.redis.serializer.Jackson2JsonRedisSerializer;
import org.springframework.data.redis.serializer.StringRedisSerializer;
@Configuration
public class RedisConfig {
@Bean
public RedisTemplate<String, Object> redisTemplate(RedisConnectionFactory factory) {
RedisTemplate<String, Object> redisTemplate = new RedisTemplate();
redisTemplate.setConnectionFactory(factory);
Jackson2JsonRedisSerializer jackson2JsonRedisSerializer = new Jackson2JsonRedisSerializer(Object.class);
ObjectMapper mapper = new ObjectMapper();
mapper.setVisibility(PropertyAccessor.ALL, JsonAutoDetect.Visibility.ANY);
mapper.enableDefaultTyping(ObjectMapper.DefaultTyping.NON_FINAL);
jackson2JsonRedisSerializer.setObjectMapper(mapper);
StringRedisSerializer stringRedisSerializer = new StringRedisSerializer();
// key采用String的序列化方式
redisTemplate.setKeySerializer(stringRedisSerializer);
// hash的key也采用String的序列化方式
redisTemplate.setHashKeySerializer(stringRedisSerializer);
// value序列化方式采用jackson
redisTemplate.setValueSerializer(jackson2JsonRedisSerializer);
// hash的value序列化方式采用jackson
redisTemplate.setHashValueSerializer(jackson2JsonRedisSerializer);
redisTemplate.afterPropertiesSet();
return redisTemplate;
}
}
好的准备工作完成,先来看生产者
package com.banmoon.test.queue.producer;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.stereotype.Component;
@Component
public class RedisTestProducer {
public static final String REDIS_TEST_KEY = "test:queue";
@Autowired
private RedisTemplate redisTemplate;
public long push (String... params) {
Long l = redisTemplate.opsForList().rightPushAll(REDIS_TEST_KEY, params);
return l;
}
}
生产者很简单,就是向redis
的list
中推送数据
主要在于消费者,该如何获取到其中的消息
package com.banmoon.test.queue.consumer;
import cn.hutool.core.util.StrUtil;
import com.banmoon.test.queue.producer.RedisTestProducer;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.stereotype.Component;
import javax.annotation.PostConstruct;
import java.util.concurrent.TimeUnit;
@Slf4j
@Component
public class RedisTestConsumer {
@Autowired
private RedisTemplate redisTemplate;
@PostConstruct
public void pop() {
new Thread(() -> {
while (true) {
try {
// 阻塞取出队首
String params = (String) redisTemplate.opsForList().leftPop(RedisTestProducer.REDIS_TEST_KEY, 10, TimeUnit.SECONDS);
if (StrUtil.isNotBlank(params))
log.info("模拟消费消息:{}", params);
// 避免高频轮循,添加休眠
TimeUnit.MILLISECONDS.sleep(1000);
} catch (InterruptedException e) {
// 不做任何处理,切记不要因为异常导致了消费线程的退出
}
}
}, RedisTestProducer.REDIS_TEST_KEY).start();
}
}
上述就是消费者,其中注意几点
bean
初始化的一个方法,大家也可以使用静态代码块,只要让这个消费线程启动就行
cpu
高载荷
启动该springBoot
项目,同时执行下面这段测试代码,调用三次生产者
package com.banmoon.test;
import com.banmoon.test.queue.producer.RedisTestProducer;
import org.junit.jupiter.api.Test;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
@SpringBootTest
class ServiceTest {
@Autowired
private RedisTestProducer redisTestProducer;
@Test
void insertTest() {
redisTestProducer.push("a", "b", "c");
}
}
查看springBoot
项目的控制台,消费者有进行消费
延迟队列的应用场景还是比较多见的,比如
很多类似的业务场景,我们不再依赖定时,使用消息中间件就可以完成这类功能。
在redis
实现延迟队列之前,我有必要说一下set
和zset
,主要是这个zset
set
大家都很熟悉,与list
不同,set
是无序且内部元素不重复。
那么zset
呢,它结合了set
和list
的特点
zset
中的元素都会关联一个分数score
,内部将通过这个score
对集合元素进行的排序。
虽然zset
集合中元素不会重复,但score
可以重复。如果有两个score
相同的元素,将按照元素的字典序进行排序。
上面描述了这么多,我们该如何使用,先看生产者
package com.banmoon.test.queue.producer;
import cn.hutool.core.date.DateField;
import cn.hutool.core.date.DateUnit;
import cn.hutool.core.date.DateUtil;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.stereotype.Component;
import java.util.Date;
@Slf4j
@Component
public class RedisTestDelayProducer {
public static final String REDIS_DELAY_TEST_KEY = "test:delay:queue";
@Autowired
private RedisTemplate redisTemplate;
public Boolean push (String params, int offset, DateField dateField) {
long score = DateUtil.offset(new Date(), dateField, offset).getTime();
Boolean b = redisTemplate.opsForZSet().addIfAbsent(REDIS_DELAY_TEST_KEY, params, score);
log.info("生产消息:{},推送是否成功:{}", params, b);
return b;
}
}
可以看到,这边使用将消费时间点的时间戳,作为了score
,生产的消息
package com.banmoon.test.queue.consumer;
import cn.hutool.core.collection.CollUtil;
import cn.hutool.core.convert.Convert;
import cn.hutool.core.util.StrUtil;
import com.banmoon.test.queue.producer.RedisTestDelayProducer;
import com.banmoon.test.queue.producer.RedisTestProducer;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.stereotype.Component;
import javax.annotation.PostConstruct;
import java.util.Date;
import java.util.Set;
import java.util.concurrent.TimeUnit;
@Slf4j
@Component
public class RedisTestDelayConsumer {
@Autowired
private RedisTemplate redisTemplate;
@PostConstruct
public void pop() {
new Thread(() -> {
while (true) {
try {
// 查看范围中的消息
Set<Object> set = redisTemplate.opsForZSet().rangeByScore(RedisTestDelayProducer.REDIS_DELAY_TEST_KEY, 0, new Date().getTime(), 0, 1);
// 判断是否为空
if (CollUtil.isNotEmpty(set)) {
String params = (String) set.iterator().next();
// 删除范围中的消息
Long success = redisTemplate.opsForZSet().remove(RedisTestDelayProducer.REDIS_DELAY_TEST_KEY, params);
if (success > 0) {
log.info("模拟消费消息:{}", params);
}
} else {
// 避免高频轮循,添加休眠
TimeUnit.MILLISECONDS.sleep(1000);
}
} catch (InterruptedException e) {
// 不做任何处理,切记不要因为异常导致了消费线程的退出
}
}
}, RedisTestDelayProducer.REDIS_DELAY_TEST_KEY).start();
}
}
消费的逻辑,基本就是,取出当前时间点,要执行的消息。
score
保证了队列中的消息有序性,且作为时间戳,所以可以完成延迟队列的对应功能。
注意事项和上面的普通队列差不多,简单注意一下就好。
启动该springBoot
项目,同时执行下面这段测试代码,调用三次生产者,分别在10秒后,30秒后,1分钟后进行消费
package com.banmoon.test;
import cn.hutool.core.date.DateField;
import com.banmoon.test.queue.producer.RedisTestDelayProducer;
import com.banmoon.test.queue.producer.RedisTestProducer;
import org.junit.jupiter.api.Test;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
@SpringBootTest
class ServiceTest {
@Autowired
private RedisTestDelayProducer redisTestDelayProducer;
@Test
void insertTest() {
redisTestDelayProducer.push("a", 10, DateField.SECOND);
redisTestDelayProducer.push("b", 30, DateField.SECOND);
redisTestDelayProducer.push("c", 1, DateField.MINUTE);
}
}
查看springBoot
项目的控制台,注意查看消费者打印的日志,主要看看三条日志的时间间隔
我还要讲一下,redis
作为消息队列的优缺点
ack
,消息确认机制,存在消息丢失的可能所以,如果是简单的日志推送,消息推送等,可以使用redis
队列。相反,如果对消息的可靠性有很大的要求,建议还是不要使用redis
作为消息中间件了。
我是半月,祝你幸福!!!