延时队列的应用场景:
下单后,30分钟内未付款就自动取消订单等; 支付后,24小时未评论自动好评; 在我们实际开发过程中,应用场景很多...
Redis由于其自身的Zset数据结构,也同样可以实现延时的操作。 Zset本质就是Set结构上加了个排序的功能,除了添加数据value之外,还提供另一属性score,这一属性在添加元素时候可以指定,每次指定score后,Zset会自动重新按新的值调整顺序。
> ZADD delay_queue 1581309229 taskId_1
(integer) 1
> ZADD delay_queue 1581309129 taskId_2
(integer) 1
> ZADD delay_queue 1581309329 taskId_3
(integer) 1
时间戳
大于等于该key值的socre就将它取出来进行消费删除,就可以达到延时执行的目的。 注意不需要遍历整个Zset集合,以免造成性能浪费。> ZRANGE delay_queue 0 -1 withscores
1) "taskId_2"
2) "1581309129"
3) "taskId_1"
4) "1581309229"
5) "taskId_3"
6) "1581309329"
Redis Lua
封装,确保原子性操作。更要注意 Redis Lua
在 Redis Cluster 的伪集群问题。redisson
,封装了 DelayedQueue
的实现。源码逻辑 org/redisson/RedissonDelayedQueue.java
Beanstalkd,一个高性能、轻量级的分布式内存队列系统。支持过有9.5 million用户的Facebook Causes应用。后来开源,现在有PostRank大规模部署和使用,每天处理百万级任务。
yum install beanstalkd
||
docker run -d -p 11300:11300 pig4cloud/beanstalkd
<!--封装了 官方的 java sdk,只支持 springboot 2.X-->
<dependency>
<groupid>com.pig4cloud.beanstalk</groupid>
<artifactid>beanstalkd-client-spring-boot-starter</artifactid>
<version>0.0.1</version>
</dependency>
@Autowired
private JobProducer producer;
/**
* @param delay 是一个整形数,表示将job放入ready队列需要等待的秒数
* @param ttr time to run—是一个整形数,表示允许一个worker执行该job的秒数。这个时间将从一个worker 获取一个job开始计算。
* 如果该worker没能在<ttr> 秒内删除、释放或休眠该job,这个job就会超时,服务端会主动释放该job。
* 最小ttr为1。如果客户端设置了0,服务端会默认将其增加到1。
* @param priority 优先级 0~2**32的整数,最高优先级是0
*/
@Test
public void testSend() {
String taskId = "1";// 业务对象信息
producer.putJob(0, 10, 10, taskId.getBytes());
}
@Component
public class DemoJobConsumer extends AbstractTubeConsumerListener {
@Override
public void work(JobConsumer consumer) {
// 阻塞多少秒获取一次 Job
Job job = consumer.reserveJob(1000L);
// 消费此Job
consumer.deleteJob(job.getId());
// 执行延时的业务逻辑
String biz = new String(job.getData());
}
}
> 项目推荐: Spring Cloud 、Spring Security OAuth2的RBAC权限管理系统 欢迎关注 </ttr>