内容目录
一、延迟队列使用场景二、zset如何实现延迟队列三、springboot基于zset实现延迟队列四、做成服务化五、使用zset实现延迟队列的缺点六、其他实现方式
延迟队列可以用于处理订单超时问题。当用户下单后,将订单信息放入延迟队列,并设置一定的超时时间。如果在超时时间内用户未支付订单,消费者会从延迟队列中获取到该订单,并执行相应的处理操作,如取消订单、释放库存等。
延迟队列可以用于优惠券的过期提醒功能。将即将过期的优惠券信息放入延迟队列,并设置合适的延迟时间。当延迟时间到达时,消费者将提醒用户优惠券即将过期,引导用户尽快使用。
延迟队列可以用于实现消息的延迟重试机制。当某个消息处理失败时,将该消息放入延迟队列,并设置一定的延迟时间。在延迟时间过后,消费者再次尝试处理该消息。这可以用于处理网络请求失败、数据库写入异常等情况下的消息重试。
延迟队列可以用于异步通知和提醒功能。例如,当用户完成某个操作后,系统可以将相关通知消息放入延迟队列,并设置一定的延迟时间,以便在合适的时机发送通知给用户。
Redis zset是按相关分数排序的唯一字符串(成员)的集合。当多个字符串具有相同的分数时,这些字符串按字典顺序排列。排序集的一些用例包括:
当然我们也可以基于zset实现延迟队列,基于 ZSet 实现延迟队列的原理是利用有序集合的特性。下面是基于 Redis 的 ZSet 实现延迟队列的简要介绍:
通过上述步骤,延迟时间到达的消息可以被按照顺序逐个取出,并进行处理。需要注意的是,在处理每个消息时,可能还需要考虑一些并发性问题和数据一致性问题,以及在处理完消息后从 ZSet 中删除该消息。
此外,为了实现更好的性能和可靠性,可以结合使用 Redis 的 Pub/Sub 机制,例如在处理完消息后,发布一个事件通知其他服务或者订阅者进行后续处理。
引入Redis相关依赖:
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-redis</artifactId>
</dependency>
在属性文件application.properties中添加:
spring.redis.host=127.0.0.1
spring.redis.port=6379
# 如果 Redis 设置了密码,需要配置以下三项
# spring.redis.password=your_password
# spring.redis.database=0
# spring.redis.ssl=false
# spring.redis.timeout=2000
添加redis配置类:
@Configuration
public class RedisConfig {
@Bean
public RedisTemplate<String, Object> redisTemplate(RedisConnectionFactory redisConnectionFactory) {
RedisTemplate<String, Object> template = new RedisTemplate<>();
template.setConnectionFactory(redisConnectionFactory);
template.setValueSerializer(new GenericToStringSerializer<>(Object.class));
return template;
}
}
创建一个延迟队列的服务类,例如DelayQueueService,用于操作Redis中的ZSet。这个服务类需要完成以下功能:
@Service
@Slf4j
public class DelayQueueService {
private static final String DELAY_QUEUE_KEY = "delay_queue";
@Autowired
private RedisTemplate<String, String> redisTemplate;
public void addToDelayQueue(String message, long delayTime) {
redisTemplate.opsForZSet().add(DELAY_QUEUE_KEY, message, System.currentTimeMillis() + delayTime);
}
public void pollAndProcessDelayedMessages() {
Set<String> messages = redisTemplate.opsForZSet().rangeByScore(DELAY_QUEUE_KEY, 0, System.currentTimeMillis());
for (String message : messages) {
// 处理消息
processMessage(message);
// 从延迟队列中删除已处理的消息
redisTemplate.opsForZSet().remove(DELAY_QUEUE_KEY, message);
}
}
private void processMessage(String message) {
// 根据业务需求进行消息处理
log.info("Processing message: " + message);
}
}
使用Spring Boot的定时任务或消息队列框架,定时调用延迟队列服务类的轮询方法或监听指定的消息队列,可以将轮训粒度放到1s一次。
@Component
public class DelayQueueSchedule {
@Autowired
private DelayQueueService delayQueueService;
// 每隔一段时间进行轮询并处理延迟消息
@Scheduled(fixedDelay = 1000)
public void pollAndProcessDelayedMessages() {
delayQueueService.pollAndProcessDelayedMessages();
}
}
然后在启动类上通过@EnableScheduling注解开启任务调度能力。
这样就简单实现了基于zset实现延迟队列的能力,可根据业务将processMessage消息处理逻辑进行修改,比如基于消息生成方提供的回调地址进行回调。
基于zset实现的延迟队列,可以封装成两种方式供业务调用。
通过使用redis的zset能够满足一些简单场景的延迟队列场景,但是也存在很多缺陷。
延迟任务未必是均匀分布的,可能在某一个时间点有很多任务,而在某个时间段内没有任何任务或者只有少量零散任务,那么延迟中心在很长一段时间内处于空转状态,对于机器性能是一种损耗。
基于 zset 实现延迟队列的一个常见问题就是空转问题。延迟队列通常用于处理需要在特定时间后执行的任务,而 zset 数据结构提供了排序功能,使得我们可以按照任务的执行时间进行排序。然后,我们可以使用一个循环来不断检查是否有任务到期需要执行。
然而,如果我们仅依赖于循环来检查任务是否到期,当延迟队列中没有任务时,循环将持续运行并浪费系统资源,这就是所谓的空转问题。此外,即使有任务存在,如果任务的到期时间较远,循环也会一直运行,导致系统的效率降低。
当延迟消息数量庞大时,轮询整个ZSet以查找到期的消息可能会对性能造成负面影响。因为ZSet是有序集合,需要遍历元素来检查是否到达指定时间。
为了获取到期的任务,需要进行范围查询。当延迟队列中的任务数量较大时,范围查询的开销也会相应增加。尤其是在处理大规模延迟队列时,这可能导致查询性能下降。
另外我们是基于定时轮训来实现的延迟调用,那么大概率会存在同一个过期时间存在集中的过期事件需要回调,并且可能某些业务方提供的回调接口效率没那么高,从而导致延迟中心性能下降,以及带来的连锁效应导致后续的延迟事件回调被延迟。
ZSet只能通过分数(score)来排序元素,分数只支持浮点数类型,其精确度可能受限。并且基于上述场景我们采取了折中的方式,把延迟的最小时间颗粒度定义成秒,在某些场景下,可能需要更高的时间精度来处理延迟消息。
并且redis实例本身故障或者重启操作,以及时钟被回拨都会影响到延迟事件和回调的准确性。
有些业务场景,在业务操作完成后需要根据后续流程是否完成来变更当前业务流程状态,比如工单类的业务,有些团队不主动变更工单的状态,而是每一步操作都会实时校验工单当前的状态和应该调整为的状态。不过一些延迟通知类的业务场景,不适合用这种模式,因为延迟任务与业务流程是单向依赖且是一次性的,无法用即时校验的方式实现。
一些开源的消息中间件都提供了延迟消息能力,比如rocketmq和rabbitmq等,以rocketmq为例,开源的只支持若干固定时长的延迟消息,商业版本支持自定义时长延迟消息。
可以轮询业务单据表或者抽象出来的延迟事件表,对于过期时间做本地化业务逻辑处理,不过这种对业务库和业务服务都会造成性能损耗,并且轮询时间颗粒度不好把控。
之前文章中有介绍过netty的时间轮,可参考《Netty时间轮》,不过时间轮默认是单机算法,我们需要做的是需要有一个持久化的延迟任务存储,解决时间轮故障或者重启的任务丢失问题,基于redis和关系数据库都可以实现,在基于时间轮实现的延迟中心服务启动时,加载延迟任务到时间轮中,时间轮中的任务过期回调后需要更新任务状态,避免重新执行或加载。
本文分享自 PersistentCoder 微信公众号,前往查看
如有侵权,请联系 cloudcommunity@tencent.com 删除。
本文参与 腾讯云自媒体同步曝光计划 ,欢迎热爱写作的你一起参与!