快速、安全、可靠!Yarn!| MTdata小讲堂

Yarn 的全称是 Yet Anther Resource Negotiator(另一种资源协商者)。它作为 Hadoop 的一个组件,官方对它的定义是一个工作调度和集群资源管理的框架。关于 Yarn 的发展历史我们在之前的文章曾介绍过,在这里就不赘述了。

如上图所示是 Yarn 的系统调度框架,Yarn 主要由四个核心组件进行协调工作,其中 ResourceManager 负责资源调度;NodeManager 负责资源管理,可启动 Container;ApplicationMaster 管理具体的应用程序,负责启动具体的任务;Container 设计比较精巧,将机器资源封装后用于计算任务,它是具体执行任务最小的单元,若任务较大可并行多个 Container 共同执行,这也是分布式任务的优势。

Yarn VS Mesos

说到 Yarn 自然要聊聊 Mesos,Mesos 是以与 Linux 内核同样的原则而创建的分布式操作系统内核,Mesos 内核运行在每一个机器上同时通过 API 为各种应用提供跨数据中心和云的资源管理调度能力。这些应用包括 Hadoop、Spark、Kafka、Elastic Search。还可配合框架 Marathon 来管理大规模的 Docker 等容器化应用,如下图所示是它的架构图:

与 Yarn 相同 Mesos 也主要由四个组件构成,它们之间根据功能可一一对应。

自然它们之间也存在一些差异,具体可总结为以下 3 点:

1.框架担任的角色不同,在 Mesos 中各种计算框架是完全融入Mesos中的,若想在 Mesos 中添加一个新的计算框架,首先需要在 Mesos 中部署一套该框架;而在 Yarn 中,各种框架作为 client 端的 library 使用,不需要事先部署一套该框架,过程更简易。 2.在资源调度方面,Mesos 只是简单的将资源推送给各个应用程序,由应用程序选择是否接受资源,Mesos 本身并不知道各个应用程序资源需求。而应用程序的 ApplicationMaster 会把各个任务的资源要求汇报给 Yarn,Yarn 则根据需要为应用程序分配资源; 3.Yarn 是 MapReduce 进化的产物,它是为 Hadoop jobs 管理资源而诞生的,只为 Hadoop jobs提供 static partitioning,而 Mesos 的设计目标是为各个框架提供dynamical partitioning,让各个集群框架共用数据中心机器。因此 Mesos 定位在数据中心级别,Yarn 则更适合运行 Apache 生态圈的应用。

底层实现技术

Yarn 的底层实现技术主要分为 3 个部分:序列化、状态机和通信模型。

序列化是在网络传输中把一个对象按照一定的编码格式通过 bit 数组序列化传输到另外一边,再通过反序列化将其组装成一个对象,它的目标是传输更少的东西却包含更大的数据量。

Java 的 Serializable 虽然很简便但传输量太大,不适合 Hadoop 的大数据量沟通。因此 Yarn 实现了一套 Writable 框架,在 MapReduce 1.0 被提出。这个框架更轻量级,但由于结构简单因此性能和兼容性都较差,所以在 MapReduce 2.0 版本更新了 Protocol Buffers,这是谷歌开源的一款序列化框架,支持 Java、C++、Python 三种语言,性能有大幅度提升。后期又推出的 Avro 是 Hadoop 生态圈的序列化组件,它同时也是一个 rpc 框架,说不定未来会代替 Protocol Buffers~目前 Avro 仅支持 Java,值得一提的是它可以通过 Json 格式表达数据结构。

由于 Yarn 有很多任务状态,如 finish、running 等,都是通过状态机触发的。Yarn 的状态机通过一个前置状态后置状态是通过什么事件触发的,触发之后调用回调函数,这四个组件组成一个最基本的状态模型,这种设计十分契合状态转移的业务。

由于 Hadoop 组件是有心跳的,如果频繁沟通使用 Java 的 rpc 组件 RMI 容易出现性能问题,因此 Yarn 需要通过更轻量级的框架来实现,它通过 Java 延伸了三个组件组合成了自有的 rpc 框架,分别是 Reflect 反射、Proxy 动态代理和 Socket 网络编程。

接下来通过代码实例更形象地感受下该组件的应用。YarnRPC 的核心代码是两个类:Invocation 和 Method,用来代表需要传输并实例化的类,其中 Invocation 的属性如下:

private Class interfaces;   // 实例类实现的接口
private Method method;   // 实例类需要调用的方法
private Object[] params;   // 方法的参数值
private Object result;     // 返回结果

Method 的属性如下:

private String methodName;  // 方法的名称
private Class[] params;       // 方法的参数类型

从两个类的属性可以看出,YarnRPC 需要远程调用的有用属性都在这两个类里定义好了。

Server 端的 RPC 类需要通过反射实例化对应的类,过程如下:

public void call(Invocation invo) {
      System.out.println(invo.getClass().getName());
      Object obj = serviceEngine.get(invo.getInterfaces().getName());
      if(obj!=null) {
        try {
          Method m = obj.getClass().getMethod(invo.getMethod().getMethodName(), invo.getMethod().getParams());
          Object result = m.invoke(obj, invo.getParams());
          invo.setResult(result);
        } catch (Throwable th) {
          th.printStackTrace();
        }
      } else {
        throw new IllegalArgumentException("has no these class");
      }
    }

Client 端的 RPC 类需要通过动态代理调用对应的类,过程如下:

public static <T> T getProxy(final Class<T> clazz,String host,int port) {
    
    final Client client = new Client(host,port);
    InvocationHandler handler = new InvocationHandler() {
      
      @Override
      public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
        Invocation invo = new Invocation();
        invo.setInterfaces(clazz);
        invo.setMethod(new com.protocal.Method(method.getName(),method.getParameterTypes()));
        invo.setParams(args);
        client.invoke(invo);
        return invo.getResult();
      }
    };
    T t = (T) Proxy.newProxyInstance(RPC.class.getClassLoader(), new Class[] {clazz}, handler);
    return t;
  }

客户端和服务端就通过 socket 来完成数据的传输,具体代码可在公众号后台回复「yarn代码」获取下载包。

结构模型

如上图所示是 Yarn 的结构模型,各个组件协调将任务从客户端提交到 Yarn 上面运行大致分为 6 个步骤。

1. 作业提交

Client 调用 job.waitForCompletion 方法,向整个集群提交 MapReduce program。 新的 Appcation ID 由 ResourceManager 分配,MapReduce program 的 client 核实作业的输出,计算输入的 split,将作业的资源(如 Jar 包、 配置文件等)同步至HDFS。最后通过调用 ResourceManager 的 submitApplication() 提交作业。

2. 作业初始化

当 ResourceManager 收到 submitApplciation() 请求时就将该请求发给调度器,调度器分配 container。然后 ResourceManager 在该 container 内启动 ApplicationMaster 进程,由 NodeManager 监控。

MapReduce program 的 ApplicationMaster 是一个主类为 MRAppMaster 的 Java 应用,它通过创造一些 bookkeeping 对象来监控作业的进度,得到任务的进度和完成报告。然后通过分布式文件系统得到由客户端计算好的输入 split,接着为每个输入 split 创建一个 map 任务,根据 mapreduce.job.reduces 创建 reduce 任务对象。

3. 任务分配

若作业很小 ApplicationMaster 会选择在其自己的 JVM 中运行任务,如果不是 ApplicationMaster 则会向 ResourceManager 请求 container 来运行所有的 map 和 reduce 任务。这些请求通过心跳来传输的,包括每个 map 任务的数据位置(比如存放输入 split 的主机名和机架)。 调度器利用这些信息来调度任务,尽量将任务分配给存储数据的节点,或者分配给和存放输入 split 的节点相同机架的节点。

4. 任务运行

当一个任务由 ResourceManager 的调度器分配给一个 container后,ApplicationMaster 通过联系 NodeManager 来启动 container。任务由一个主类为 YarnChild 的 Java 应用执行,在运行任务之前首先本地化任务需要的资源(如作业配置、JAR 文件以及分布式缓存的所有文件),最后运行 map 或 reduce 任务。

5. 进度和状态更新

Yarn 中的任务将其进度和状态返回给 ApplicationMaster,客户端每秒(通过 mapreduce.client.progressmonitor.pollinterval 设置)向 ApplicationMaster 请求进度更新并展示给用户。

6. 作业完成

除了向 ApplicationMaster 请求作业进度外,客户端每 5 分钟都会通过调用 waitForCompletion() 来检查作业是否完成,时间间隔可以通过 mapreduce.client.completion.pollinterval 来设置。作业完成之后 ApplicationMaster 和 container 会清理工作状态,OutputCommiter 的作业清理方法也会被调用,作业的信息会被作业历史服务器存储以备之后用户核查。

资源调度器

Yarn 是通过将资源分配给 queue 来进行资源分配的,每个 queue 可以设置它的资源分配方式,接着展开介绍 Yarn 的三种资源分配方式。

FIFO Scheduler

如果没有配置策略的话,所有的任务都提交到一个 default 队列,根据它们的提交顺序执行。富裕资源就执行任务,若资源不富裕就等待前面的任务执行完毕后释放资源,这就是 FIFO Scheduler 先入先出的分配方式。

via https://blog.csdn.net/suifeng3051/article/details/49508261

如上图所示,在 Job1 提交时占用了所有的资源,不久后 Job2提交了,但是此时系统中已经没有资源可以分配给它了。加入 Job1 是一个大任务,那么 Job2 就只能等待一段很长的时间才能获得执行的资源。所以先入先出的分配方式存在一个问题就是大任务会占用很多资源,造成后面的小任务等待时间太长而饿死,因此一般不使用这个默认配置。

Capacity Scheduler

Capacity Scheduler 是一种多租户、弹性的分配方式。每个租户一个队列,每个队列可以配置能使用的资源上限与下限(譬如 50%,达到这个上限后即使其他的资源空置着,也不可使用),通过配置可以令队列至少有资源下限配置的资源可使用。

via https://blog.csdn.net/suifeng3051/article/details/49508261

上图队列 A 和队列 B 分配了相互独立的资源。Job1 提交给队列 A 执行,它只能使用队列 A 的资源。接着 Job2 提交给了队列B 就不必等待 Job1 释放资源了。这样就可以将大任务和小任务分配在两个队列中,这两个队列的资源相互独立,就不会造成小任务饿死的情况了。

Fair Scheduler

Fair Scheduler 是一种公平的分配方式,所谓的公平就是集群会尽可能地按配置的比例分配资源给队列。

via https://blog.csdn.net/suifeng3051/article/details/49508261

如上图所示,Job1 提交给队列 A,它占用了集群的所有资源。接着 Job2 提交给了队列 B,这时 Job1 就需要释放它的一半的资源给队列 A 中的 Job2 使用。接着 Job3 也提交给了队列 B,这个时候 Job2 如果还未执行完毕的话也必须释放一半的资源给 Job3。这就是公平的分配方式,在队列范围内所有任务享用到的资源都是均分的。

目前这三种调度方式用得最广泛的就是 Capacity Scheduler

资源隔离原理

既然在调度中存在队列的概念,自然免不了资源隔离。

Yarn 采用线程监控的方法判断任务是否超量使用内存,一旦发现超量则直接将其 kill 掉。这种方式比较暴力,容易误伤一些启动时内存比较大的任务。默认情况下 NodeManager 不会对进行 CPU 隔离,我们可以通过启用 Cgroup 支持,它会生成指定的配置文件设置资源占有 CPU 的百分比,运行过程中会严格限制其占用的百分比,但 Cgroup 只能隔离百分比而不能进行分核隔离。

通信接口

在分布式内组件之间的沟通是十分重要的,因此接着介绍几个常用的通信接口。如下图所示是 Yarn 的基础通信模型图。

其中 JobClient 与 ResourceManager的通信协议为 ApplicationClientProtocol,客户端通过该协议提交应用程序,查询应用状态;ApplicationMaster 与 ResourceManager 通信协议为 ApplicationMasterProtocol,ApplicationMaster 向 ResourceManager 注册自己,并为各个任务申请资源;ApplicationMaster 与 NodeManager 的通信协议为 ContainerManagementProtocol,ApplicationMaster 要求 NodeManager 启动或停止 Container,并获取 Container 的状态信息;NodeManager 与 ResourceManager 的通信协议为 ResourceTracker,NodeManager 向 ResourceManager 注册自己,并定时发送心跳信息汇报当前节点的资源使用情况。

编程概述

编写一个可以在 Yarn 上运行的任务,需要写两个类:用于启动 AppMaster 的 Client 类和用于启动 Container 的 AppMaster 类。

Client 类的具体过程描述如下:

Configuration conf = new Configuration();
    
// 1、创建并启动一个yarnClient
YarnClient yarnClient = YarnClient.createYarnClient();
yarnClient.init(conf);
yarnClient.start();

// 2、创建一个application
YarnClientApplication app = yarnClient.createApplication();
// 3、获取application运行时的context
ApplicationSubmissionContext context = app.getApplicationSubmissionContext();
ApplicationId appId = context.getApplicationId();
System.out.println("create new applicationId ==> " + appId);
    
// 4、设置container运行完成后自动销毁
context.setKeepContainersAcrossApplicationAttempts(false);
// 设置application的名称
context.setApplicationName("ApplicationMaster");

// 5、定义该application所使用的内存和cpu,10mb,1cpu
Resource capability = Resource.newInstance(10, 1);
context.setResource(capability);

// 6、构建appmaster运行的container, 需要定义本地资源,运行时环境,和运行命令
ContainerLaunchContext amContainer = createAMContainerLanunchContext(conf, context.getApplicationId());
context.setAMContainerSpec(amContainer);

// 7、设置优先级,默认是0
context.setPriority(Priority.newInstance(0));
// 8、设置运行队列
context.setQueue("default");
// 9、提交任务
yarnClient.submitApplication(context);

AppMaster 类的具体过程描述如下:

Configuration conf = new Configuration();

// 1、创建 AM 到 RM 的客户端
amRMClient = AMRMClientAsync.createAMRMClientAsync(1000, new RMCallbackHandler());
amRMClient.init(conf);
amRMClient.start();

// 2、创建 AM 到 NM 的客户端
amNMClient = new NMClientAsyncImpl(new NMCallbackHandler());
amNMClient.init(conf);
amNMClient.start();

// 3、将AM注册到RM上,注册成功后 会启动AM和RM之间的心跳线程
amRMClient.registerApplicationMaster(NetUtils.getHostname(), -1, "");

for (int i = 0; i < numTotalContainers.get(); i++) {
  // 4、申请container资源
  ContainerRequest containerAsk = new ContainerRequest(Resource.newInstance(100, 1), null, null, Priority.newInstance(0));
  amRMClient.addContainerRequest(containerAsk);
}

*具体代码可在公众号后台回复「yarn代码」获取下载包。

尽管 Yarn 自带的编程 API 已经得到了极大的简化,但从头开发一个 Yarn 应用程序仍是一件非常困难的事。

Apache Twill 这个项目则是为简化 Yarn 上应用程序开发而成立的项目,该项目把与 Yarn 相关的重复性的工作封装成库,使得用户可以专注于自己的应用程序逻辑,最后通过一个简单的 helloworld 实例感受一下:

public class HelloWorld {
 static Logger LOG = LoggerFactory.getLogger(HelloWorld.class);
 static class HelloWorldRunnable extends AbstractTwillRunnable {
 @Override
 public void run() {
 LOG.info("Hello World");
 }
}
 
public static void main(String[] args) throws Exception {
 YarnConfiguration conf = new YarnConfiguration();
 TwillRunnerService runner = new YarnTwillRunnerService(conf, "localhost:2181");
 runner.startAndWait();
 TwillController controller = runner.prepare(new HelloWorldRunnable()).start();
 Services.getCompletionFuture(controller).get();
}

原文发布于微信公众号 - 美图数据技术团队(gh_feb1d206d92b)

原文发表时间:2018-09-11

本文参与腾讯云自媒体分享计划,欢迎正在阅读的你也加入,一起分享。

发表于

我来说两句

0 条评论
登录 后参与评论

相关文章

来自专栏LhWorld哥陪你聊算法

【Spark篇】---Spark调优之代码调优,数据本地化调优,内存调优,SparkShuffle调优,Executor的堆外内存调优

Spark中调优大致分为以下几种 ,代码调优,数据本地化,内存调优,SparkShuffle调优,调节Executor的堆外内存。

13430
来自专栏深度学习之tensorflow实战篇

hive数据:名词解释

问题导读 1.hive数据分为那两种类型? 2.什么表数据? 3.什么是元数据? 4.Hive表里面导入数据的本质什么? 5.表、分区、桶之间之间的关系是什么?...

45070
来自专栏Spark学习技巧

必会:关于SparkStreaming checkpoint那些事儿

spark Streaming的checkpoint是一个利器,帮助在driver端非代码逻辑错误导致的driver应用失败重启,比如网络,jvm等,当然也仅限...

16320
来自专栏LuckQI

了解Spark中的RDD

14550
来自专栏Spark学习技巧

hadoop系列之基础系列

一、Hadoop基础 1、分布式概念 通过爬虫-->爬到网页存储-->查找关键字 一台机器存储是有限的 Google采用多台机器,...

42670
来自专栏我是攻城师

Spark历险记之编译和远程任务提交

57690
来自专栏Spark学习技巧

Spark调优系列之序列化方式调优

由于大多数的spark计算是基于内存的的天性,spark应用的瓶颈一般受制于集群的CPU,网络带宽,内存。大部分情况下,如果内存适合当前数据量的计算,那么瓶颈往...

35090
来自专栏我是攻城师

理解Spark的运行机制

49290
来自专栏大数据-Hadoop、Spark

2018-08-08

1、spark程序停-启,实时数据量一下子太多,如何处理 2、spark程序数据丢失,如何处理?duration是多少?

9420
来自专栏肖力涛的专栏

Spark踩坑记:初试

Apache Spark是一个围绕速度、易用性和复杂分析构建的大数据处理框架。本系列文将通过初试、Hbase+Mysql 、Spark Streaming+ka...

1.4K20

扫码关注云+社区

领取腾讯云代金券