本文解析jenkins从节点执行主节点的远程请求过程的源码
从上一篇文章“接受主节点的远程请求(三)”得知,执行UserRequst是通过创建Runnable并提交到Executor,而Executor负责执行构建
具体执行请求的代码是UserRequest类的perform()方法
protected UserResponse<RSP,EXC> perform(Channel channel) throws EXC {
try {
ClassLoader cl = channel.importedClassLoaders.get(classLoaderProxy);
RSP r = null;
Channel oldc = Channel.setCurrent(channel);
try {
Object o;
try {
o = deserialize(channel,request,cl);
} catch (ClassNotFoundException e) {
throw new ClassNotFoundException("Failed to deserialize the Callable object. Perhaps you needed to implement DelegatingCallable?",e);
} catch (RuntimeException e) {
throw new Error("Failed to deserialize the Callable object.",e);
}
Callable<RSP,EXC> callable = (Callable<RSP,EXC>)o;
if(!channel.isArbitraryCallableAllowed() && !(callable instanceof RPCRequest))
throw new SecurityException("Execution of "+callable.toString()+" is prohibited because the channel is restricted");
callable = channel.decorators.wrapUserRequest(callable);
ClassLoader old = Thread.currentThread().getContextClassLoader();
Thread.currentThread().setContextClassLoader(cl);
// execute the service
try {
r = callable.call();
} finally {
Thread.currentThread().setContextClassLoader(old);
}
} finally {
Channel.setCurrent(oldc);
}
return new UserResponse<RSP,EXC>(serialize(r,channel),false);
...
}
通过UserRequest类的perform()方法里面的ClassLoader cl = channel.importedClassLoaders.get(classLoaderProxy)创建ClassLoader
通过UserRequest类的perform()方法里面Object o = deserialize(channel,request,cl)以及Callable<RSP,EXC> callable = (Callable<RSP,EXC>)o得到Callable对象,即“发送远程请求(二)”发送的RemoteLaunchCallable
deserialize()方法代码:
/*package*/ static Object deserialize(final Channel channel, byte[] data, ClassLoader defaultClassLoader) throws IOException, ClassNotFoundException { ByteArrayInputStream in = new ByteArrayInputStream(data); ObjectInputStream ois; if (channel.remoteCapability.supportsMultiClassLoaderRPC()) { // this code is coupled with the ObjectOutputStream subtype above ois = new MultiClassLoaderSerializer.Input(channel, in); } else { ois = new ObjectInputStreamEx(in, defaultClassLoader); } return ois.readObject(); }
MultiClassLoaderSerializer.Input继承ObjectInputStream,通过重写resolveClass,resolveProxyClass方法以及readClassLoader方法来获得ObjectInputStream对象
这么做的原因:反序列化时,如果在本地找不到这个对象的类的sourceCode,则序列化就会失败。但是可以通过覆盖ObjectOutputStream.annotateClass和ObjectInputStream.resolveClass来实现从主节点将类的sourceCode传到从节点,并运用ClassLoader来载入这个类。
class MultiClassLoaderSerializer { ... static final class Input extends ObjectInputStream { private final Channel channel; private final List<ClassLoader> classLoaders = new ArrayList<ClassLoader>(); Input(Channel channel, InputStream in) throws IOException { super(in); this.channel = channel; } private ClassLoader readClassLoader() throws IOException, ClassNotFoundException { ... } @Override protected Class<?> resolveClass(ObjectStreamClass desc) throws IOException, ClassNotFoundException { ... } @Override protected Class<?> resolveProxyClass(String[] interfaces) throws IOException, ClassNotFoundException { ... } } ... }
通过UserRequest类的perform()方法里面callable.call()调用jenkins-core项目的Launcher类的内部类RemoteLaunchCallable类的call方法
private static class RemoteLaunchCallable extends MasterToSlaveCallable<RemoteProcess,IOException> { ... public RemoteProcess call() throws IOException { Launcher.ProcStarter ps = new LocalLauncher(listener).launch(); ps.cmds(cmd).masks(masks).envs(env).stdin(in).stdout(out).stderr(err).quiet(quiet); if(workDir!=null) ps.pwd(workDir); if (reverseStdin) ps.writeStdin(); if (reverseStdout) ps.readStdout(); if (reverseStderr) ps.readStderr(); final Proc p = ps.start(); ... }
通过上面代码的Launcher.ProcStarter ps = new LocalLauncher(listener).launch()以及final Proc p = ps.start(),最终调用Launcher类的LocalLauncher内部类的launch方法
public static class LocalLauncher extends Launcher { ... @Override public Proc launch(ProcStarter ps) throws IOException { if (!ps.quiet) { maskedPrintCommandLine(ps.commands, ps.masks, ps.pwd); } EnvVars jobEnv = inherit(ps.envs); // replace variables in command line String[] jobCmd = new String[ps.commands.size()]; for ( int idx = 0 ; idx < jobCmd.length; idx++ ) jobCmd[idx] = jobEnv.expand(ps.commands.get(idx)); return new LocalProc(jobCmd, Util.mapToEnv(jobEnv), ps.reverseStdin ?LocalProc.SELFPUMP_INPUT:ps.stdin, ps.reverseStdout?LocalProc.SELFPUMP_OUTPUT:ps.stdout, ps.reverseStderr?LocalProc.SELFPUMP_OUTPUT:ps.stderr, toFile(ps.pwd)); } ... }
而LocalProc构造方法
public LocalProc(String[] cmd,String[] env,InputStream in,OutputStream out,OutputStream err,File workDir) throws IOException { this( calcName(cmd), stderr(environment(new ProcessBuilder(cmd),env).directory(workDir), err==null || err== SELFPUMP_OUTPUT), in, out, err ); }
里面的java.lang.ProcessBuilder才是jenkins job里面Execute Shell的最根本的代码。