前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >并发编程

并发编程

作者头像
周杰伦本人
发布2022-10-25 16:13:31
5780
发布2022-10-25 16:13:31
举报
文章被收录于专栏:同步文章

多线程

image-20200607145235655
image-20200607145235655

线程的应用

  1. 实现callable接口
  2. 继承Thread类

Request请求类:

代码语言:javascript
复制
public class Request {

    private String name;

    public String getName() {
        return name;
    }

    public void setName(String name) {
        this.name = name;
    }

    @Override
    public String toString() {
        return "Request{" +
                "name='" + name + '\'' +
                '}';
    }
}

RequestProcessor接口:

代码语言:javascript
复制
/**
 * 请求处理接口
 */
public interface RequestProcessor {
    void processRequest(Request request);
}

PrintProcessor打印处理:

代码语言:javascript
复制
import java.util.concurrent.LinkedBlockingQueue;

/**
 * 打印处理
 */
public class PrintProcessor extends Thread implements RequestProcessor {

    //存放请求的队列
    LinkedBlockingQueue<Request> linkedBlockingQueue = new LinkedBlockingQueue<Request>();

    private final RequestProcessor nextProcessor;

    public PrintProcessor(RequestProcessor nextProcessor) {
        this.nextProcessor = nextProcessor;
    }

    public void processRequest(Request request) {
        linkedBlockingQueue.add(request);
    }

    @Override
    public void run() {
        while (true) {
            try {
                Request request = linkedBlockingQueue.take();
                System.out.println("print data:"+request);
                nextProcessor.processRequest(request);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }
}

SaveProcessor保存处理类:

代码语言:javascript
复制
import java.util.concurrent.LinkedBlockingQueue;

/**
 * 保存处理
 */
public class SaveProcessor extends Thread implements RequestProcessor {

    LinkedBlockingQueue<Request> linkedBlockingQueue = new LinkedBlockingQueue<Request>();

    @Override
    public void run() {
        while (true) {
            try {
                Request request = linkedBlockingQueue.take();
                System.out.println("save data:"+request);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }

    public void processRequest(Request request) {
        linkedBlockingQueue.add(request);
    }
}

测试类:

代码语言:javascript
复制
public class Demo {

    PrintProcessor printProcessor;

    public Demo () {
        //责任链模式 先输出后保存
        SaveProcessor saveProcessor = new SaveProcessor();
        saveProcessor.start();
        printProcessor = new PrintProcessor(saveProcessor);
        printProcessor.start();
    }
    public static void main(String[] args) {
        Request request = new Request();
        request.setName("mic");
        new Demo().doTest(request);
    }

    private void doTest(Request request) {
        printProcessor.processRequest(request);
    }
}

并发编程的基础

线程状态

6种:

NEW 没有调用start 方法 RUNNABLE 运行状态 BLOCKED 阻塞

  • 等待阻塞 wait
  • 同步阻塞 synchronized
  • 其他阻塞 sleep/join

WAITING 等待 TIMED_WAITING 时间等待 TERMINATED 终止

image-20200606213523148
image-20200606213523148

线程的启动和终止

启动:start

终止: interrupt

线程安全问题

可见性 原子性 有序性

可见性:

代码语言:javascript
复制
import java.util.concurrent.TimeUnit;

/**
 *  通过volatile保证可见性 把volatile去掉 线程不会关闭
 * 可见性问题
 */
public class VisableDemo {

    private volatile static boolean stop=false;

    public static void main(String[] args) throws InterruptedException {
        Thread thread=new Thread(()->{
            int i=0;
            while(!stop){
                i++;
            }
        });
        thread.start();
        TimeUnit.SECONDS.sleep(1);
        stop=true;
    }
}

原子性:

代码语言:javascript
复制
/**
 * 原子性例子 按理说count=1000 但结果并不是 因为count++不是原子操作 
 */
public class AtomicDemo {
    private static int count = 0;

    public static void inc() {
        try {
            Thread.sleep(1);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        count++;
    }

    public static void main(String[] args) throws InterruptedException {
        for (int i = 0; i < 1000; i++) {
            new Thread(AtomicDemo::inc).start();
        }
        Thread.sleep(4000);
        System.out.println("运行结果:"+count);
    }
}

有序性不好模拟。。

有序性问题:

  1. 编译器的指令重排序
  2. cpu处理器的指令重排序
  3. 内存系统的重排序

cpu高速缓存

总线锁 缓存锁(MIES协议)

JMM内存模型

image-20200606222044692
image-20200606222044692

JMM如何解决原子性 可见性 有序性的问题

限制处理器的优化和使用内存屏障

解决方法:volatile synchronized final juc

原子性:synchronized(monitorenter、monitorexit)

可见性:volatile synchronized final

有序性:volatile synchronized

volatile

volatile

  1. 保证可见性(底层是使用lock指令)和禁止指令重排序
  2. 使用缓存锁(MESI 缓存一致性协议) M是modify I是Invalid E是Exclusive S是Shared
  3. 内存屏障
    • 对每个volatile写操作的前面插入storestore barrier
    • 对每个volatile写操作后面插入storeload barrier
    • 对每个volatile读操作前面插入loadload barrier
    • 对每个volatile读操作后面插入loadstore barrier

内存屏障两个作用:

  1. 保证数据的可见性
  2. 防止指令的重排序

指令重排序:

代码语言:javascript
复制
/**
 * 指令重排序
 */
public class VolatileDemo {
    private static int x=0,y=0;
    private static int a=0,b=0;

    public static void main(String[] args) throws InterruptedException {
        Thread thread1 = new Thread(()->{
            a=1;
            x=b;
        });
        Thread thread2 = new Thread(()->{
            b=1;
            y=a;
        });
        thread1.start();
        thread2.start();

        //线程没有执行完阻塞在这里 底层wait notify
        thread1.join();
        thread2.join();
        System.out.println("x="+x+","+"y="+y);

        //可能结果 x=0,y=1 x=1,y=0,x=1,y=1
        //也有可能x=0 y=0  t1执行x=b t2执行b=1 t2执行y=a t1执行a=1 ;

    }
}

内存屏障

synchronized 原理分析

  1. synchronized是如何实现锁 锁一共有4种状态,级别从低到高依次是:无锁状态、偏向锁状态、轻量级锁状态和重量级锁状态
  2. 为什么任何一个对象都可以成为锁 对象监视器ObjectMonitor
  3. 锁存在哪个地方 对象头

偏向锁:锁不存在竞争 都是由同一个线程获得

image-20200607123258464
image-20200607123258464

轻量级锁

image-20200607125420048
image-20200607125420048

重量级锁

wait和notify

wait()、notify()方法属于Object中的方法;对于Object中的方法,每个对象都拥有。

wait:

  1. 释放当前的对象锁
  2. 让当前对象锁处于等待

wait notify 实现原理图:

wait notify 实现原理图
wait notify 实现原理图

示例:

ThreadWait:

代码语言:javascript
复制
package waitandnotify;

public class ThreadWait extends Thread{
    private Object lock;
    public ThreadWait(Object lock) {
        this.lock = lock;
    }

    public void run() {
        synchronized (lock) {
            System.out.println("开始执行 thread wait");
            try {
                lock.wait();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            System.out.println("执行结束 thread wait");
        }
    }
}

ThreadNotify:

代码语言:javascript
复制
package waitandnotify;

public class ThreadNotify extends Thread{
    private Object lock;
    public ThreadNotify(Object lock) {
        this.lock = lock;
    }

    public void run() {
        synchronized (lock) {
            System.out.println("开始执行 thread notify");
            try {
                lock.notify();
            } catch (Exception e) {
                e.printStackTrace();
            }
            System.out.println("执行结束 thread notify");
        }
    }
}

测试类:

代码语言:javascript
复制
package waitandnotify;

public class Demo {
    public static void main(String[] args) {
        Object lock = new Object();
        //两个线程使用同一个线程
        ThreadWait threadWait = new ThreadWait(lock);
        threadWait.start();
        ThreadNotify threadNotify = new ThreadNotify(lock);
        threadNotify.start();
        
        
    }
}

执行结果:

代码语言:javascript
复制
开始执行 thread wait
开始执行 thread wait
执行结束 thread wait
执行结束 thread wait

lock

synchronized和lock区别:

  1. lock是一个类 synchronized是jvm关键字
  2. lock灵活 可以选择什么时候获得锁和释放锁 synchronized是被动的 在同步代码块执行完或者抛出异常时释放锁
  3. lock可以判断锁的状态 synchronized无法判断锁的状态
  4. lock 有公平锁和非公平锁 synchronized是非公平锁

ReentrantLock 重入锁

image-20200607170255909
image-20200607170255909

示例:

代码语言:javascript
复制
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;

public class ReenTrantLockDemo {
    static Lock lock = new ReentrantLock();

    private static int count =0;

    public synchronized static void incr() throws InterruptedException {
        Thread.sleep(1);
        lock.lock();;
        count++;
        lock.unlock();
    }
}

基于ReentrantLock 的AQS原理分析

调用流程图:

image-20200607182226826
image-20200607182226826
源码分析 ReentrantLock获得锁过程:
代码语言:javascript
复制
static final class NonfairSync extends Sync {
        private static final long serialVersionUID = 7316153563782823691L;

        /**
         * Performs lock.  Try immediate barge, backing up to normal
         * acquire on failure.
         */
        final void lock() {
            //加锁
            if (compareAndSetState(0, 1))
                //设置独占状态
                setExclusiveOwnerThread(Thread.currentThread());
            else
                acquire(1);
        }

        protected final boolean tryAcquire(int acquires) {
            return nonfairTryAcquire(acquires);
        }
    }

compareAndSetState 比较并替换 替换的是state的值

state=0表示无锁 state>0 表示有锁

AQS acquire方法

代码语言:javascript
复制
public final void acquire(int arg) {
    //EXCLUSIVE 代表独占的状态
    if (!tryAcquire(arg) &&
        acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
        selfInterrupt();
}

tryAcquire方法(尝试获取锁):

代码语言:javascript
复制
final boolean nonfairTryAcquire(int acquires) {
    final Thread current = Thread.currentThread();
    int c = getState();
    if (c == 0) {
        if (compareAndSetState(0, acquires)) {
            setExclusiveOwnerThread(current);
            return true;
        }
    }
    else if (current == getExclusiveOwnerThread()) {
    	//重入
        int nextc = c + acquires;
        if (nextc < 0) // overflow
            throw new Error("Maximum lock count exceeded");
        setState(nextc);
        return true;
    }
    return false;
}

addWaiter(Node.EXCLUSIVE), arg) :

代码语言:javascript
复制
//返回当前节点
private Node addWaiter(Node mode) {
    //把当前线程封装成node
    Node node = new Node(Thread.currentThread(), mode);
    // Try the fast path of enq; backup to full enq on failure
    Node pred = tail;
    if (pred != null) {
        node.prev = pred;
        if (compareAndSetTail(pred, node)) {
            pred.next = node;
            return node;
        }
    }
    //把当前节点加入队列中
    enq(node);
    return node;
}

enq方法:

代码语言:javascript
复制
private Node enq(final Node node) {
    //自旋方式
    for (;;) {
        Node t = tail;
        if (t == null) { // Must initialize
            //保证线程安全
            if (compareAndSetHead(new Node()))
                tail = head;
        } else {
            node.prev = t;
            if (compareAndSetTail(t, node)) {
                t.next = node;
                return t;
            }
        }
    }
}
image-20200607184531131
image-20200607184531131

acquireQueued方法:

代码语言:javascript
复制
final boolean acquireQueued(final Node node, int arg) {
    boolean failed = true;
    try {
        boolean interrupted = false;
        for (;;) {
            final Node p = node.predecessor();
            if (p == head && tryAcquire(arg)) {
                //获得锁成功
                setHead(node);
                p.next = null; // help GC
                failed = false;
                return interrupted;
            }
            // 获得锁失败判断是否挂起
            if (shouldParkAfterFailedAcquire(p, node) &&
                parkAndCheckInterrupt())
                interrupted = true;
        }
    } finally {
        if (failed)
            cancelAcquire(node);
    }
}

当前节点获得锁成功:

image-20200607184914153
image-20200607184914153
源码分析 ReentrantLock释放锁

unlock方法调用release

代码语言:javascript
复制
public final boolean release(int arg) {
    if (tryRelease(arg)) {
        Node h = head;
        if (h != null && h.waitStatus != 0)
            unparkSuccessor(h);
        return true;
    }
    return false;
}

tryRelease

代码语言:javascript
复制
protected final boolean tryRelease(int releases) {
    //每释放一次锁 减一
    int c = getState() - releases;
    if (Thread.currentThread() != getExclusiveOwnerThread())
        throw new IllegalMonitorStateException();
    boolean free = false;
    if (c == 0) {
        free = true;
        setExclusiveOwnerThread(null);
    }
    setState(c);
    return free;
}

unparkSuccessor方法:

代码语言:javascript
复制
private void unparkSuccessor(Node node) {
    /*
     * If status is negative (i.e., possibly needing signal) try
     * to clear in anticipation of signalling.  It is OK if this
     * fails or if status is changed by waiting thread.
     */
    int ws = node.waitStatus;
    if (ws < 0)
        compareAndSetWaitStatus(node, ws, 0);

    /*
     * Thread to unpark is held in successor, which is normally
     * just the next node.  But if cancelled or apparently null,
     * traverse backwards from tail to find the actual
     * non-cancelled successor.
     */
    Node s = node.next;
    if (s == null || s.waitStatus > 0) {
        s = null;
        for (Node t = tail; t != null && t != node; t = t.prev)
            if (t.waitStatus <= 0)
                s = t;
    }
    if (s != null)
    	// 唤醒下一个节点来竞争锁
        LockSupport.unpark(s.thread);
}
非公平锁和公平锁差别

ReentrantLock有公平锁和非公平锁 默认是非公平锁 公平锁就是排队等待获取锁 非公平锁就是进来就获取锁 获取不到再进行排队

NonfairSync非公平锁:

代码语言:javascript
复制
static final class NonfairSync extends Sync {
    private static final long serialVersionUID = 7316153563782823691L;

    /**
     * Performs lock.  Try immediate barge, backing up to normal
     * acquire on failure.
     */
    final void lock() {
        if (compareAndSetState(0, 1))
            setExclusiveOwnerThread(Thread.currentThread());
        else
            acquire(1);
    }

    protected final boolean tryAcquire(int acquires) {
        return nonfairTryAcquire(acquires);
    }
}

FairSync公平锁:

代码语言:javascript
复制
static final class FairSync extends Sync {
    private static final long serialVersionUID = -3000897897090466540L;

    final void lock() {
        acquire(1);
    }

    /**
     * Fair version of tryAcquire.  Don't grant access unless
     * recursive call or no waiters or is first.
     */
    protected final boolean tryAcquire(int acquires) {
        final Thread current = Thread.currentThread();
        int c = getState();
        if (c == 0) {
            if (!hasQueuedPredecessors() &&
                compareAndSetState(0, acquires)) {
                setExclusiveOwnerThread(current);
                return true;
            }
        }
        else if (current == getExclusiveOwnerThread()) {
            int nextc = c + acquires;
            if (nextc < 0)
                throw new Error("Maximum lock count exceeded");
            setState(nextc);
            return true;
        }
        return false;
    }
}

ReentrantReadWriteLock 读写锁

代码语言:javascript
复制
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantReadWriteLock;

/**
 * 读写锁
 */
public class RWLockDemo {

    static Map<String,Object> cacheMap = new HashMap<>();

    static ReentrantReadWriteLock readWriteLock = new ReentrantReadWriteLock();

    static Lock read = readWriteLock.readLock();
    static Lock write = readWriteLock.writeLock();

    /**
     * 读操作
     * @param key
     * @return
     */
    public static  final  Object get(String key) {
        read.lock();
        try {
            return cacheMap.get(key);
        }finally {
            read.unlock();
        }
    }

    public static  final  Object set(String key,String value) {
        write.lock();
        try {
            return cacheMap.put(key,value);
        } finally {
            write.unlock();
        }
    }





}

Condition

相当于jdk层面的wat notify

让某个线程在满足某个条件下才能被唤醒

ThreadWait:

代码语言:javascript
复制
package condition;

import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;

/**
 * Condition await()
 */
public class ThreadWait extends Thread{
    private Lock lock;
    private Condition condition;
    public ThreadWait(Lock lock,Condition condition) {
        this.lock = lock;
        this.condition = condition;
    }

    public void run() {
            try {
                lock.lock();
                System.out.println("开始执行 thread wait");
                condition.await();
            } catch (InterruptedException e) {
                e.printStackTrace();
                System.out.println("执行结束 thread wait");
            }finally {
                lock.unlock();
            }

    }
}

ThreadNotify:

代码语言:javascript
复制
package condition;

import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;

/**
 * Condition signal()
 */
public class ThreadNotify extends Thread{
    private Lock lock;
    private Condition condition;
    public ThreadNotify(Lock lock,Condition condition) {
        this.lock = lock;
        this.condition = condition;
    }

    public void run() {
            try {
                lock.lock();
                System.out.println("开始执行 thread notify");
                condition.signal();
                System.out.println("执行结束 thread notify");
            } catch (Exception e) {
                e.printStackTrace();
            }
            finally {
                lock.unlock();
            }
    }
}
代码语言:javascript
复制
package condition;


import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;

public class Demo {
    public static void main(String[] args) {
        Lock lock = new ReentrantLock();
        Condition condition = lock.newCondition();
        //两个线程使用同一个线程
        ThreadWait threadWait = new ThreadWait(lock,condition);
        threadWait.start();
        ThreadNotify threadNotify = new ThreadNotify(lock,condition);
        threadNotify.start();
    }
}

运行结果:

代码语言:javascript
复制
开始执行 thread wait
开始执行 thread notify
执行结束 thread notify

源码分析

await方法:

代码语言:javascript
复制
public final void await() throws InterruptedException {
    if (Thread.interrupted())
        throw new InterruptedException();
    //添加队列
    Node node = addConditionWaiter();
    //释放所有锁 重入锁重入次数变成0
    int savedState = fullyRelease(node);
    int interruptMode = 0;
    while (!isOnSyncQueue(node)) {
        //让当前线程阻塞
        LockSupport.park(this);
        if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
            break;
    }
    if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
        interruptMode = REINTERRUPT;
    if (node.nextWaiter != null) // clean up if cancelled
        unlinkCancelledWaiters();
    if (interruptMode != 0)
        reportInterruptAfterWait(interruptMode);
}

signal方法:

代码语言:javascript
复制
public final void signal() {
    if (!isHeldExclusively())
        throw new IllegalMonitorStateException();
    Node first = firstWaiter;
    if (first != null)
        doSignal(first);
}

doSignal

代码语言:javascript
复制
private void doSignal(Node first) {
    do {
        if ( (firstWaiter = first.nextWaiter) == null)
            lastWaiter = null;
        first.nextWaiter = null;
    } while (!transferForSignal(first) &&
             (first = firstWaiter) != null);
}

transferForSignal

代码语言:javascript
复制
final boolean transferForSignal(Node node) {
    /*
     * If cannot change waitStatus, the node has been cancelled.
     */
    if (!compareAndSetWaitStatus(node, Node.CONDITION, 0))
        return false;

    /*
     * Splice onto queue and try to set waitStatus of predecessor to
     * indicate that thread is (probably) waiting. If cancelled or
     * attempt to set waitStatus fails, wake up to resync (in which
     * case the waitStatus can be transiently and harmlessly wrong).
     */
    //加入AQS队列
    Node p = enq(node);
    int ws = p.waitStatus;
    if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL))
        LockSupport.unpark(node.thread);
    return true;
}
image-20200607205406826
image-20200607205406826

AQS队列 存放争抢锁的线程

Condition队列 存放挂起的线程

CountdownLatch

倒计时到0时候才执行 countDownLatch.await(); 后面的代码

代码语言:javascript
复制
import java.util.concurrent.CountDownLatch;


public class CountDownLatchDemo {



    public static void main(String[] args) throws InterruptedException {
        CountDownLatch countDownLatch=new CountDownLatch(3);
        new Thread(()->{
            try {
                Thread.sleep(2000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            countDownLatch.countDown();  //递减

        }).start();

        new Thread(()->{
            try {
                Thread.sleep(2000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            countDownLatch.countDown();

        }).start();

        new Thread(()->{
            countDownLatch.countDown();
            try {
                Thread.sleep(2000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }).start();

        countDownLatch.await(); //阻塞
        System.out.println("执行完毕 ");
    }
}
image-20200607210524213
image-20200607210524213

源码分析

countDownLatch.await();
代码语言:javascript
复制
public void await() throws InterruptedException {
    sync.acquireSharedInterruptibly(1);
}

acquireSharedInterruptibly方法

代码语言:javascript
复制
public final void acquireSharedInterruptibly(int arg)
        throws InterruptedException {
    if (Thread.interrupted())
        throw new InterruptedException();
    //尝试获取共享锁
    if (tryAcquireShared(arg) < 0)
        doAcquireSharedInterruptibly(arg);
}

tryAcquireShared

代码语言:javascript
复制
protected int tryAcquireShared(int acquires) {
    return (getState() == 0) ? 1 : -1;
}

getState==0 表示不需要阻塞

doAcquireSharedInterruptibly方法:

代码语言:javascript
复制
private void doAcquireSharedInterruptibly(int arg)
        throws InterruptedException {
        final Node node = addWaiter(Node.SHARED);
        boolean failed = true;
        try {
            for (;;) {
                final Node p = node.predecessor();
                if (p == head) {
                    int r = tryAcquireShared(arg);
                    //计数器变成0
                    if (r >= 0) {
                        setHeadAndPropagate(node, r);
                        p.next = null; // help GC
                        failed = false;
                        return;
                    }
                }
                //是否要挂起线程
                if (shouldParkAfterFailedAcquire(p, node) &&
                    parkAndCheckInterrupt())
                    throw new InterruptedException();
            }
        } finally {
            if (failed)
                cancelAcquire(node);
        }
    }
countDownLatch.countDown();
代码语言:javascript
复制
protected boolean tryReleaseShared(int releases) {
    // Decrement count; signal when transition to zero
    for (;;) {
        int c = getState();
        if (c == 0)
            return false;
        int nextc = c-1;
       	//cas 递减操作
        if (compareAndSetState(c, nextc))
            return nextc == 0;
    }
}

Semaphore

应用场景:限流

代码语言:javascript
复制
import java.util.concurrent.Semaphore;

/**
 */
public class SemaphoreDemo {

    public static void main(String[] args) {
        Semaphore semaphore=new Semaphore(5);
        for(int i=0;i<10;i++){
            new DoAnything(i,semaphore).start();
        }

    }
    static class DoAnything extends Thread{
        private int num;
        private Semaphore semaphore;

        public DoAnything(int num, Semaphore semaphore) {
            this.num = num;
            this.semaphore = semaphore;
        }

        @Override
        public void run() {
            try {
                semaphore.acquire(); // 获取一个令牌
                System.out.println("第"+num+"个线程进入");
                Thread.sleep(2000);
                semaphore.release();//释放令牌
                System.out.println("第"+num+"释放令牌");
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }
}

原子操作 Atomic

12个

  1. 基本类型
  2. 数组类型
  3. 引用类型
  4. 属性类型(字段类型)

线程池原理分析

避免线程的重复创建 限流

image-20200607215826997
image-20200607215826997
代码语言:javascript
复制
public ThreadPoolExecutor(int corePoolSize,//核心线程数
                          int maximumPoolSize,//最大线程数
                          long keepAliveTime,//超时时间 超出核心线程数量以外的线程的空余线程的存活时间
                          TimeUnit unit,// 存活时间的单位
                          BlockingQueue<Runnable> workQueue,//阻塞队列
                          RejectedExecutionHandler handler) {//拒绝策略
    this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
         Executors.defaultThreadFactory(), handler);
}

Executors工厂

newFixedThreadPool: 创建一个固定的线程池

代码语言:javascript
复制
public static ExecutorService newFixedThreadPool(int nThreads) {
    return new ThreadPoolExecutor(nThreads, nThreads,
                                  0L, TimeUnit.MILLISECONDS,
                                  new LinkedBlockingQueue<Runnable>());
}

newSingleThreadExecutor : 创建只有一个线程的线程池

代码语言:javascript
复制
public static ExecutorService newSingleThreadExecutor() {
    return new FinalizableDelegatedExecutorService
        (new ThreadPoolExecutor(1, 1,
                                0L, TimeUnit.MILLISECONDS,
                                new LinkedBlockingQueue<Runnable>()));
}

newCachedThreadPool :不限制最大线程数

代码语言:javascript
复制
public static ExecutorService newCachedThreadPool(ThreadFactory threadFactory) {
    return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                                  60L, TimeUnit.SECONDS,
                                  new SynchronousQueue<Runnable>(),//不存数据的队列
                                  threadFactory);
}

newScheduledThreadPool :定时器 延时执行的线程池

代码语言:javascript
复制
public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) {
    return new ScheduledThreadPoolExecutor(corePoolSize);
}

线程池流程图:

image-20200607223216305
image-20200607223216305

ThreadPoolExecutor类的execute方法

代码语言:javascript
复制
public void execute(Runnable command) {
    if (command == null)
        throw new NullPointerException();
    /*
     * Proceed in 3 steps:
     *
     * 1. If fewer than corePoolSize threads are running, try to
     * start a new thread with the given command as its first
     * task.  The call to addWorker atomically checks runState and
     * workerCount, and so prevents false alarms that would add
     * threads when it shouldn't, by returning false.
     *
     * 2. If a task can be successfully queued, then we still need
     * to double-check whether we should have added a thread
     * (because existing ones died since last checking) or that
     * the pool shut down since entry into this method. So we
     * recheck state and if necessary roll back the enqueuing if
     * stopped, or start a new thread if there are none.
     *
     * 3. If we cannot queue task, then we try to add a new
     * thread.  If it fails, we know we are shut down or saturated
     * and so reject the task.
     */
    int c = ctl.get();
    if (workerCountOf(c) < corePoolSize) {
        //创建核心线程
        if (addWorker(command, true))
            return;
        c = ctl.get();
    }
    if (isRunning(c) && workQueue.offer(command)) {
        int recheck = ctl.get();
        if (! isRunning(recheck) && remove(command))
            reject(command);
        else if (workerCountOf(recheck) == 0)
            addWorker(null, false);
    }
    else if (!addWorker(command, false))
        reject(command);
}
代码语言:javascript
复制
private boolean addWorker(Runnable firstTask, boolean core) {
    retry:
    for (;;) {
        int c = ctl.get();
        int rs = runStateOf(c);

        // Check if queue empty only if necessary.
        if (rs >= SHUTDOWN &&
            ! (rs == SHUTDOWN &&
               firstTask == null &&
               ! workQueue.isEmpty()))
            return false;

        for (;;) {
            int wc = workerCountOf(c);
            if (wc >= CAPACITY ||
                wc >= (core ? corePoolSize : maximumPoolSize))
                return false;
            if (compareAndIncrementWorkerCount(c))
                break retry;
            c = ctl.get();  // Re-read ctl
            if (runStateOf(c) != rs)
                continue retry;
            // else CAS failed due to workerCount change; retry inner loop
        }
    }

    boolean workerStarted = false;
    boolean workerAdded = false;
    Worker w = null;
    try {
        w = new Worker(firstTask);
        final Thread t = w.thread;
        if (t != null) {
            final ReentrantLock mainLock = this.mainLock;
            mainLock.lock();
            try {
                // Recheck while holding lock.
                // Back out on ThreadFactory failure or if
                // shut down before lock acquired.
                int rs = runStateOf(ctl.get());

                if (rs < SHUTDOWN ||
                    (rs == SHUTDOWN && firstTask == null)) {
                    if (t.isAlive()) // precheck that t is startable
                        throw new IllegalThreadStateException();
                    workers.add(w);
                    int s = workers.size();
                    if (s > largestPoolSize)
                        largestPoolSize = s;
                    workerAdded = true;
                }
            } finally {
                mainLock.unlock();
            }
            if (workerAdded) {
                //启动线程
                t.start();
                workerStarted = true;
            }
        }
    } finally {
        if (! workerStarted)
            addWorkerFailed(w);
    }
    return workerStarted;
}

t.start(); 实现runable接口 调用 runWorker:

代码语言:javascript
复制
final void runWorker(Worker w) {
    Thread wt = Thread.currentThread();
    Runnable task = w.firstTask;
    w.firstTask = null;
    w.unlock(); // allow interrupts
    boolean completedAbruptly = true;
    try {
        //getTask 从队列中取
        while (task != null || (task = getTask()) != null) {
        	//独占锁  线程正在运行不应该被阻断
            w.lock();
            // If pool is stopping, ensure thread is interrupted;
            // if not, ensure thread is not interrupted.  This
            // requires a recheck in second case to deal with
            // shutdownNow race while clearing interrupt
            if ((runStateAtLeast(ctl.get(), STOP) ||
                 (Thread.interrupted() &&
                  runStateAtLeast(ctl.get(), STOP))) &&
                !wt.isInterrupted())
                wt.interrupt();
            try {
                beforeExecute(wt, task);
                Throwable thrown = null;
                try {
                    task.run();
                } catch (RuntimeException x) {
                    thrown = x; throw x;
                } catch (Error x) {
                    thrown = x; throw x;
                } catch (Throwable x) {
                    thrown = x; throw new Error(x);
                } finally {
                    afterExecute(task, thrown);
                }
            } finally {
                task = null;
                w.completedTasks++;
                w.unlock();
            }
        }
        completedAbruptly = false;
    } finally {
        processWorkerExit(w, completedAbruptly);
    }
}

线程池的监控

代码语言:javascript
复制
import java.util.Date;
import java.util.concurrent.*;

/**
 * 线程池监控
 */
public class MyExecutors extends ThreadPoolExecutor {
    //beforeExecutor、afterExecutor、shutdown

    private ConcurrentMap<String,Date> startTime;


    public MyExecutors(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue) {
        super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue);
        this.startTime=new ConcurrentHashMap<>();
    }

    public MyExecutors(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory) {
        super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory);
    }

    public MyExecutors(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, RejectedExecutionHandler handler) {
        super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, handler);
    }

    public MyExecutors(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler) {
        super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory, handler);
    }

    @Override
    public void shutdown() {
        System.out.print("已经执行的任务数量:"+this.getCompletedTaskCount()+"\n");
        System.out.print("当前的活动线程数:"+this.getActiveCount()+"\n");
        System.out.print("当前排队的线程数:"+this.getQueue().size()+"\n");
        super.shutdown();
    }

    @Override
    protected void beforeExecute(Thread t, Runnable r) {
        startTime.put(String.valueOf(r.hashCode()),new Date());
        super.beforeExecute(t, r);
    }

    @Override
    protected void afterExecute(Runnable r, Throwable t) {
        //只用一次
        Date statDate=startTime.remove(String.valueOf(r.hashCode()));
        Date finishDate=new Date();
        long dif=finishDate.getTime()-statDate.getTime(); //执行间隔时间
        System.out.println("任务耗时:"+dif);
        System.out.println("最大允许的线程数:"+this.getMaximumPoolSize());
        System.out.println("线程的空闲时间"+this.getKeepAliveTime(TimeUnit.MILLISECONDS));
        System.out.println("任务总数:"+this.getTaskCount());
        super.afterExecute(r, t);
    }

    public static ExecutorService newMyExecutors(){
        return new MyExecutors(0, Integer.MAX_VALUE,
                60L, TimeUnit.SECONDS,
                new SynchronousQueue<Runnable>());
    }
}

ThreadPoolDemo

代码语言:javascript
复制
import java.util.concurrent.ExecutorService;
import java.util.concurrent.ThreadPoolExecutor;

/**
 * 线程池监控测试
 */
public class ThreadPoolDemo implements Runnable{

    static ExecutorService executorService=MyExecutors.newMyExecutors();

    @Override
    public void run() {
        try {
            Thread.sleep(1000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        System.out.println(Thread.currentThread().getName());
    }

    public static void main(String[] args) {
        ThreadPoolExecutor tpe=(ThreadPoolExecutor)executorService;
        for(int i=0;i<100;i++){
            executorService.execute(new ThreadPoolDemo());
        }
        executorService.shutdown();
    }
}
本文参与 腾讯云自媒体同步曝光计划,分享自作者个人站点/博客。
原始发表:2020-06-07,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 作者个人站点/博客 前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 多线程
  • 线程的应用
  • 并发编程的基础
    • 线程状态
      • 线程的启动和终止
      • 线程安全问题
        • 可见性 原子性 有序性
          • cpu高速缓存
          • JMM内存模型
          • JMM如何解决原子性 可见性 有序性的问题
          • volatile
          • synchronized 原理分析
          • wait和notify
          • lock
            • ReentrantLock 重入锁
              • 源码分析 ReentrantLock获得锁过程:
              • 源码分析 ReentrantLock释放锁
              • 非公平锁和公平锁差别
            • ReentrantReadWriteLock 读写锁
            • Condition
              • 源码分析
              • CountdownLatch
                • 源码分析
                  • countDownLatch.await();
                  • countDownLatch.countDown();
              • Semaphore
              • 原子操作 Atomic
              • 线程池原理分析
              • 线程池的监控
              领券
              问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档