前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >jenkins源码分析 —— 接受主节点的远程请求(三)

jenkins源码分析 —— 接受主节点的远程请求(三)

作者头像
lovelife110
发布2021-01-14 10:50:23
8010
发布2021-01-14 10:50:23
举报
文章被收录于专栏:爱生活爱编程爱生活爱编程

本系列(参考远程执行shell源码分析)主节点是通过ssh方式连接(launch slave agents on unix machines via ssh)slave节点

而在启动slave节点时会启动一个ReaderThread线程来接受主节点请求

启动ReaderThread

启动slave节点入口为:通过ssh-slaves项目的SSHLauncher类来启动slave

public synchronized void launch(final SlaveComputer computer, final TaskListener listener) throws InterruptedException {

...

代码语言:txt
复制
 openConnection(listener);
代码语言:txt
复制
 verifyNoHeaderJunk(listener);
代码语言:txt
复制
 reportEnvironment(listener);
代码语言:txt
复制
 String java = resolveJava(computer, listener);
代码语言:txt
复制
 String workingDirectory = getWorkingDirectory(computer);
代码语言:txt
复制
 copySlaveJar(listener, workingDirectory);
代码语言:txt
复制
 startSlave(computer, listener, java, workingDirectory);

...

}

通过startSlave方法启动slave进程

private void startSlave(SlaveComputer computer, final TaskListener listener, String java, String workingDirectory) throws IOException {

...

computer.setChannel(session.getStdout(), session.getStdin(), listener.getLogger(), null);

...

}

通过computer.setChannel调用jenkins-core项目SlaveComputer类的setChannel方法

public void setChannel(InputStream in, OutputStream out, OutputStream launchLog, Channel.Listener listener) throws IOException, InterruptedException {

代码语言:txt
复制
 ChannelBuilder cb = **new** ChannelBuilder(nodeName,threadPoolForRemoting)

.withMode(Channel.Mode.NEGOTIATE)

.withHeaderStream(launchLog);

for (ChannelConfigurator cc : ChannelConfigurator.all()) {

代码语言:txt
复制
     cc.onChannelBuilding(cb,**this**);

}

代码语言:txt
复制
 Channel channel = cb.build(in,out);
代码语言:txt
复制
 setChannel(channel,launchLog,listener);

}

再通过Channel channel = cb.build(in,out)进入到remting项目ChannelBuilder类的build方法

public Channel build(InputStream is, OutputStream os) throws IOException {

return new Channel(this,negotiate(is,os));

}

通过negotiate(is,os)方法得到ChunkedCommandTransport对象

跳转到remoting项目Channel类的构造方法

protected Channel(ChannelBuilder settings, CommandTransport transport) throws IOException {

...

代码语言:txt
复制
 transport.setup(**this**, **new** CommandReceiver() {

public void handle(Command cmd) {

代码语言:txt
复制
         updateLastHeard();

if (logger.isLoggable(Level.FINE))

代码语言:txt
复制
             logger.fine("Received " + cmd);

try {

代码语言:txt
复制
             cmd.execute(Channel.this);

} catch (Throwable t) {

代码语言:txt
复制
             logger.log(Level.SEVERE, "Failed to execute command " + cmd + " (channel " + Channel.this.name + ")", t);
代码语言:txt
复制
             logger.log(Level.SEVERE, "This command is created here", cmd.createdAt);

}

}

public void terminate(IOException e) {

代码语言:txt
复制
         Channel.this.terminate(e);

}

});

}

Channel(ChannelBuilder settings, CommandTransport transport)构造方法接受ChunkedCommandTransport对象,由于

ChunkedCommandTransport 继承——>> AbstractSynchronousByteArrayCommandTransport 继承——>> SynchronousCommandTransport 继承——>> CommandTransport

transport.setup调用SynchronousCommandTransport类的setup方法来启动一个ReaderThread线程

public void setup(Channel channel, CommandReceiver receiver) {

this.channel = channel;

new ReaderThread(receiver).start();

}

读取对象

通过上面的ReaderThread.start()方法启动一个线程,ReaderThread为SynchronousCommandTransport类的内部类,在run()方法中,通过cmd = read()读取对象

private final class ReaderThread extends Thread {

private int commandsReceived = 0;

private int commandsExecuted = 0;

private final CommandReceiver receiver;

public ReaderThread(CommandReceiver receiver) {

super("Channel reader thread: "+channel.getName());

this.receiver = receiver;

}

@Override

public void run() {

final String name =channel.getName();

try {

while(!channel.isInClosed()) {

代码语言:txt
复制
             Command cmd = **null**;

try {

代码语言:txt
复制
                 cmd = read();

} catch (EOFException e) {

代码语言:txt
复制
                 IOException ioe = **new** IOException("Unexpected termination of the channel");
代码语言:txt
复制
                 ioe.initCause(e);

throw ioe;

} catch (ClassNotFoundException e) {

代码语言:txt
复制
                 LOGGER.log(Level.SEVERE, "Unable to read a command (channel " + name + ")",e);

continue;

} finally {

代码语言:txt
复制
                 commandsReceived++;

}

代码语言:txt
复制
             receiver.handle(cmd);
代码语言:txt
复制
             commandsExecuted++;

}

代码语言:txt
复制
         closeRead();

}

...

}

}

创建command对象,即UserRequest

通过上面的read()方法,调用remoting项目的AbstractSynchronousByteArrayCommandTransport的read方法创建一个command对象,即UserRequest(发送远程请求(二)最终发送的UserRequest)

public Command read() throws IOException, ClassNotFoundException {

return Command.readFrom(channel,new ObjectInputStreamEx(

new ByteArrayInputStream(readBlock(channel)),

代码语言:txt
复制
         channel.baseClassLoader));

}

执行UserRequest

通过ReaderThread的run()方法里的receiver.handle(cmd)回调上面Channel类的构造方法里面的handle方法,而传入handle方法的cmd参数即通过上面read()得到的UserRequest

protected Channel(ChannelBuilder settings, CommandTransport transport) throws IOException {

...

public void handle(Command cmd) {

代码语言:txt
复制
 updateLastHeard();

if (logger.isLoggable(Level.FINE))

代码语言:txt
复制
     logger.fine("Received " + cmd);

try {

代码语言:txt
复制
     cmd.execute(Channel.this);

} catch (Throwable t) {

代码语言:txt
复制
     logger.log(Level.SEVERE, "Failed to execute command " + cmd + " (channel " + Channel.this.name + ")", t);
代码语言:txt
复制
     logger.log(Level.SEVERE, "This command is created here", cmd.createdAt);

}

}

...

}

通过cmd.execute(Channel.this)来执行UserRequest

先是通过UserRequest的父类Request(继承Command)来执行execute()方法

protected final void execute(final Channel channel) {

代码语言:txt
复制
 channel.executingCalls.put(id,**this**);
代码语言:txt
复制
 future = channel.executor.submit(**new** Runnable() {

private int startIoId;

private int calcLastIoId() {

int endIoId = channel.lastIoId();

if (startIoId==endIoId) return 0;

return endIoId;

}

public void run() {

代码语言:txt
复制
         String oldThreadName = Thread.currentThread().getName();
代码语言:txt
复制
         Thread.currentThread().setName(oldThreadName+" for "+channel.getName());

try {

代码语言:txt
复制
             Command rsp;
代码语言:txt
复制
             CURRENT.set(Request.this);
代码语言:txt
复制
             startIoId = channel.lastIoId();

try {

// make sure any I/O preceding this has completed

代码语言:txt
复制
                channel.pipeWriter.get(lastIoId).get();
代码语言:txt
复制
                 RSP r = Request.this.perform(channel);

// normal completion

代码语言:txt
复制
                rsp = **new** Response<RSP,EXC>(id,calcLastIoId(),r);

...

synchronized (channel) {// expand the synchronization block of the send() method to a check

if(!channel.isOutClosed())

代码语言:txt
复制
                         channel.send(rsp);

}

...

}

});

}

创建Runnable并提交到Executor

通过channel.executor.submit(new Runnable(){...})创建一个Runnable并通过submit提交到executor

然后通过Request.this.perform(channel)调用remoting项目UserRequest类的perform()方法

protected UserResponse<RSP,EXC> perform(Channel channel) throws EXC {

try {

代码语言:txt
复制
     ClassLoader cl = channel.importedClassLoaders.get(classLoaderProxy);
代码语言:txt
复制
     RSP r = **null**;
代码语言:txt
复制
     Channel oldc = Channel.setCurrent(channel);

try {

代码语言:txt
复制
         Object o;

try {

代码语言:txt
复制
             o = deserialize(channel,request,cl);

...

}

创建Response

通过new Response<RSP,EXC>(id,calcLastIoId(),r)创建Response对象

发送Response

最终通过channel.send(rsp)把Response对象发送给主节点

本文参与 腾讯云自媒体分享计划,分享自作者个人站点/博客。
原始发表:2018-05-18 ,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 作者个人站点/博客 前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体分享计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 启动ReaderThread
  • 读取对象
  • 创建command对象,即UserRequest
  • 执行UserRequest
  • 创建Runnable并提交到Executor
  • 创建Response
  • 发送Response
相关产品与服务
命令行工具
腾讯云命令行工具 TCCLI 是管理腾讯云资源的统一工具。使用腾讯云命令行工具,您可以快速调用腾讯云 API 来管理您的腾讯云资源。此外,您还可以基于腾讯云的命令行工具来做自动化和脚本处理,以更多样的方式进行组合和重用。
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档