首页
学习
活动
专区
圈层
工具
发布
首页
学习
活动
专区
圈层
工具
MCP广场
社区首页 >问答首页 >如何从独立线程中运行的多个外部命令管理多个stdout?

如何从独立线程中运行的多个外部命令管理多个stdout?
EN

Stack Overflow用户
提问于 2019-05-14 02:20:51
回答 1查看 32关注 0票数 0

我想从我的scala程序内部运行多个外部命令,并能够处理它们的输出(发送到stdout)。有什么办法可以让我这么做吗?这些程序的输出将包含一些日志记录和可能的进度信息。我需要解析它们的输出并将其发送到数据库服务器,以便在它们返回错误时保存日志、进度和关闭进程。重要的方面是,我不想等待不同的进程完成,而是不断地获得输出。

我以这种方式运行单独的外部命令:

代码语言:javascript
复制
def execCommand(command: String, bId: Long): Future[(Long, Stream[String])] = {
    Future {
      bId -> command.lineStream
    }(executionContext)
  }

"bId“只是我的进程id。除了使用以下命令之外,我无法管理如何获取输出:

代码语言:javascript
复制
Await.result() 

方法,在"execCommand“方法的结果上,但这不是我想要的工作方式。

我希望我的程序在多线程中处理多个数据流,并持续管理这些数据。它不需要是scala,用java编写的解决方案也可以。

EN

回答 1

Stack Overflow用户

发布于 2019-05-14 03:10:43

当您启动该进程时,还会启动一个线程来读取该进程的输出。

如果要在单个线程中管理输出,请创建一个BlockingQueue并将输出发送到队列。您可能希望跟踪输出来自哪个进程,因此在队列中使用POJO。

示例

代码语言:javascript
复制
public class CommandOutput {
    private final int process;
    private final String line;
    public CommandOutput(int process, String line) {
        this.process = process;
        this.line = line;
    }
    public int getProcess() {
        return this.process;
    }
    public String getLine() {
        return this.line;
    }
}
代码语言:javascript
复制
public class CommandOutputStreamHandler extends Thread {
    private final int process;
    private final InputStream stream;
    private final BlockingQueue<CommandOutput> queue;
    public CommandOutputStreamHandler(int process, InputStream stream, BlockingQueue<CommandOutput> queue) {
        this.process = process;
        this.stream = stream;
        this.queue = queue;
    }
    @Override
    public void run() {
        try (BufferedReader reader = new BufferedReader(new InputStreamReader(this.stream))) {
            for (String line; (line = reader.readLine()) != null; ) {
                this.queue.add(new CommandOutput(this.process, line));
                Thread.sleep(200); // just for simulating output over time
            }
        } catch (Exception e) {
            throw new RuntimeException(e); // or simply log the exception
        } finally {
            this.queue.add(new CommandOutput(this.process, null));
        }
    }
}
代码语言:javascript
复制
BlockingQueue<CommandOutput> queue = new LinkedBlockingQueue<>();

final int PROCS = 3;
for (int i = 0; i < PROCS; i++) {
    Process p = new ProcessBuilder("cmd.exe", "/c", "dir", "/b")
            .redirectErrorStream(true)
            .start();
    new CommandOutputStreamHandler(i, p.getInputStream(), queue)
            .start();
}

for (int endMarkers = 0; endMarkers < PROCS; ) {
    CommandOutput co = queue.take();
    if (co.getLine() == null) {
        endMarkers++;
    } else {
        System.out.println(co.getProcess() + ": " + co.getLine());
    }
}

样本输出

代码语言:javascript
复制
0: .classpath
0: .project
1: .classpath
0: .settings
1: .project
0: lib
1: .settings
0: pom.xml
1: lib
0: src
1: pom.xml
0: target
1: src
2: .classpath
1: target
2: .project
2: .settings
2: lib
2: pom.xml
2: src
2: target
票数 0
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/56117875

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档