前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >聊聊PowerJob的BroadcastProcessor

聊聊PowerJob的BroadcastProcessor

原创
作者头像
code4it
发布2024-01-27 21:45:43
950
发布2024-01-27 21:45:43
举报
文章被收录于专栏:码匠的流水账码匠的流水账

本文主要研究一下PowerJob的BroadcastProcessor

BroadcastProcessor

tech/powerjob/worker/core/processor/sdk/BroadcastProcessor.java

代码语言:javascript
复制
public interface BroadcastProcessor extends BasicProcessor {

    /**
     * 在所有节点广播执行前执行,只会在一台机器执行一次
     */
    default ProcessResult preProcess(TaskContext context) throws Exception {
        return new ProcessResult(true);
    }
    /**
     * 在所有节点广播执行完成后执行,只会在一台机器执行一次
     */
    default ProcessResult postProcess(TaskContext context, List<TaskResult> taskResults) throws Exception {
        return defaultResult(taskResults);
    }

    static ProcessResult defaultResult(List<TaskResult> taskResults) {
        long succeed = 0, failed = 0;
        for (TaskResult ts : taskResults) {
            if (ts.isSuccess()) {
                succeed ++ ;
            }else {
                failed ++;
            }
        }
        return new ProcessResult(failed == 0, String.format("succeed:%d, failed:%d", succeed, failed));
    }
}

BroadcastProcessor接口继承了BasicProcessor,它定义了preProcess、postProcess、defaultResult方法,其中preProcess默认返回成功的ProcessResult,postProcess返回defaultResult,它会根据taskResults计算最终的success与否

HeavyProcessorRunnable

tech/powerjob/worker/core/processor/runnable/HeavyProcessorRunnable.java

代码语言:javascript
复制
public class HeavyProcessorRunnable implements Runnable {


    private final InstanceInfo instanceInfo;
    private final String taskTrackerAddress;
    private final TaskDO task;
    private final ProcessorBean processorBean;
    private final OmsLogger omsLogger;
    /**
     * 重试队列,ProcessorTracker 将会定期重新上报处理结果
     */
    private final Queue<ProcessorReportTaskStatusReq> statusReportRetryQueue;
    private final WorkerRuntime workerRuntime;

    @Override
    @SuppressWarnings("squid:S2142")
    public void run() {
        // 切换线程上下文类加载器(否则用的是 Worker 类加载器,不存在容器类,在序列化/反序列化时会报 ClassNotFoundException)
        Thread.currentThread().setContextClassLoader(processorBean.getClassLoader());
        try {
            innerRun();
        } catch (InterruptedException ignore) {
            // ignore
        } catch (Throwable e) {
            reportStatus(TaskStatus.WORKER_PROCESS_FAILED, e.toString(), null, null);
            log.error("[ProcessorRunnable-{}] execute failed, please contact the author(@KFCFans) to fix the bug!", task.getInstanceId(), e);
        } finally {
            ThreadLocalStore.clear();
        }
    }    

    public void innerRun() throws InterruptedException {

        final BasicProcessor processor = processorBean.getProcessor();

        String taskId = task.getTaskId();
        Long instanceId = task.getInstanceId();

        log.debug("[ProcessorRunnable-{}] start to run task(taskId={}&taskName={})", instanceId, taskId, task.getTaskName());
        ThreadLocalStore.setTask(task);
        ThreadLocalStore.setRuntimeMeta(workerRuntime);

        // 0. 构造任务上下文
        WorkflowContext workflowContext = constructWorkflowContext();
        TaskContext taskContext = constructTaskContext();
        taskContext.setWorkflowContext(workflowContext);
        // 1. 上报执行信息
        reportStatus(TaskStatus.WORKER_PROCESSING, null, null, null);

        ProcessResult processResult;
        ExecuteType executeType = ExecuteType.valueOf(instanceInfo.getExecuteType());

        // 2. 根任务 & 广播执行 特殊处理
        if (TaskConstant.ROOT_TASK_NAME.equals(task.getTaskName()) && executeType == ExecuteType.BROADCAST) {
            // 广播执行:先选本机执行 preProcess,完成后 TaskTracker 再为所有 Worker 生成子 Task
            handleBroadcastRootTask(instanceId, taskContext);
            return;
        }

        // 3. 最终任务特殊处理(一定和 TaskTracker 处于相同的机器)
        if (TaskConstant.LAST_TASK_NAME.equals(task.getTaskName())) {
            handleLastTask(taskId, instanceId, taskContext, executeType);
            return;
        }

        // 4. 正式提交运行
        try {
            processResult = processor.process(taskContext);
            if (processResult == null) {
                processResult = new ProcessResult(false, "ProcessResult can't be null");
            }
        } catch (Throwable e) {
            log.warn("[ProcessorRunnable-{}] task(id={},name={}) process failed.", instanceId, taskContext.getTaskId(), taskContext.getTaskName(), e);
            processResult = new ProcessResult(false, e.toString());
        }
        reportStatus(processResult.isSuccess() ? TaskStatus.WORKER_PROCESS_SUCCESS : TaskStatus.WORKER_PROCESS_FAILED, suit(processResult.getMsg()), null, workflowContext.getAppendedContextData());
    }

    //......
}    

HeavyProcessorRunnable实现了Runnable接口,其run方法设置Thread.currentThread().setContextClassLoader为processorBean.getClassLoader(),然后执行innerRun,它对于BROADCAST的root任务会执行handleBroadcastRootTask,对于last任务执行handleLastTask,否则执行processor.process(taskContext),最后执行reportStatus

handleBroadcastRootTask

代码语言:javascript
复制
    private void handleBroadcastRootTask(Long instanceId, TaskContext taskContext) {
        BasicProcessor processor = processorBean.getProcessor();
        ProcessResult processResult;
        // 广播执行的第一个 task 只执行 preProcess 部分
        if (processor instanceof BroadcastProcessor) {

            BroadcastProcessor broadcastProcessor = (BroadcastProcessor) processor;
            try {
                processResult = broadcastProcessor.preProcess(taskContext);
            } catch (Throwable e) {
                log.warn("[ProcessorRunnable-{}] broadcast task preProcess failed.", instanceId, e);
                processResult = new ProcessResult(false, e.toString());
            }

        } else {
            processResult = new ProcessResult(true, "NO_PREPOST_TASK");
        }
        // 通知 TaskTracker 创建广播子任务
        reportStatus(processResult.isSuccess() ? TaskStatus.WORKER_PROCESS_SUCCESS : TaskStatus.WORKER_PROCESS_FAILED, suit(processResult.getMsg()), ProcessorReportTaskStatusReq.BROADCAST, taskContext.getWorkflowContext().getAppendedContextData());

    }

handleBroadcastRootTask方法执行broadcastProcessor.preProcess(taskContext),然后reportStatus,其cmd为ProcessorReportTaskStatusReq.BROADCAST

handleLastTask

代码语言:javascript
复制
    private void handleLastTask(String taskId, Long instanceId, TaskContext taskContext, ExecuteType executeType) {
        final BasicProcessor processor = processorBean.getProcessor();
        ProcessResult processResult;
        Stopwatch stopwatch = Stopwatch.createStarted();
        log.debug("[ProcessorRunnable-{}] the last task(taskId={}) start to process.", instanceId, taskId);

        List<TaskResult> taskResults = workerRuntime.getTaskPersistenceService().getAllTaskResult(instanceId, task.getSubInstanceId());
        try {
            switch (executeType) {
                case BROADCAST:

                    if (processor instanceof BroadcastProcessor) {
                        BroadcastProcessor broadcastProcessor = (BroadcastProcessor) processor;
                        processResult = broadcastProcessor.postProcess(taskContext, taskResults);
                    } else {
                        processResult = BroadcastProcessor.defaultResult(taskResults);
                    }
                    break;
                case MAP_REDUCE:

                    if (processor instanceof MapReduceProcessor) {
                        MapReduceProcessor mapReduceProcessor = (MapReduceProcessor) processor;
                        processResult = mapReduceProcessor.reduce(taskContext, taskResults);
                    } else {
                        processResult = new ProcessResult(false, "not implement the MapReduceProcessor");
                    }
                    break;
                default:
                    processResult = new ProcessResult(false, "IMPOSSIBLE OR BUG");
            }
        } catch (Throwable e) {
            processResult = new ProcessResult(false, e.toString());
            log.warn("[ProcessorRunnable-{}] execute last task(taskId={}) failed.", instanceId, taskId, e);
        }

        TaskStatus status = processResult.isSuccess() ? TaskStatus.WORKER_PROCESS_SUCCESS : TaskStatus.WORKER_PROCESS_FAILED;
        reportStatus(status, suit(processResult.getMsg()), null, taskContext.getWorkflowContext().getAppendedContextData());

        log.info("[ProcessorRunnable-{}] the last task execute successfully, using time: {}", instanceId, stopwatch);
    }

handleLastTask对于BROADCAST则执行broadcastProcessor.postProcess或者BroadcastProcessor.defaultResult

reportStatus

代码语言:javascript
复制
    private void reportStatus(TaskStatus status, String result, Integer cmd, Map<String, String> appendedWfContext) {
        ProcessorReportTaskStatusReq req = new ProcessorReportTaskStatusReq();

        req.setInstanceId(task.getInstanceId());
        req.setSubInstanceId(task.getSubInstanceId());
        req.setTaskId(task.getTaskId());
        req.setStatus(status.getValue());
        req.setResult(result);
        req.setReportTime(System.currentTimeMillis());
        req.setCmd(cmd);
        // 检查追加的上下文大小是否超出限制
        if (instanceInfo.getWfInstanceId() !=null && WorkflowContextUtils.isExceededLengthLimit(appendedWfContext, workerRuntime.getWorkerConfig().getMaxAppendedWfContextLength())) {
            log.warn("[ProcessorRunnable-{}]current length of appended workflow context data is greater than {}, this appended workflow context data will be ignore!",instanceInfo.getInstanceId(), workerRuntime.getWorkerConfig().getMaxAppendedWfContextLength());
            // ignore appended workflow context data
            appendedWfContext = Collections.emptyMap();
        }
        req.setAppendedWfContext(appendedWfContext);

        // 最终结束状态要求可靠发送
        if (TaskStatus.FINISHED_STATUS.contains(status.getValue())) {
            boolean success = TransportUtils.reliablePtReportTask(req, taskTrackerAddress, workerRuntime);
            if (!success) {
                // 插入重试队列,等待重试
                statusReportRetryQueue.add(req);
                log.warn("[ProcessorRunnable-{}] report task(id={},status={},result={}) failed, will retry later", task.getInstanceId(), task.getTaskId(), status, result);
            }
        } else {
            TransportUtils.ptReportTask(req, taskTrackerAddress, workerRuntime);
        }
    }

reportStatus上报状态给TaskTracker,它构建ProcessorReportTaskStatusReq,对于FINISHED_STATUS的执行TransportUtils.reliablePtReportTask,失败则进入重试队列,否则执行TransportUtils.ptReportTask

TransportUtils

tech/powerjob/worker/common/utils/TransportUtils.java

代码语言:javascript
复制
    public static boolean reliablePtReportTask(ProcessorReportTaskStatusReq req, String address, WorkerRuntime workerRuntime) {
        try {
            return reliableAsk(ServerType.WORKER, WTT_PATH, WTT_HANDLER_REPORT_TASK_STATUS, address, req, workerRuntime.getTransporter()).isSuccess();
        } catch (Exception e) {
            log.warn("[PowerJobTransport] reliablePtReportTask failed: {}", req, e);
            return false;
        }
    }

    private static AskResponse reliableAsk(ServerType t, String rootPath, String handlerPath, String address, PowerSerializable req, Transporter transporter) throws Exception {
        final URL url = easyBuildUrl(t, rootPath, handlerPath, address);
        final CompletionStage<AskResponse> completionStage = transporter.ask(url, req, AskResponse.class);
        return completionStage
                .toCompletableFuture()
                .get(RemoteConstant.DEFAULT_TIMEOUT_MS, TimeUnit.MILLISECONDS);
    }    

    public static void ptReportTask(ProcessorReportTaskStatusReq req, String address, WorkerRuntime workerRuntime) {
        final URL url = easyBuildUrl(ServerType.WORKER, WTT_PATH, WTT_HANDLER_REPORT_TASK_STATUS, address);
        workerRuntime.getTransporter().tell(url, req);
    }    

reliablePtReportTask用的是ask方法,ptReportTask用的是tell方法

onReceiveProcessorReportTaskStatusReq

tech/powerjob/worker/actors/TaskTrackerActor.java

代码语言:javascript
复制
    @Handler(path = WTT_HANDLER_REPORT_TASK_STATUS)
    public AskResponse onReceiveProcessorReportTaskStatusReq(ProcessorReportTaskStatusReq req) {

        int taskStatus = req.getStatus();
        // 只有重量级任务才会有两级任务状态上报的机制
        HeavyTaskTracker taskTracker = HeavyTaskTrackerManager.getTaskTracker(req.getInstanceId());

        // 手动停止 TaskTracker 的情况下会出现这种情况
        if (taskTracker == null) {
            log.warn("[TaskTrackerActor] receive ProcessorReportTaskStatusReq({}) but system can't find TaskTracker.", req);
            return null;
        }

        if (ProcessorReportTaskStatusReq.BROADCAST.equals(req.getCmd())) {
            taskTracker.broadcast(taskStatus == TaskStatus.WORKER_PROCESS_SUCCESS.getValue(), req.getSubInstanceId(), req.getTaskId(), req.getResult());
        }

        taskTracker.updateTaskStatus(req.getSubInstanceId(), req.getTaskId(), taskStatus, req.getReportTime(), req.getResult());

        // 更新工作流上下文
        taskTracker.updateAppendedWfContext(req.getAppendedWfContext());

        // 结束状态需要回复接受成功
        if (TaskStatus.FINISHED_STATUS.contains(taskStatus)) {
            return AskResponse.succeed(null);
        }

        return null;
    }

TaskTrackerActor的onReceiveProcessorReportTaskStatusReq用于处理ProcessorReportTaskStatusReq,它针对cmd为BROADCAST的执行taskTracker.broadcast

broadcast

tech/powerjob/worker/core/tracker/task/heavy/HeavyTaskTracker.java

代码语言:javascript
复制
    public void broadcast(boolean preExecuteSuccess, long subInstanceId, String preTaskId, String result) {

        if (finished.get()) {
            return;
        }

        log.info("[TaskTracker-{}-{}] finished broadcast's preProcess, preExecuteSuccess:{},preTaskId:{},result:{}", instanceId, subInstanceId, preExecuteSuccess, preTaskId, result);

        // 生成集群子任务
        if (preExecuteSuccess) {
            List<String> allWorkerAddress = ptStatusHolder.getAllProcessorTrackers();
            List<TaskDO> subTaskList = Lists.newLinkedList();
            for (int i = 0; i < allWorkerAddress.size(); i++) {
                TaskDO subTask = new TaskDO();
                subTask.setSubInstanceId(subInstanceId);
                subTask.setTaskName(TaskConstant.BROADCAST_TASK_NAME);
                subTask.setTaskId(preTaskId + "." + i);
                // 广播任务直接写入派发地址
                subTask.setAddress(allWorkerAddress.get(i));
                subTaskList.add(subTask);
            }
            submitTask(subTaskList);
        } else {
            log.warn("[TaskTracker-{}-{}] BroadcastTask failed because of preProcess failed, preProcess result={}.", instanceId, subInstanceId, result);
        }
    }

    public boolean submitTask(List<TaskDO> newTaskList) {
        if (finished.get()) {
            return true;
        }
        if (CollectionUtils.isEmpty(newTaskList)) {
            return true;
        }
        // 基础处理(多循环一次虽然有些浪费,但分布式执行中,这点耗时绝不是主要占比,忽略不计!)
        newTaskList.forEach(task -> {
            task.setInstanceId(instanceId);
            task.setStatus(TaskStatus.WAITING_DISPATCH.getValue());
            task.setFailedCnt(0);
            task.setLastModifiedTime(System.currentTimeMillis());
            task.setCreatedTime(System.currentTimeMillis());
            task.setLastReportTime(-1L);
        });

        log.debug("[TaskTracker-{}] receive new tasks: {}", instanceId, newTaskList);
        return taskPersistenceService.batchSave(newTaskList);
    }    

HeavyTaskTracker的broadcast方法会通过ptStatusHolder.getAllProcessorTrackers()获取allWorkerAddress,然后遍历allWorkerAddress,挨个生成subTask,最后提交这一批subTaskList;submitTask通过taskPersistenceService.batchSave进行保存

小结

  • BroadcastProcessor接口继承了BasicProcessor,它定义了preProcess、postProcess、defaultResult方法,其中preProcess默认返回成功的ProcessResult,postProcess返回defaultResult,它会根据taskResults计算最终的success与否;
  • HeavyProcessorRunnable实现了Runnable接口,其run方法设置Thread.currentThread().setContextClassLoader为processorBean.getClassLoader(),然后执行innerRun,它对于BROADCAST的root任务会执行handleBroadcastRootTask,对于last任务执行handleLastTask,否则执行processor.process(taskContext),最后执行reportStatus
  • handleBroadcastRootTask方法执行broadcastProcessor.preProcess(taskContext),然后reportStatus,其cmd为ProcessorReportTaskStatusReq.BROADCAST
  • handleLastTask对于BROADCAST则执行broadcastProcessor.postProcess或者BroadcastProcessor.defaultResult
  • HeavyTaskTracker的broadcast方法会通过ptStatusHolder.getAllProcessorTrackers()获取allWorkerAddress,然后遍历allWorkerAddress,挨个生成subTask,最后提交这一批subTaskList;submitTask通过taskPersistenceService.batchSave进行保存

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • BroadcastProcessor
  • HeavyProcessorRunnable
    • handleBroadcastRootTask
      • handleLastTask
        • reportStatus
          • TransportUtils
          • onReceiveProcessorReportTaskStatusReq
          • broadcast
          • 小结
          相关产品与服务
          容器服务
          腾讯云容器服务(Tencent Kubernetes Engine, TKE)基于原生 kubernetes 提供以容器为核心的、高度可扩展的高性能容器管理服务,覆盖 Serverless、边缘计算、分布式云等多种业务部署场景,业内首创单个集群兼容多种计算节点的容器资源管理模式。同时产品作为云原生 Finops 领先布道者,主导开源项目Crane,全面助力客户实现资源优化、成本控制。
          领券
          问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档