Yarn 的全称是 Yet Anther Resource Negotiator(另一种资源协商者)。它作为 Hadoop 的一个组件,官方对它的定义是一个工作调度和集群资源管理的框架。关于 Yarn 的发展历史我们在之前的文章曾介绍过,在这里就不赘述了。
如上图所示是 Yarn 的系统调度框架,Yarn 主要由四个核心组件进行协调工作,其中 ResourceManager 负责资源调度;NodeManager 负责资源管理,可启动 Container;ApplicationMaster 管理具体的应用程序,负责启动具体的任务;Container 设计比较精巧,将机器资源封装后用于计算任务,它是具体执行任务最小的单元,若任务较大可并行多个 Container 共同执行,这也是分布式任务的优势。
Yarn VS Mesos
说到 Yarn 自然要聊聊 Mesos,Mesos 是以与 Linux 内核同样的原则而创建的分布式操作系统内核,Mesos 内核运行在每一个机器上同时通过 API 为各种应用提供跨数据中心和云的资源管理调度能力。这些应用包括 Hadoop、Spark、Kafka、Elastic Search。还可配合框架 Marathon 来管理大规模的 Docker 等容器化应用,如下图所示是它的架构图:
与 Yarn 相同 Mesos 也主要由四个组件构成,它们之间根据功能可一一对应。
自然它们之间也存在一些差异,具体可总结为以下 3 点:
1.框架担任的角色不同,在 Mesos 中各种计算框架是完全融入Mesos中的,若想在 Mesos 中添加一个新的计算框架,首先需要在 Mesos 中部署一套该框架;而在 Yarn 中,各种框架作为 client 端的 library 使用,不需要事先部署一套该框架,过程更简易。 2.在资源调度方面,Mesos 只是简单的将资源推送给各个应用程序,由应用程序选择是否接受资源,Mesos 本身并不知道各个应用程序资源需求。而应用程序的 ApplicationMaster 会把各个任务的资源要求汇报给 Yarn,Yarn 则根据需要为应用程序分配资源; 3.Yarn 是 MapReduce 进化的产物,它是为 Hadoop jobs 管理资源而诞生的,只为 Hadoop jobs提供 static partitioning,而 Mesos 的设计目标是为各个框架提供dynamical partitioning,让各个集群框架共用数据中心机器。因此 Mesos 定位在数据中心级别,Yarn 则更适合运行 Apache 生态圈的应用。
底层实现技术
Yarn 的底层实现技术主要分为 3 个部分:序列化、状态机和通信模型。
序列化是在网络传输中把一个对象按照一定的编码格式通过 bit 数组序列化传输到另外一边,再通过反序列化将其组装成一个对象,它的目标是传输更少的东西却包含更大的数据量。
Java 的 Serializable 虽然很简便但传输量太大,不适合 Hadoop 的大数据量沟通。因此 Yarn 实现了一套 Writable 框架,在 MapReduce 1.0 被提出。这个框架更轻量级,但由于结构简单因此性能和兼容性都较差,所以在 MapReduce 2.0 版本更新了 Protocol Buffers,这是谷歌开源的一款序列化框架,支持 Java、C++、Python 三种语言,性能有大幅度提升。后期又推出的 Avro 是 Hadoop 生态圈的序列化组件,它同时也是一个 rpc 框架,说不定未来会代替 Protocol Buffers~目前 Avro 仅支持 Java,值得一提的是它可以通过 Json 格式表达数据结构。
由于 Yarn 有很多任务状态,如 finish、running 等,都是通过状态机触发的。Yarn 的状态机通过一个前置状态到后置状态是通过什么事件触发的,触发之后调用回调函数,这四个组件组成一个最基本的状态模型,这种设计十分契合状态转移的业务。
由于 Hadoop 组件是有心跳的,如果频繁沟通使用 Java 的 rpc 组件 RMI 容易出现性能问题,因此 Yarn 需要通过更轻量级的框架来实现,它通过 Java 延伸了三个组件组合成了自有的 rpc 框架,分别是 Reflect 反射、Proxy 动态代理和 Socket 网络编程。
接下来通过代码实例更形象地感受下该组件的应用。YarnRPC 的核心代码是两个类:Invocation 和 Method,用来代表需要传输并实例化的类,其中 Invocation 的属性如下:
private Class interfaces; // 实例类实现的接口
private Method method; // 实例类需要调用的方法
private Object[] params; // 方法的参数值
private Object result; // 返回结果
Method 的属性如下:
private String methodName; // 方法的名称
private Class[] params; // 方法的参数类型
从两个类的属性可以看出,YarnRPC 需要远程调用的有用属性都在这两个类里定义好了。
Server 端的 RPC 类需要通过反射实例化对应的类,过程如下:
public void call(Invocation invo) {
System.out.println(invo.getClass().getName());
Object obj = serviceEngine.get(invo.getInterfaces().getName());
if(obj!=null) {
try {
Method m = obj.getClass().getMethod(invo.getMethod().getMethodName(), invo.getMethod().getParams());
Object result = m.invoke(obj, invo.getParams());
invo.setResult(result);
} catch (Throwable th) {
th.printStackTrace();
}
} else {
throw new IllegalArgumentException("has no these class");
}
}
Client 端的 RPC 类需要通过动态代理调用对应的类,过程如下:
public static <T> T getProxy(final Class<T> clazz,String host,int port) {
final Client client = new Client(host,port);
InvocationHandler handler = new InvocationHandler() {
@Override
public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
Invocation invo = new Invocation();
invo.setInterfaces(clazz);
invo.setMethod(new com.protocal.Method(method.getName(),method.getParameterTypes()));
invo.setParams(args);
client.invoke(invo);
return invo.getResult();
}
};
T t = (T) Proxy.newProxyInstance(RPC.class.getClassLoader(), new Class[] {clazz}, handler);
return t;
}
客户端和服务端就通过 socket 来完成数据的传输,具体代码可在公众号后台回复「yarn代码」获取下载包。
结构模型
如上图所示是 Yarn 的结构模型,各个组件协调将任务从客户端提交到 Yarn 上面运行大致分为 6 个步骤。
1. 作业提交
Client 调用 job.waitForCompletion 方法,向整个集群提交 MapReduce program。 新的 Appcation ID 由 ResourceManager 分配,MapReduce program 的 client 核实作业的输出,计算输入的 split,将作业的资源(如 Jar 包、 配置文件等)同步至HDFS。最后通过调用 ResourceManager 的 submitApplication() 提交作业。
2. 作业初始化
当 ResourceManager 收到 submitApplciation() 请求时就将该请求发给调度器,调度器分配 container。然后 ResourceManager 在该 container 内启动 ApplicationMaster 进程,由 NodeManager 监控。
MapReduce program 的 ApplicationMaster 是一个主类为 MRAppMaster 的 Java 应用,它通过创造一些 bookkeeping 对象来监控作业的进度,得到任务的进度和完成报告。然后通过分布式文件系统得到由客户端计算好的输入 split,接着为每个输入 split 创建一个 map 任务,根据 mapreduce.job.reduces 创建 reduce 任务对象。
3. 任务分配
若作业很小 ApplicationMaster 会选择在其自己的 JVM 中运行任务,如果不是 ApplicationMaster 则会向 ResourceManager 请求 container 来运行所有的 map 和 reduce 任务。这些请求通过心跳来传输的,包括每个 map 任务的数据位置(比如存放输入 split 的主机名和机架)。 调度器利用这些信息来调度任务,尽量将任务分配给存储数据的节点,或者分配给和存放输入 split 的节点相同机架的节点。
4. 任务运行
当一个任务由 ResourceManager 的调度器分配给一个 container后,ApplicationMaster 通过联系 NodeManager 来启动 container。任务由一个主类为 YarnChild 的 Java 应用执行,在运行任务之前首先本地化任务需要的资源(如作业配置、JAR 文件以及分布式缓存的所有文件),最后运行 map 或 reduce 任务。
5. 进度和状态更新
Yarn 中的任务将其进度和状态返回给 ApplicationMaster,客户端每秒(通过 mapreduce.client.progressmonitor.pollinterval 设置)向 ApplicationMaster 请求进度更新并展示给用户。
6. 作业完成
除了向 ApplicationMaster 请求作业进度外,客户端每 5 分钟都会通过调用 waitForCompletion() 来检查作业是否完成,时间间隔可以通过 mapreduce.client.completion.pollinterval 来设置。作业完成之后 ApplicationMaster 和 container 会清理工作状态,OutputCommiter 的作业清理方法也会被调用,作业的信息会被作业历史服务器存储以备之后用户核查。
资源调度器
Yarn 是通过将资源分配给 queue 来进行资源分配的,每个 queue 可以设置它的资源分配方式,接着展开介绍 Yarn 的三种资源分配方式。
FIFO Scheduler
如果没有配置策略的话,所有的任务都提交到一个 default 队列,根据它们的提交顺序执行。富裕资源就执行任务,若资源不富裕就等待前面的任务执行完毕后释放资源,这就是 FIFO Scheduler 先入先出的分配方式。
via https://blog.csdn.net/suifeng3051/article/details/49508261
如上图所示,在 Job1 提交时占用了所有的资源,不久后 Job2提交了,但是此时系统中已经没有资源可以分配给它了。加入 Job1 是一个大任务,那么 Job2 就只能等待一段很长的时间才能获得执行的资源。所以先入先出的分配方式存在一个问题就是大任务会占用很多资源,造成后面的小任务等待时间太长而饿死,因此一般不使用这个默认配置。
Capacity Scheduler
Capacity Scheduler 是一种多租户、弹性的分配方式。每个租户一个队列,每个队列可以配置能使用的资源上限与下限(譬如 50%,达到这个上限后即使其他的资源空置着,也不可使用),通过配置可以令队列至少有资源下限配置的资源可使用。
via https://blog.csdn.net/suifeng3051/article/details/49508261
上图队列 A 和队列 B 分配了相互独立的资源。Job1 提交给队列 A 执行,它只能使用队列 A 的资源。接着 Job2 提交给了队列B 就不必等待 Job1 释放资源了。这样就可以将大任务和小任务分配在两个队列中,这两个队列的资源相互独立,就不会造成小任务饿死的情况了。
Fair Scheduler
Fair Scheduler 是一种公平的分配方式,所谓的公平就是集群会尽可能地按配置的比例分配资源给队列。
via https://blog.csdn.net/suifeng3051/article/details/49508261
如上图所示,Job1 提交给队列 A,它占用了集群的所有资源。接着 Job2 提交给了队列 B,这时 Job1 就需要释放它的一半的资源给队列 A 中的 Job2 使用。接着 Job3 也提交给了队列 B,这个时候 Job2 如果还未执行完毕的话也必须释放一半的资源给 Job3。这就是公平的分配方式,在队列范围内所有任务享用到的资源都是均分的。
目前这三种调度方式用得最广泛的就是 Capacity Scheduler 。
资源隔离原理
既然在调度中存在队列的概念,自然免不了资源隔离。
Yarn 采用线程监控的方法判断任务是否超量使用内存,一旦发现超量则直接将其 kill 掉。这种方式比较暴力,容易误伤一些启动时内存比较大的任务。默认情况下 NodeManager 不会对进行 CPU 隔离,我们可以通过启用 Cgroup 支持,它会生成指定的配置文件设置资源占有 CPU 的百分比,运行过程中会严格限制其占用的百分比,但 Cgroup 只能隔离百分比而不能进行分核隔离。
通信接口
在分布式内组件之间的沟通是十分重要的,因此接着介绍几个常用的通信接口。如下图所示是 Yarn 的基础通信模型图。
其中 JobClient 与 ResourceManager的通信协议为 ApplicationClientProtocol,客户端通过该协议提交应用程序,查询应用状态;ApplicationMaster 与 ResourceManager 通信协议为 ApplicationMasterProtocol,ApplicationMaster 向 ResourceManager 注册自己,并为各个任务申请资源;ApplicationMaster 与 NodeManager 的通信协议为 ContainerManagementProtocol,ApplicationMaster 要求 NodeManager 启动或停止 Container,并获取 Container 的状态信息;NodeManager 与 ResourceManager 的通信协议为 ResourceTracker,NodeManager 向 ResourceManager 注册自己,并定时发送心跳信息汇报当前节点的资源使用情况。
编程概述
编写一个可以在 Yarn 上运行的任务,需要写两个类:用于启动 AppMaster 的 Client 类和用于启动 Container 的 AppMaster 类。
Client 类的具体过程描述如下:
Configuration conf = new Configuration();
// 1、创建并启动一个yarnClient
YarnClient yarnClient = YarnClient.createYarnClient();
yarnClient.init(conf);
yarnClient.start();
// 2、创建一个application
YarnClientApplication app = yarnClient.createApplication();
// 3、获取application运行时的context
ApplicationSubmissionContext context = app.getApplicationSubmissionContext();
ApplicationId appId = context.getApplicationId();
System.out.println("create new applicationId ==> " + appId);
// 4、设置container运行完成后自动销毁
context.setKeepContainersAcrossApplicationAttempts(false);
// 设置application的名称
context.setApplicationName("ApplicationMaster");
// 5、定义该application所使用的内存和cpu,10mb,1cpu
Resource capability = Resource.newInstance(10, 1);
context.setResource(capability);
// 6、构建appmaster运行的container, 需要定义本地资源,运行时环境,和运行命令
ContainerLaunchContext amContainer = createAMContainerLanunchContext(conf, context.getApplicationId());
context.setAMContainerSpec(amContainer);
// 7、设置优先级,默认是0
context.setPriority(Priority.newInstance(0));
// 8、设置运行队列
context.setQueue("default");
// 9、提交任务
yarnClient.submitApplication(context);
AppMaster 类的具体过程描述如下:
Configuration conf = new Configuration();
// 1、创建 AM 到 RM 的客户端
amRMClient = AMRMClientAsync.createAMRMClientAsync(1000, new RMCallbackHandler());
amRMClient.init(conf);
amRMClient.start();
// 2、创建 AM 到 NM 的客户端
amNMClient = new NMClientAsyncImpl(new NMCallbackHandler());
amNMClient.init(conf);
amNMClient.start();
// 3、将AM注册到RM上,注册成功后 会启动AM和RM之间的心跳线程
amRMClient.registerApplicationMaster(NetUtils.getHostname(), -1, "");
for (int i = 0; i < numTotalContainers.get(); i++) {
// 4、申请container资源
ContainerRequest containerAsk = new ContainerRequest(Resource.newInstance(100, 1), null, null, Priority.newInstance(0));
amRMClient.addContainerRequest(containerAsk);
}
*具体代码可在公众号后台回复「yarn代码」获取下载包。
尽管 Yarn 自带的编程 API 已经得到了极大的简化,但从头开发一个 Yarn 应用程序仍是一件非常困难的事。
Apache Twill 这个项目则是为简化 Yarn 上应用程序开发而成立的项目,该项目把与 Yarn 相关的重复性的工作封装成库,使得用户可以专注于自己的应用程序逻辑,最后通过一个简单的 helloworld 实例感受一下:
public class HelloWorld {
static Logger LOG = LoggerFactory.getLogger(HelloWorld.class);
static class HelloWorldRunnable extends AbstractTwillRunnable {
@Override
public void run() {
LOG.info("Hello World");
}
}
public static void main(String[] args) throws Exception {
YarnConfiguration conf = new YarnConfiguration();
TwillRunnerService runner = new YarnTwillRunnerService(conf, "localhost:2181");
runner.startAndWait();
TwillController controller = runner.prepare(new HelloWorldRunnable()).start();
Services.getCompletionFuture(controller).get();
}