前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >使用ConcurrentHashMap实现高效缓存框架

使用ConcurrentHashMap实现高效缓存框架

作者头像
田维常
发布2019-07-16 11:36:37
1.4K0
发布2019-07-16 11:36:37
举报
在项目中我们有的时候需要使用某种形式的缓存,使用缓存能够重用之前的计算结果,降低系统延迟,提高吞吐量,但是其却会消耗更多的内存。就像许多重复发明的轮子一样,缓存框架的设计看上去简单,但系统的性能将会由缓存框架的伸缩性决定。如下是一段使用HashMap实现的缓存框架:
代码语言:javascript
复制
public interface Computable<A, V> {
  V compute(A arg) throws InterruptedException;
}
代码语言:javascript
复制
import java.math.BigInteger;

public class ExpensiveFunction implements Computable<String, BigInteger> {
  @Override
  public BigInteger compute(String arg) throws InterruptedException {
    return new BigInteger(arg);
  }
}
代码语言:javascript
复制
import java.util.HashMap;
import java.util.Map;

public class Memorizer1<A, V> implements Computable<A, V> {
  private final Map<A, V> cache = new HashMap<>();
  private final Computable<A, V> c;

  public Memorizer1(Computable<A, V> c) {
    this.c = c;
  }

  @Override
  public synchronized V compute(A arg) throws InterruptedException {
    V result = cache.get(arg);
    if (null == result) {
      result = c.compute(arg);
      cache.put(arg, result);
    }

    return result;
  }
}

上述代码中,Computable接口定义的是一类用于执行某种类型计算的策略族。ExpensiveFunction实现了Computable接口,该类在概念上是通过传入的参数arg,经过一系列复杂计算而得到结果,这里为了方便起见,只是返回了一个BigInteger对象。Memorizer1类也实现了Computable接口,这里实际上用到了装饰者模式,在构造Memorizer1类时需要传入一个Computable类型对象进来,如ExpensiveFunction,当需要使用ExpensiveFunction类来进行复杂计算时,可以通过Memorizer1类来对其进行装饰,转而调用Memorizer1的compute方法。而在Memorizer1内部,其使用了一个HashMap来对真正的Computable对象(如ExpensiveFunction)的结果进行了缓存,如果传入的参数arg能够在cache中找到结果,那么直接返回,否则调用实际的Computable::compute方法进行计算,通过这种方式达到提高系统新能的目的。

上述Memorizer1虽然能够实现对计算结果的缓存,但是由于HashMap不是线程安全的,其使用synchronized将整个compute方法包裹起来,当并发量较高时,就会出现多个线程同时竞争执行compute方法的情况,系统的性能可能比不使用缓存更低,此时如何提高Memorizer1的包裹内容的伸缩性将决定了系统的性能。

观察这段程序会发现,Memorizer1中有两个全局属性,一个是用于缓存的cache,一个是用于计算的c。这里c对象只表示了一组通过传入参数计算结果的策略,其是一个无状态对象(所谓的无状态对象是指其内部没有被多个线程公用的属性);而主要的cache对象才是所有线程需要竞争的对象,因而这里使用锁的时候其实只需要锁住HashMap一个类型的对象即可。在java中对于HashMap,可以使用ConcurrentHashMap来解决多线程竞争map对象的问题,如下是使用ConcurrentHashMap改写Memorizer1的形式:

代码语言:javascript
复制
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;

public class Memorizer2<A, V> implements Computable<A, V> {
  private final Map<A, V> cache = new ConcurrentHashMap<>();
  private final Computable<A, V> c;

  public Memorizer2(Computable<A, V> c) {
    this.c = c;
  }

  @Override
  public V compute(A arg) throws InterruptedException {
    V result = cache.get(arg);
    if (null == result) {
      result = c.compute(arg);
      cache.put(arg, result);
    }

    return result;
  }
}

上述Memorizer2很好的改善了Memorizer1中存在的不足,并且多线程情况下也能够很好的运行,但是其仍存在一些不足,通过分析Memorizer2::compute方法可以发现,使用cache的位置主要有两个:首先获取cache中键为arg的值,如果不存在则调用Computable::compute方法来计算结果,并将结果保存到cache中。其实在调用cache::get和cache::put方法之间进行了一些计算,当一个线程执行了cache::get方法之后得到了空的result,其会执行高额的Computable::compute方法,此时该线程还未执行到cache::put方法,而另一个线程也调用了cache::get方法,其也将得到一个空的result,这样这两个线程还是有可能执行两次Computable::compute方法,并且由于Computable::compute方法耗费较高,因而重复计算的概率还是比较大的。

在java类库中提供了另外一个类FutureTask,该类有一个get方法,如果有结果,那么其会立即返回结果,如果还未运行完成,那么其会阻塞,直到得到运行结果为止。这里我们可以通过FutureTask改写上述缓存框架,如下是改写后的代码:

代码语言:javascript
复制
import java.util.Map;
import java.util.concurrent.Callable;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.FutureTask;

public class Memorizer3<A, V> implements Computable<A, V> {
  private final Map<A, Future<V>> cache = new ConcurrentHashMap<>();
  private final Computable<A, V> c;

  public Memorizer3(Computable<A, V> c) {
    this.c = c;
  }

  @Override
  public V compute(final A arg) throws InterruptedException {
    Future<V> f = cache.get(arg);
    if (null == f) {
      Callable<V> eval = () -> {
          return c.compute(arg);
      };

      FutureTask<V> ft = new FutureTask<V>(eval);
      f = ft;
      cache.put(arg, f);
      ft.run();
    }

    try {
      return f.get();
    } catch (ExecutionException e) {
      throw launderThrowable(e.getCause());
    }
  }

  private InterruptedException launderThrowable(Throwable cause) {
    if (cause instanceof InterruptedException) {
      return (InterruptedException) cause;
    } else {
      return new InterruptedException(cause.getMessage());
    }
  }
}

这里cache中存储的键还是计算需要的参数,而值则是一个个FutureTask对象,真正的计算放到了一个Callable对象中,在调用FutureTask::run方法时,是实际执行了Computable::compute方法。当一个线程传入一个参数之后,Memorizer3首先在cache中找是否已经有计算结果,如果没有,那么会创建一个FutureTask对象存入到cache中,然后调用FutureTask::run方法计算结果,最后调用FutureTask::get方法返回结果,如果run方法还未计算出结果,那么最后的get方法将会被阻塞,直到产生结果。

Memorizer3对于缓存的实现几乎是完美的,但是其还是存在重复计算的缺陷,即当一个线程在cache中未找到结果,其创建了FutureTask对象并将其放入cache中,然后执行run方法计算结果的同时,另一个线程也传入同样的参数想要获取计算结果,其也发现cache中没有缓存结果,那么其也会创建一个新的FutureTask对象,并调用FutureTask::run方法计算结果。但是相对于Memorizer2,其出现重复计算的概率已经大大减少了。这里造成这个问题的原因还是在于Map的复合操作“若没有则添加”并不是原子的。

在ConcurrentHashMap中提供了一个方法V putIfAbsent(K key, V value),其是一个原子的若不存在则添加方法,并且其会返回当前Map中已经存在的value值,若没有则返回null,如下是通过该方法解决上述重复计算问题的代码:

代码语言:javascript
复制
import java.util.concurrent.Callable;
import java.util.concurrent.CancellationException;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.FutureTask;

public class Memorizer<A, V> implements Computable<A, V> {
  private final ConcurrentMap<A, Future<V>> cache = new ConcurrentHashMap<>();
  private final Computable<A, V> c;

  public Memorizer(Computable<A, V> c) {
    this.c = c;
  }

  @Override
  public V compute(final A arg) throws InterruptedException {
    while (true) {
      Future<V> f = cache.get(arg);
      if (null == f) {
        Callable<V> eval = () -> {
          return c.compute(arg);
        };

        FutureTask<V> ft = new FutureTask<V>(eval);
        f = cache.putIfAbsent(arg, ft);
        if (null == f) {
          f = ft;
          ft.run();
        }
      }

      try {
        return f.get();
      } catch (CancellationException e) {
        cache.remove(arg, f);
      } catch (ExecutionException e) {
        throw launderThrowable(e.getCause());
      }
    }
  }

  private InterruptedException launderThrowable(Throwable cause) {
    if (cause instanceof InterruptedException) {
      return (InterruptedException) cause;
    }

    return new InterruptedException(cause.getMessage());
  }
}

这里通过调用ConcurrentHashMap::putIfAbsent方法,假设两个线程传入了同样的参数,并且都创建了一个FutureTask对象,一个线程获得了cache的执行权限执行了cache::putIfAbsent()方法,并且返回了一个null到局部变量f中,此时另一个线程也会调用cache::putIfAbsent()方法,由于第一个线程已经将相关键值对存入到cache中了,那么第二个线程将获得第一个线程创建的FutureTask对象,并且将其替换给当前线程中的局部变量f,并且其判断f不为null,那么其会调用f::get()方法,而此时第一个线程正在执行FutureTask::run方法,如果其已经计算完成,那么其会返回结果给第一个线程,而第二个线程是直接执行的FutureTask::get方法,如果第一个线程执行完成,那么第二个线程将直接获取结果返回,如果第一个线程没有执行完成,那么第二个线程将等待第一个线程执行完成后再返回结果。

这里对compute方法使用while循环的目的是,当某个线程在执行结果的时候,其余的线程需要等待该线程执行完成,如果其余的线程由于某些原因等待被打断,那么通过while循环其会继续进入等待状态,从而得到执行结果。而对于某个执行计算结果的线程而言,如果其计算过程被取消或失败,那么对于缓存而言这就造成了缓存污染,因为存入的是FutureTask对象,而不是真正的结果,如果计算的线程被取消,那么实际上FutureTask::get方法将一直无法获取到值,但是cache中对应键的值也不是null的,这就是缓存污染的根源。上述代码中通过调用FutureTask::get方法时捕获CacellationException来捕捉执行计算的线程计算被取消或失败的情况,当出现这种情况的时候就从cache中将相应的FutureTask对象给移除掉,并且通过while循环也可以是后来进入的线程再次执行run方法从而得到计算结果。

上述的Memorizer基本上是一个完美的缓存类,但是对于缓存而言,其数据如果存在过期问题,那么将需要另外进行设计,从而实现高性能吞吐的目的,当然,如果只是针对一些复杂的计算,只要传入的值不变,其结果永远不会发生变化,那么使用该框架还是非常合适的。

END

本文参与 腾讯云自媒体分享计划,分享自微信公众号。
原始发表:2019-05-04,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 Java后端技术栈 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体分享计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档