java延迟队列提供了在指定时间才能获取队列元素的功能,队列头元素是最接近过期的元素。没有过期元素的话,使用poll()方法会返回null值,超时判定是通过getDelay(TimeUnit.NANOSECONDS)方法的返回值小于等于0来判断。延时队列不能存放空元素。
The core idea is as follows:
(1) Users place orders, save them to the database, and push the order and its expiration time into DelayQueue
(2) Start a thread that checks the expiration of an order. The thread uses the take () method of delayQueue to get the expired order. This method is a blocking method. If there is no expired order at present, the method will block and wait until the order is obtained and then continue to execute.
(3) When take () obtains an expired order, the thread queries the order in the database according to the ID of the acquired order and checks the order status. If it is unpaid, it changes the status to expired.
延时队列实现了Iterator接口,但iterator()遍历顺序不保证是元素的实际存放顺序。
DelayQueue<E extends Delayed>的队列元素需要实现Delayed接口,该接口类定义如下:
private final transient ReentrantLock lock = new ReentrantLock();
private final PriorityQueue<E> q = new PriorityQueue<E>();
/**
* Thread designated to wait for the element at the head of
* the queue. This variant of the Leader-Follower pattern
* (http://www.cs.wustl.edu/~schmidt/POSA/POSA2/) serves to
* minimize unnecessary timed waiting. When a thread becomes
* the leader, it waits only for the next delay to elapse, but
* other threads await indefinitely. The leader thread must
* signal some other thread before returning from take() or
* poll(...), unless some other thread becomes leader in the
* interim. Whenever the head of the queue is replaced with
* an element with an earlier expiration time, the leader
* field is invalidated by being reset to null, and some
* waiting thread, but not necessarily the current leader, is
* signalled. So waiting threads must be prepared to acquire
* and lose leadership while waiting.
*/
private Thread leader = null;
/**
* Condition signalled when a newer element becomes available
* at the head of the queue or a new thread may need to
* become leader.
*/
private final Condition available = lock.newCondition();
......
}
DelayedQuene的优先级队列 PriorityQueue 使用的排序方式是队列元素的compareTo方法,优先级队列存放顺序是从小到大的,所以队列元素的compareTo方法影响了队列的出队顺序。
若compareTo方法定义不当,会造成延时高的元素在队头,延时低的元素无法出队。
类架构:
方法:
由Delayed定义可以得知,队列元素需要实现getDelay(TimeUnit unit)方法和compareTo(Delayed o)方法, getDelay定义了剩余到期时间,compareTo方法定义了元素排序规则,注意,元素的排序规则影响了元素的获取顺序,将在后面说明。
当 first 元素还没到出队列的时间,就一直等待,直到返回。
/**
* Retrieves and removes the head of this queue, waiting if necessary
* until an element with an expired delay is available on this queue.
*
* @return the head of this queue
* @throws InterruptedException {@inheritDoc}
*/
public E take() throws InterruptedException {
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
for (;;) {
E first = q.peek();
//没有元素,让出线程,等待java.lang.Thread.State#WAITING
if (first == null)
available.await();
else {
long delay = first.getDelay(NANOSECONDS);
// 已到期,元素出队
if (delay <= 0)
return q.poll();
first = null; // don't retain ref while waiting,这里在等待的时候,释放 ref。
// 其它线程在leader线程TIMED_WAITING期间,会进入等待状态,这样可以只有一个线程去等待到时唤醒,避免大量唤醒操作
if (leader != null)
available.await(); // 等待
else {
Thread thisThread = Thread.currentThread();
leader = thisThread;
try {
// 等待剩余时间后,再尝试获取元素,在等待期间,由于leader是当前线程,所以其它线程会等待。
available.awaitNanos(delay);
} finally {
if (leader == thisThread)
leader = null;
}
}
}
}
} finally {
if (leader == null && q.peek() != null)
available.signal(); // 唤醒等待线程
lock.unlock();
}
}
当 first 元素还没到出队列的时间,就直接返回 null。
/**
* Retrieves and removes the head of this queue, or returns {@code null}
* if this queue has no elements with an expired delay.
*
* @return the head of this queue, or {@code null} if this
* queue has no elements with an expired delay
*/
public E poll() {
final ReentrantLock lock = this.lock;
lock.lock();
try {
E first = q.peek();
if (first == null || first.getDelay(NANOSECONDS) > 0)
return null;
else
return q.poll();
} finally {
lock.unlock();
}
}
其中,PriorityQueue 的 poll() 方法如下:
public E poll() {
if (size == 0)
return null;
int s = --size;
modCount++;
E result = (E) queue[0];
E x = (E) queue[s];
queue[s] = null;
if (s != 0)
siftDown(0, x);
return result;
}
关于 PriorityQueue 的实现原理,我们下一篇中讲。
由代码我们可以看出,获取元素时,总是判断PriorityQueue队列的队首元素是否到期,若未到期,返回null,所以compareTo()的方法实现不当的话,会造成队首元素未到期,当队列中有到期元素却获取不到的情况。因此,队列元素的compareTo方法实现需要注意。
package i.juc
import java.lang.Thread.sleep
import java.time.LocalDateTime
import java.time.format.DateTimeFormatter
import java.util.concurrent.DelayQueue
/**
* @author: Jack
* 2020-05-27 16:44
*/
fun main(args: Array<String>) {
val order1 = Order(3000, "Order1")
val order2 = Order(5000, "Order2")
val order3 = Order(10000, "Order3")
val delayQueue = DelayQueue<Order>()
delayQueue.add(order1)
delayQueue.add(order2)
delayQueue.add(order3)
println("Order delay queue begin: ${now()} \n")
while (delayQueue.size > 0) {
val order = delayQueue.poll()
if (null != order) {
println("Order ${order.name} is out. ${now()}")
}
sleep(1000)
}
}
fun now() = LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss"))
package i.juc
import java.util.concurrent.Delayed
import java.util.concurrent.TimeUnit
/**
* @author: Jack
* 2020-05-27 16:45
*/
class Order : Delayed {
var delayTime: Long
var name: String
constructor(delayTime: Long, name: String) {
this.delayTime = System.currentTimeMillis() + if (delayTime > 0) delayTime else 0
this.name = name
}
/**
* Returns the remaining delay associated with this object, in the
* given time unit.
*
* @param unit the time unit
* @return the remaining delay; zero or negative values indicate
* that the delay has already elapsed
*/
override fun getDelay(unit: TimeUnit): Long {
return delayTime - System.currentTimeMillis()
}
/**
* Compares this object with the specified object for order. Returns a
* negative integer, zero, or a positive integer as this object is less
* than, equal to, or greater than the specified object.
*/
override fun compareTo(o: Delayed): Int {
val order = o as Order
val t = this.delayTime - order.delayTime
return when {
t > 0 -> 1
t < 0 -> -1
else -> 0
}
}
}
运行结果:
Order delay queue begin: 2020-05-27 17:33:08
Order Order1 is out. 2020-05-27 17:33:11
Order Order2 is out. 2020-05-27 17:33:13
Order Order3 is out. 2020-05-27 17:33:18
需要注意的是 compareTo 的顺序问题:修改compareTo方法 t > 0 为 -1 后的运行结果: 在10秒之后几乎同时取出。
https://mp.weixin.qq.com/s/tM3QVIdNtPW3x0w--LRy3Q https://www.cnblogs.com/hhan/p/10678466.html