本系列(参考远程执行shell源码分析)主节点是通过ssh方式连接(launch slave agents on unix machines via ssh)slave节点
而在启动slave节点时会启动一个ReaderThread线程来接受主节点请求
启动slave节点入口为:通过ssh-slaves项目的SSHLauncher类来启动slave
public synchronized void launch(final SlaveComputer computer, final TaskListener listener) throws InterruptedException {
...
openConnection(listener);
verifyNoHeaderJunk(listener);
reportEnvironment(listener);
String java = resolveJava(computer, listener);
String workingDirectory = getWorkingDirectory(computer);
copySlaveJar(listener, workingDirectory);
startSlave(computer, listener, java, workingDirectory);
...
}
通过startSlave方法启动slave进程
private void startSlave(SlaveComputer computer, final TaskListener listener, String java, String workingDirectory) throws IOException {
...
computer.setChannel(session.getStdout(), session.getStdin(), listener.getLogger(), null);
...
}
通过computer.setChannel调用jenkins-core项目SlaveComputer类的setChannel方法
public void setChannel(InputStream in, OutputStream out, OutputStream launchLog, Channel.Listener listener) throws IOException, InterruptedException {
ChannelBuilder cb = **new** ChannelBuilder(nodeName,threadPoolForRemoting)
.withMode(Channel.Mode.NEGOTIATE)
.withHeaderStream(launchLog);
for (ChannelConfigurator cc : ChannelConfigurator.all()) {
cc.onChannelBuilding(cb,**this**);
}
Channel channel = cb.build(in,out);
setChannel(channel,launchLog,listener);
}
再通过Channel channel = cb.build(in,out)进入到remting项目ChannelBuilder类的build方法
public Channel build(InputStream is, OutputStream os) throws IOException {
return new Channel(this,negotiate(is,os));
}
通过negotiate(is,os)方法得到ChunkedCommandTransport对象
跳转到remoting项目Channel类的构造方法
protected Channel(ChannelBuilder settings, CommandTransport transport) throws IOException {
...
transport.setup(**this**, **new** CommandReceiver() {
public void handle(Command cmd) {
updateLastHeard();
if (logger.isLoggable(Level.FINE))
logger.fine("Received " + cmd);
try {
cmd.execute(Channel.this);
} catch (Throwable t) {
logger.log(Level.SEVERE, "Failed to execute command " + cmd + " (channel " + Channel.this.name + ")", t);
logger.log(Level.SEVERE, "This command is created here", cmd.createdAt);
}
}
public void terminate(IOException e) {
Channel.this.terminate(e);
}
});
}
Channel(ChannelBuilder settings, CommandTransport transport)构造方法接受ChunkedCommandTransport对象,由于
ChunkedCommandTransport 继承——>> AbstractSynchronousByteArrayCommandTransport 继承——>> SynchronousCommandTransport 继承——>> CommandTransport
transport.setup调用SynchronousCommandTransport类的setup方法来启动一个ReaderThread线程
public void setup(Channel channel, CommandReceiver receiver) {
this.channel = channel;
new ReaderThread(receiver).start();
}
通过上面的ReaderThread.start()方法启动一个线程,ReaderThread为SynchronousCommandTransport类的内部类,在run()方法中,通过cmd = read()读取对象
private final class ReaderThread extends Thread {
private int commandsReceived = 0;
private int commandsExecuted = 0;
private final CommandReceiver receiver;
public ReaderThread(CommandReceiver receiver) {
super("Channel reader thread: "+channel.getName());
this.receiver = receiver;
}
@Override
public void run() {
final String name =channel.getName();
try {
while(!channel.isInClosed()) {
Command cmd = **null**;
try {
cmd = read();
} catch (EOFException e) {
IOException ioe = **new** IOException("Unexpected termination of the channel");
ioe.initCause(e);
throw ioe;
} catch (ClassNotFoundException e) {
LOGGER.log(Level.SEVERE, "Unable to read a command (channel " + name + ")",e);
continue;
} finally {
commandsReceived++;
}
receiver.handle(cmd);
commandsExecuted++;
}
closeRead();
}
...
}
}
通过上面的read()方法,调用remoting项目的AbstractSynchronousByteArrayCommandTransport的read方法创建一个command对象,即UserRequest(发送远程请求(二)最终发送的UserRequest)
public Command read() throws IOException, ClassNotFoundException {
return Command.readFrom(channel,new ObjectInputStreamEx(
new ByteArrayInputStream(readBlock(channel)),
channel.baseClassLoader));
}
通过ReaderThread的run()方法里的receiver.handle(cmd)回调上面Channel类的构造方法里面的handle方法,而传入handle方法的cmd参数即通过上面read()得到的UserRequest
protected Channel(ChannelBuilder settings, CommandTransport transport) throws IOException {
...
public void handle(Command cmd) {
updateLastHeard();
if (logger.isLoggable(Level.FINE))
logger.fine("Received " + cmd);
try {
cmd.execute(Channel.this);
} catch (Throwable t) {
logger.log(Level.SEVERE, "Failed to execute command " + cmd + " (channel " + Channel.this.name + ")", t);
logger.log(Level.SEVERE, "This command is created here", cmd.createdAt);
}
}
...
}
通过cmd.execute(Channel.this)来执行UserRequest
先是通过UserRequest的父类Request(继承Command)来执行execute()方法
protected final void execute(final Channel channel) {
channel.executingCalls.put(id,**this**);
future = channel.executor.submit(**new** Runnable() {
private int startIoId;
private int calcLastIoId() {
int endIoId = channel.lastIoId();
if (startIoId==endIoId) return 0;
return endIoId;
}
public void run() {
String oldThreadName = Thread.currentThread().getName();
Thread.currentThread().setName(oldThreadName+" for "+channel.getName());
try {
Command rsp;
CURRENT.set(Request.this);
startIoId = channel.lastIoId();
try {
// make sure any I/O preceding this has completed
channel.pipeWriter.get(lastIoId).get();
RSP r = Request.this.perform(channel);
// normal completion
rsp = **new** Response<RSP,EXC>(id,calcLastIoId(),r);
...
synchronized (channel) {// expand the synchronization block of the send() method to a check
if(!channel.isOutClosed())
channel.send(rsp);
}
...
}
});
}
通过channel.executor.submit(new Runnable(){...})创建一个Runnable并通过submit提交到executor
然后通过Request.this.perform(channel)调用remoting项目UserRequest类的perform()方法
protected UserResponse<RSP,EXC> perform(Channel channel) throws EXC {
try {
ClassLoader cl = channel.importedClassLoaders.get(classLoaderProxy);
RSP r = **null**;
Channel oldc = Channel.setCurrent(channel);
try {
Object o;
try {
o = deserialize(channel,request,cl);
...
}
通过new Response<RSP,EXC>(id,calcLastIoId(),r)创建Response对象
最终通过channel.send(rsp)把Response对象发送给主节点