java并发编程实战学习(3)--基础构建模块

转自:java并发编程实战

5.3阻塞队列和生产者-消费者模式

BlockingQueue阻塞队列提供可阻塞的put和take方法,以及支持定时的offer和poll方法。如果队列已经满了,那么put方法将阻塞直到空间可用;如果队列为空,那么take方法将阻塞直到有元素可用。队列可以是有界的也可以是无界的。

如果生产者生成工作的速率比消费者处理工作的速率款,那么工作项会在队列中累计起来,最终好紧内存。同样,put方法的阻塞特性也极大地简化了生产者的编码。如果使用有界队列,当队列充满时,生产者将阻塞并不能继续生产工作,而消费者就有时间来赶上工作的进度。阻塞队列同样提供了一个offer方法,如果数据项不能被添加到队列中,那么将返回一个失败的状态。这样你就能创建更多灵活的策略来处理负荷过载的情况。

在构建高可靠的应用程序时,有界队列是一种强大的资源管理工具:他们能一直并防止产生过多的工作项,使应用程序在负荷过载的情况下边的更加健壮。

/**
 * java并发编程实战
 * 5.3.1桌面搜索
 * 爬虫查找所有文件并放入队列
 * Created by mrf on 2016/3/7.
 */
public class FileCrawler implements Runnable {
    private final BlockingQueue<File> fileQueue;
    private final FileFilter fileFilter;
    private final File root;

    public FileCrawler(BlockingQueue<File> fileQueue, FileFilter fileFilter, File root) {
        this.fileQueue = fileQueue;
        this.fileFilter = fileFilter;
        this.root = root;
    }

    @Override
    public void run() {
        try {
            crawl(root);
        } catch (InterruptedException e) {
            //恢复中断
            Thread.currentThread().interrupt();
            e.printStackTrace();
        }
    }

    private void crawl(File root) throws  InterruptedException {
        File[] entries = root.listFiles(fileFilter);
        if (entries!=null){
            for (File entry : entries) {
                if (entry.isDirectory()){
                    crawl(entry);
                }else if (!alreadyIndexed(entry)){
                    fileQueue.put(entry);
                }
            }
        }
    }

    private boolean alreadyIndexed(File entry){
        //检查是否加入索引
        return false;
    }
}

/**
 * 消费者
 * 将爬虫结果队列取出并加入索引
 */
class Indexer implements Runnable{
    private static final int BOUND = 100;
    private static final int N_CONSUMERS = 2;
    private final BlockingQueue<File> queue;

    Indexer(BlockingQueue<File> queue) {
        this.queue = queue;
    }

    @Override
    public void run() {
        try {
            while (true){
                indexFile(queue.take());
            }
        }catch (InterruptedException e){
            Thread.currentThread().interrupt();
        }
    }

    private void indexFile(File take) {
        //将文件加入索引
    }

    public static void startIndexing(File[] roots){
        BlockingQueue<File> queue = new LinkedBlockingDeque<>(BOUND);
        FileFilter fileFilter = new FileFilter() {
            @Override
            public boolean accept(File pathname) {
                return true;
            }
        };
        for (File root:roots) {
            new Thread(new FileCrawler(queue,fileFilter,root)).start();
        }
        for (int i = 0; i < N_CONSUMERS; i++) {
            new Thread(new Indexer(queue)).start();
        }
    }
}

  5.5信号量

Semaphore中管理着一组虚拟的许可(permit)。许可的初始数量可通过构造函数来指定。在执行操作时可以首先获得许可(只要还有剩余的许可),并在使用以后释放许可。如果没有许可,那么acquire将阻塞直到有许可(或者被中断或者操作超时)。release方法将返回一个许可给信号量。计算信号量的一种简化形式是二值信号量,即初始值为1的Semaphore。二值信号量可以用作互斥体(mutex),并具备不可重入的加锁语义:谁拥有这个唯一的许可,谁就拥有了互斥锁。

/**
 * java 并发编程实战
 * 5-14使用Semaphore做容器设置边界
 * 信号量
 * Created by mrf on 2016/3/8.
 */
public class BoundedHashSet<T> {
    private final Set<T> set;
    private final Semaphore sem;

//    public BoundedHashSet(Set<T> set, Semaphore sem) {
//        this.set = set;
//        this.sem = sem;
//    }

    public BoundedHashSet(int bound){
        this.set = Collections.synchronizedSet(new HashSet<T>());
        sem = new Semaphore(bound);
    }

    public boolean add(T o) throws InterruptedException {
        sem.acquire();
        boolean wasAdded = false;
        try {
            wasAdded = set.add(o);
            return wasAdded;
        }finally {
            if (!wasAdded){
                sem.release();
            }
        }
    }
    public boolean remove(Object o){
        boolean wasRemoved = set.remove(o);
        if (wasRemoved){
            sem.release();
        }
        return wasRemoved;
    }
}

  5.6构建高效且可伸缩的结果缓存

/**
 * java并发编程实战
 * 5-16使用HashMap和不同机制来初始化缓存
 * 实现将曾经计算过的命令缓存起来,方便相同的计算直接出结果而不用重复计算
 * Created by mrf on 2016/3/8.
 */
public interface Computable<A,V> {
    V compute(A arg) throws InterruptedException;
}

class ExpensiveFunction implements Computable<String,BigInteger>{

    @Override
    public BigInteger compute(String arg) throws InterruptedException {
        //在经过长时间的计算后
        return new BigInteger(arg);
    }
}

/**
 * 保守上锁办法
 * 每次只有一个线程能执行compute,性能差
 * @param <A>
 * @param <V>
 */
class Memoizer1<A,V> implements Computable<A,V>{
    @GuardedBy("this")
    private final Map<A,V> cache = new HashMap<>();
    private final Computable<A,V> c;

    public Memoizer1(Computable<A, V> c) {
        this.c = c;
    }

    @Override
    public synchronized  V compute(A arg) throws InterruptedException {
        V result = cache.get(arg);
        if (result==null){
            result = c.compute(arg);
            cache.put(arg,result);
        }
        return result;
    }
}

/**
 * 5-17
 * 改用ConcurrentHashMap增强并发性
 * 但还有个问题,就是只有计算完的结果才能缓存,正在计算的没有缓存,
 * 这将导致一个长时间的计算没有放入缓存,另一个又开始重复计算。
 * @param <A>
 * @param <V>
 */
class Memoizer2<A,V> implements Computable<A,V>{

    private final  Map<A,V> cache = new ConcurrentHashMap<>();
    private final  Computable<A,V> c;

    Memoizer2(Computable<A, V> c) {
        this.c = c;
    }

    @Override
    public V compute(A arg) throws InterruptedException {
        V result = cache.get(arg);
        if (result ==null){
            result = c.compute(arg);
            cache.put(arg,result);
        }
        return result;
    }
}

/**
 * 几乎完美:非常好的并发性,缓存正在计算中的结果
 * 但compute模块中if代码块是非原子性的,这样可能导致两个相同的计算
 * @param <A>
 * @param <V>
 */
class Memoizer3<A,V> implements Computable<A,V>{
    private final Map<A,Future<V>> cache = new ConcurrentHashMap<>();
    private final Computable<A,V> c;

    Memoizer3(Computable<A, V> c) {
        this.c = c;
    }

    @Override
    public V compute(final A arg) throws InterruptedException {
        Future<V> f = cache.get(arg);
        if (f==null){
            Callable<V> eval = new Callable<V>() {
                @Override
                public V call() throws Exception {
                    return c.compute(arg);
                }
            };
            FutureTask<V> ft = new FutureTask<V>(eval);
            f = ft;
            cache.put(arg,ft);
            ft.run();
        }
        try {
            return f.get();
        } catch (ExecutionException e) {
            //抛出正在计算
            e.printStackTrace();
        }
        return null;
    }
}

/**
 * 使用ConcurrentHashMap的putIfAbsent解决原子问题
 * 若计算取消则移除
 * @param <A>
 * @param <V>
 */
class Memoizer<A,V> implements Computable<A,V>{
    private final ConcurrentHashMap<A,Future<V>> cache = new ConcurrentHashMap<>();
    private final Computable<A,V> c;

    Memoizer(Computable<A, V> c) {
        this.c = c;
    }

    @Override
    public V compute(final A arg) throws InterruptedException {
        while (true){
            Future<V> f = cache.get(arg);
            if (f==null){
                Callable<V> eval = new Callable<V>() {
                    @Override
                    public V call() throws Exception {
                        return c.compute(arg);
                    }
                };
                FutureTask<V> ft = new FutureTask<V>(eval);
                f = cache.putIfAbsent(arg,ft);
                if (f==null){
                    f = ft;ft.run();
                }
            }
            try {
                return f.get();
            } catch (CancellationException e){
                cache.remove(arg,f);
            } catch(ExecutionException e) {
                //抛出正在计算
                e.printStackTrace();
            }
            return null;
        }

    }
}

  小结:

  • 可变状态是直观重要的(It's the mutable state,stupid)。所有的并发问题都可以归结为如何协调对并发状态的访问。可变状态越少,就越容易确保线程的安全性。
  • 尽量将域声明为final类型,除非需要他们是可变的。
  • 不可变对象一定是线程安全的。不可变对象能极大地降低并发编程的复杂性。他们更为简单而且可以任意共享而无须使用加锁或保护性复制等机制。
  • 封装有助于管理复杂性。在编写线程安全的程序时,虽然可以将所有数据都保存在全局变量中,但为什么要这样做?将数据封装在对象中,更易于维持不变性条件:将同步机制封装在对象中,更易于遵循同步策略。
  • 用锁来保护每个可变变量。
  • 当保护同一个不变性条件中的所有变量时,要使用同一个锁。
  • 在执行复合操作期间,要持有锁。
  • 如果从多个线程中访问同一个可变变量时没有同步机制,那么程序会出现问题。
  • 不要故作聪明地腿短出不需要使用同步。
  • 在设计过程中考虑线程安全,或者在文档中明确地指出他不是线程安全的。
  • 将同步策略文档化。

本文参与腾讯云自媒体分享计划,欢迎正在阅读的你也加入,一起分享。

发表于

我来说两句

0 条评论
登录 后参与评论

相关文章

来自专栏微信公众号:Java团长

从并发编程到分布式系统——如何处理海量数据(上)

在这里想写写自己在学习并发处理的学习思路,也会聊聊自己遇到的那些坑,以此为记,希望鞭策自己不断学习、永不放弃!

641
来自专栏Android机动车

我的图片四级缓存框架

开发App一定涉及到图片加载、图片处理,那就必须会用到三方的图片框架,要么选择自己封装。至于主流的三方图片框架,就不得不说老牌的ImageLoader、如今更流...

1153
来自专栏开发技术

shiro源码篇 - shiro的session的查询、刷新、过期与删除,你值得拥有

    老公酷爱网络游戏,老婆无奈,只得告诫他:你玩就玩了,但是千万不可以在游戏里找老婆,不然,哼哼。。。     老公嘴角露出了微笑:放心吧亲爱的,我绝对不会...

2112
来自专栏何俊林

阿里、华为、腾讯Java技术面试题精选

1475
来自专栏大内老A

如何让普通变量也支持事务回滚?

有一次和人谈起关于事务的话题,谈到怎样的资源才能事务型资源。除了我们经常使用的数据库、消息队列、事务型文件系统(TxF)以及事务性注册表(TxR)等,还有那些资...

1648
来自专栏芋道源码1024

熔断器 Hystrix 源码解析 —— 命令执行(二)之执行隔离策略

本文主要基于 Hystrix 1.5.X 版本 1. 概述 2. HystrixThreadPoolProperties 3. HystrixThreadPoo...

4816
来自专栏阿杜的世界

《七周七并发模型》阅读笔记(一)一、线程与锁——第一天二、线程与锁——第二天三、线程与锁——第三天

线程与锁模型其实是对底层硬件运行过程的形式化,这种形式化既是该模型最大的优点,也是它最大的缺点。我们借助Java语言来学习线程与锁模型,不过内容也适用于其他语言...

1082
来自专栏JavaQ

高并发编程-CyclicBarrier深入解析

CyclicBarrier是一个同步辅助类,它允许一组线程互相等待,直到所有线程都到达某个公共屏障点(也可以叫同步点),即相互等待的线程都完成调用await方法...

3942
来自专栏JavaQ

源码阅读之CyclicBarrier

源码阅读是基于JDK7,本篇主要涉及CyclicBarrier常用方法源码分析。文中代码若格式排版不对,可点击底部的阅读原文阅读。 1.概述 CyclicBar...

3347
来自专栏大内老A

ASP.NET MVC Model元数据及其定制:一个重要的接口IMetadataAware

在介绍用于自定义Model元数据属性的AdditionalMetadataAttribute特性时我们提到了它实现的接口IMedataAware,我们说这是一个...

1966

扫码关注云+社区