前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >Java 调用 shell 控制超时时间

Java 调用 shell 控制超时时间

原创
作者头像
Flink 实战演练
修改2022-08-18 16:29:49
2.1K0
修改2022-08-18 16:29:49
举报
文章被收录于专栏:Flink 知其所以然

背景

平台开发经常需要使用 shell 脚本调度大数据的组件,在使用 springBoot 开发项目时也是如此,为了保证子 shell 的执行时间可控,需要设置超时时间,如果 shell 无法在给定时间内返回,需要进行相关容错处理。

实践

shell 脚本

say_hello.sh

代码语言:txt
复制
#!/bin/zsh
set -eu
echo "hello ----"
#2*3
for (( i = 0; i < $1; i++ )); do
    echo $i 111
    sleep 1
done
echo "end *************"

Java 代码案例1

代码语言:txt
复制
package org.bridge.xjq.bridge;

import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
import java.net.URL;
import java.util.concurrent.TimeUnit;

public class Main {

    public static void main(String[] args) throws IOException, InterruptedException {

        // one: 使用 processBuilder 构建 process 调度 shell 脚本
        URL url = Main3.class.getResource("/say_hello.sh");
        ProcessBuilder processBuilder = new ProcessBuilder("sh", url.getPath(), 3);
        processBuilder.redirectErrorStream(true);
        Process process = processBuilder.start();

        // two: 设置超时时间, 等待执行结束或者超时
        System.out.println("等待执行完毕或超时 ...");
        boolean over = process.waitFor(5, TimeUnit.SECONDS);
        System.out.println("进程正常结束了么:" + over);

        // three: 判断是否运行完毕了, 如果没有, 就手动 kill 掉子进程
        if (over) {
            System.out.println("finished in shell ");
        } else {
            System.out.println("准备 stop 掉子进程");
            // 如果不 sleep 直接 destory, 会造成读取线程发生 IO 异常
            process.destroy();
        }

        // four: 读取子进程输出结果
        BufferedReader reader = new BufferedReader(new InputStreamReader(process.getInputStream()));
        StringBuilder stringBuilder = new StringBuilder();
        String line;
        while ((line = reader.readLine()) != null) {
            System.out.println(line);
            stringBuilder.append(line);
        }

        // five: 等待进程结束(process.destory 内部是一个 native 方法, 不会马上 kill 掉,需要等待一段时间子进程真正被 killed)
        int loop = 0;
        while (process.isAlive() && loop < 10) {
            loop++;
            Thread.sleep(1);
            System.out.println("loop: " + loop + ", still alive");
        }

        System.out.println("进程信息" + process.isAlive() + process.exitValue());
        System.out.println(stringBuilder.toString());

    }
}
子进程正常结束

日志如图所示:

代码语言:txt
复制
等待执行完毕或超时 ...
进程正常结束了么:true
finished in shell 
hello ----
0 111
1 111
2 111
end *************
进程是否存活:false;进程返回值:0
shell 输出结果:hello ----0 1111 1112 111end *************
子进程执行超时

在取出结果时会抛出异常 Exception in thread "main" java.io.IOException: Stream closed

代码语言:txt
复制
等待执行完毕或超时 ...
进程正常结束了么:false
准备 stop 掉子进程
Exception in thread "main" java.io.IOException: Stream closed
	at java.io.BufferedInputStream.getBufIfOpen(BufferedInputStream.java:170)
	at java.io.BufferedInputStream.read(BufferedInputStream.java:336)
	at sun.nio.cs.StreamDecoder.readBytes(StreamDecoder.java:284)
	at sun.nio.cs.StreamDecoder.implRead(StreamDecoder.java:326)
	at sun.nio.cs.StreamDecoder.read(StreamDecoder.java:178)
	at java.io.InputStreamReader.read(InputStreamReader.java:184)
	at java.io.BufferedReader.fill(BufferedReader.java:161)
	at java.io.BufferedReader.readLine(BufferedReader.java:324)
	at java.io.BufferedReader.readLine(BufferedReader.java:389)
	at org.bridge.xjq.bridge.Main3.main(Main3.java:37)

代码优化

当检测到子进程超时依然没有结束时,我们会主动 destroy 掉子进程,destory 子进程的同时也会将 InputStream 流关闭,导致子进程计算结果无法获取,为了避免这种情形,可以考虑使用一个单独线程获取结果。

代码语言:txt
复制
public class Main3 {

    public static void main(String[] args) throws IOException, InterruptedException {

        // one: 使用 processBuilder 构建 process 调度 shell 脚本
        URL url = Main3.class.getResource("/say_hello.sh");
        ProcessBuilder processBuilder = new ProcessBuilder("sh", url.getPath(), "8");
        processBuilder.redirectErrorStream(true);
        Process process = processBuilder.start();

        // 1.5 在进程开始执行后, 立即轮询取出结果到 stringBuilder 中, 为了避免阻塞主逻辑(发现超时时,kill 掉子进程),使用独立线程取数
        BufferedReader reader = new BufferedReader(new InputStreamReader(process.getInputStream()));
        StringBuilder stringBuilder = new StringBuilder();
        // 填充 shell 脚本内容
        new Thread(new Runnable() {
            @Override
            public void run() {
                // 将数据导出来, 这里也会一直等待子进程输出数据
                String line;
                while (true) {
                    try {
                        if ((line = reader.readLine()) == null) {
                            System.out.println(Thread.currentThread() + ", 取数结束了...");
                            break;
                        }
                    } catch (IOException e) {
                        e.printStackTrace();
                        throw new RuntimeException(e);
                    }
                    stringBuilder.append(line);
                    System.out.println(Thread.currentThread() + "result:" + line);
                }
                System.out.println(Thread.currentThread() + "flush to stringBuilder over");
            }
        }).start();

        // two: 设置超时时间, 等待执行结束或者超时
        System.out.println("等待执行完毕或超时 ...");
        boolean over = process.waitFor(5, TimeUnit.SECONDS);
        System.out.println("进程正常结束了么:" + over);

        // three: 判断是否运行完毕了, 如果没有, 就手动 kill 掉子进程
        if (over) {
            System.out.println("finished in shell ");
        } else {
            System.out.println("准备 stop 掉子进程");
            // 如果不 sleep 直接 destory, 会造成读取线程发生 IO 异常
            process.destroy();
        }

        /*
        // four: 读取子进程输出结果
        BufferedReader reader = new BufferedReader(new InputStreamReader(process.getInputStream()));
        StringBuilder stringBuilder = new StringBuilder();
        String line;
        while ((line = reader.readLine()) != null) {
            System.out.println(line);
            stringBuilder.append(line);
        }
        */

        // five: 等待进程结束(process.destory 内部是一个 native 方法, 不会马上 kill 掉,需要等待一段时间子进程真正被 killed)
        int loop = 0;
        while (process.isAlive() && loop < 10) {
            loop++;
            Thread.sleep(1);
            System.out.println("loop: " + loop + ", still alive");
        }

        System.out.println("进程是否存活:" + process.isAlive() + ";进程返回值:"+ process.exitValue());
        System.out.println("shell 输出结果:" + stringBuilder.toString());

    }
}
子进程正常结束
代码语言:txt
复制
等待执行完毕或超时 ...
Thread[Thread-0,5,main]result:hello ----
Thread[Thread-0,5,main]result:0 111
Thread[Thread-0,5,main]result:1 111
Thread[Thread-0,5,main]result:2 111
Thread[Thread-0,5,main]result:end *************
Thread[Thread-0,5,main], 取数结束了...
Thread[Thread-0,5,main]flush to stringBuilder over
进程正常结束了么:true
finished in shell 
进程是否存活:false;进程返回值:0
shell 输出结果:hello ----0 1111 1112 111end *************
子进程超时
代码语言:txt
复制
等待执行完毕或超时 ...
Thread[Thread-0,5,main]result:hello ----
Thread[Thread-0,5,main]result:0 111
Thread[Thread-0,5,main]result:1 111
Thread[Thread-0,5,main]result:2 111
Thread[Thread-0,5,main]result:3 111
Thread[Thread-0,5,main]result:4 111
进程正常结束了么:false
准备 stop 掉子进程
Thread[Thread-0,5,main], 取数结束了...
Thread[Thread-0,5,main]flush to stringBuilder over
loop: 1, still alive
进程是否存活:false;进程返回值:143
shell 输出结果:hello ----0 1111 1112 1113 1114 111

小结

Process 对象

public boolean waitFor(long timeout, TimeUnit unit)

该方法会阻塞当前线程,直到子进程执行完毕或者达到了超时时间,返回值反应子进程是否正常运行完毕

public abstract void destroy()

底层使用的一个 JNI(java native interface)方法,可能是异步执行的,并不会同步 kill 掉进程返回,因此需要等待一小段时间,等待子进程被终结

代码语言:txt
复制
    private void destroy(boolean force) {
        switch (platform) {
            case LINUX:
            case BSD:
            case AIX:
                // There is a risk that pid will be recycled, causing us to
                // kill the wrong process!  So we only terminate processes
                // that appear to still be running.  Even with this check,
                // there is an unavoidable race condition here, but the window
                // is very small, and OSes try hard to not recycle pids too
                // soon, so this is quite safe.
                synchronized (this) {
                    if (!hasExited)
                        destroyProcess(pid, force);
                }
                try { stdin.close();  } catch (IOException ignored) {}
                try { stdout.close(); } catch (IOException ignored) {}
                try { stderr.close(); } catch (IOException ignored) {}
                break;
                ...
代码语言:txt
复制
    private static native void destroyProcess(int pid, boolean force);
exitValue() 是进程返回值可能会抛出异常
isAlive() 底层使用 exitValue,不会抛异常

InputStream

close 方法

不用的 InputStream 有不同的 close 实现,对于 process.getInputStream() 返回的类是 ProcessPipeInputStream,其 close 会补充结束符后,再关闭掉 inputStream,从而避免 reader.readline 读数时抛异常 Stream closed

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 背景
  • 实践
    • shell 脚本
      • Java 代码案例1
        • 子进程正常结束
        • 子进程执行超时
      • 代码优化
        • 子进程正常结束
        • 子进程超时
    • 小结
      • Process 对象
        • public boolean waitFor(long timeout, TimeUnit unit)
        • public abstract void destroy()
        • exitValue() 是进程返回值可能会抛出异常
        • isAlive() 底层使用 exitValue,不会抛异常
      • InputStream
        • close 方法
    领券
    问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档