Java IO 之 管道流 原理分析

概述

管道流是用来在多个线程之间进行信息传递的Java流。 管道流分为字节流管道流和字符管道流。 字节管道流:PipedOutputStream 和 PipedInputStream。 字符管道流:PipedWriter 和 PipedReader。 PipedOutputStream、PipedWriter 是写入者/生产者/发送者; PipedInputStream、PipedReader 是读取者/消费者/接收者。

字节管道流

这里我们只分析字节管道流,字符管道流原理跟字节管道流一样,只不过底层一个是 byte 数组存储 一个是 char 数组存储的。

java的管道输入与输出实际上使用的是一个循环缓冲数来实现的。输入流PipedInputStream从这个循环缓冲数组中读数据,输出流PipedOutputStream往这个循环缓冲数组中写入数据。当这个缓冲数组已满的时候,输出流PipedOutputStream所在的线程将阻塞;当这个缓冲数组为空的时候,输入流PipedInputStream所在的线程将阻塞。

注意事项

在使用管道流之前,需要注意以下要点:

  • 管道流仅用于多个线程之间传递信息,若用在同一个线程中可能会造成死锁;
  • 管道流的输入输出是成对的,一个输出流只能对应一个输入流,使用构造函数或者connect函数进行连接;
  • 一对管道流包含一个缓冲区,其默认值为1024个字节,若要改变缓冲区大小,可以使用带有参数的构造函数;
  • 管道的读写操作是互相阻塞的,当缓冲区为空时,读操作阻塞;当缓冲区满时,写操作阻塞;
  • 管道依附于线程,因此若线程结束,则虽然管道流对象还在,仍然会报错“read dead end”;
  • 管道流的读取方法与普通流不同,只有输出流正确close时,输出流才能读到-1值。

示例

public class PipedStreamDemo {
    public static void main(String[] args) {
        //创建一个线程池
        ExecutorService executorService = Executors.newCachedThreadPool();

        try {
            //创建输入和输出管道流
            PipedOutputStream pos = new PipedOutputStream();
            PipedInputStream pis = new PipedInputStream(pos);

            //创建发送线程和接收线程
            Sender sender = new Sender(pos);
            Reciever reciever = new Reciever(pis);

            //提交给线程池运行发送线程和接收线程
            executorService.execute(sender);
            executorService.execute(reciever);
        } catch (IOException e) {
            e.printStackTrace();
        }
        //通知线程池,不再接受新的任务,并执行完成当前正在运行的线程后关闭线程池。
        executorService.shutdown();
        try {
            //shutdown 后可能正在运行的线程很长时间都运行不完成,这里设置超过1小时,强制执行 Interruptor 结束线程。
            executorService.awaitTermination(1, TimeUnit.HOURS);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }

    static class Sender extends Thread {
        private PipedOutputStream pos;

        public Sender(PipedOutputStream pos) {
            super();
            this.pos = pos;
        }

        @Override
        public void run() {
            try {
                String s = "hello world, amazing java !";
                System.out.println("Sender:" + s);
                byte[] buf = s.getBytes();
                pos.write(buf, 0, buf.length);
                pos.close();
                TimeUnit.SECONDS.sleep(5);
            } catch (Exception e) {
                e.printStackTrace();
            }
        }
    }

    static class Reciever extends Thread {
        private PipedInputStream pis;

        public Reciever(PipedInputStream pis) {
            super();
            this.pis = pis;
        }

        @Override
        public void run() {
            try {
                ByteArrayOutputStream baos = new ByteArrayOutputStream();
                byte[] buf = new byte[1024];
                int len = 0;
                while ((len = pis.read(buf)) != -1) {
                    baos.write(buf, 0, len);
                }
                byte[] result = baos.toByteArray();
                String s = new String(result, 0, result.length);
                System.out.println("Reciever:" + s);
            } catch (IOException e) {
                e.printStackTrace();
            }
        }
    }
}

输出结果:

源码分析

因为数据是从 PipedOutputStream 写入,然后通过 PipedInputStream 读取的,所以下面我们先来分析下 生产者 PipedOutputStream 的源码。

PipedOutputStream 源码分析

初始化

1、定义了一个 PipedInputStream 成员变量 sink。用来保存需要写入到的目标管道流中。 2、一个代参数的构造,一个无参的构造。

  • 有参的构造调用 connect() 方法把两个管道流连接在一起,
  • 无参的构造函数更灵活,不必在创建一个 PipedOutputStream 的对象时指定 PipedInputStream 对象,可以在后面代码,自己调用 connect() 自己指定。使用方式如下:

write 方法

write 方法就是调用 PipedInputStream的 receive 的方法,把要写入的数据写入进去。

PipedOutputStream 总结

通过源码分析,发现该类没有什么特别的,通过构造或者 connect() 方法接收一个 PipedInputStream对象,然后把要输出信息,交给 PipedInputStream.receive() 方法去接收。

PipedInputStream 源码分析

打开该类后发现比 PipedInputStream 类复杂了好多。

类结构

PipedInputStream 中定义了很多成员变量

1、closedByWriter 是否关闭 PipedOutputStream 流。 2、closedByReader 是否关闭 PipedInputStream 流。 3、connected 输入输出管道流是否成功连接了。 4、readSide、writeSide 读线程和写线程 5、DEFAULT_PIPE_SIZE 默认读写的缓冲区大小为 1024. 6、PIPE_SIZE 对外暴露管道流的读写缓冲区大小(当前包可见) 7、buffer 缓冲区大小 8、in 写入缓冲区下标 9、out 写出缓冲区下标

PipedInputStream 构造及初始化

  • PipedInputStream 支持有4种构造方法。 1、public PipedInputStream(PipedOutputStream src) 传入一个 PipedOutputStream 参数,并调用 initPipe() 方法创建默认大小(1024)的 buffer。 2、public PipedInputStream(PipedOutputStream src, int pipeSize) 传入一个 PipedOutputStream 参数和 pipeSize参数,调用 initPipe() 方法创建指定大小的 buffer 3、public PipedInputStream() 调用 initPipe() 方法,创建一个默认大小的buffer 4、public PipedInputStream(int pipeSize) 调用 initPipe() 方法,创建一个指定大小的buffer
  • initPipe 方法 private void initPipe(int pipeSize) 根据 pipeSize 创建 buffer 。
  • connect 方法 public void connect(PipedOutputStream src) connect方法其实还是调用的 PipedOutputStream 类种的 connect 方法。 所以下面这样写法,是等价的,都是调用 PipedOutputStream 类种的 connect 方法。

receive 方法

通过分析 PipedOutputStream 的源码,我们知道,该方法是在 PipedOutputStream.write() 方法种调用的。

  • 1、checkStateForReceive()检查是否可以接受数据。(是否可向 buffer 种写入数据);
  • 2、获取写线程。PipedOutputStream.write() 中调用的,所以获取的是PipedOutStream 所在的线程;
  • 3、判断 in==out。如果相等说明,已经缓冲区已经被填充满数据了。这时调用 awaitSpace() 方法,唤醒读线程(读线程可能 wait 状态),让当前线程 wait ,如果没有读线程唤醒写线程,那么写线程会在 awaitSpace() 方法种每隔1秒检查一次是否可写;

为什么 in == out 的时候就是写满缓冲区呢? 比如: buffer 长度为10,现在写了5个字节,又读了5个字节,是不是 in 也等于 out? 其实不会的,为什么? 因为读的时候如果 in==out时,他把 in 的值置为了 -1。详见 read() 方法。

  • 4、如果 in<0,就是第一次写或者已经读完 buffer 中已写的数据,这是,把 in 和 out 置为0;
  • 5、向buffer 种写入数据。
  • 6、如果 in 达到 buffer 的最大长度,则把in 置为 0, 下次开始从0 开始填充。(这里,可以把 buffer 当成一个环形队列)。

awaitSpace() 源码

read() 方法

1、执行各种检查,是否可读。 2、获取读线程并赋值给 readSide 变量。 3、while 循环监听判断是否有写线程写数据,如果没有则等待(每秒检查一次),并唤醒写线程(写线程可能 wait )。 4、读取 buffer 中的数据。 如果读到 buffer 的最后一个元素,则把 out 置为0,下次从下标0开始继续读(循环队列表)。 5、如果 in == out,则把 in 置为 -1 。置为初始状态。相当于清空了缓冲区,从缓冲区的下标 0 开始读写。

available() 方法

获取当前可读的字节数

1、如果 in<0; 说明当前没有可读的数据 2、如果 in == out; 说明数据已经填充满了。 3、如果 in > out; 那么in - out 就是 可写的字节数。 4、否则,就是 in < out 的情况。因为它是环形写入的,可能出现 in < out 的情况,所以需要 in + buffer.length - out,才能获取可读字节长度。

PipedInputStream 总结

PipedInputStream 原理其实也很简单,但代码看起来有点懵,它就是通过 wait() 和 notifyAll() 来控制 buffer 是否可读,或可写的。

管道流,做开发这么多年,现在都没有遇到可用的场景。管道流能用到的场景,在并发包种,很多方式都可以实现或代替。比如 java.util.concurrent.Exchanger 类。 java.util.concurrent.Exchanger 的使用场景比管道流使用场景更广泛些。


本文参与腾讯云自媒体分享计划,欢迎正在阅读的你也加入,一起分享。

发表于

我来说两句

0 条评论
登录 后参与评论

相关文章

来自专栏LinXunFeng的专栏

打造Moya便捷解析库,提供RxSwift拓展

1、相信大家在使用Swift开发时,Moya是首选的网络工具,在模型解析这一块,Swift版模型解析的相关第三方库有很多,本人最习惯用的就是SwiftyJSON...

24111
来自专栏王二麻子IT技术交流园地

Java--Socket通信(双向)

新建两个工程,一个客户端,一个服务端,先启动服务端再启动客户端 两个工程的读写操作线程类基本上完全相同 服务端: import java.io.Buffered...

3005
来自专栏進无尽的文章

简述OC语言

对于一门语言的学习是需要时间领悟的,而对于一些原理性的问题,我们需要清楚其核心思想,知其然而知其所以然,这样才能有利于自己的后续发展。本文只是简述,没有面面具到...

2142
来自专栏Kirito的技术分享

JAVA 拾遗--Future 模式与 Promise 模式

写这篇文章的动机,是缘起于微信闲聊群的一场讨论,粗略整理下,主要涉及了以下几个具体的问题: 同步,异步,阻塞,非阻塞的关联及区别。 JAVA 中有 callb...

2.9K10
来自专栏编码小白

ofbiz中FreeMarkerWorker的makeConfiguration方法

            这个方法是说明了为什么在ftl中可以使用一些java方法             1.代码展示 public static Confi...

3737
来自专栏chenssy

这些Spring中的设计模式,你都知道吗?

设计模式作为工作学习中的枕边书,却时常处于勤说不用的尴尬境地,也不是我们时常忘记,只是一直没有记忆。

921
来自专栏一个小程序员的成长笔记

验证常用正则表达式

1.日期时间验证,支持闰年 支持格式:YYYY/MM/DD, YYYY-MM-DD, YYYY_MM_DD, YYYY.MM.DD 1 var reg = /(...

3717
来自专栏JAVA技术站

JAVA流之管道流PipedInputStream,PipedOutputStream

管道流主要作用是可以连接两个线程间的通信。管道流也分为字节流(PipedInputStream、PipedOutputStream)与字符流(PipedRead...

682
来自专栏Java开发者杂谈

分布式改造剧集2---DIY分布式锁

1637
来自专栏犀利豆的技术空间

徒手撸框架--实现 RPC 远程调用

微服务已经是每个互联网开发者必须掌握的一项技术。而 RPC 框架,是构成微服务最重要的组成部分之一。趁最近有时间。又看了看 dubbo 的源码。dubbo 为了...

1492

扫码关注云+社区

领取腾讯云代金券