每日前端夜话0x2A
每日前端夜话,陪你聊前端。
每天晚上18:00准时推送。
正文共:4534 字 1 图
预计阅读时间: 12 分钟
翻译:疯狂的技术宅 原文:http://2ality.com/2018/05/child-process-streams.html
在本中,我们在 Node.js 中把 shell 命令作为子进程运行。然后异步读取这些进程的 stdout 并写入其 stdin。
在子进程中运行 shell 命令
首先从在子进程中运行 shell 命令开始:
1const {onExit} = require('@rauschma/stringio');
2const {spawn} = require('child_process');
3
4async function main() {
5 const filePath = process.argv[2];
6 console.log('INPUT: '+filePath);
7
8 const childProcess = spawn('cat', [filePath],
9 {stdio: [process.stdin, process.stdout, process.stderr]}); // (A)
10
11 await onExit(childProcess); // (B)
12
13 console.log('### DONE');
14}
15main();
解释:
spawn()
,它可以使我们在命令运行时访问命令的 stdin,stdout 和 stderr。等待子进程通过 Promise 退出
函数 onExit()
如下所示。
1function onExit(childProcess: ChildProcess): Promise<void> {
2 return new Promise((resolve, reject) => {
3 childProcess.once('exit', (code: number, signal: string) => {
4 if (code === 0) {
5 resolve(undefined);
6 } else {
7 reject(new Error('Exit with error code: '+code));
8 }
9 });
10 childProcess.once('error', (err: Error) => {
11 reject(err);
12 });
13 });
14}
子进程的实现
以下代码用 @rauschma/stringio
异步写入以 shell 命令运行的子进程的 stdin
:
1const {streamWrite, streamEnd, onExit} = require('@rauschma/stringio');
2const {spawn} = require('child_process');
3
4async function main() {
5 const sink = spawn('cat', [],
6 {stdio: ['pipe', process.stdout, process.stderr]}); // (A)
7
8 writeToWritable(sink.stdin); // (B)
9 await onExit(sink);
10
11 console.log('### DONE');
12}
13main();
14
15async function writeToWritable(writable) {
16 await streamWrite(writable, 'First line\n');
17 await streamWrite(writable, 'Second line\n');
18 await streamEnd(writable);
19}
我们为 shell 命令生成一个名为 sink
的独立进程。用 writeToWritable
写入 sink.stdin
。它借助 await
异步执行并暂停,以避免缓冲区被消耗太多。
解释:
spawn()
通过 sink.stdin
('pipe'
)访问 stdin。 stdout 和 stderr 被转发到 process.stdin
和 process.stderr
,如前面所述。await
写完成。而是 await
子进程 sink
完成。接下来了解 streamWrite()
的工作原理。
写流操作的 promise
Node.js 写流的操作通常涉及回调(参见文档【https://nodejs.org/dist/latest-v10.x/docs/api/stream.html#stream_writable_write_chunk_encoding_callback】)。代码如下。
1function streamWrite(
2 stream: Writable,
3 chunk: string|Buffer|Uint8Array,
4 encoding='utf8'): Promise<void> {
5 return new Promise((resolve, reject) => {
6 const errListener = (err: Error) => {
7 stream.removeListener('error', errListener);
8 reject(err);
9 };
10 stream.addListener('error', errListener);
11 const callback = () => {
12 stream.removeListener('error', errListener);
13 resolve(undefined);
14 };
15 stream.write(chunk, encoding, callback);
16 });
17}
streamEnd()
的工作方式是类似的。
从子进程中读取数据
下面的代码使用异步迭代(C行)来读取子进程的 stdout
中的内容:
1const {chunksToLinesAsync, chomp} = require('@rauschma/stringio');
2const {spawn} = require('child_process');
3
4async function main() {
5 const filePath = process.argv[2];
6 console.log('INPUT: '+filePath);
7
8 const source = spawn('cat', [filePath],
9 {stdio: ['ignore', 'pipe', process.stderr]}); // (A)
10
11 await echoReadable(source.stdout); // (B)
12
13 console.log('### DONE');
14}
15main();
16
17async function echoReadable(readable) {
18 for await (const line of chunksToLinesAsync(readable)) { // (C)
19 console.log('LINE: '+chomp(line))
20 }
21}
解释:
process.stderr
。awat
直到 echoReadable()
完成。没有这个 await
,DONE
将会在调用 source.stdout
之前被输出。在子进程之间进行管道连接
在下面的例子中,函数transform()
将会:
source
子进程的 stdout
中读取内容。sink
子进程的 stdin
。换句话说,我们正在实现类似 Unix 管道的功能:
1cat someFile.txt | transform() | cat
这是代码:
1const {chunksToLinesAsync, streamWrite, streamEnd, onExit}
2 = require('@rauschma/stringio');
3const {spawn} = require('child_process');
4
5async function main() {
6 const filePath = process.argv[2];
7 console.log('INPUT: '+filePath);
8
9 const source = spawn('cat', [filePath],
10 {stdio: ['ignore', 'pipe', process.stderr]});
11 const sink = spawn('cat', [],
12 {stdio: ['pipe', process.stdout, process.stderr]});
13
14 transform(source.stdout, sink.stdin);
15 await onExit(sink);
16
17 console.log('### DONE');
18}
19main();
20
21async function transform(readable, writable) {
22 for await (const line of chunksToLinesAsync(readable)) {
23 await streamWrite(writable, '@ '+line);
24 }
25 await streamEnd(writable);
26}
扩展阅读