前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >Java 延时队列 DelayQueue

Java 延时队列 DelayQueue

作者头像
一个会写诗的程序员
发布2020-05-29 14:58:14
2.1K0
发布2020-05-29 14:58:14
举报

概述

java延迟队列提供了在指定时间才能获取队列元素的功能,队列头元素是最接近过期的元素。没有过期元素的话,使用poll()方法会返回null值,超时判定是通过getDelay(TimeUnit.NANOSECONDS)方法的返回值小于等于0来判断。延时队列不能存放空元素。

应用场景

The core idea is as follows:

(1) Users place orders, save them to the database, and push the order and its expiration time into DelayQueue

(2) Start a thread that checks the expiration of an order. The thread uses the take () method of delayQueue to get the expired order. This method is a blocking method. If there is no expired order at present, the method will block and wait until the order is obtained and then continue to execute.

(3) When take () obtains an expired order, the thread queries the order in the database according to the ID of the acquired order and checks the order status. If it is unpaid, it changes the status to expired.

延时队列实现了Iterator接口,但iterator()遍历顺序不保证是元素的实际存放顺序。

延迟队列数据结构

DelayQueue<E extends Delayed>的队列元素需要实现Delayed接口,该接口类定义如下:

    private final transient ReentrantLock lock = new ReentrantLock();
    private final PriorityQueue<E> q = new PriorityQueue<E>();

    /**
     * Thread designated to wait for the element at the head of
     * the queue.  This variant of the Leader-Follower pattern
     * (http://www.cs.wustl.edu/~schmidt/POSA/POSA2/) serves to
     * minimize unnecessary timed waiting.  When a thread becomes
     * the leader, it waits only for the next delay to elapse, but
     * other threads await indefinitely.  The leader thread must
     * signal some other thread before returning from take() or
     * poll(...), unless some other thread becomes leader in the
     * interim.  Whenever the head of the queue is replaced with
     * an element with an earlier expiration time, the leader
     * field is invalidated by being reset to null, and some
     * waiting thread, but not necessarily the current leader, is
     * signalled.  So waiting threads must be prepared to acquire
     * and lose leadership while waiting.
     */
    private Thread leader = null;

    /**
     * Condition signalled when a newer element becomes available
     * at the head of the queue or a new thread may need to
     * become leader.
     */
    private final Condition available = lock.newCondition();
    ......
}

DelayedQuene的优先级队列 PriorityQueue 使用的排序方式是队列元素的compareTo方法,优先级队列存放顺序是从小到大的,所以队列元素的compareTo方法影响了队列的出队顺序。

若compareTo方法定义不当,会造成延时高的元素在队头,延时低的元素无法出队。

类架构:

方法:

由Delayed定义可以得知,队列元素需要实现getDelay(TimeUnit unit)方法和compareTo(Delayed o)方法, getDelay定义了剩余到期时间,compareTo方法定义了元素排序规则,注意,元素的排序规则影响了元素的获取顺序,将在后面说明。

获取队列元素: 阻塞与非阻塞获取

阻塞出队列

当 first 元素还没到出队列的时间,就一直等待,直到返回。

    /**
     * Retrieves and removes the head of this queue, waiting if necessary
     * until an element with an expired delay is available on this queue.
     *
     * @return the head of this queue
     * @throws InterruptedException {@inheritDoc}
     */
    public E take() throws InterruptedException {
        final ReentrantLock lock = this.lock;
        lock.lockInterruptibly();
        try {
            for (;;) {
                E first = q.peek();

 //没有元素,让出线程,等待java.lang.Thread.State#WAITING
                if (first == null)
                    available.await();
                else {
                    long delay = first.getDelay(NANOSECONDS);

// 已到期,元素出队
                    if (delay <= 0)
                        return q.poll();
                    first = null; // don't retain ref while waiting,这里在等待的时候,释放 ref。

// 其它线程在leader线程TIMED_WAITING期间,会进入等待状态,这样可以只有一个线程去等待到时唤醒,避免大量唤醒操作
                    if (leader != null)
                        available.await(); // 等待
                    else {
                        Thread thisThread = Thread.currentThread();
                        leader = thisThread;
                        try {

// 等待剩余时间后,再尝试获取元素,在等待期间,由于leader是当前线程,所以其它线程会等待。
                            available.awaitNanos(delay);
                        } finally {
                            if (leader == thisThread)
                                leader = null;
                        }
                    }
                }
            }
        } finally {
            if (leader == null && q.peek() != null)
                available.signal(); // 唤醒等待线程
            lock.unlock();
        }
    }

非阻塞出队列

当 first 元素还没到出队列的时间,就直接返回 null。

    /**
     * Retrieves and removes the head of this queue, or returns {@code null}
     * if this queue has no elements with an expired delay.
     *
     * @return the head of this queue, or {@code null} if this
     *         queue has no elements with an expired delay
     */
    public E poll() {
        final ReentrantLock lock = this.lock;
        lock.lock();
        try {
            E first = q.peek();
            if (first == null || first.getDelay(NANOSECONDS) > 0)
                return null;
            else
                return q.poll();
        } finally {
            lock.unlock();
        }
    }

其中,PriorityQueue 的 poll() 方法如下:

    public E poll() {
        if (size == 0)
            return null;
        int s = --size;
        modCount++;
        E result = (E) queue[0];
        E x = (E) queue[s];
        queue[s] = null;
        if (s != 0)
            siftDown(0, x);
        return result;
    }

关于 PriorityQueue 的实现原理,我们下一篇中讲。

由代码我们可以看出,获取元素时,总是判断PriorityQueue队列的队首元素是否到期,若未到期,返回null,所以compareTo()的方法实现不当的话,会造成队首元素未到期,当队列中有到期元素却获取不到的情况。因此,队列元素的compareTo方法实现需要注意。

代码实践示例

package i.juc

import java.lang.Thread.sleep
import java.time.LocalDateTime
import java.time.format.DateTimeFormatter
import java.util.concurrent.DelayQueue

/**
 * @author: Jack
 * 2020-05-27 16:44
 */

fun main(args: Array<String>) {
    val order1 = Order(3000, "Order1")
    val order2 = Order(5000, "Order2")
    val order3 = Order(10000, "Order3")

    val delayQueue = DelayQueue<Order>()
    delayQueue.add(order1)
    delayQueue.add(order2)
    delayQueue.add(order3)

    println("Order delay queue begin: ${now()} \n")

    while (delayQueue.size > 0) {
        val order = delayQueue.poll()
        if (null != order) {
            println("Order ${order.name} is out. ${now()}")
        }
        sleep(1000)
    }

}

fun now() = LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss"))





package i.juc

import java.util.concurrent.Delayed
import java.util.concurrent.TimeUnit

/**
 * @author: Jack
 * 2020-05-27 16:45
 */
class Order : Delayed {
    var delayTime: Long
    var name: String

    constructor(delayTime: Long, name: String) {
        this.delayTime = System.currentTimeMillis() + if (delayTime > 0) delayTime else 0
        this.name = name
    }

    /**
     * Returns the remaining delay associated with this object, in the
     * given time unit.
     *
     * @param unit the time unit
     * @return the remaining delay; zero or negative values indicate
     * that the delay has already elapsed
     */
    override fun getDelay(unit: TimeUnit): Long {
        return delayTime - System.currentTimeMillis()
    }


    /**
     * Compares this object with the specified object for order.  Returns a
     * negative integer, zero, or a positive integer as this object is less
     * than, equal to, or greater than the specified object.
     */
    override fun compareTo(o: Delayed): Int {
        val order = o as Order
        val t = this.delayTime - order.delayTime
        return when {
            t > 0 -> 1
            t < 0 -> -1
            else -> 0
        }
    }

}

运行结果:

Order delay queue begin: 2020-05-27 17:33:08 

Order Order1 is out. 2020-05-27 17:33:11
Order Order2 is out. 2020-05-27 17:33:13
Order Order3 is out. 2020-05-27 17:33:18

需要注意的是 compareTo 的顺序问题:修改compareTo方法 t > 0 为 -1 后的运行结果: 在10秒之后几乎同时取出。

参考资料

https://mp.weixin.qq.com/s/tM3QVIdNtPW3x0w--LRy3Q https://www.cnblogs.com/hhan/p/10678466.html

本文参与 腾讯云自媒体分享计划,分享自作者个人站点/博客。
如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 作者个人站点/博客 前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体分享计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 概述
  • 应用场景
    • 延迟队列数据结构
      • 获取队列元素: 阻塞与非阻塞获取
        • 阻塞出队列
        • 非阻塞出队列
    • 代码实践示例
    • 参考资料
    相关产品与服务
    数据库
    云数据库为企业提供了完善的关系型数据库、非关系型数据库、分析型数据库和数据库生态工具。您可以通过产品选择和组合搭建,轻松实现高可靠、高可用性、高性能等数据库需求。云数据库服务也可大幅减少您的运维工作量,更专注于业务发展,让企业一站式享受数据上云及分布式架构的技术红利!
    领券
    问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档