前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >多线程之并发工具类

多线程之并发工具类

作者头像
OPice
发布2019-10-23 17:51:07
3040
发布2019-10-23 17:51:07
举报
文章被收录于专栏:D·技术专栏D·技术专栏

CountDownLatch

在开发过程中经常会碰到一个任务需要开启多个线程,然后将多个线程的执行结果汇总。比如说查询全量数据,考虑数据量的问题,我们基本上会做分页,这时候就需要多次循环调用。CountDownLatch这个类使一个线程等待其他线程各自执行完毕后再执行。 执行原理 CountDownLatch内部实现了AQS,初始化CountDownLatch的时候,会调用Sync的构造方法将count赋值给state变量。多个线程调用countDown的时候,是使用CAS递减state的值;调用await方法的线程会被放在AQS阻塞队列中,等待计数器为0时,唤醒该线程。 核心方法

代码语言:javascript
复制
//构造方法,初始化计数器为count
 public CountDownLatch(int count) {
        if (count < 0) throw new IllegalArgumentException("count < 0");
        //Sync实现了AQS
        this.sync = new Sync(count);
    }
//调用await()方法的线程会被挂起,它会等待直到count值为0才继续执行
public void await() throws InterruptedException {
        sync.acquireSharedInterruptibly(1);
    }
//挂起timeout,超过这个时间继续执行
 public boolean await(long timeout, TimeUnit unit)
        throws InterruptedException {
        return sync.tryAcquireSharedNanos(1, unit.toNanos(timeout));
    }
//将count -1
public void countDown() {
        sync.releaseShared(1);
    }
//获取当前count值
 public long getCount() {
        return sync.getCount();
    }

走个栗子

代码语言:javascript
复制
@Slf4j
public class CountDownLatchTest implements Runnable {

    //假设 10000条数据,每页100条需要,100次请求
    static final CountDownLatch countDownLatch = new CountDownLatch(100);

    private Integer i;

    public CountDownLatchTest(Integer i) {
        this.i = i;
    }

    @Override
    public void run() {
        try {
            //请求服务查询数据
            log.info("查到数据:{}-{}", i * 100, (i + 1) * 100);
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            countDownLatch.countDown();
        }
    }

    public static void main(String[] args) {
        //接口支持的最大并发 = corePoolSize, 总任务量 = maximumSize
        ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(10, 100, 60L, TimeUnit.SECONDS, new ArrayBlockingQueue<>(100));

        for (int i = 0; i < 10; i++) {
            threadPoolExecutor.submit(new CountDownLatchTest(i));
        }
        countDownLatch.await();
    }
}

CyclicBarrier

等待所有线程达到一个屏障时在执行. 原理 CyclicBarrier是由ReentrantLock可重入锁和Condition共同实现的。 核心方法

代码语言:javascript
复制
    /** The lock for guarding barrier entry */
    private final ReentrantLock lock = new ReentrantLock();
    /** Condition to wait on until tripped */
    private final Condition trip = lock.newCondition();
    /** The number of parties */
    private final int parties;
    /* The command to run when tripped */
    private final Runnable barrierCommand;
    /** The current generation */
    private Generation generation = new Generation();
  
//定义一个CyclicBarrier,parties 代表拦截的线程数量
 public CyclicBarrier(int parties) {
        this(parties, null);
    }

//定义一个CyclicBarrier,parties 代表拦截的线程数量,由最后一个线程执行barrierAction
public CyclicBarrier(int parties, Runnable barrierAction) {
        if (parties <= 0) throw new IllegalArgumentException();
        this.parties = parties;
        this.count = parties;
        this.barrierCommand = barrierAction;
    }

//调用
public int await() throws InterruptedException, BrokenBarrierException {
        try {
            return dowait(false, 0L);
        } catch (TimeoutException toe) {
            throw new Error(toe); // cannot happen
        }
    }

public int await(long timeout, TimeUnit unit)
        throws InterruptedException,
               BrokenBarrierException,
               TimeoutException {
        return dowait(true, unit.toNanos(timeout));
    }

//返回当前在屏障处等待的设备
public int getNumberWaiting() {
        final ReentrantLock lock = this.lock;
        lock.lock();
        try {
            return parties - count;
        } finally {
            lock.unlock();
        }
    }

//屏障是否可用
public boolean isBroken() {
        final ReentrantLock lock = this.lock;
        lock.lock();
        try {
            return generation.broken;
        } finally {
            lock.unlock();
        }
    }
//返回参与屏障的数量
public int getParties() {
        return parties;
    }

跑个栗子

代码语言:javascript
复制
@Slf4j
public class CyclicBarrierTest implements Runnable {
    private CyclicBarrier barrier;

    private String name;
    private Long time;

    public CyclicBarrierTest(CyclicBarrier barrier, String name, Long time) {
        this.barrier = barrier;
        this.name = name;
        this.time = time;
    }

    @Override
    public void run() {
        try {
            Thread.sleep(time * 1000);
            System.out.println(name + "到了");
            barrier.await();
        } catch (InterruptedException e) {
            e.printStackTrace();
        } catch (BrokenBarrierException e) {
            e.printStackTrace();
        }
    }
}

@Slf4j
public class MainThread implements Runnable {
    @Override
    public void run() {
        System.out.println("人到齐了,开饭.....");
    }

    public static void main(String[] args) {
        CyclicBarrier cyclicBarrier = new CyclicBarrier(3, new MainThread());
        ExecutorService executorService = Executors.newFixedThreadPool(3);
        executorService.execute(new CyclicBarrierTest(cyclicBarrier,"小花",2L));
        executorService.execute(new CyclicBarrierTest(cyclicBarrier,"小红",1L));
        executorService.execute(new CyclicBarrierTest(cyclicBarrier,"小明",5L));

    }
}

cyclicbarriertest.gif

Semaphore

Semaphore 信号量,控制并发时线程的数量。

跑个栗子

代码语言:javascript
复制
//健身房有好几个跑步机
public class FitnessRoom {

    class TreadMill {
        private Integer num;

        public Integer getNum() {
            return num;
        }

        public void setNum(Integer num) {
            this.num = num;
        }

        public TreadMill(Integer num) {
            this.num = num;
        }
    }

    private TreadMill[] treadMills = new TreadMill[]{new TreadMill(1), new TreadMill(2), new TreadMill(3), new TreadMill(4)};
    private boolean[] use = new boolean[4];

    Semaphore semaphore = new Semaphore(4, true);

    //获取一个跑步机
    public TreadMill get() throws InterruptedException {
        semaphore.acquire(1);
        return getAvailable();
    }

    // 遍历找一个没人的跑步机
    public TreadMill getAvailable() {
        for (int i = 0; i < use.length; i++) {
            if (!use[i]) {
                use[i] = true;
                return treadMills[i];
            }
        }
        return null;
    }

    /**
     * 释放跑步机
     * @param treadMill
     */
    public void release(TreadMill treadMill) {
        for (int i = 0; i < use.length; i++) {
            if (treadMills[i] == treadMill) {
                if (use[i]) {
                    use[i] = false;
                }
            }

        }
    }
}

public class MainThread implements Runnable {

    private String name;
    private FitnessRoom fitnessRoom;

    public MainThread(String name, FitnessRoom fitnessRoom) {
        this.name = name;
        this.fitnessRoom = fitnessRoom;
    }

    @Override
    public void run() {
        try {
            FitnessRoom.TreadMill treadMill = fitnessRoom.get();
            if (null != treadMill) {
                System.out.println(name + "在" + treadMill.getNum() + "号跑步机上跑步");
                TimeUnit.SECONDS.sleep(2);
                System.out.println(name + "跑完了");
                //跑步机腾出来了
                fitnessRoom.release(treadMill);
            }
        } catch (InterruptedException e) {
            e.printStackTrace();
        } finally {
        }
    }

    public static void main(String[] args) {
        ExecutorService executorService = Executors.newFixedThreadPool(5);
        FitnessRoom fitnessRoom = new FitnessRoom();
        for (int i = 0; i < 20; i++) {
            executorService.execute(new MainThread(i + "", fitnessRoom));
        }
    }
}

semaphore.gif

Exchanger

两个线程之间交互数据工具类

原子操作类

AtomicBoolean AtomicInteger AtomicLong AtomicIntegerArray AtomicLongArray AtomicReferenceArray AtomicReference AtomicReferenceFieldUpdater AtomicMarkableReference AtomicIntegerFieldUpdater AtomicLongFieldUpdater AtomicStampedFieldUpdater AtomicReferenceFieldUpdater

本文参与 腾讯云自媒体分享计划,分享自作者个人站点/博客。
原始发表:2019.09.23 ,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 作者个人站点/博客 前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体分享计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • CountDownLatch
  • CyclicBarrier
  • Semaphore
  • Exchanger
  • 原子操作类
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档