前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >yarn中的事件分发与状态机框架

yarn中的事件分发与状态机框架

作者头像
陈猿解码
发布2023-02-28 15:09:05
7630
发布2023-02-28 15:09:05
举报
文章被收录于专栏:陈猿解码

【概述】

在早之前的文章《YARN——任务提交启动流程》中提到了,其处理逻辑是围绕applicaiton、container、attempt实例对象的创建,各自状态机的变化来实现的。

具体来说,是将处理逻辑抽象为事件与事件的处理,对事件进行异步分发以及对不同事件的回调处理,同时以有限状态机来表示事件处理后的不同状态。本文就来总结下yarn中的事件异步分发处理框架以及状态机框架的使用与实现原理。

【事件异步处理分发框架】

1. 使用

对于事件异步处理框架,其使用比较简单,可分为如下几个步骤。

代码语言:javascript
复制
// 1) 创建事件异步分发处理对象
Dispatcher dispatcher = new AsyncDispatcher("RM Event dispatcher");
// 2) 注册事件以及对应的处理类对象
dispatcher.register(RMFatalEventType.class,new ResourceManager.RMFatalEventDispatcher());
// 3)初始化及启动事件分发器
dispatcher.init();
dispatcher.start();
// 4) 向事件分发器投递消息
dispatcher.getEventHandler().handle(new RMFatalEvent(RMFatalEventType.STATE_STORE_OP_FAILED, failureCause));

这里的事件处理类必须实现EventHandler接口,在handle接口中完成事件的回调处理。

代码语言:javascript
复制
public interface EventHandler<T extends Event> {
    void handle(T event);
}

private class RMFatalEventDispatcher implements EventHandler<RMFatalEvent> {
    public void handle(RMFatalEvent event) {
        ...
    }
}

2. 原理

异步事件分发处理器内部是一个经典的队列单线程的处理模型。

在异步事件分发器内部包装了一个事件队列、一个事件分发处理类实例对象的MAP、以及一个独立的线程。

在注册事件的处理类时,以事件类型为Key,处理类实例对象为value,存储到map中(允许一个事件同时有多个处理对象)。

调用getEventHandler().handle()时,实际上是向队列中添加了一个事件对象。事件分发器内部的线程则不断从队列中取出消息,然后从map中找到事件的处理类对象实例,并调用该类对象的handle方法进行事件的处理。

小细节:

  • 队列是没有长度限制的,可以无限往里面添加事件
  • 如果一个事件类型,同时注册了多个处理类对象,则按照注册顺序依次调用进行回调处理。

【状态机框架】

有限状态机在编码中会经常用到,其本质上是在当前状态下,收到某个事件后,经过一定的处理,切换到下一个状态。

在hadoop中自带了状态机处理框架,并且在RM、NM内部大量运用了状态机来维护中application、attempt、container等有生命周期的信息的状态。

对于状态机框架的使用,包括如下几个步骤:

代码语言:javascript
复制
// 1. 构造状态机工厂, 并添加状态装换
StateMachineFactory<RMAppImpl, RMAppState, RMAppEventType, RMAppEvent> stateMachineFactory =
    new StateMachineFactory<RMAppImpl, RMAppState, RMAppEventType, RMAppEvent>(RMAppState.NEW).
    addTransition(
        RMAppState.NEW, // 当前状态
        RMAppState.NEW_SAVING, // 处理后的状态
        RMAppEventType.START, // 事件类型
        new RMAppNewlySavingTransition()). // 处理类对象
    addTransition(
        RMAppState.NEW, // 当前状态
        EnumSet.of( // 处理后可能的状态集
            RMAppState.SUBMITTED,
            RMAppState.ACCEPTED,
            RMAppState.FINISHED, 
            RMAppState.FAILED,
            RMAppState.KILLED,
            RMAppState.FINAL_SAVING),
        RMAppEventType.RECOVER, // 事件类型
        new RMAppRecoveredTransition()). // 处理类对象
    addTransition(...).
    installTopology();
// 2. 构造状态机实例队列
StateMachine<RMAppState, RMAppEventType, RMAppEvent> stateMachine = stateMachineFactory.make(this);
// 3. 事件处理与状态转换
stateMachine.doTransition(event.getType(), event);

首先,状态机工厂(StateMachineFactory)是一个泛型类,包含4个类型变量

  • OPERAND:操作该状态机的对象类型
  • STATE:状态机的状态实例
  • EVENTTYPE:将要被处理的事件类型
  • EVENT:具体的事件对象

其次,需要通过`addTransition`接口加入状态转换,该函数包括4个入参,分别为转换之前的状态、转换后的状态、触发的事件、事件的处理类对象。

对于该操作,状态机工厂类内部实际上是将状态转换封装成一个类,并对添加的多个状态转换以链表的形式串起来。

最后,需要调用`installTopology`完成状态机内部的拓扑构造。

这一步,在内部就是对状态转换链表转换成一个map嵌套的状态机转换表,外层以状态为key,value是一个map。内层map以事件为key,转换的操作(封装)对象为value。

状态机工厂类的定义如下所示:

代码语言:javascript
复制
final public class StateMachineFactory<OPERAND, STATE extends Enum<STATE>, EVENTTYPE extends Enum<EVENTTYPE>, EVENT> {
    private final TransitionsListNode transitionsListNode;
    private Map<STATE, Map<EVENTTYPE, Transition<OPERAND, STATE, EVENTTYPE, EVENT>>> stateMachineTable;
    private STATE defaultInitialState;
}

private class TransitionsListNode {
    final ApplicableTransition<OPERAND, STATE, EVENTTYPE, EVENT> transition;
    final TransitionsListNode next;

    TransitionsListNode(
        ApplicableTransition<OPERAND, STATE, EVENTTYPE, EVENT> transition,
        TransitionsListNode next) {
        this.transition = transition;
        this.next = next;
    }
}

事件处理类对象需要实现`SingleArcTransition`或`MultipleArcTransition`接口,在transition接口中进行事件处理动作。

SingleArcTransition表示转换后明确只有一种状态,而对于MultipleArcTransition,经过事件处理后,可能会有不同的状态, 只需要是设定的状态集合中其中一个即可。

代码语言:javascript
复制
private static class RMAppTransition implements SingleArcTransition<RMAppImpl, RMAppEvent> {
    public void transition(RMAppImpl app, RMAppEvent event) {
    };
}

private static final class RMAppNewlySavingTransition extends RMAppTransition {
    @Override
    public void transition(RMAppImpl app, RMAppEvent event) {
        ...
    }
}

private static final class RMAppRecoveredTransition implements MultipleArcTransition<RMAppImpl, RMAppEvent, RMAppState> {
    @Override
    public RMAppState transition(RMAppImpl app, RMAppEvent event) {
        ...
        return RMAppState.ACCEPTED;
    }
}

状态机转换处理(doTransition)的处理逻辑,就是根据事件类型与当前的状态,从`stateMachineTable`中找到对应的处理对象,并调用其transition方法进行处理。

代码语言:javascript
复制
private class InternalStateMachine implements StateMachine<STATE, EVENTTYPE, EVENT> {
    @Override
    public synchronized STATE doTransition(EVENTTYPE eventType, EVENT event)
        throws InvalidStateTransitionException {
        listener.preTransition(operand, currentState, event);
        STATE oldState = currentState;
        currentState = 
            StateMachineFactory.this.doTransition(operand, currentState, eventType, event);
        listener.postTransition(operand, oldState, currentState, event);
        return currentState;
    }
}

final public class StateMachineFactory<OPERAND, STATE extends Enum<STATE>, EVENTTYPE extends Enum<EVENTTYPE>, EVENT> {
    private STATE doTransition(
        OPERAND operand, 
        STATE oldState, 
        EVENTTYPE eventType, 
        EVENT event)
        throws InvalidStateTransitionException {
        Map<EVENTTYPE, Transition<OPERAND, STATE, EVENTTYPE, EVENT>> transitionMap = stateMachineTable.get(oldState);
        if (transitionMap != null) {
            Transition<OPERAND, STATE, EVENTTYPE, EVENT> transition = transitionMap.get(eventType);
            if (transition != null) {
                return transition.doTransition(operand, oldState, event, eventType);
            }
        }
        throw new InvalidStateTransitionException(oldState, eventType);
    }
    
    private class SingleInternalArc implements Transition<OPERAND, STATE, EVENTTYPE, EVENT> {
        public STATE doTransition(OPERAND operand, STATE oldState, EVENT event, EVENTTYPE eventType) {
            if (hook != null) {
                hook.transition(operand, event);
            }
            return postState;
        }
    }
    
    private class MultipleInternalArc implements Transition<OPERAND, STATE, EVENTTYPE, EVENT> {
        public STATE doTransition(OPERAND operand, STATE oldState, EVENT event, EVENTTYPE eventType) throws InvalidStateTransitionException {
            STATE postState = hook.transition(operand, event);
            if (!validPostStates.contains(postState)) {
                throw new InvalidStateTransitionException(oldState, eventType);
            }
            return postState;
        }
    }
}

小技巧:

任务提交运行过程中涉及的application、container等的状态机还是非常复杂的,因此在yarn内部提供了可视化的机制,具体是通过调用`VisualizeStateMachine`对指定状态机类生成graphviz(.gv)文件,然后再用dot将其转换为png的图片,这样就可以比较直观的查看了。

【RM中的使用】

在RM中,事件分发与状态机通常是结合起来使用的,即向事件分发器注册一个事件的处理对象,在该处理对象的handle处理方法中,调用状态机进行相应的处理。其处理过程又可能是继续向事件分发器投递另外一个事件。这样在内部就形成了处理链路。yarn任务的运行调度逻辑主要就是这样进行的。对应的处理流程如下图所示:

对应的代码逻辑为:

代码语言:javascript
复制
rmDispatcher.register(RMAppEventType.class, new ApplicationEventDispatcher(rmContext));

@Private
public static final class ApplicationEventDispatcher implements EventHandler<RMAppEvent> {

    private final RMContext rmContext;

    public ApplicationEventDispatcher(RMContext rmContext) {
        this.rmContext = rmContext;
    }

    @Override
    public void handle(RMAppEvent event) {
        ApplicationId appID = event.getApplicationId();
        RMApp rmApp = this.rmContext.getRMApps().get(appID);
        if (rmApp != null) {
            try {
                rmApp.handle(event);
            } catch (Throwable t) {
                LOG.error(
                    "Error in handling event type " + event.getType() +
                    " for application " + appID, t);
            }
        }
    }
}

// RMAppImpl.java
public void handle(RMAppEvent event) {
    ...
    this.stateMachine.doTransition(event.getType(), event);
    ...
}

【总结】

本文介绍了yarn中的异步事件处理框架,状态机框架的使用,以及内部实现原理,了解了这些后,对进一步分析yarn源码会有很大的帮助。

本文参与 腾讯云自媒体同步曝光计划,分享自微信公众号。
原始发表:2022-10-01,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 陈猿解码 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档