专栏首页刘晓杰ThreadPoolExecutor介绍

ThreadPoolExecutor介绍

数据结构:AtomicInteger。

他的核心代码是

    public final int getAndIncrement() {
        for (;;) {
            int current = get();
            int next = current + 1;
            if (compareAndSet(current, next))
                return current;
        }
    }

    //对任意单个volatile变量的读/写具有原子性,但类似于volatile++这种复合操作不具有原子性。
    private volatile int value;

    public final int get() {
        return value;
    }

    // 使用了由硬件保证其原子性的指令 CAS (compare and swap)
    public final boolean compareAndSet(int expect, int update) {
        return unsafe.compareAndSwapInt(this, valueOffset, expect, update);
    }

综上,getAndIncrement() 方法并不是原子操作。 只是保证了他和其他函数对 value 值得更新都是有效的。 整个方法本身并不是线程安全的,但通过compareAndSet方法,可以达到安全操作的效果。 不过由于没有加线程同步锁,整个操作的线程开销比较小的,很多时候也会提高线程执行的效率。

ThreadPoolExecutor执行流程

    public void execute(Runnable command) {
        if (command == null)
            throw new NullPointerException();
        /*
         * Proceed in 3 steps:
         *
         * 1. If fewer than corePoolSize threads are running, try to
         * start a new thread with the given command as its first
         * task.  The call to addWorker atomically checks runState and
         * workerCount, and so prevents false alarms that would add
         * threads when it shouldn't, by returning false.
         *
         * 2. If a task can be successfully queued, then we still need
         * to double-check whether we should have added a thread
         * (because existing ones died since last checking) or that
         * the pool shut down since entry into this method. So we
         * recheck state and if necessary roll back the enqueuing if
         * stopped, or start a new thread if there are none.
         *
         * 3. If we cannot queue task, then we try to add a new
         * thread.  If it fails, we know we are shut down or saturated
         * and so reject the task.
         */
        int c = ctl.get();//******正在运行的线程数量
        if (workerCountOf(c) < corePoolSize) {
            if (addWorker(command, true))
                return;
            c = ctl.get();
        }
        if (isRunning(c) && workQueue.offer(command)) {
            int recheck = ctl.get();
            if (! isRunning(recheck) && remove(command))
                reject(command);
            else if (workerCountOf(recheck) == 0)
                addWorker(null, false);
        }
        else if (!addWorker(command, false))
            reject(command);
    }

看一下addWorker

    private boolean addWorker(Runnable firstTask, boolean core) {
        retry:
        for (;;) {
            int c = ctl.get();
            int rs = runStateOf(c);

            // Check if queue empty only if necessary.
            if (rs >= SHUTDOWN &&
                ! (rs == SHUTDOWN &&
                   firstTask == null &&
                   ! workQueue.isEmpty()))
                return false;

            for (;;) {
                int wc = workerCountOf(c);
                if (wc >= CAPACITY ||
                    wc >= (core ? corePoolSize : maximumPoolSize))
                    return false;
                if (compareAndIncrementWorkerCount(c))
                    break retry;
                c = ctl.get();  // Re-read ctl
                if (runStateOf(c) != rs)
                    continue retry;
                // else CAS failed due to workerCount change; retry inner loop
            }
        }

        boolean workerStarted = false;
        boolean workerAdded = false;
        Worker w = null;
        try {
            final ReentrantLock mainLock = this.mainLock;
            w = new Worker(firstTask);
            final Thread t = w.thread;
            if (t != null) {
                mainLock.lock();
                try {
                    // Recheck while holding lock.
                    // Back out on ThreadFactory failure or if
                    // shut down before lock acquired.
                    int c = ctl.get();
                    int rs = runStateOf(c);

                    if (rs < SHUTDOWN ||
                        (rs == SHUTDOWN && firstTask == null)) {
                        if (t.isAlive()) // precheck that t is startable
                            throw new IllegalThreadStateException();
                        workers.add(w);//**********************************************最有用的就这句
                        int s = workers.size();
                        if (s > largestPoolSize)
                            largestPoolSize = s;
                        workerAdded = true;
                    }
                } finally {
                    mainLock.unlock();
                }
                if (workerAdded) {
                    t.start();
                    workerStarted = true;
                }
            }
        } finally {
            if (! workerStarted)
                addWorkerFailed(w);
        }
        return workerStarted;
    }

为什么workers.add需要加锁呢?因为workers是

private final HashSet<Worker> workers = new HashSet<Worker>();

HashSet继承自HashMap

    public boolean add(E e) {
        return map.put(e, PRESENT)==null;
    }

就相当于调用HashMap的put函数

    public V put(K key, V value) {
        if (table == EMPTY_TABLE) {
            inflateTable(threshold);
        }
        if (key == null)
            return putForNullKey(value);
        int hash = hash(key);
        int i = indexFor(hash, table.length);
        for (Entry<K,V> e = table[i]; e != null; e = e.next) {
            Object k;
            if (e.hash == hash && ((k = e.key) == key || key.equals(k))) {
                V oldValue = e.value;
                e.value = value;
                e.recordAccess(this);
                return oldValue;
            }
        }

        modCount++;
        addEntry(hash, key, value, i);
        return null;
    }

put当然不是原子操作,多线程情况下没有lock会出错

本文参与腾讯云自媒体分享计划,欢迎正在阅读的你也加入,一起分享。

我来说两句

0 条评论
登录 后参与评论

相关文章

  • 源码阅读--xutil3

    提莫队长
  • 源码阅读--Glide

    参考资料:http://www.apkbus.com/blog-705730-60158.html 用法:

    提莫队长
  • ConcurrentHashMap详解

    参考: http://www.cnblogs.com/everSeeker/p/5601861.html http://blog.csdn.net/fj...

    提莫队长
  • Android网络之HttpUrlConnection和Socket关系解析

    多年以前Android的网络请求只有Apache开源的HttpClient和JDK的HttpUrlConnection,近几年随着OkHttp的流行Androi...

    静默加载
  • 为什么用 if(0 == x) 而不是 if(x == 0) ?

    大家好,今天跟大伙分享一个编程小技巧方面的知识:标题已经给出了,为什么有的人更愿意用 if ( 0 == x)而不是 if(x == 0)?

    7089bAt@PowerLi
  • LeetCode Longest Palindromic Substring

    Given a string s, find the longest palindromic substring in s. You may assume th...

  • Android中窗口Input事件接收

    至此 , 在InputMangerService与应用窗口间就建立了Socket连接.

    None_Ling
  • ConcurrentHashMap源码学习

    既然有了HashMap为什么还会出现ConcurrentHashMap?同时ConcurrentHashMap具有什么优势?ConcurrentHashMap与...

    路行的亚洲
  • ConcurrentHashMap源码解析(JDK1.8)

    package java.util.concurrent; import java.io.ObjectStreamField; import java.io....

    武培轩
  • Leetcode【227、468、848、1081】

    这道题是实现一个基本计算器,即给一个只包括 +、-、*、/、数字和空格的字符串,计算结果。

    echobingo

扫码关注云+社区

领取腾讯云代金券