前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >LiteFlow异步编排执行的具体过程逻辑

LiteFlow异步编排执行的具体过程逻辑

作者头像
路行的亚洲
发布2023-10-25 09:39:01
5020
发布2023-10-25 09:39:01
举报
文章被收录于专栏:后端技术学习后端技术学习

一、概论

根据前面一篇串行并行编排(https://mp.weixin.qq.com/s/R-TS5bQnEnROMaUjTZgIKA)的文章。我们知道无论串行还是并行编排,都需要基于chain来实现condition的调用。那么在并行编排condition的过程又是如何实现这个过程的呢?下面我们详细来了解并行编排从condition到node的过程,因为串行编排相对来说要简单一些,但是总体的思路是类似的,只不过执行的condition不一样。

二、并行编排执行器基于策略模式实现逻辑

找到并行编排的condition入口:

代码语言:javascript
复制
    private void executeAsyncCondition(Integer slotIndex) throws Exception {

        // 获取并发执行策略
        ParallelStrategyExecutor parallelStrategyExecutor = ParallelStrategyHelper.loadInstance().buildParallelExecutor(this.getParallelStrategy());

        // 执行并发逻辑
        parallelStrategyExecutor.execute(this, slotIndex);

    }

执行的过程首先获取对应的执行器,然后执行具体的执行器,然后执行业务逻辑。

从代码中,执行器分为三类:完成任一任务、完成全部任务、完成指定ID任务。

代码语言:javascript
复制
    ANY("anyOf", "完成任一任务", AnyOfParallelExecutor.class),
    ALL("allOf", "完成全部任务", AllOfParallelExecutor.class),
    SPECIFY("must", "完成指定 ID 任务", SpecifyParallelExecutor.class);

从代码可以看到作者使用了策略模式来实现获取对应的执行。获取的思路是首先基于当前的并行策略帮助类获取其实例对象,可以看到作者基于单例模式实现对实例对象的获取。然后从策略执行器strategyExecutorMap中获取,如果获取到了,说明之前有存储过这种执行器,此时直接返回。如果当前不存在,则根据枚举的类型获取具体的执行器,然后将其放入到策略执行器map中,同时进行返回。

可以看到作者还是非常的细心的,将共性方法提取到了抽象类ParallelStrategyExecutor中,调用execute(WhenCondition whenCondition, Integer slotIndex)来实现对执行器对应的逻辑处理:

代码语言:javascript
复制
AllOfParallelExecutor 全部执行并行编排执行器
AnyOfParallelExecutor 任一任务并行编排执行器
SpecifyParallelExecutor 完成指定任务执行器,使用 ID 进行比较

这里的Allof和Anyof与CompletableFuture执行的方式相对应。这里的枚举其实也是为后面CompletableFuture来执行并行编排逻辑处理做的铺垫。

三、AllOfParallelExecutor执行器执行逻辑

看到具体的执行器,是不是很疑问,哪里执行了我们的业务逻辑了呢?

代码语言:javascript
复制
 @Override
    public void execute(WhenCondition whenCondition, Integer slotIndex) throws Exception {

        // 获取所有 CompletableFuture 任务,拿到结果
        List<CompletableFuture<WhenFutureObj>> whenAllTaskList = this.getWhenAllTaskList(whenCondition, slotIndex);

        // 把这些 CompletableFuture 通过 allOf 合成一个 CompletableFuture,表明完成所有任务
        CompletableFuture<?> specifyTask = CompletableFuture.allOf(whenAllTaskList.toArray(new CompletableFuture[] {}));

        // 结果处理
        this.handleTaskResult(whenCondition, slotIndex, whenAllTaskList, specifyTask);

    } 

而我们知道CompletableFuture处理任务无非两种结果,一种是有返回值的,一种是没有返回值的。可以看到这里是有返回值的,为什么呢?从返回值可以看到CompletableFuture。也即WhenFutureObj这个返回值是我们需要关注的对象。

这里可以看到很多事情是在this.getWhenAllTaskList(whenCondition, slotIndex);中完成的,也即这一步对应并行编排来说是非常重要的,因为后续的两个操作,只是做结果的合并和处理task的结果。因此我们把目光注意到WhenFutureObj,也即我们返回的结果是基于WhenFutureObj这个对象来组装的。

可以看到这个对象中保存的信息:

代码语言:javascript
复制
public class WhenFutureObj {

    //判断执行任务是否成功
    private boolean success;

    // 是否超时
    private boolean timeout;

    // 执行器的名称
    private String executorName;

    // 出现的异常信息
    private Exception ex;
}    

因为我们知道CompletableFuture.supplyAsync这个Api是有返回值的,也即我们可以顺着WhenFutureObj这个对象找到对应的get对象。因为线程方法的入口都是run方法,因此我们debug,顺着这个CompletableFuture找到对应的run方法的f.get():

代码语言:javascript
复制
 public void run() {
            CompletableFuture<T> d; Supplier<T> f;
            if ((d = dep) != null && (f = fn) != null) {
                dep = null; fn = null;
                if (d.result == null) {
                    try {
                        d.completeValue(f.get());
                    } catch (Throwable ex) {
                        d.completeThrowable(ex);
                    }
                }
                d.postComplete();
            }
        }
    }

f.get()这个就有文章了,因为这个方法会调用我们的get方法:

这个方法可能不是很显眼,但是我们的确是可以找到它的。

因此我们可以看到其实它执行业务方法是在

这里执行的,因为这里执行的过程会返回对应的结果。而真正的实现则是在get中。

代码语言:javascript
复制
public WhenFutureObj get() {
        try {
            executableItem.setCurrChainId(currChainId);
            executableItem.execute(slotIndex);
            return WhenFutureObj.success(executableItem.getId());
        }
        catch (Exception e) {
            return WhenFutureObj.fail(executableItem.getId(), e);
        }
    }

因为这里会调用业务具体的nodeComponent,执行具体的业务逻辑。而这里的返回结果就是我们上面看到的对象 WhenFutureObj。是不是很激动。对的,这就是我们想要找到执行任务是否成功的结果。

四、调用Node组件来执行具体的NodeComponent

调用Node来实现具体业务逻辑的调用,可以看到作者是非常的贴心的:

代码语言:javascript
复制
public void execute() throws Exception {
        Slot slot = this.getSlot();

        // 在元数据里加入step信息
        CmpStep cmpStep = new CmpStep(nodeId, name, CmpStepTypeEnum.SINGLE);
        cmpStep.setTag(this.getTag());
        cmpStep.setInstance(this);
        cmpStep.setRefNode(this.getRefNode());
        slot.addStep(cmpStep);

        StopWatch stopWatch = new StopWatch();
        stopWatch.start();

        try {
            // 前置处理
            self.beforeProcess();

            // 主要的处理逻辑
            self.process();

            // 成功后回调方法
            self.onSuccess();

            // 步骤状态设为true
            cmpStep.setSuccess(true);
        }
        catch (Exception e) {
            // 步骤状态设为false,并加入异常
            cmpStep.setSuccess(false);
            cmpStep.setException(e);

            // 执行失败后回调方法
            // 这里要注意,失败方法本身抛出错误,只打出堆栈,往外抛出的还是主要的异常
            try {
                self.onError(e);
            }
            catch (Exception ex) {
                String errMsg = StrUtil.format("component[{}] onError method happens exception", this.getDisplayName());
                LOG.error(errMsg, ex);
            }
            throw e;
        }
        finally {
            // 后置处理
            self.afterProcess();

            stopWatch.stop();
            final long timeSpent = stopWatch.getTotalTimeMillis();
            LOG.info("component[{}] finished in {} milliseconds", this.getDisplayName(), timeSpent);

            // 往CmpStep中放入时间消耗信息
            cmpStep.setTimeSpent(timeSpent);

            // 性能统计
            if (ObjectUtil.isNotNull(monitorBus)) {
                CompStatistics statistics = new CompStatistics(this.getClass().getSimpleName(), timeSpent);
                monitorBus.addStatistics(statistics);
            }
        }
    }

可以看到作者非常的贴心,进行了前置和后置方法的处理。而真正处理业务逻辑桥梁的self.process()这个方法。因为这个方法处理我们业务系统对应nodeComponent逻辑。

五、业务之外的扩展

从上面我们可以看到作者对nodeComponent处理的逻辑写了很多处理的方法。除了执行业务逻辑处理的桥梁的self.process()方法主要包括:

代码语言:javascript
复制
beforeProcess 前置处理
onSuccess 成功回调
onError  执行失败后回调方法
afterProcess 后置处理
monitorBus.addStatistics 性能统计

可以看到一个优秀的框架不仅需要处理正确的逻辑,还需要处理异常的逻辑,同时给业务留下口子,方便业务进行扩展。

除此之外,还有降级组件FallbackCmp,在业务逻辑中,可以基于此组件实现降级处理。

当然,除了whenCondition和ThenCondition之外,还有很多其它的Condition。更多的condition可以去官网了解。

六、相关参考地址

LiteFlow开源的gitee地址: https://gitee.com/dromara/liteFlow

LiteFlow开源的github地址: https://github.com/dromara/liteflow

LiteFlow开源的官网地址: https://liteflow.cc/

串行并行编排:https://mp.weixin.qq.com/s/R-TS5bQnEnROMaUjTZgIKA

本文参与 腾讯云自媒体同步曝光计划,分享自微信公众号。
原始发表:2023-10-22,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 后端技术学习 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 一、概论
  • 二、并行编排执行器基于策略模式实现逻辑
  • 三、AllOfParallelExecutor执行器执行逻辑
  • 四、调用Node组件来执行具体的NodeComponent
  • 五、业务之外的扩展
  • 六、相关参考地址
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档