前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >java并发编程(五)

java并发编程(五)

作者头像
疯狂的KK
发布2020-03-25 11:16:05
2730
发布2020-03-25 11:16:05
举报
文章被收录于专栏:Java项目实战Java项目实战
30:ArrayBlockingQueue,LinkedBlockingQueue,SynchronousQueue

PriorityBlockingQueue,DelayQueue

阻塞队列:在某些情况下,会挂起线程,一旦条件满足,被挂起的线程会自动唤醒。而阻塞队列无需关心什么时候阻塞,什么时候唤醒。

ArrayBlockingQueue:由数组结构组成的有界阻塞队列

LinkedBlockingQueue:由链表结构组成的有界阻塞队列

SynchronousQueue:不存储元素的阻塞队列,即单个元素队列

PriorityBlockingQueue:支持优先级排序的无界阻塞队列

DelayQueue:使用优先级队列实现的延迟无界阻塞队列

阻塞队列的通用Api

抛异常 特殊值 阻塞 超时

插入add(e) offer(e) put(e) offer(e,time,unit)

移除 remove() poll() take() poll(time,unit)

检查 element() peek() X X

demo

public class BlockingQueueDemo {

    public static void main(String[] args) {
        BlockingQueue<String> blockingQueue = new SynchronousQueue<>();
        new Thread(() -> {
            try {
                System.out.println(Thread.currentThread().getName() + "\t put 1");
                blockingQueue.put("1");
                System.out.println(Thread.currentThread().getName() + "\t put 2");
                blockingQueue.put("2");
                System.out.println(Thread.currentThread().getName() + "\t put 3");
                blockingQueue.put("3");
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }, "A").start();
        new Thread(() -> {
            try {
                TimeUnit.SECONDS.sleep(5);
                System.out.println(Thread.currentThread().getName()+"\t get"+blockingQueue.take());
                TimeUnit.SECONDS.sleep(5);
                System.out.println(Thread.currentThread().getName()+"\t get"+blockingQueue.take());
                TimeUnit.SECONDS.sleep(5);
                System.out.println(Thread.currentThread().getName()+"\t get"+blockingQueue.take());
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }, "B").start();

    }


}

SynchronousQueue:不存储元素的阻塞队列,即单个元素队列,每次put进一个元素,另一个线程5秒后取值,每次take到的都是put进的值

Connected to the target VM, address: '127.0.0.1:53549', transport: 'socket'
A   put 1
B   get1
A   put 2
B   get2
A   put 3
B   get3
Disconnected from the target VM, address: '127.0.0.1:53549', transport: 'socket'

当队列满时,你还add,就会抛异常,队列满了

当队列空时,你还remove,就会抛异常,没有元素给你

//ArrayBlockingQueue 容量仅为2 放入3个 B能取到吗?能取到几个?

public class BlockingQueueDemo {

    public static void main(String[] args) {
        //BlockingQueue<String> blockingQueue = new SynchronousQueue<>();
      
        ArrayBlockingQueue<String> blockingQueue = new ArrayBlockingQueue<String>(2);
        new Thread(() -> {

            try {
                System.out.println(Thread.currentThread().getName() + "\t put 1");
                blockingQueue.add("1");
                System.out.println(Thread.currentThread().getName() + "\t put 2");
                blockingQueue.add("2");
                System.out.println(Thread.currentThread().getName() + "\t put 3");
                blockingQueue.add("3");
            } catch (Exception e) {
                e.printStackTrace();
            }

        }, "A").start();
        new Thread(() -> {
            try {
                TimeUnit.SECONDS.sleep(5);
                System.out.println(Thread.currentThread().getName()+"\t get"+blockingQueue.poll());
                TimeUnit.SECONDS.sleep(5);
                System.out.println(Thread.currentThread().getName()+"\t get"+blockingQueue.poll());
                TimeUnit.SECONDS.sleep(5);
                System.out.println(Thread.currentThread().getName()+"\t get"+blockingQueue.poll());
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }, "B").start();

    }
}
A   put 1
A   put 2
A   put 3
java.lang.IllegalStateException: Queue full
  at java.util.AbstractQueue.add(AbstractQueue.java:98)
  at java.util.concurrent.ArrayBlockingQueue.add(ArrayBlockingQueue.java:312)
  at com.kk.thread.BlockingQueueDemo.lambda$main$0(BlockingQueueDemo.java:25)
  at java.lang.Thread.run(Thread.java:748)
B   get1
B   get2
B   getnull

问题:那么当ArrayBlockingQueue用put放入元素时会报full queu 异常吗?为什么?

将Add换成put,虽然限制容量为2,但是继续放元素,队列会阻塞直到put数据,B仍能取到所有元素

看下ArraBlockingQueue的put源码

/**
     * Inserts the specified element at the tail of this queue, waiting
     * for space to become available if the queue is full.
     *在队列尾部插入指定元素,等待队列已满时空间变为可用
     * @throws InterruptedException {@inheritDoc}
     * @throws NullPointerException {@inheritDoc}
     */
    public void put(E e) throws InterruptedException {
        checkNotNull(e);//检查是否元素为null
        final ReentrantLock lock = this.lock;//可重入锁
        lock.lockInterruptibly();//实际判断的是Thread.interrupted()
        try {
            while (count == items.length)队列元素满了就等待
                notFull.await();
            enqueue(e);否则进入队列,调用singal方法通知线程
        } finally {
            lock.unlock();
        }
    }

放入1

检查线程是否中断

设置线程状态

放入队列

此时调用unlock waitstatus=-1

而unlock最终会unpark当前线程

如果线程被阻塞*{@code park}然后它将解除锁定。保证下一个调用

到{@code park}不会阻塞

也就是说put方法在队列满了以后会阻塞,等待没满的唤醒

31:CountDownLatch,CyclicBarrier,Semaphore

CountDownLatch:让一些线程阻塞直到另一些线程完成一系列操作后才被唤醒

public class CountDownLatchDemo
{
  public static void main(String[] args) throws InterruptedException
  {
    
    CountDownLatch latch = new CountDownLatch(6);
    for (int i = 1; i <= 6; i++) {
      new Thread(() -> {
        System.out.println(Thread.currentThread().getName()+"\t 车次离开");
        latch.countDown();
      }, String.valueOf(i)).start();
    }
    latch.await();
    System.out.println(Thread.currentThread().getName()+"\t 锁定");
  }

}
1   车次离开
2   车次离开
3   车次离开
4   车次离开
5   车次离开
6   车次离开
main   锁定

CyclicBarrier:让一组线程到达一个屏障(也可以叫同步点)时被阻塞,

直到最后一个线程到达屏障时,屏障才会开门,所有被屏障拦截的线程才

会继续干活。

public class CyclicBarrierDemo
{
  private static final int NUMBER = 7;
  
  public static void main(String[] args)
  {
    //CyclicBarrier(int parties, Runnable barrierAction) 
    CyclicBarrier cb = new CyclicBarrier(NUMBER, new Runnable() {
      @Override
      public void run() {
        System.out.println("车次"+NUMBER+"已满");
      }
    });
    
    for (int i = 1; i <=7; i++) {
      new Thread(() -> {
        try {
          System.out.println(Thread.currentThread().getName()+"\t 车次到达");
          cb.await();
        } catch (InterruptedException | BrokenBarrierException e) {
          // TODO Auto-generated catch block
          e.printStackTrace();
        }
      }, String.valueOf(i)).start();
    }
  }
1   车次到达
2   车次到达
3   车次到达
4   车次到达
5   车次到达
6   车次到达
7   车次到达
车次7已满

Semaphore:一个是用于多个共享资源的互斥使用,另一个用于并发线程数的控制,互斥,当一个线程使用资源时获取信号量,互斥线程需要等待,线程释放,将信号量加1,唤醒互斥资源

public class SemaphoreDemo
{
  public static void main(String[] args)
  {
    //3个停车位
    Semaphore semaphore = new Semaphore(3);
    //模拟6部汽车
    for (int i = 1; i <= 6; i++) {
      new Thread(() -> {
        
        try {
          semaphore.acquire();
          System.out.println(Thread.currentThread().getName()+"\t 号停入停车位");
          TimeUnit.SECONDS.sleep(3);
          semaphore.release();
          System.out.println(Thread.currentThread().getName()+"\t 号离开停车位");
        } catch (InterruptedException e) {
          // TODO Auto-generated catch block
          e.printStackTrace();
        }

      }, String.valueOf(i)).start();
    }
    
  }
}

1 号停入停车位

2 号停入停车位

3 号停入停车位

4 号停入停车位

2 号离开停车位

3 号离开停车位

5 号停入停车位

6 号停入停车位

1 号离开停车位

4 号离开停车位

5 号离开停车位

6 号离开停车位

32:Exchanger交换器

一个同步点,在这个点上线程可以成对地交换元素。每个线程在{@link#exchange}方法的条目上显示一些对象,与伙伴线程匹配,并在返回时接收其伙伴的对象。交换器可以看作{@link SynchronousQueue}的双向形式。Exchangers may be useful in applications such as genetic algorithms and pipeline designs,应用于遗传算法和管道设计

A synchronization point at which threads can pair and swap elements
 * within pairs.  Each thread presents some object on entry to the
 * {@link #exchange exchange} method, matches with a partner thread,
 * and receives its partner's object on return.  An Exchanger may be
 * viewed as a bidirectional form of a {@link SynchronousQueue}.
 * Exchangers may be useful in applications such as genetic algorithms
 * and pipeline designs.

用于两个线程之间的数据交换

public class ExchangerDemo {
    public static void main(String[] args) {
        ExecutorService executor = Executors.newCachedThreadPool();

        final Exchanger exchanger = new Exchanger();
        executor.execute(()->{
            String data1="int 1";
            exchangeDate(data1, exchanger);
        });
        executor.execute(()->{
            String data1="int 2";
            exchangeDate(data1, exchanger);
        });
        executor.shutdown();
    }

    private static void exchangeDate(String data1, Exchanger exchanger) {
        try {
            System.out.println(Thread.currentThread().getName() + "交换数据" + data1 + " 交换");
            Thread.sleep(5L);
            String data2 = (String) exchanger.exchange(data1);
            System.out.println(Thread.currentThread().getName() + "交易得到" + data2);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}
pool-1-thread-1交换数据int 1 交换
pool-1-thread-2交换数据int 2 交换
pool-1-thread-1交易得到int 2
pool-1-thread-2交易得到int 1
本文参与 腾讯云自媒体分享计划,分享自微信公众号。
原始发表:2020-03-18,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 赵KK日常技术记录 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体分享计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档