Java并发之线程间的协作

     上篇文章我们介绍了synchronized关键字,使用它可以有效的解决我们多线程所带来的一些常见问题。例如:竞态条件,内存可见性等。并且,我们也说明了该关键字主要是一个加锁和释放锁的集成,所有为能获得锁的线程都将被阻塞在某个对象的阻塞队列上。而我们本篇将要介绍的线程间的协作则主要是对对象的另一个队列的使用(条件队列),所有因条件不满足而无法继续运行的线程都将在条件队列上进行等待。主要涉及内容如下:

  • 理解wait/notify这两个方法
  • 典型的生产者消费者问题
  • 理解join方法的实现原理

一、理解wait/notify这两个方法      这两个方法是我们本篇文章的主角,它们被定义在根类Object中。

public final void wait()
public final native void wait(long timeout)

public final native void notify();
public final native void notifyAll();

两个wait方法,无参的wait相当于wait(0)表示无限期等待,有参数的wait方法则指定该线程等待多长时间。notify方法用于释放一个在条件队列上等待的线程,而notifyall方法则是用于释放所有在条件队列上进行等待的线程。那么究竟什么时候调用wait方法让线程到条件队列上去等待,什么时候调用notify释放条件队列上的线程呢?

我们说过一个对象有一把锁和两个队列,对于所有无法获取到锁的线程都将被阻塞在阻塞队列上,而对于获取到锁以后,于运行过程中由于缺少某些条件而不得不终止程序的线程将被阻塞在条件队列上并让出CPU。而且需要注意一点的是,线程被阻塞在阻塞队列上和条件队列上,所表现出的状态是不一样的。例如:

/*定义一个线程类*/
public class MyThread extends Thread{

    @Override
    public void run(){
        try {
            System.out.println(Thread.currentThread().getState());
            synchronized (this){
                wait();
            }
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}
/*启动线程*/
public static void main(String[] args) throws InterruptedException {
        Thread thread = new MyThread();
        thread.start();

        Thread.sleep(1000);
        System.out.println(thread.getState());
        System.out.println("main is out");

    }

输出结果显示:

主函数中启动一个线程,该线程内部运行的时候先输出当前线程状态,然后调用wait方法将自己挂在当前线程对象的条件队列上并让出CPU,而我们在主函数中对该线程的状态进行再一次的输出, 从结果截图来看,程序并没有结束----说明子线程并没有正常结束,阻塞在条件队列上的线程的状态是waiting,这和阻塞在阻塞队列上的线程状态blocked是完全两种不同的状态。但是,当我们调用notify或者notifyall方法将某个线程从条件队列中释放的时候,该线程要和外面的其他线程一样去竞争对象的锁,如果不能获取到对象的锁,依然会被阻塞在该对象的阻塞队列上。

二、使用wait/notify解决生产者消费者问题      生产者消费者问题是我们操作系统中的一个经典的问题。生产者向仓库中源源不断的放入产品,消费者从仓库中源源不断的拿出产品,当仓库满的时候,生产者就不能继续往里面放入产品,当仓库空的时候,消费者就不能从仓库里取出产品。如何协调好生产者线程和消费者线程对仓库的操作就是这个问题的核心。

public class Repository {

    private ArrayDeque<String> list = null;
    private int limit;     //仓库容量

    public Repository(int limit){
        this.limit = limit;
        list = new ArrayDeque<String>(this.limit);
    }

    //仓库提供给生产者存入操作
    public synchronized void addGoods(String data) throws InterruptedException {
        while(list.size() == limit){
            //说明仓库已经满了
            wait();
        }
        list.add(data);
        System.out.println("i produce a product:"+data);
        notifyAll();
    }

    //仓库提供给消费者取出操作
    public synchronized String getGoods() throws InterruptedException {
        while(list.isEmpty()){
            //说明仓库已经空了
            wait();
        }
        String result = list.poll();
        System.out.println("i consume a product:"+ result);
        notifyAll();
        return result;
    }

}

我们定义一个仓库类,该仓库提供给生产者投放的方法,提供给消费者取出的方法。我们使用双端队列实现对仓库的模拟,limit参数限定仓库容量。

生产者的投放方法,当生产者想要向仓库投放产品时,如果仓库已经满了,则将将当前线程阻塞在条件队列上,等待仓库有空余位置为止。而如果仓库没满,则向其中投入一个产品并唤醒被阻塞在条件队列上的所有线程(在本例中实际上就是消费者线程)。一旦消费者线程从条件队列上被释放,他将重新和生产者线程竞争对象锁,在获取到对象锁之后将回到上次因条件不足而被阻塞的程序位置。消费者的取出方法和生产者的投放方法类似,此处不再赘述。

public class Producer extends Thread {

    //生产者线程不停的生产产品直到仓库满

    private Repository repository = null;

    public Producer(Repository r){
        this.repository = r;
    }

    int count = 0;
    @Override
    public void run(){
        while(true){
            try {
               repository.addGoods(String.valueOf(count));
                count++;
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }
}

定义一个生产者类,生产者始终不停的生产产品,我们用count来模拟产品代号。

public class Consumer extends Thread {

    //消费者线程不停的从仓库中取出产品直到仓库空

    private Repository repository = null;

    public Consumer(Repository r){
        this.repository = r;
    }

    @Override
    public void run(){
        while(true){
            try {
                String result = repository.getGoods();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }
}

定义一个消费者类,消费者不停的从仓库中取出产品。

public static void main(String[] args) throws InterruptedException {

        Repository repository = new Repository(20);
        Thread producer = new Producer(repository);
        Thread consumer = new Consumer(repository);

        producer.start();
        consumer.start();

        System.out.println("main thread is out");
    }

最后我们定义一个仓库并通过构造方法的传入使得生产者和消费者共同使用相同的仓库对象。分别启动两个线程,程序将死循环的输出生产者和消费者的生产和消费操作,以下是程序运行的部分结果:

我们可以看到生产者和消费者这两个线程交替的输出,偶尔会出现消费者滞后生产者的情况,但是消费者绝对不会超前生产者,因为只有生产者生产出产品之后,消费者才能取出。以上便是经典的生产者消费者问题,通过对该问题的实现,我们能够对wait/notify这两个操作有了一个更加深刻的认识。

三、join方法的实现原理      join方法的内部其实使用的还是我们上述介绍的wait/notify机制。

public final void join() throws InterruptedException {
        join(0);
    }
public final synchronized void join(long millis)
    throws InterruptedException {
        long base = System.currentTimeMillis();
        long now = 0;

        if (millis < 0) {
            throw new IllegalArgumentException("timeout value is negative");
        }
        if (millis == 0) {
            while (isAlive()) {
                wait(0);
            }
        } else {
            while (isAlive()) {
                long delay = millis - now;
                if (delay <= 0) {
                    break;
                }
                wait(delay);
                now = System.currentTimeMillis() - base;
            }
        }
    }

两个方法,核心的还是这个带有参数的join方法。该方法大体上分为三种情况,如果millis小于0,抛出异常。如果millis等于0,就无限期等待,这一段代码不知道大家理解的如何:

if (millis == 0) {
    while (isAlive()) {
        wait(0);
    }
}
Thread thread = new MyThread();
thread.start();
        
thread.join();

两小段代码,第一段代码是jdk中关于millis 等于0的一个实现,第二段代码则是我们调用join方法的一个基本格式。我们可以看到,由于join这个方法被synchronized关键字修饰,那么我们主线程在调用thread对象的该方法时就需要首先获得thread对象的锁。

进入到join方法的内部,当millis 等于0的时候,判断只要线程对象活着,也就是thread对象活着,就调用wait(0)方法将当前线程(main)线程挂起到thread对象的条件队列上。一旦thread线程对象执行结束,Java系统将调用notifyall来释放所有挂在该对象的条件队列上的线程,此时main线程将会被唤醒,从而实现了main线程等待thread线程执行结束的一个过程。至于millis 大于0的情况,只不过内部调用了wait(long timeout)方法,其他的实现原理基本类似,此处不再赘述。

本篇文章,我们主要介绍线程间的一种协作机制,使用wait/notify两个方法来协作不同的线程。通过实现经典的生产者消费者模型增加了对wait/notify这两个方法的理解,最后从源代码的角度对Thread下的join方法进行了学习,该方法的核心就是利用wait/notify协作主线程和分支线程来实现等待的一个操作。总结不到之处,望指出。

本文参与腾讯云自媒体分享计划,欢迎正在阅读的你也加入,一起分享。

发表于

我来说两句

0 条评论
登录 后参与评论

相关文章

来自专栏orientlu

FreeRTOS 消息队列

上面这几中方式中, 除了消息通知, 其他几种实现都是基于消息队列。消息队列作为主要的通信方式, 支持在任务间, 任务和中断间传递消息内容。 这一章介绍 Fre...

2552
来自专栏安恒网络空间安全讲武堂

专题 | Python编写渗透工具学习笔记一

目录&基础知识 0x00 Python编程中一些模块的简单介绍(基础知识) 0x01web目录扫描程序 --脚本代码的实现和分析 --优化脚本 0x02实现一个...

2697
来自专栏大闲人柴毛毛

Java并发编程的艺术(十二)——线程安全

1. 什么是『线程安全』? 如果一个对象构造完成后,调用者无需额外的操作,就可以在多线程环境下随意地使用,并且不发生错误,那么这个对象就是线程安全的。 2. ...

3635
来自专栏后端技术探索

vim神奇高效功能--批量生成Sql实例

可以通过写代码,读取文件进行入库,无论用什么语言,这代码逻辑都很容易实现。唯一的问题是代码需要上线后执行,每次上线可是一个大工程,恐怕会被pm同学嘲笑说:“你这...

983
来自专栏增长技术

git对象模型

所有用来表示项目历史信息的文件,是通过一个40个字符的(40-digit)“对象名”来索引的,对象名看起来像这样:

1033
来自专栏IMWeb前端团队

nodejs中错误捕获的一些最佳实践

本文作者:IMWeb yisbug 原文出处:IMWeb社区 未经同意,禁止转载 本文内容大部分来自 https://www.joyent.com/...

2036
来自专栏H2Cloud

C++中消息自动派发之三 About JSON Encode

  《C++ 消息自动派发》系列上篇介绍了IDL解析器,生成的C++代码只支持JSON转C++ struct。 经过新的重构,这次增加了对C++ struct ...

4075
来自专栏自动化测试实战

接口测试框架分析

2844
来自专栏我有一个梦想

C++服务器开发之笔记三

为什么需要原子性操作? 我们考虑一个例子: (1)x++这个常见的运算符在内存中是怎样操作的? 从内存中读x的值到寄存器中,对寄存器加1,再把新值写回x所处的内...

1877
来自专栏york技术分享

sed 使用教程 - 通读篇(30分钟入门系列)

和上篇 awk 分享一样,作为通读性的分享,不想引入太过复杂的东西,依然从日常工作中碰到的 80% 的需求出发,重点阐述最重点的部门,工作原理等,普及一些对se...

44122

扫码关注云+社区