前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >DistributtedShell的container在所有节点上仅执行一次

DistributtedShell的container在所有节点上仅执行一次

作者头像
jiewuyou
发布2022-09-29 15:40:16
4270
发布2022-09-29 15:40:16
举报
文章被收录于专栏:数据人生数据人生

问题

在上Hadoop2培训课的时候,老师出了这么一道题

修改Distributedshell的源代码,使得用户提供的命令(由“–shell_command”参数指定)可以在所有节点上仅执行一次。(目前的实现是,如果该命令由N个task同时执行,则这N个task可能位于任意节点上,比如都在node1上。)

修改代码

该问题需要在两个地方对源码进行修改:

  1. 修改参数,指定实现的feature是否生效
  2. 让每一个container运行在不同的节点上

博客将主要介绍过程2的实现过程,主要思路是首先获取节点列表,再在申请container时,指定节点。具体过程如下:

  • 打开源码。编译好Hadoop-2.3.0之后,用Eclipse打开工程,DistributedShell的源码的位置在/hadoop-2.3.0-src/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java
  • 获取计算节点列表。定义nodeList用于保存计算节点列表,在ApplicationMaster的init()函数中添加初始化nodeList的代码。初始化完成后,nodeList中保存有计算节点的列表(不包括RM 节点)。
代码语言:javascript
复制
public class ApplicationMaster {
  // 所有计算节点
  private static List nodeList = new ArrayList();

  public boolean init(String[] args) throws ParseException, IOException {
    //该函数的末尾添加如下代码,用于获取计算节点列表
    try {
      YarnClient yarnClient = YarnClient.createYarnClient();
      yarnClient.init(conf);
      yarnClient.start();
      List<NodeReport> clusterNodeReports;
      clusterNodeReports = yarnClient.getNodeReports(
          NodeState.RUNNING);
      for (NodeReport node : clusterNodeReports) {
        this.nodeList.add(node.getNodeId().getHost());
      }
    } catch (YarnException e) {
      // TODO Auto-generated catch block
      e.printStackTrace();
    }
    return true;
  }
}
  • 让container运行在不同的节点上。申请资源的时候,会调用函数setupContainerAskForRM,修改该函数即可,函数如下:
代码语言:javascript
复制
  private ContainerRequest setupContainerAskForRM() {
    // setup requirements for hosts
    // using * as any host will do for the distributed shell app
    // set the priority for the request
    Priority pri = Records.newRecord(Priority.class);
    // TODO - what is the range for priority? how to decide?
    pri.setPriority(requestPriority);
    // Set up resource type requirements
    // For now, memory and CPU are supported so we set memory and cpu
    // requirements
    Resource capability = Records.newRecord(Resource.class);
    capability.setMemory(containerMemory);
    capability.setVirtualCores(containerVirtualCores);
    String[] nodes = null;
    if (!nodeList.isEmpty()) {
      nodes = new String[1];
      nodes[0] = (String) nodeList.get(0);
      nodeList.remove(0);
    }
    ContainerRequest request = new ContainerRequest(capability, nodes, null,
        pri);//默认的nodes为null
    LOG.info("Requested container ask: " + request.toString());
    return request;
  }
  • 改好之后,打成jar包,覆盖${HADOOP_HOME}/share/hadoop/yarn/hadoop-yarn-applications-distributedshell-2.3.0.jar即可生效
  • 验证,书写如下脚本并运行。发现3个container运行在不同的节点上,表示改写成功
代码语言:javascript
复制
bin/hadoop jar \
share/hadoop/yarn/hadoop-yarn-applications-distributedshell-2.3.0.jar \
org.apache.hadoop.yarn.applications.distributedshell.Client \
--jar share/hadoop/yarn/hadoop-yarn-applications-distributedshell-2.3.0.jar \
--shell_command "ls" \
--num_containers 3 \
--container_memory 512 \
--container_vcores 1 \
--master_memory 350 \
--priority 10

问题与解决

在获取计算节点列表时,被卡住了,最后在和别人交流的时候,知道ApplicationMaster通过yarnClient可以从RM中获取计算节点列表。最后将问题解决了。感谢所有提供帮助的人

本文参与 腾讯云自媒体分享计划,分享自作者个人站点/博客。
原始发表:2015-04-16,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 作者个人站点/博客 前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体分享计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 问题
  • 修改代码
  • 问题与解决
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档