使用DelayQueue 和 FutureTask 实现java中的缓存

使用DelayQueue、ConcurrentHashMap、FutureTask实现的缓存工具类。

DelayQueue 简介

DelayQueue是一个支持延时获取元素的无界阻塞队列。DelayQueue内部队列使用PriorityQueue来实现。队列中的元素必须实现Delayed接口,在创建元素时可以指定多久才能从队列中获取当前元素。只有在延迟期满时才能从队列中提取元素。

DelayQueue非常有用,可以将DelayQueue运用在以下应用场景。

  1. 缓存系统的设计:可以用DelayQueue保存缓存元素的有效期,使用一个线程循环查询 DelayQueue,一旦能从DelayQueue中获取元素时,表示缓存有效期到了。
  2. 定时任务调度:使用DelayQueue保存当天将会执行的任务和执行时间,一旦从 DelayQueue中获取到任务就开始执行,比如TimerQueue就是使用DelayQueue实现的。

ConcurrentHashMap和FutureTask,详见以下:

  1. ConcurrentHashMap 原理解析
  2. FutureTask 源码分析

缓存工具类实现

  1. 支持缓存多长时间,单位毫秒。
  2. 支持多线程并发。 比如:有一个比较耗时的操作,此时缓冲中没有此缓存值,一个线程开始计算这个耗时操作,而再次进来线程就不需要再次进行计算,只需要等上一个线程计算完成后(使用FutureTask)返回该值即可。
import java.util.concurrent.Callable;
import java.util.concurrent.CancellationException;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.DelayQueue;
import java.util.concurrent.Delayed;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.FutureTask;
import java.util.concurrent.TimeUnit;

/**
  * @author jijs
  * @date 2017/08/04
  */
public class CacheBean<V> {
    // 缓存计算的结果
    private final static ConcurrentMap<String, Future<Object>> cache = new ConcurrentHashMap<>();

    // 延迟队列来判断那些缓存过期
    private final static DelayQueue<DelayedItem<String>> delayQueue = new DelayQueue<>();

    // 缓存时间
    private final int ms;

    static {
        // 定时清理过期缓存
        Thread t = new Thread() {
            @Override
            public void run() {
                dameonCheckOverdueKey();
            }
        };
        t.setDaemon(true);
        t.start();
    }

    private final Computable<V> c;

    /**
     * @param c Computable
     */
    public CacheBean(Computable<V> c) {
        this(c, 60 * 1000);
    }

    /**
     * @param c Computable
     * @param ms 缓存多少毫秒
     */
    public CacheBean(Computable<V> c, int ms) {
        this.c = c;
        this.ms = ms;
    }

    public V compute(final String key) throws InterruptedException {

        while (true) {
            //根据key从缓存中获取值
            Future<V> f = (Future<V>) cache.get(key);
            if (f == null) {
                Callable<V> eval = new Callable<V>() {
                    public V call() {
                        return (V) c.compute(key);
                    }
                };
                FutureTask<V> ft = new FutureTask<>(eval);
                //如果缓存中存在此可以,则返回已存在的value
                f = (Future<V>) cache.putIfAbsent(key, (Future<Object>) ft);
                if (f == null) {
                    //向delayQueue中添加key,并设置该key的存活时间
                    delayQueue.put(new DelayedItem<>(key, ms));
                    f = ft;
                    ft.run();
                }
            }
            try {
                return f.get();
            } catch (CancellationException e) {
                cache.remove(key, f);
            } catch (ExecutionException e) {
                e.printStackTrace();
            }
        }
    }

    /**
     * 检查过期的key,从cache中删除
     */
    private static void dameonCheckOverdueKey() {
        DelayedItem<String> delayedItem;
        while (true) {
            try {
                delayedItem = delayQueue.take();
                if (delayedItem != null) {
                    cache.remove(delayedItem.getT());
                    System.out.println(System.nanoTime() + " remove " + delayedItem.getT() + " from cache");
                }
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }

}


class DelayedItem<T> implements Delayed {

    private T t;
    private long liveTime;
    private long removeTime;

    public DelayedItem(T t, long liveTime) {
        this.setT(t);
        this.liveTime = liveTime;
        this.removeTime = TimeUnit.MILLISECONDS.convert(liveTime, TimeUnit.MILLISECONDS) + System.currentTimeMillis();
    }

    @Override
    public int compareTo(Delayed o) {
        if (o == null)
            return 1;
        if (o == this)
            return 0;
        if (o instanceof DelayedItem) {
            DelayedItem<T> tmpDelayedItem = (DelayedItem<T>) o;
            if (liveTime > tmpDelayedItem.liveTime) {
                return 1;
            } else if (liveTime == tmpDelayedItem.liveTime) {
                return 0;
            } else {
                return -1;
            }
        }
        long diff = getDelay(TimeUnit.MILLISECONDS) - o.getDelay(TimeUnit.MILLISECONDS);
        return diff > 0 ? 1 : diff == 0 ? 0 : -1;
    }

    @Override
    public long getDelay(TimeUnit unit) {
        return unit.convert(removeTime - System.currentTimeMillis(), unit);
    }

    public T getT() {
        return t;
    }

    public void setT(T t) {
        this.t = t;
    }

    @Override
    public int hashCode() {
        return t.hashCode();
    }

    @Override
    public boolean equals(Object object) {
        if (object instanceof DelayedItem) {
            return object.hashCode() == hashCode() ? true : false;
        }
        return false;
    }

}

Computable接口

/**
  * @author jijs
  * @date 2017/08/04
  */
public interface Computable<V> {
    V compute(String k);
}

测试类

/**
  * @author jijs
  * @date 2017/08/04
  */
public class FutureTaskDemo {

    public static void main(String[] args) throws InterruptedException {
        // 子线程
        Thread t = new Thread(() -> {
            CacheBean<String> cb = new CacheBean<>(k -> {
                try {
                    System.out.println("模拟计算数据,计算时长2秒。key=" + k);
                    TimeUnit.SECONDS.sleep(2);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                return "你好:" + k;
            }, 5000);

            try {
                while (true) {
                    System.out.println("thead2:" + cb.compute("b"));
                    TimeUnit.SECONDS.sleep(1);
                }
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        });
        t.start();

        // 主线程
        while (true) {
            CacheBean<String> cb = new CacheBean<>(k -> {
                try {
                    System.out.println("模拟计算数据,计算时长2秒。key=" + k);
                    TimeUnit.SECONDS.sleep(2);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                return "你好:" + k;
            }, 5000);

            System.out.println("thead1:" + cb.compute("b"));
            TimeUnit.SECONDS.sleep(1);
        }
    }
}

执行结果:

Paste_Image.png

两个线程同时访问同一个key的缓存。从执行结果发现,每次缓存失效后,同一个key只执行一次计算,而不是多个线程并发执行同一个计算然后缓存。

本文参与腾讯云自媒体分享计划,欢迎正在阅读的你也加入,一起分享。

发表于

我来说两句

0 条评论
登录 后参与评论

相关文章

来自专栏Flutter入门

Weex是如何在Android客户端上跑起来的

Weex可以通过自己设计的DSL,书写.we文件或者.vue文件来开发界面,整个页面书写分成了3段,template、style、script,借鉴了成熟的MV...

53450
来自专栏码匠的流水账

java8 parallelStream性能测试

默认是Runtime.getRuntime().availableProcessors() - 1,这里为7

39720
来自专栏会跳舞的机器人

java并发编程的艺术第十章——Executor框架

Executor框架的主要成员:ThreadPoolExecutor、ScheduledThreadPoolExecutor、Future接口、Runnable...

12520
来自专栏编码小白

ofbiz实体引擎(一) 获取Delegator

public abstract class DelegatorFactory implements Factory<Delegator, String> { ...

39750
来自专栏Java编程技术

Java中调度线程池ScheduledThreadPoolExecutor原理探究

前面讲解过Java中线程池ThreadPoolExecutor原理探究,ThreadPoolExecutor是Executors中一部分功能,下面来介绍另外一部...

16120
来自专栏Java学习网

常见的 Java 错误及避免方法之第五集(每集10个错误后续持续发布)

当输入期间意外终止文件或流时,将抛出“EOFException”。 以下是抛出EOFException异常的一个示例,来自JavaBeat应用程序:

17230
来自专栏林冠宏的技术文章

java 线程池ThreadPoolExecutor 如何与 AsyncTask() 组合使用。

转载请声明出处谢谢!https://cloud.tencent.com/developer/user/1148436/activities 这里主要使用Exec...

22160
来自专栏猿天地

面试-线程池的成长之路

线程池是一种多线程处理形式,处理过程中将任务提交到线程池,任务的执行交由线程池来管理。

17420
来自专栏函数式编程语言及工具

浅谈Slick(2)- Slick101:第一个动手尝试的项目

   看完Slick官方网站上关于Slick3.1.1技术文档后决定开始动手建一个项目来尝试一下Slick功能的具体使用方法。我把这个过程中的一些了解和想法记录...

28790
来自专栏逸鹏说道

我这么玩Web Api(二)

数据验证,全局数据验证与单元测试 目录 一、模型状态 - ModelState 二、数据注解 - Data Annotations 三、自定义数据注解 四、全局...

54760

扫码关注云+社区

领取腾讯云代金券