java并发编程(五)

30:ArrayBlockingQueue,LinkedBlockingQueue,SynchronousQueue

PriorityBlockingQueue,DelayQueue

阻塞队列:在某些情况下,会挂起线程,一旦条件满足,被挂起的线程会自动唤醒。而阻塞队列无需关心什么时候阻塞,什么时候唤醒。

ArrayBlockingQueue:由数组结构组成的有界阻塞队列

LinkedBlockingQueue:由链表结构组成的有界阻塞队列

SynchronousQueue:不存储元素的阻塞队列,即单个元素队列

PriorityBlockingQueue:支持优先级排序的无界阻塞队列

DelayQueue:使用优先级队列实现的延迟无界阻塞队列

阻塞队列的通用Api

抛异常 特殊值 阻塞 超时

插入add(e) offer(e) put(e) offer(e,time,unit)

移除 remove() poll() take() poll(time,unit)

检查 element() peek() X X

demo

public class BlockingQueueDemo {

    public static void main(String[] args) {
        BlockingQueue<String> blockingQueue = new SynchronousQueue<>();
        new Thread(() -> {
            try {
                System.out.println(Thread.currentThread().getName() + "\t put 1");
                blockingQueue.put("1");
                System.out.println(Thread.currentThread().getName() + "\t put 2");
                blockingQueue.put("2");
                System.out.println(Thread.currentThread().getName() + "\t put 3");
                blockingQueue.put("3");
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }, "A").start();
        new Thread(() -> {
            try {
                TimeUnit.SECONDS.sleep(5);
                System.out.println(Thread.currentThread().getName()+"\t get"+blockingQueue.take());
                TimeUnit.SECONDS.sleep(5);
                System.out.println(Thread.currentThread().getName()+"\t get"+blockingQueue.take());
                TimeUnit.SECONDS.sleep(5);
                System.out.println(Thread.currentThread().getName()+"\t get"+blockingQueue.take());
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }, "B").start();

    }


}

SynchronousQueue:不存储元素的阻塞队列,即单个元素队列,每次put进一个元素,另一个线程5秒后取值,每次take到的都是put进的值

Connected to the target VM, address: '127.0.0.1:53549', transport: 'socket'
A   put 1
B   get1
A   put 2
B   get2
A   put 3
B   get3
Disconnected from the target VM, address: '127.0.0.1:53549', transport: 'socket'

当队列满时,你还add,就会抛异常,队列满了

当队列空时,你还remove,就会抛异常,没有元素给你

//ArrayBlockingQueue 容量仅为2 放入3个 B能取到吗?能取到几个?

public class BlockingQueueDemo {

    public static void main(String[] args) {
        //BlockingQueue<String> blockingQueue = new SynchronousQueue<>();
      
        ArrayBlockingQueue<String> blockingQueue = new ArrayBlockingQueue<String>(2);
        new Thread(() -> {

            try {
                System.out.println(Thread.currentThread().getName() + "\t put 1");
                blockingQueue.add("1");
                System.out.println(Thread.currentThread().getName() + "\t put 2");
                blockingQueue.add("2");
                System.out.println(Thread.currentThread().getName() + "\t put 3");
                blockingQueue.add("3");
            } catch (Exception e) {
                e.printStackTrace();
            }

        }, "A").start();
        new Thread(() -> {
            try {
                TimeUnit.SECONDS.sleep(5);
                System.out.println(Thread.currentThread().getName()+"\t get"+blockingQueue.poll());
                TimeUnit.SECONDS.sleep(5);
                System.out.println(Thread.currentThread().getName()+"\t get"+blockingQueue.poll());
                TimeUnit.SECONDS.sleep(5);
                System.out.println(Thread.currentThread().getName()+"\t get"+blockingQueue.poll());
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }, "B").start();

    }
}
A   put 1
A   put 2
A   put 3
java.lang.IllegalStateException: Queue full
  at java.util.AbstractQueue.add(AbstractQueue.java:98)
  at java.util.concurrent.ArrayBlockingQueue.add(ArrayBlockingQueue.java:312)
  at com.kk.thread.BlockingQueueDemo.lambda$main$0(BlockingQueueDemo.java:25)
  at java.lang.Thread.run(Thread.java:748)
B   get1
B   get2
B   getnull

问题:那么当ArrayBlockingQueue用put放入元素时会报full queu 异常吗?为什么?

将Add换成put,虽然限制容量为2,但是继续放元素,队列会阻塞直到put数据,B仍能取到所有元素

看下ArraBlockingQueue的put源码

/**
     * Inserts the specified element at the tail of this queue, waiting
     * for space to become available if the queue is full.
     *在队列尾部插入指定元素,等待队列已满时空间变为可用
     * @throws InterruptedException {@inheritDoc}
     * @throws NullPointerException {@inheritDoc}
     */
    public void put(E e) throws InterruptedException {
        checkNotNull(e);//检查是否元素为null
        final ReentrantLock lock = this.lock;//可重入锁
        lock.lockInterruptibly();//实际判断的是Thread.interrupted()
        try {
            while (count == items.length)队列元素满了就等待
                notFull.await();
            enqueue(e);否则进入队列,调用singal方法通知线程
        } finally {
            lock.unlock();
        }
    }

放入1

检查线程是否中断

设置线程状态

放入队列

此时调用unlock waitstatus=-1

而unlock最终会unpark当前线程

如果线程被阻塞*{@code park}然后它将解除锁定。保证下一个调用

到{@code park}不会阻塞

也就是说put方法在队列满了以后会阻塞,等待没满的唤醒

31:CountDownLatch,CyclicBarrier,Semaphore

CountDownLatch:让一些线程阻塞直到另一些线程完成一系列操作后才被唤醒

public class CountDownLatchDemo
{
  public static void main(String[] args) throws InterruptedException
  {
    
    CountDownLatch latch = new CountDownLatch(6);
    for (int i = 1; i <= 6; i++) {
      new Thread(() -> {
        System.out.println(Thread.currentThread().getName()+"\t 车次离开");
        latch.countDown();
      }, String.valueOf(i)).start();
    }
    latch.await();
    System.out.println(Thread.currentThread().getName()+"\t 锁定");
  }

}
1   车次离开
2   车次离开
3   车次离开
4   车次离开
5   车次离开
6   车次离开
main   锁定

CyclicBarrier:让一组线程到达一个屏障(也可以叫同步点)时被阻塞,

直到最后一个线程到达屏障时,屏障才会开门,所有被屏障拦截的线程才

会继续干活。

public class CyclicBarrierDemo
{
  private static final int NUMBER = 7;
  
  public static void main(String[] args)
  {
    //CyclicBarrier(int parties, Runnable barrierAction) 
    CyclicBarrier cb = new CyclicBarrier(NUMBER, new Runnable() {
      @Override
      public void run() {
        System.out.println("车次"+NUMBER+"已满");
      }
    });
    
    for (int i = 1; i <=7; i++) {
      new Thread(() -> {
        try {
          System.out.println(Thread.currentThread().getName()+"\t 车次到达");
          cb.await();
        } catch (InterruptedException | BrokenBarrierException e) {
          // TODO Auto-generated catch block
          e.printStackTrace();
        }
      }, String.valueOf(i)).start();
    }
  }
1   车次到达
2   车次到达
3   车次到达
4   车次到达
5   车次到达
6   车次到达
7   车次到达
车次7已满

Semaphore:一个是用于多个共享资源的互斥使用,另一个用于并发线程数的控制,互斥,当一个线程使用资源时获取信号量,互斥线程需要等待,线程释放,将信号量加1,唤醒互斥资源

public class SemaphoreDemo
{
  public static void main(String[] args)
  {
    //3个停车位
    Semaphore semaphore = new Semaphore(3);
    //模拟6部汽车
    for (int i = 1; i <= 6; i++) {
      new Thread(() -> {
        
        try {
          semaphore.acquire();
          System.out.println(Thread.currentThread().getName()+"\t 号停入停车位");
          TimeUnit.SECONDS.sleep(3);
          semaphore.release();
          System.out.println(Thread.currentThread().getName()+"\t 号离开停车位");
        } catch (InterruptedException e) {
          // TODO Auto-generated catch block
          e.printStackTrace();
        }

      }, String.valueOf(i)).start();
    }
    
  }
}

1 号停入停车位

2 号停入停车位

3 号停入停车位

4 号停入停车位

2 号离开停车位

3 号离开停车位

5 号停入停车位

6 号停入停车位

1 号离开停车位

4 号离开停车位

5 号离开停车位

6 号离开停车位

32:Exchanger交换器

一个同步点,在这个点上线程可以成对地交换元素。每个线程在{@link#exchange}方法的条目上显示一些对象,与伙伴线程匹配,并在返回时接收其伙伴的对象。交换器可以看作{@link SynchronousQueue}的双向形式。Exchangers may be useful in applications such as genetic algorithms and pipeline designs,应用于遗传算法和管道设计

A synchronization point at which threads can pair and swap elements
 * within pairs.  Each thread presents some object on entry to the
 * {@link #exchange exchange} method, matches with a partner thread,
 * and receives its partner's object on return.  An Exchanger may be
 * viewed as a bidirectional form of a {@link SynchronousQueue}.
 * Exchangers may be useful in applications such as genetic algorithms
 * and pipeline designs.

用于两个线程之间的数据交换

public class ExchangerDemo {
    public static void main(String[] args) {
        ExecutorService executor = Executors.newCachedThreadPool();

        final Exchanger exchanger = new Exchanger();
        executor.execute(()->{
            String data1="int 1";
            exchangeDate(data1, exchanger);
        });
        executor.execute(()->{
            String data1="int 2";
            exchangeDate(data1, exchanger);
        });
        executor.shutdown();
    }

    private static void exchangeDate(String data1, Exchanger exchanger) {
        try {
            System.out.println(Thread.currentThread().getName() + "交换数据" + data1 + " 交换");
            Thread.sleep(5L);
            String data2 = (String) exchanger.exchange(data1);
            System.out.println(Thread.currentThread().getName() + "交易得到" + data2);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}
pool-1-thread-1交换数据int 1 交换
pool-1-thread-2交换数据int 2 交换
pool-1-thread-1交易得到int 2
pool-1-thread-2交易得到int 1

本文分享自微信公众号 - 赵KK日常技术记录(gh_cc4c9f1a9521),作者:赵kk

原文出处及转载信息见文内详细说明,如有侵权,请联系 yunjia_community@tencent.com 删除。

原始发表时间:2020-03-18

本文参与腾讯云自媒体分享计划,欢迎正在阅读的你也加入,一起分享。

我来说两句

0 条评论
登录 后参与评论

相关文章

  • Redis排行榜的设计与实现

    排行榜zset的经典实现,现在的思路全都是查库的操作,由于业务原因,有些是异步操作,难免存在已经计分,但分数还没有入库,这时去查库,导致与实际的分数不一致的情况...

    疯狂的KK
  • java并发编程(二)

    synchronized常用于修饰方法和代码块,对象锁作用于对象的实例方法(不能通过类名调用,通过创建对象调用),比如单例模式的双重检索机制,而类锁并非真的存在...

    疯狂的KK
  • group by 到底是什么妖怪?

    本来今天是要接着整理日志系统的优化的,但是自己对梳理一些名词理解有些欠缺,思来想去还是想讨论下group by语法问题,这个问题我以前举例说明...

    疯狂的KK
  • Java网络编程--BIO阻塞式网络编程

    阻塞(blocking)IO :阻塞是指结果返回之前,线程会被挂起,函数只有在得到结果之后(或超时)才会返回

    CodingDiray
  • Trie树使用实例

    本文简单介绍下apache collection4中的PatriciaTrie的使用。

    codecraft
  • Java 8 日期时间 API

    java 8 通过发布新的Date-Time API (JSR 310)来进一步加强对日期和时间的处理。

    一滴水的眼泪
  • JAVA|Java方法的使用

    方法从简来说就是,把一个功能单独放在大括号内,当需要这个功能的时候我们直接调用方法,这样不仅实现了代码的复用,还解决了代码冗余的问题。

    算法与编程之美
  • Java底层-JMX

    引言部分摘自百度百科,实际上JMX是java5开始提供的对java应用进行监控的一套接口,或者我们也可以像理解JUC包一样理解JMX,把它当成一个框架。JMX这...

    每天学Java
  • Java-类型 变量 运算符

    Java程序结构 public class 类名 { public static void main(String[] args){ //……语句 } ...

    瑞新
  • Java自动化测试(数组/运算符)

    数据类型[][] 数组名 = 数据类型 [二维数组长度][二维数组中一维数组的长度];

    zx钟

扫码关注云+社区

领取腾讯云代金券