前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >延迟任务多种实现姿势--上

延迟任务多种实现姿势--上

作者头像
大忽悠爱学习
发布2022-09-29 10:44:56
5050
发布2022-09-29 10:44:56
举报
文章被收录于专栏:c++与qt学习

延迟任务多种实现姿势--上


关于延迟任务的所有代码实现均存放在下面这个仓库中:

https://gitee.com/DaHuYuXiXi/deley-task


什么是延迟任务

例如:pdd下单,但是没有付款,那么24小时候,订单会自动取消。收货后,如果一直不进行确认,那么默认七天后自动确认收货等等。

上面这些场景是我们平日中一直都会遇到的,作为程序员的我们,有没有考虑过该怎么实现这些延迟任务呢?


一,最简单的延迟队列实现

DelayQueue是一个无界的BlockingQueue的实现类,用于放置实现了Delayed接口的对象,其中的对象只能在其到期时才能从队列中取走。

  • BlockingQueue即阻塞队列,java提供的面向多线程安全的队列数据结构,当队列内元素数量为0的时候,试图从队列内获取元素的线程将被阻塞或者抛出异常。
  • 这里的“无界”队列,是指队列的元素数量不存在上限,队列的容量会随着元素数量的增加而扩容。
在这里插入图片描述
在这里插入图片描述

DelayQueue实现了BlockingQueue接口,所以具有无界、阻塞的特点,除此之外它自己的核心特点就是:

  • 放入该队列的延时任务对象,只要到达延时时间之后才能被取到。
  • DelayQueue 不接受null元素。
  • DelayQueue 只接受那些实现了java.util.concurrent.Delayed接口的对象。

订单延迟任务实现

代码语言:javascript
复制
package com.delayTask.delayQueue;


import com.delayTask.domain.Order;
import lombok.Data;
import lombok.ToString;

import java.util.concurrent.Delayed;
import java.util.concurrent.TimeUnit;

/**
 * 延时订单任务
 *
 * @author zdh
 */
@ToString
@Data
public class OrderDelayObject implements Delayed {

    /**
     * 延迟任务唯一标识: 这里默认为当前时间戳
     */
    private Long id;

    /**
     * 延时时间
     */
    private long delayTime;

    /**
     * 订单对象
     */
    private Order order;

    public OrderDelayObject(long delayTime, Order order) {
        this.id = System.currentTimeMillis();
        //延时时间加上当前时间
        this.delayTime = System.currentTimeMillis() + delayTime;
        this.order = order;
    }


    /**
     * 延迟任务是否到期
     */
    @Override
    public long getDelay(TimeUnit unit) {
        long diff = delayTime - System.currentTimeMillis();
        return unit.convert(diff, TimeUnit.MILLISECONDS);
    }

    /**
     * 延时任务队列,按照延时时间元素排序,实现Comparable接口
     */
    @Override
    public int compareTo(Delayed obj) {
        return Long.compare(this.delayTime, ((OrderDelayObject) obj).delayTime);
    }
} 
  • Delayed接口继承Comparable接口,所以需要实现compareTo方法,用于延时任务在队列中按照“延时时间”进行排序。
  • getDelay方法是Delayed接口方法,实现该方法提供获取延时任务的倒计时时间

订单处理

代码语言:javascript
复制
package com.dhy.delayQueue;


import com.delayTask.delayQueue.OrderDelayFactory;
import com.delayTask.delayQueue.OrderDelayObject;
import com.delayTask.domain.Order;
import lombok.extern.slf4j.Slf4j;
import org.testng.annotations.BeforeTest;
import org.testng.annotations.Test;
import java.util.concurrent.DelayQueue;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;

@Slf4j
public class DelayQueueTest {
    /**
     * 延迟队列
     */
    private final DelayQueue<OrderDelayObject> delayQueue = new DelayQueue<>();

    /**
     * 开启线程不断轮询,看是否有延迟任务可以处理
     */
    @BeforeTest
    public void beforeTest() {
        Executors.newSingleThreadExecutor().execute(() -> {
            try {
                while (true) {
                    //阻塞直到获取到某个到时的延迟任务
                    OrderDelayObject delayObject = delayQueue.take();
                    log.info("延迟任务信息如下: {}",delayObject);
                    Order order = delayObject.getOrder();
                    order.cancelOrderByTimeEnd();
                }
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        });
    }

    /**
     * 测试下单
     */
    @Test
    public void testOrder() throws InterruptedException {
        OrderDelayObject orderDelay = OrderDelayFactory.newOrderDelay("大忽悠", "小风扇", 13.4, 10L);
        delayQueue.add(orderDelay);

        OrderDelayObject orderDelay1 = OrderDelayFactory.newOrderDelay("小朋友", "冰箱", 3000.0, 20L);
        delayQueue.add(orderDelay1);

        Thread.sleep(TimeUnit.SECONDS.toMillis(8L));

        orderDelay.getOrder().submitOrder();
        delayQueue.remove(orderDelay);

        //防止程序结束
        Thread.sleep(TimeUnit.MINUTES.toMillis(10L));
    }


}
在这里插入图片描述
在这里插入图片描述

优缺点

使用DelayQueue实现延时任务非常简单,而且简便,全部都是标准的JDK代码实现,不用引入第三方依赖(不依赖redis实现、消息队列实现等),非常的轻量级。

它的缺点就是所有的操作都是基于应用内存的,一旦出现应用单点故障,可能会造成延时任务数据的丢失。如果订单并发量非常大,因为DelayQueue是无界的,订单量越大,队列内的对象就越多,可能造成OOM的风险。所以使用DelayQueue实现延时任务,只适用于任务量较小的情况。


优化点

上图中我们使用的是while-true循环同步顺序的处理延迟任务:

在这里插入图片描述
在这里插入图片描述

这里建议将订单处理的业务逻辑放到单独一个线程池中进行处理,而非在这里同步进行处理,因为这样可能会导致部分到期的延迟任务无法得到及时的处理。


二,上点档次,基于Netty时间轮算法实现

时间轮算法

在这里插入图片描述
在这里插入图片描述

时间轮算法名副其实,时间轮就是一个环形的数据结构,类似于表盘,将时间轮分成多个bucket(比如:0-8)。假设每个时间轮轮片的分隔时间段tickDuration=1s(即:指针经过每个格子花费时间是 1 s),当前的时间bucket=3,那么在18秒后需要被执行的任务需要落到((3+18)%8=5取余运算)的5号bucket上。假如有多个需要在该时间段内执行的任务,就会组成一个双向链表。另外针对时间轮我们要有下面的几个认知:

  • 时间轮指针是一个Worker线程,在时间轮整点的时候执行双向链表中的任务。
  • 时间轮算法的并不是精准的延时,它的执行精度取决于每个时间轮轮片的分隔时间段tickDuration
  • Worker线程是单线程,一个bucket、一个bucket的顺序处理任务。「所以我们的延时任务一定要做成异步任务,否则会影响时间轮后续任务的执行时间。」

更加详细介绍,可以参考此篇文章


订单延迟任务实现

这里商品订单到时取消对时间精确度的要求并不是特别高,因此可以选择采用时间轮算法进行处理。

首先通过maven坐标引入netty

代码语言:javascript
复制
        <dependency>
            <groupId>io.netty</groupId>
            <artifactId>netty-all</artifactId>
            <version>5.0.0.Alpha2</version>
        </dependency>
  • 对netty时间轮的使用,进行一层简单的封装
代码语言:javascript
复制
package com.delayTask.wheelTimer;

import com.delayTask.DelayTaskEvent;
import io.netty.util.HashedWheelTimer;
import io.netty.util.Timeout;

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;

/**
 * <h>
 *     时间轮工厂
 * </h>
 * @author 大忽悠
 * @create 2022/9/17 17:17
 */
public class WheelTimerHelper {
    /**
     * 处理订单任务的线程池
     */
    private static final ExecutorService THREAD_POOL= Executors.newCachedThreadPool();

    /**
     * 时间轮
     */
    private static HashedWheelTimer wheelTimer;

    /**
     * 生产一个时间轮,默认的bucket数量为512个
     */
    public static HashedWheelTimer newWheelTimer(Long duration){
        wheelTimer=new HashedWheelTimer(duration, TimeUnit.MILLISECONDS, 512);
        return wheelTimer;
    }

    /**
     * @param delayTaskEvent 延迟任务事件
     */
    public static Timeout addNewTask(DelayTaskEvent delayTaskEvent){
        //延迟任务,延迟时间,时间单位
        return wheelTimer.newTimeout(delayTask -> {
            delayTaskEvent.handleDelayEvent();
        }, delayTaskEvent.getDelay(TimeUnit.MILLISECONDS), TimeUnit.MILLISECONDS);
    }
}
  • 测试
代码语言:javascript
复制
package com.dhy.wheelTimer;

import com.delayTask.delayQueue.OrderDelayEvent;
import com.delayTask.delayQueue.OrderDelayFactory;
import com.delayTask.wheelTimer.WheelTimerHelper;
import io.netty.util.Timeout;
import org.testng.annotations.BeforeTest;
import org.testng.annotations.Test;

import java.util.concurrent.TimeUnit;

/**
 * @author 大忽悠
 * @create 2022/9/17 17:19
 */
public class WheelTimerTest {
    @BeforeTest
    public void beforeTest(){
        WheelTimerHelper.newWheelTimer(100L);
    }

    @Test
    public void testWheelTimer() throws InterruptedException {
        OrderDelayEvent orderDelay =  OrderDelayFactory.newOrderDelay("大忽悠", "小风扇", 13.4, 10L);
        OrderDelayEvent orderDelay1 = OrderDelayFactory.newOrderDelay("小朋友", "冰箱", 3000.0, 20L);
        Timeout timeout = WheelTimerHelper.addNewTask(orderDelay);
        Timeout timeout1 = WheelTimerHelper.addNewTask(orderDelay1);

        //订单二在到期前成功结算,因此不需要取消
        orderDelay1.getOrder().submitOrder();
        //取消延迟任务二
        timeout1.cancel();

        //阻塞,防止程序结束
        Thread.sleep(TimeUnit.SECONDS.toMillis(100L));
    }
}
在这里插入图片描述
在这里插入图片描述

详细代码实现,可以fork仓库看源码


优缺点

时间轮算法实现延时任务的优点就是,相对于使用JDK的DelayQueue,其算法上具有优势,执行性能相对好一些。其缺点就是所有的延时任务以及延时触发的管理,都是在单个应用服务的内存中进行的,一旦该应用服务发生故障重启服务,时间轮任务数据将全部丢失。这一缺点和DelayQueue是一样的。为了解决这个问题,我们可以使用redis、RocketMQ等分布式中间件来管理延时任务消息的方式来实现延时任务。


小结

本文主要对延迟任务基于内存的单体应用实现给出了两种解决策略,下一篇文章中,我们将针对基于内存的单体解决方法缺陷,给出基于redis和mq实现介绍。

本文参与 腾讯云自媒体同步曝光计划,分享自作者个人站点/博客。
原始发表:2022-09-17,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 作者个人站点/博客 前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 延迟任务多种实现姿势--上
  • 什么是延迟任务
  • 一,最简单的延迟队列实现
    • 订单延迟任务实现
      • 订单处理
        • 优缺点
          • 优化点
          • 二,上点档次,基于Netty时间轮算法实现
            • 时间轮算法
              • 订单延迟任务实现
                • 优缺点
                • 小结
                相关产品与服务
                云数据库 Redis
                腾讯云数据库 Redis(TencentDB for Redis)是腾讯云打造的兼容 Redis 协议的缓存和存储服务。丰富的数据结构能帮助您完成不同类型的业务场景开发。支持主从热备,提供自动容灾切换、数据备份、故障迁移、实例监控、在线扩容、数据回档等全套的数据库服务。
                领券
                问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档