阻塞队列与普通队列的区别在于,当队列是空的时,从队列中获取元素的操作将会被阻塞,或者当队列是满时,往队列里添加元素的操作会被阻塞
延迟阻塞队列DelayQueue
的底层是基于优先级队列PriorityQueue
来实现的,因此研究延迟阻塞队列,更多的注意力应集中在以下两点
类的声明如下,要求队列中的元素必须继承 Delayed
public class DelayQueue<E extends Delayed>
extends AbstractQueue<E>
implements BlockingQueue<E>
public interface Delayed extends Comparable<Delayed> {
long getDelay(TimeUnit unit);
}
这个限定,主要服务于优先级队列的排序要求,根据延迟时间对元素队列中的元素进行排序
入队的实现逻辑比较简单,为了保证并发安全,实现中实现加锁机制
public boolean offer(E e) {
final ReentrantLock lock = this.lock;
lock.lock();
try {
q.offer(e);
if (q.peek() == e) {
leader = null;
available.signal();
}
return true;
} finally {
lock.unlock();
}
}
入队的实际是交由优先级队列进行实现,需要注意的是,入队之后,额外的一个操作,如果入队的元素恰好在队列头,执行两个操作
leader
赋值为空 (这个是干嘛的,为什么这么做?)available.signal()
唤醒被阻塞的线程(什么线程被阻塞?)public E poll() {
final ReentrantLock lock = this.lock;
lock.lock();
try {
E first = q.peek();
if (first == null || first.getDelay(NANOSECONDS) > 0)
return null;
else
return q.poll();
} finally {
lock.unlock();
}
}
出队的操作同样加锁,获取队列头的元素,判断延期时间是否结束,是才返回结果,否则返回null
注意,这里有两个疑问
getDelay()
方法返回值会变么,由谁来改变呢?虽然上面的出队和入队的逻辑比较简单,但是留下的疑问一点都不少,上面的四个问题应该如何解答?
继续看源码,发现还有一个出队的方法, 传入了两个参数表示阻塞的超时时间(即超过这个时间没有返回,则抛一个中端异常)
public E poll(long timeout, TimeUnit unit) throws InterruptedException {
// 时间转换为纳秒
long nanos = unit.toNanos(timeout);
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
for (;;) {
// 获取队列头
E first = q.peek();
if (first == null) { // 队列为空
if (nanos <= 0)
// 延时时间已过,直接返回null
return null;
else
// 当前线程阻塞 nanos (ns),然后再次循环
nanos = available.awaitNanos(nanos);
} else { // 队列非空
// 获取队列头元素的延迟时间
long delay = first.getDelay(NANOSECONDS);
if (delay <= 0) // 延迟时间小于0,直接返回队列头
return q.poll();
if (nanos <= 0)
// 阻塞时间已过,队列头的延迟时间还没到,则返回null
return null;
first = null; // don't retain ref while waiting
if (nanos < delay || leader != null)
// 无法获取当前的队列头
//(因为队列头延迟时间大于阻塞时间,即队列头不生效)
// 继续阻塞,以期望此时可能新增一个到队列头
nanos = available.awaitNanos(nanos);
else {
// 可以获取当前队列头
Thread thisThread = Thread.currentThread();
leader = thisThread;
try {
// 阻塞到队列头生效
long timeLeft = available.awaitNanos(delay);
nanos -= delay - timeLeft;
} finally {
if (leader == thisThread)
leader = null;
}
}
}
}
} finally {
if (leader == null && q.peek() != null)
available.signal();
lock.unlock();
}
}
分析: 以当前队列为空作为条件
上面代码的流程如下:
继续化重点
getDelay
返回小于0)上面的方法因为加上了一个超时时间(即在指定的时间内依然无法返回时,断掉阻塞),分析起来可能不太顺畅,再看源码,还有一个take方法,逻辑与上面相似,只是砍掉了超时断开阻塞的逻辑
public E take() throws InterruptedException {
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
for (;;) {
E first = q.peek();
if (first == null)
// 队列头为空,则阻塞,直到新增一个入队为止
available.await();
else {
long delay = first.getDelay(NANOSECONDS);
if (delay <= 0)
// 若队列头元素已生效,则直接返回
return q.poll();
first = null; // don't retain ref while waiting
if (leader != null)
// leader 非空时,表示有其他的一个线程在出队阻塞中
// 此时挂住当前线程,等待另一个线程出队完成
available.await();
else {
Thread thisThread = Thread.currentThread();
leader = thisThread;
try {
// 等待队列头元素生效
available.awaitNanos(delay);
} finally {
if (leader == thisThread)
leader = null;
}
}
}
}
} finally {
if (leader == null && q.peek() != null)
// 当前线程出队完成,通知其他出队阻塞的线程继续执行
available.signal();
lock.unlock();
}
}
通过了之前的烧脑逻辑之后,再看这个就简单很多了
leader = thisThread
)leader=null
)因此可以愉快的解答上面的四个问题
添加一个元素到队列头
普通的出队方法
上面分析的是阻塞队列的实现原理,接下来举一个实例来解析下这个延迟阻塞队列的使用姿势,加深下理解
(简化了在简化之后的,与实际会有一些区别,请勿完全认定合理)
比如和电商的详情页展示,为了提高应用的性能,我们将整个页面进行了缓存,当详情页发生修改后,我们会更新缓存的内容
因此为了保证缓存的内容和实际的内容是一致的,我们需要一个对账的任务,当详情页修改后,并且更新缓存完成之后,我们需要再次对比缓存和实际内容的一致性;
此时一个异步的任务可以这么设计:监听详情页修改的事件,延迟一段时间,然后再对比缓存和实际内容的一致性 (这里延迟一段时间主要是为了保证缓存已经更新完成)
@Data
@NoArgsConstructor
@AllArgsConstructor
public class DetailInfo {
private int itemId;
private String title;
private String desc;
private int price;
}
UpdateTask
注意其中 getDelay()
的实现逻辑,根据当前时间与预订的延迟生效时间进行比较
@Data
@AllArgsConstructor
public class UpdateTask implements Delayed {
private int itemId;
private long delayTime;
@Override
public long getDelay(TimeUnit unit) {
return delayTime - System.currentTimeMillis();
}
@Override
public int compareTo(Delayed o) {
return (int) (getDelay(TimeUnit.MICROSECONDS) - o.getDelay(TimeUnit.MICROSECONDS));
}
}
更新事件的监听订阅使用了 Guava的EventBus
来处理,如有疑问可以搜索EventBus的使用姿势
public class DetailManager {
// 模拟真实数据存储空间
private Map<Integer, DetailInfo> realMap = new ConcurrentHashMap<>();
// 模拟缓存空间
private Map<String, String> cache = new ConcurrentHashMap<>();
private Gson gson = new Gson();
private String getCacheKey(int itemId) {
return "detailInfo_" + itemId;
}
// eventBus 用于发送更新事件;异步接受更新事件
private AsyncEventBus eventBus;
private void init() {
DetailInfo detailInfo = new DetailInfo(1, "onw", "第一个测试", 100);
DetailInfo detailInfo2 = new DetailInfo(2, "two", "第二个测试", 200);
realMap.put(detailInfo.getItemId(), detailInfo);
realMap.put(detailInfo2.getItemId(), detailInfo2);
cache.put(getCacheKey(detailInfo.getItemId()), gson.toJson(detailInfo));
cache.put(getCacheKey(detailInfo2.getItemId()), gson.toJson(detailInfo2));
eventBus = new AsyncEventBus("Validate-Thread",
Executors.newFixedThreadPool(2));
eventBus.register(this);
}
// 模拟更新商品
public void updateDetail(int itemId) {
DetailInfo detailInfo = realMap.get(itemId);
long now = System.currentTimeMillis();
detailInfo.setTitle("title_" + itemId + "_" + now);
cache.put(getCacheKey(itemId), gson.toJson(detailInfo));
// 发送一个修改的事件
eventBus.post(new UpdateTask(itemId, now + 5000));
System.out.println("[UpdateInfo]>>>ItemId: " + itemId + " updateTime: " + now + " validateTime: " + (now + 5000));
}
// 延迟队列
private DelayQueue<UpdateTask> delayQueue = new DelayQueue<>();
/**
* 监听修改事件
* @param updateTask
*/
@Subscribe
public void verify(UpdateTask updateTask) {
long getTaskTime = System.currentTimeMillis();
delayQueue.put(updateTask);
try {
UpdateTask task = delayQueue.take();
long processTime = System.currentTimeMillis();
DetailInfo real = realMap.get(task.getItemId());
String cacheObj = cache.get(getCacheKey(task.getItemId()));
boolean ans = gson.toJson(real).equals(cacheObj);
System.out.println("validate itemId: " + updateTask.getItemId() +
" getEventTime: " + getTaskTime +
" processTime:" + processTime + " ans: " + ans);
} catch (Exception e) {
e.printStackTrace();
}
}
public static void main(String[] args) throws InterruptedException {
DetailManager detailManager = new DetailManager();
detailManager.init();
// 开始修改
detailManager.updateDetail(1);
Thread.sleep(20);
detailManager.updateDetail(2);
Thread.sleep(35000);
}
}
简单说明主流程
DetailManager
输出结果如下
[UpdateInfo]>>>ItemId: 1 updateTime: 1508677959067 validateTime: 1508677964067
[UpdateInfo]>>>ItemId: 2 updateTime: 1508677959103 validateTime: 1508677964103
Thread[pool-1-thread-1,5,main]>>> validate itemId: 1 getEventTime: 1508677959078 processTime:1508677964067 ans: true
Thread[pool-1-thread-2,5,main]>>> validate itemId: 2 getEventTime: 1508677964067 processTime:1508677964103 ans: true
从上面的输出可以得知,实际验证的时间戳和预期的时间错是相同的
延迟阻塞队列DelayQueue,学习下来之后感觉非常有意思,首先是加深了使用姿势的了解,其次对其中的阻塞,唤醒机制有了一定了解,涨了锁使用知识的见识(这里面还有一个非常有意思的东西就是 Condition
和 ReentrantLock
的使用,后续线程安全篇的研究可以以此作为应用场景)
简单小结上面的学习内容
PriorityQueue
take()
, poll(long, TimeUnit)
两方法之一Delayed
接口,内部实现的getDelay
方法,要求返回值越来越小(如果一直大于0,这个延迟任务就一直无法执行了)