前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >YARN任务运行中的Token

YARN任务运行中的Token

作者头像
陈猿解码
发布2023-02-28 14:59:15
7940
发布2023-02-28 14:59:15
举报
文章被收录于专栏:陈猿解码

上一篇文章中,主要讲解了token的一些通用知识,以及hadoop中,token的实现和通用数据结构及流程。

本文主要讲述yarn任务提交运行过程中涉及的几个重要token:AMRMToken,NMToken,ContainerToken。

【AMRMToken】


用于保证ApplicationMaster(下面均简称AM)与RM之间的安全通信,即AM向RM注册,以及后续向RM申请资源的rpc请求,都会带上该token。

AMRMToken在客户端向RM提交任务后,由RM创建生成,然后通过rpc请求传递给NM;NM通过将token持久化到本地文件,让AM启动后从对应文件中加载到token,这样AM就可以使用正确的token向RM注册并完成rpc请求交互了。接下来就展开说明下。

1)token的生成

客户端提交任务请求后,RM在内部的处理中,为AM构造对应的container启动上下文时,创建了AMRMToken,相关代码如下所示:

代码语言:javascript
复制
// AMLauncher.java
private void launch() throws IOException, YarnException {
    ...
    // 构造 container 启动上下文
    ContainerLaunchContext launchContext = 
        createAMContainerLaunchContext(applicationContext, masterContainerID);
    ...
}

private ContainerLaunchContext createAMContainerLaunchContext(
    ApplicationSubmissionContext applicationMasterContext,
    ContainerId containerID) throws IOException {
    ...
    setupTokens(container, containerID);
    ...
}

protected void setupTokens(ContainerLaunchContext container, ContainerId, containerID) 
    throws IOException {
    ...
    // 构造 AMRMToken
    Token<AMRMTokenIdentifier> amrmToken = createAndSetAMRMToken();
    if (amrmToken != null) {
        credentials.addToken(amrmToken.getService(), amrmToken);
    }
    ...
}

protected Token<AMRMTokenIdentifier> createAndSetAMRMToken() {
    Token<AMRMTokenIdentifier> amrmToken =
        this.rmContext.getAMRMTokenSecretManager()
            .createAndGetAMRMToken(application.getAppAttemptId());
    ((RMAppAttemptImpl)application).setAMRMToken(amrmToken);
    return amrmToken;
}

2)AMRMToken的传递

a. RM --> NM

在构造完container启动上下文后,将启动上下文随container启动请求(StartContainerRequest)发送给NM。

b. NM --> AM

NM收到请求后,内部构造Container实例对象,并从请求中取出credential保存在实例对象中,在真正需要启动AM时,将token信息写到本地文件中。

代码语言:javascript
复制
// ContainerLauncher.java
public Integer call() {
    // token存储在 nmPrivate 中的路径
    Path nmPrivateTokensPath = 
        dirsHandler.getLocalPathForWrite(
            getContainerPrivateDir(appIdStr, containerIdStr) + Path.SEPARATOR +
            String.format(TOKEN_FILE_NAME_FMT, containerIdStr));
    // Set th token location too.
    // 为AM设置环境变量
    addToEnvMap(
        environment, nmEnvVars,
        ApplicationConstants.CONTAINER_TOKEN_FILE_ENV_NAME,
        new Path(
            containerWorkDir,
            FINAL_CONTAINER_TOKENS_FILE).toUri().getPath());
    // 将token写入文件中
    try (DataOutputStream tokensOutStream = 
        lfs.create(nmPrivateTokensPath, EnumSet.of(CREATE, OVERWRITE))) {
        Credentials creds = container.getCredentials();
        creds.writeTokenStorageToStream(tokensOutStream);
    }
    ...
}

//DefaultContainerExecutor.java
public int launchContainer(ContainerStartContext ctx) {
    // copy container tokens to work dir
    Path tokenDst = 
        new Path(containerWorkDir, Containerlaunch.FINAL_CONTAINER_TOKENS_FILE);
    copyFile(nmPrivateTokensPath, tokenDst, user);
}

从上面的代码可以看到,实际上先将token写入nmPrivate目录中,以container的ID作为文件名,".tokens"作为文件后缀,然后将token文件拷贝到container的工作目录中,并重命名为container.tokens。

例如,存储在nmPrivate目录下的token:

代码语言:javascript
复制
[root@dn-nm-0 container_e301_1652243949356_2011_01_000003]# pwd
/home/hncscwc/hadoop/yarn/nodemanager/local/nmPrivate/application_1652243949356_2011/container_e301_1652243949356_2011_01_000003
[root@dn-nm-0 container_e301_1652243949356_2011_01_000003]# ll
total 52
-rw-r--r-- 1 hadoop hadoop 8 May 13 16:27 container_e301_1652243949356_2011_01_000003.pid
-rw-r--r-- 1 hadoop hadoop 387 May 13 16:27 container_e301_1652243949356_2011_01_000003.tokens
-rw-r--r-- 1 hadoop hadoop 43441 May 13 16:27 launch_container.sh

存储在container工作目录下的token:

代码语言:javascript
复制
[root@dn-nm-0 container_e301_1652243949356_2011_01_000003]# pwd
/home/hncscwc/hadoop/yarn/nodemanager/local/usercache/dcp/appcache/application_1652243949356_2011/container_e301_1652243949356_2011_01_000003
[root@dn-nm-0 container_e301_1652243949356_2011_01_000003]# ll
total 72
lrwxrwxrwx 1 hadoop hadoop 100 May 13 16:27 __app__.jar -> /home/hncscwc/hadoop/yarn/nodemanager/local/usercache/hncscwc/filecache/3906/spark-examples_2.11-2.4.4.jar
-rw-r--r-- 1 hadoop hadoop 387 May 13 16:27 container_tokens
-rwx------ 1 hadoop hadoop 750 May 13 16:27 default_container_executor_session.sh
-rwx------ 1 hadoop hadoop 805 May 13 16:27 default_container_executor.sh
-rwx------ 1 hadoop hadoop 43441 May 13 16:27 launch_container.sh
lrwxrwxrwx 1 hadoop hadoop 91 May 13 16:27 metrics-influxdb.jar -> /home/hncscwc/hadoop/yarn/nodemanager/local/usercache/hncscwc/filecache/3910/metrics-influxdb.jar
lrwxrwxrwx 1 hadoop hadoop 89 May 13 16:27 metrics.properties -> /home/hncscwc/hadoop/yarn/nodemanager/local/usercache/hncscwc/filecache/3909/metrics.properties
lrwxrwxrwx 1 hadoop hadoop 89 May 13 16:27 __spark_conf__ -> /home/hncscwc/hadoop/yarn/nodemanager/local/usercache/hncscwc/filecache/3908/__spark_conf__.zip
lrwxrwxrwx 1 hadoop hadoop 94 May 13 16:27 spark-influxdb-sink.jar -> /home/hncscwc/hadoop/yarn/nodemanager/local/usercache/hncscwc/filecache/3907/spark-influxdb-sink.jar
drwxr-xr-x 2 hadoop hadoop 12288 May 13 16:27 __spark_libs__
drwx--x--- 2 hadoop hadoop 55 May 13 16:27 tmp

3)AM启动后的注册校验

AM进程启动后,构造UGI(UserGroupInformation)对象时,会根据环境变量HADOOP_TOKEN_FILE_LOCATION的值,从指定文件中加载token信息,然后附在rpc请求中向RM进行注册。RM收到请求后由对应的SecretManager(这里对应于AMRMTokenSecretManager)完成认证逻辑。认证的逻辑在上一篇文章有详细介绍。

需要注意的是:CONTAINER_TOKEN_FLIE_ENV_NAME的值与HADOOP_TOKEN_FILE_LOCATION的值是相同的,这样就可以保证正确读取到对应的token。

【NMToken】


NMToken则是用于与NM的安全通信。

从任务提交运行的流程中可以知道,RM和AM都会和NM通信请求启动container,其中RM向NM请求启动AM;而AM则是向NM请求启动任务container。因此,在RM与NM的通信、AM与NM的通信中都会用到NMToken。

1) NM向RM注册获取NMToken的MasterKey

由于NMToken是由RM生成的,但最终在NM中进行校验,因此NM需要和RM使用一样的密钥,这个密钥是在NM向RM注册时获取的,并在心跳请求中更新密钥信息。

代码语言:javascript
复制
// ResourceTrackerService.java
public RegisterNodeManagerResponse registerNodeManager(
    egisterNodeManagerRequest request) throws YarnException, IOException {
    ...
    RegisterNodeManagerResponse response = 
    recordFactory.newRecordInstance(RegisterNodeManagerResponse.class);
    ...
    // 返回 containerToken 和 NMToken 的密钥信息
    response.setContainerTokenMasterKey(
        containerTokenSecretManager.getCurrentKey());
    response.setNMTokenMasterKey(
        nmTokenSecretManager.getCurrentKey());
}

// NodeStatusUpdaterImpl.java
protected void registerWithRM() 
    throws YarnException, IOException {
    ...
    regNMResponse =
        resourceTracker.registerNodeManager(request);
    MasterKey masterKey = 
        regNMResponse.getContainerTokenMasterKey();
    // do this now so that its set before we start heartbeating to RM
    // It is expected that status updater is started by this point and
    // RM gives the shared secret in registration during
    // StatusUpdater#start().
    if (masterKey != null) {
       this.context.getContainerTokenSecretManager()
           .setMasterKey(masterKey);
    }

    masterKey = regNMResponse.getNMTokenMasterKey();
    if (masterKey != null) {
        this.context.getNMTokenSecretManager()
            .setMasterKey(masterKey);
    }
}

2)RM向NM请求启动AM

在请求中会携带NMToken:

代码语言:javascript
复制
// AMLauncher.java 
private void launch() throws IOException, YarnException {
    connect();
    ...
}

private void connect() throws IOException {
    ContainerId masterContainerID = masterContainer.getId();
    
    containerMgrProxy = getContainerMgrProxy(masterContainerID);
}

protected ContainerManagementProtocol getContainerMgrProxy(
    final ContainerId containerId) {
    final NodeId node = masterContainer.getNodeId();
    final InetSocketAddress containerManagerConnectAddress =
        NetUtils.createSocketAddrForHost(node.getHost(), node.getPort());

    final YarnRPC rpc = getYarnRPC();

    UserGroupInformation currentUser =
        UserGroupInformation.createRemoteUser(containerId
            .getApplicationAttemptId().toString());

    String user =
        rmContext.getRMApps()
            .get(containerId.getApplicationAttemptId().getApplicationId())
            .getUser();
    org.apache.hadoop.yarn.api.records.Token token =
        rmContext.getNMTokenSecretManager().createNMToken(
            containerId.getApplicationAttemptId(), node, user);
    currentUser.addToken(ConverterUtils.convertFromYarn(token,
        containerManagerConnectAddress));

    return NMProxy.createNMProxy(conf, ContainerManagementProtocol.class,
        currentUser, rpc, containerManagerConnectAddress);
}

NM在请求处理中校验:

代码语言:javascript
复制
// ContainerManagerImpl.java
public StartContainersResponse startContainers(
    StartContainersRequest requests) throws YarnException, IOException {
    UserGroupInformation remoteUgi = getRemoteUgi();
    NMTokenIdentifier nmTokenIdentifier = selectNMTokenIdentifier(remoteUgi);
    authorizeUser(remoteUgi, nmTokenIdentifier);
    ...
}

protected NMTokenIdentifier selectNMTokenIdentifier(
    UserGroupInformation remoteUgi) {
    Set<TokenIdentifier> tokenIdentifiers = remoteUgi.getTokenIdentifiers();
    NMTokenIdentifier resultId = null;
    for (TokenIdentifier id : tokenIdentifiers) {
        if (id instanceof NMTokenIdentifier) {
            resultId = (NMTokenIdentifier) id;
            break;
        }
    }
    return resultId;
}

protected void authorizeUser(UserGroupInformation remoteUgi,
    NMTokenIdentifier nmTokenIdentifier) throws YarnException {
    if (nmTokenIdentifier == null) {
      throw RPCUtil.getRemoteException(INVALID_NMTOKEN_MSG);
    }
    if (!remoteUgi.getUserName().equals(
      nmTokenIdentifier.getApplicationAttemptId().toString())) {
      throw RPCUtil.getRemoteException("Expected applicationAttemptId: "
          + remoteUgi.getUserName() + "Found: "
          + nmTokenIdentifier.getApplicationAttemptId());
    }
}

3)AM启动向RM注册后,从注册的响应中获取NMToken

代码语言:javascript
复制
// AMRMClientImpl.java
private RegisterApplicationMasterResponse registerApplicationMaster()
      throws YarnException, IOException {
    RegisterApplicationMasterRequest request =
        RegisterApplicationMasterRequest.newInstance(this.appHostName,
            this.appHostPort, this.appTrackingUrl);
    RegisterApplicationMasterResponse response =
        rmClient.registerApplicationMaster(request);
    synchronized (this) {
      lastResponseId = 0;
      if (!response.getNMTokensFromPreviousAttempts().isEmpty()) {
        populateNMTokens(response.getNMTokensFromPreviousAttempts());
      }
    }
    return response;
}

// 将Token放到缓存中
protected void populateNMTokens(List<NMToken> nmTokens) {
    for (NMToken token : nmTokens) {
      String nodeId = token.getNodeId().toString();
      if (LOG.isDebugEnabled()) {
        if (getNMTokenCache().containsToken(nodeId)) {
          LOG.debug("Replacing token for : " + nodeId);
        } else {
          LOG.debug("Received new token for : " + nodeId);
        }
      }
      getNMTokenCache().setToken(nodeId, token.getToken());
    }
}

4)AM向NM请求启动任务container时,将token放到ugi中

从缓存中取出对应NM节点的的token,然后放到ugi中,随请求一并发送给NM。

代码语言:javascript
复制
// NMClientImpl.java
public Map<String, ByteBuffer> startContainer(
    Container container, ContainerLaunchContext containerLaunchContext)
        throws YarnException, IOException {
    ...
    proxy =
      cmProxy.getProxy(container.getNodeId().toString(),
        container.getId());
    // 注意containerToken
    StartContainerRequest scRequest =
      StartContainerRequest.newInstance(containerLaunchContext,
        container.getContainerToken());
    List<StartContainerRequest> list = new ArrayList<StartContainerRequest>();
    list.add(scRequest);
    StartContainersRequest allRequests =
      StartContainersRequest.newInstance(list);
    StartContainersResponse response =
      proxy.getContainerManagementProtocol().startContainers(allRequests);
    ...
}

// ContainerManagementProtocolProxy.java
public synchronized ContainerManagementProtocolProxyData getProxy(
    String containerManagerBindAddr, ContainerId containerId)
    throws InvalidToken {
    ...
    if (proxy == null) {
      proxy =
          new ContainerManagementProtocolProxyData(rpc, containerManagerBindAddr,
              containerId, nmTokenCache.getToken(containerManagerBindAddr));
      if (maxConnectedNMs > 0) {
        addProxyToCache(containerManagerBindAddr, proxy);
      }
    }
    ...
}

public ContainerManagementProtocolProxyData(YarnRPC rpc,
    String containerManagerBindAddr,
    ContainerId containerId, Token token) throws InvalidToken {
    this.containerManagerBindAddr = containerManagerBindAddr;
    this.activeCallers = 0;
    this.scheduledForClose = false;
    this.token = token;
    this.proxy = newProxy(rpc, containerManagerBindAddr, containerId, token);
}

protected ContainerManagementProtocol newProxy(final YarnRPC rpc,
    String containerManagerBindAddr, ContainerId containerId, Token token)
    throws InvalidToken {
    UserGroupInformation user =
      UserGroupInformation.createRemoteUser(containerId
        .getApplicationAttemptId().toString());

    org.apache.hadoop.security.token.Token<NMTokenIdentifier> nmToken =
      ConverterUtils.convertFromYarn(token, cmAddr);
    user.addToken(nmToken);

    return NMProxy.createNMProxy(conf, ContainerManagementProtocol.class,
        user, rpc, cmAddr);
}

【ContainerToken】


在向NM请求启动container时,除了需要NMToken之外,还需要ContainerToken,以验证container的合法性。

ContainerToken和NMToken采用相同的方式,因此密钥的获取方式与流程以及更新,和前面NMToken中讲到的几乎是同一个流程。

首先,同样是在NM的注册与定时心跳请求中,RM向NM同步并更新密钥。RM向NM请求container时,直接创建并带上ContainerToken;而AM则是通过向RM申请资源时,RM创建了ContainerToken,并随请求的应答传递给了AM。此后AM再向NM请求启动container时,则带上了对应的Token信息,有兴趣的朋友可以对照流程走读相关源码。

另外,该token大的类型虽然都是containerToken,但实际上又细分为ApplicaitonMaster和Task两类,分别用于RM与NM通信、AM与NM通信中。

【LocalizerToken】


LocalizerToken主要用于NM的资源本地化服务与NM之间的通信。由于NM资源本地化服务是以一个独立进程的方式运行的,并且会通过rpc协议不断向NM汇报资源下载情况,因此使用Token来保证通信安全。

【总结】


小结一下,本文主要讲解了Yarn运行中涉及的几个token,具体包括token的作用,如何创建,具体使用的流程。

另外,除了上面介绍的几个token之外,各个任务(mr/spark/flink)在运行时,也还存在一些其他的token,例如mr中会用到的ClientToAMToken等,有兴趣的可以自行摸索下~

本文参与 腾讯云自媒体同步曝光计划,分享自微信公众号。
原始发表:2022-05-24,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 陈猿解码 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档