Java并发之线程间的协作

     上篇文章我们介绍了synchronized关键字,使用它可以有效的解决我们多线程所带来的一些常见问题。例如:竞态条件,内存可见性等。并且,我们也说明了该关键字主要是一个加锁和释放锁的集成,所有为能获得锁的线程都将被阻塞在某个对象的阻塞队列上。而我们本篇将要介绍的线程间的协作则主要是对对象的另一个队列的使用(条件队列),所有因条件不满足而无法继续运行的线程都将在条件队列上进行等待。主要涉及内容如下:

  • 理解wait/notify这两个方法
  • 典型的生产者消费者问题
  • 理解join方法的实现原理

一、理解wait/notify这两个方法      这两个方法是我们本篇文章的主角,它们被定义在根类Object中。

public final void wait()
public final native void wait(long timeout)

public final native void notify();
public final native void notifyAll();

两个wait方法,无参的wait相当于wait(0)表示无限期等待,有参数的wait方法则指定该线程等待多长时间。notify方法用于释放一个在条件队列上等待的线程,而notifyall方法则是用于释放所有在条件队列上进行等待的线程。那么究竟什么时候调用wait方法让线程到条件队列上去等待,什么时候调用notify释放条件队列上的线程呢?

我们说过一个对象有一把锁和两个队列,对于所有无法获取到锁的线程都将被阻塞在阻塞队列上,而对于获取到锁以后,于运行过程中由于缺少某些条件而不得不终止程序的线程将被阻塞在条件队列上并让出CPU。而且需要注意一点的是,线程被阻塞在阻塞队列上和条件队列上,所表现出的状态是不一样的。例如:

/*定义一个线程类*/
public class MyThread extends Thread{

    @Override
    public void run(){
        try {
            System.out.println(Thread.currentThread().getState());
            synchronized (this){
                wait();
            }
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}
/*启动线程*/
public static void main(String[] args) throws InterruptedException {
        Thread thread = new MyThread();
        thread.start();

        Thread.sleep(1000);
        System.out.println(thread.getState());
        System.out.println("main is out");

    }

输出结果显示:

主函数中启动一个线程,该线程内部运行的时候先输出当前线程状态,然后调用wait方法将自己挂在当前线程对象的条件队列上并让出CPU,而我们在主函数中对该线程的状态进行再一次的输出, 从结果截图来看,程序并没有结束----说明子线程并没有正常结束,阻塞在条件队列上的线程的状态是waiting,这和阻塞在阻塞队列上的线程状态blocked是完全两种不同的状态。但是,当我们调用notify或者notifyall方法将某个线程从条件队列中释放的时候,该线程要和外面的其他线程一样去竞争对象的锁,如果不能获取到对象的锁,依然会被阻塞在该对象的阻塞队列上。

二、使用wait/notify解决生产者消费者问题      生产者消费者问题是我们操作系统中的一个经典的问题。生产者向仓库中源源不断的放入产品,消费者从仓库中源源不断的拿出产品,当仓库满的时候,生产者就不能继续往里面放入产品,当仓库空的时候,消费者就不能从仓库里取出产品。如何协调好生产者线程和消费者线程对仓库的操作就是这个问题的核心。

public class Repository {

    private ArrayDeque<String> list = null;
    private int limit;     //仓库容量

    public Repository(int limit){
        this.limit = limit;
        list = new ArrayDeque<String>(this.limit);
    }

    //仓库提供给生产者存入操作
    public synchronized void addGoods(String data) throws InterruptedException {
        while(list.size() == limit){
            //说明仓库已经满了
            wait();
        }
        list.add(data);
        System.out.println("i produce a product:"+data);
        notifyAll();
    }

    //仓库提供给消费者取出操作
    public synchronized String getGoods() throws InterruptedException {
        while(list.isEmpty()){
            //说明仓库已经空了
            wait();
        }
        String result = list.poll();
        System.out.println("i consume a product:"+ result);
        notifyAll();
        return result;
    }

}

我们定义一个仓库类,该仓库提供给生产者投放的方法,提供给消费者取出的方法。我们使用双端队列实现对仓库的模拟,limit参数限定仓库容量。

生产者的投放方法,当生产者想要向仓库投放产品时,如果仓库已经满了,则将将当前线程阻塞在条件队列上,等待仓库有空余位置为止。而如果仓库没满,则向其中投入一个产品并唤醒被阻塞在条件队列上的所有线程(在本例中实际上就是消费者线程)。一旦消费者线程从条件队列上被释放,他将重新和生产者线程竞争对象锁,在获取到对象锁之后将回到上次因条件不足而被阻塞的程序位置。消费者的取出方法和生产者的投放方法类似,此处不再赘述。

public class Producer extends Thread {

    //生产者线程不停的生产产品直到仓库满

    private Repository repository = null;

    public Producer(Repository r){
        this.repository = r;
    }

    int count = 0;
    @Override
    public void run(){
        while(true){
            try {
               repository.addGoods(String.valueOf(count));
                count++;
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }
}

定义一个生产者类,生产者始终不停的生产产品,我们用count来模拟产品代号。

public class Consumer extends Thread {

    //消费者线程不停的从仓库中取出产品直到仓库空

    private Repository repository = null;

    public Consumer(Repository r){
        this.repository = r;
    }

    @Override
    public void run(){
        while(true){
            try {
                String result = repository.getGoods();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }
}

定义一个消费者类,消费者不停的从仓库中取出产品。

public static void main(String[] args) throws InterruptedException {

        Repository repository = new Repository(20);
        Thread producer = new Producer(repository);
        Thread consumer = new Consumer(repository);

        producer.start();
        consumer.start();

        System.out.println("main thread is out");
    }

最后我们定义一个仓库并通过构造方法的传入使得生产者和消费者共同使用相同的仓库对象。分别启动两个线程,程序将死循环的输出生产者和消费者的生产和消费操作,以下是程序运行的部分结果:

我们可以看到生产者和消费者这两个线程交替的输出,偶尔会出现消费者滞后生产者的情况,但是消费者绝对不会超前生产者,因为只有生产者生产出产品之后,消费者才能取出。以上便是经典的生产者消费者问题,通过对该问题的实现,我们能够对wait/notify这两个操作有了一个更加深刻的认识。

三、join方法的实现原理      join方法的内部其实使用的还是我们上述介绍的wait/notify机制。

public final void join() throws InterruptedException {
        join(0);
    }
public final synchronized void join(long millis)
    throws InterruptedException {
        long base = System.currentTimeMillis();
        long now = 0;

        if (millis < 0) {
            throw new IllegalArgumentException("timeout value is negative");
        }
        if (millis == 0) {
            while (isAlive()) {
                wait(0);
            }
        } else {
            while (isAlive()) {
                long delay = millis - now;
                if (delay <= 0) {
                    break;
                }
                wait(delay);
                now = System.currentTimeMillis() - base;
            }
        }
    }

两个方法,核心的还是这个带有参数的join方法。该方法大体上分为三种情况,如果millis小于0,抛出异常。如果millis等于0,就无限期等待,这一段代码不知道大家理解的如何:

if (millis == 0) {
    while (isAlive()) {
        wait(0);
    }
}
Thread thread = new MyThread();
thread.start();
        
thread.join();

两小段代码,第一段代码是jdk中关于millis 等于0的一个实现,第二段代码则是我们调用join方法的一个基本格式。我们可以看到,由于join这个方法被synchronized关键字修饰,那么我们主线程在调用thread对象的该方法时就需要首先获得thread对象的锁。

进入到join方法的内部,当millis 等于0的时候,判断只要线程对象活着,也就是thread对象活着,就调用wait(0)方法将当前线程(main)线程挂起到thread对象的条件队列上。一旦thread线程对象执行结束,Java系统将调用notifyall来释放所有挂在该对象的条件队列上的线程,此时main线程将会被唤醒,从而实现了main线程等待thread线程执行结束的一个过程。至于millis 大于0的情况,只不过内部调用了wait(long timeout)方法,其他的实现原理基本类似,此处不再赘述。

本篇文章,我们主要介绍线程间的一种协作机制,使用wait/notify两个方法来协作不同的线程。通过实现经典的生产者消费者模型增加了对wait/notify这两个方法的理解,最后从源代码的角度对Thread下的join方法进行了学习,该方法的核心就是利用wait/notify协作主线程和分支线程来实现等待的一个操作。总结不到之处,望指出。

本文参与腾讯云自媒体分享计划,欢迎正在阅读的你也加入,一起分享。

发表于

我来说两句

0 条评论
登录 后参与评论

相关文章

来自专栏微信公众号:Java团长

synchronized与Lock的区别与使用详解

昨天在学习别人分享的面试经验时,看到Lock的使用。想起自己在上次面试也遇到了synchronized与Lock的区别与使用。于是,我整理了两者的区别和使用情况...

532
来自专栏IT技术精选文摘

深入理解 Java 多线程核心知识

691
来自专栏JMCui

多线程编程学习五(线程池的创建)

一、概述 New Thread的弊端如下:        a、每次New Thread新建对象性能差。        b、线程缺乏统一的管理,可能无限制的新建...

34611
来自专栏从零开始学 Web 前端

09 - JavaSE之线程

PS: 如果我们没有 new一个 Thread 对象出来,而是直接使用 MyThread 的 run 方法(mt.run()),这就是方法调用,而不是启动线程了...

1155
来自专栏余林丰

1.有关线程、并发的基本概念

什么是线程?   提到“线程”总免不了要和“进程”做比较,而我认为在Java并发编程中混淆的不是“线程”和“进程”的区别,而是“任务(Task)”。进程是表示资...

24610
来自专栏架构之路

wait方法和sleep方法的区别

一.概念、原理、区别 Java中的多线程是一种抢占式的机制而不是分时机制。线程主要有以下几种状态:可运行,运行,阻塞,死亡。抢占式机制指的是有多个线程处于可运...

3335
来自专栏微服务生态

支持生产阻塞的线程池

我们使用线程池的时候,经常使用默认的丢弃策略,那么我们也可以自定义策略,那么下面的文章可以看下。

421
来自专栏JAVA烂猪皮

Java线程池使用与原理

我们可以利用java很容易创建一个新线程,同时操作系统创建一个线程也是一笔不小的开销。所以基于线程的复用,就提出了线程池的概念,我们使用线程池创建出若干个线程,...

761
来自专栏java一日一条

Java 多线程之内置锁与显示锁

Java中具有通过Synchronized实现的内置锁,和ReentrantLock实现的显示锁,这两种锁各有各的好处,算是互有补充,今天就来做一个总结。

803
来自专栏IT笔记

深入理解 Java 多线程核心知识:跳槽面试必备

多线程相对于其他 Java 知识点来讲,有一定的学习门槛,并且了解起来比较费劲。在平时工作中如若使用不当会出现数据错乱、执行效率低(还不如单线程去运行)或者死锁...

45018

扫码关注云+社区