如何用 Redis 实现延时任务?

1、什么是延时任务

延时任务,顾名思义,就是延迟一段时间后才执行的任务。延时任务的使用还是很广泛的。关于延时任务的实现方式,我知道的就不少 3 种,今天就讲下如何用 redis 实现延时任务。

2、延时任务的特点

在介绍具体方案之前,我们不妨先想一下要实现一个延时系统,有哪些内容是必须存储下来的(这里的存储不一定是指持久化,也可以是放在内存中,取决于延时任务的重要程度)。

首先要存储的就是任务的描述。假如你要处理的延时任务是延时发布资讯,那么你至少要存储资讯的id吧。另外,如果你有多种任务类型,比如:延时推送消息、延时清洗数据等等,那么你还需要存储任务的类型。以上总总,都归属于任务描述。

除此之外,你还必须存储任务执行的时间点吧,一般来说就是时间戳。此外,我们还需要根据任务的执行时间进行排序,因为延时任务队列里的任务可能会有很多,只有到了时间点的任务才应该被执行,所以必须支持对任务执行时间进行排序。

3、使用 Redis 实现延时任务

以上就是一个延迟任务系统必须具备的要素了。回到 Redis,有什么数据结构可以既存储任务描述,又能存储任务执行时间,还能根据任务执行时间进行排序呢?想来想去,似乎只有 Sorted Set 。我们可以把任务的描述序列化成字符串,放在 Sorted Set 的 value 中,然后把任务的执行时间戳作为 score,利用 Sorted Set 天然的排序特性,执行时刻越早的会排在越前面。

这样一来,我们只要开一个或多个定时线程,每隔一段时间去查一下这个 Sorted Set 中 score 小于或等于当前时间戳的元素(这可以通过 zrangebyscore 命令实现),然后再执行元素对应的任务即可。当然,执行完任务后,还要将元素从 Sorted Set 中删除,避免任务重复执行。

如果是多个线程去轮询这个 Sorted Set,还有考虑并发问题,假如说一个任务到期了,也被多个线程拿到了,这个时候必须保证只有一个线程能执行这个任务,这可以通过 zrem 命令来实现,只有删除成功了,才能执行任务,这样就能保证任务不被多个任务重复执行了。

接下来看代码。首先看下项目结构:

一共 4 个类:Constants 类定义了 Redis key 相关的常量。DelayTaskConsumer 是延时任务的消费者,这个类负责从 Redis 拉取到期的任务,并封装了任务消费的逻辑。DelayTaskProducer 则是延时任务的生产者,主要用于将延时任务放到 Redis 中。RedisClient 则是 Redis 客户端的工具类。

最主要的类就是 DelayTaskConsumer 和 DelayTaskProducer 了。

我们先来看下生产者 DelayTaskProducer:

代码很简单,就是将任务描述(为了方便,这里只存储资讯的 id)和任务执行的时间戳放到 Redis 的 Sorted Set 中。

接下来是延时任务的消费者 DelayTaskConsumer:

public class DelayTaskConsumer {

    private ScheduledExecutorService scheduledExecutorService = Executors.newSingleThreadScheduledExecutor();

    public void start(){
        scheduledExecutorService.scheduleWithFixedDelay(new DelayTaskHandler(),1,1, TimeUnit.SECONDS);
    }

    public static class DelayTaskHandler implements Runnable{

        @Override
        public void run() {
            Jedis client = RedisClient.getClient();
            try {
                Set<String> ids = client.zrangeByScore(Constants.DELAY_TASK_QUEUE, 0, System.currentTimeMillis(),
                        0, 1);
                if(ids==null||ids.isEmpty()){
                    return;
                }
                for(String id:ids){
                    Long count = client.zrem(Constants.DELAY_TASK_QUEUE, id);
                    if(count!=null&&count==1){
                        System.out.println(MessageFormat.format("发布资讯。id - {0} , timeStamp - {1} , " +
                                "threadName - {2}",id,System.currentTimeMillis(),Thread.currentThread().getName()));
                    }
                }
            }finally {
                client.close();
            }
        }
    }
}

首先看 start 方法。在这个方法里面我们利用 Java 的ScheduledExecutorService 开了一个调度线程池,这个线程池会每隔 1 秒钟调度 DelayTaskHandler 中的 run 方法。

DelayTaskHandler 类就是具体的调度逻辑了。主要有 2 个步骤,一个是从 Redis Sorted Set 中拉取到期的延时任务,另一个是执行到期的延时任务。拉取到期的延时任务是通过 zrangeByScore 命令实现的,处理多线程并发问题是通过 zrem 命令实现的。代码不复杂,这里就不多做解释了。

接下来测试一下:

我们首先生产了 4 个延时任务,执行时间分别是程序开始运行后的 5 秒、10 秒、15 秒、20 秒,然后启动了 10 个消费者去消费延时任务。运行效果如下:

可以看到,任务确实能够在相应的时间点左右被执行,不过有少许时间误差,这个是因为我们拉取到期任务是通过定时任务拉取而不是实时推送的,而且拉取任务时有一部分网络开销,再者,我们的任务处理逻辑是同步处理的,需要上一次的任务处理完,才能拉取下一批任务,这些因素都会造成延时任务的执行时间产生偏差。

4、总结

以上就是通过 Redis 实现延时任务的思路了。这里提供的只是一个最简单的版本,实际上还有很多地方可以优化。比如,我们可以把任务的处理逻辑再放到单独的线程池中去执行,这样的话任务消费者只需要负责任务的调度就可以了,好处就是可以减少任务执行时间偏差。还有就是,这里为了方便,任务的描述存储的只是任务的 id,如果有多种不同类型的任务,像前面说的发送资讯任务和推送消息任务,那么就要通过额外存储任务的类型来进行区分,或者使用不同的 Sorted Set 来存放延时任务了。

除此之外,上面的例子每次拉取延时任务时,只拉取一个,如果说某一个时刻要处理的任务数非常多,那么会有一部分任务延迟比较严重,这里可以优化成每次拉取不止一个到期的任务,比如说 10 个,然后再逐个进行处理,这样的话可以极大地提升调度效率,因为如果是使用上面的方法,拉取 10 个任务需要 10 次调度,每次间隔 1 秒,总共需要 10 秒才能把 10 个任务拉取完,如果改成一次拉取 10 个,只需要 1 次就能完成了,效率提升还是挺大的。

最后一个需要考虑的地方是,上面的代码并没有对任务执行失败的情况进行处理,也就是说如果某个任务执行失败了,那么连重试的机会都没有。因此,在生产环境使用时,还需要考虑任务处理失败的情况。有一个简单的方法是在任务处理时捕获异常,当在处理过程中出现异常时,就将该任务再放回 Redis Sorted 中,或者由当前线程再重试处理。

那么使用 Redis 实现延时任务有什么优缺点呢?优点就是可以满足吞吐量。缺点则是存在任务丢失的风险(当 Redis 实例挂了的时候)。因此,如果对性能要求比较高,同时又能容忍少数情况下任务的丢失,那么可以使用这种方式来实现。

Redis 在国内各大公司都能看到其身影,比如我们熟悉的新浪,阿里,腾讯,百度,美团,小米等。

原文发布于微信公众号 - GitChat精品课(CSDN_Tech)

原文发表时间:2018-10-11

本文参与腾讯云自媒体分享计划,欢迎正在阅读的你也加入,一起分享。

发表于

我来说两句

0 条评论
登录 后参与评论

相关文章

来自专栏BeJavaGod

如何限制用户在某一时间段多次访问接口

要知道,如今很多平台的接口都是可以同时被门户网站,手机端,移动浏览器访问,因为接口是通用的,而为了安全起见,有些接口都会设置一个门槛,那就是限制访问次数,也就是...

41060
来自专栏同步博客

Redis构建分布式锁

  为什么要构建锁呢?因为构建合适的锁可以在高并发下能够保持数据的一致性,即客户端在执行连贯的命令时上锁的数据不会被别的客户端的更改而发生错误。同时还能够保证命...

21520
来自专栏木可大大

漫谈Web缓存架构

目前,Web应用的核心数据通常存放在数据库中,比如说用户信息、订单信息、交易信息等,同时,数据库和编程语言是无关的,通过SQL交互,Java、Php等语言写的程...

12530
来自专栏非著名程序员

Android Studio你不知道的调试技巧

? 写代码不可避免有Bug,通常情况下除了日志最直接的调试手段就是debug;那么你的调试技术停留在哪一阶段呢?仅仅是下个断点单步执行吗?或者你知道 Eval...

329100
来自专栏Java帮帮-微信公众号-技术文章全总结

Web-第二十天 Redis学习【悟空教程】

rpm -e --nodeps java-1.6.0-openjdk-1.6.0.0-1.66.1.13.0.el6.i686

20350
来自专栏美团技术团队

缓存那些事

前言 一般而言,现在互联网应用(网站或App)的整体流程,可以概括如图1所示,用户请求从界面(浏览器或App界面)到网络转发、应用服务再到存储(数据库或文件系统...

76340
来自专栏Seebug漏洞平台

CVE-2017-5123 漏洞利用全攻略

原文:https://salls.github.io/Linux-Kernel-CVE-2017-5123/

41870
来自专栏Golang语言社区

Golang学习--GroupCache的使用

groupcache 是 Brad Fitzpatrick 最新的作品,目标在于取代一部分memcached的功能。以官方的说明是:groupcache ...

60990
来自专栏腾讯云API

腾讯云 API 最佳实践: 善用幂等性

有些开发者问我云服务器“创建实例”接口有一个参数“ClientToken”不知道有什么作用。本文作一个简单的解答。

4.6K150
来自专栏Java工程师日常干货

分布式服务治理框架Dubbo前言QuickStart 一些思考

Dubbo是一个被国内很多互联网公司广泛使用的开源分布式服务治理框架,是一个非常全面的SOA基础框架,当当网在Dubbo基础上新增了一些功能,并将其命名为Dub...

14230

扫码关注云+社区

领取腾讯云代金券