前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >一个 DAG 工作流引擎的设计实现源代码实例

一个 DAG 工作流引擎的设计实现源代码实例

作者头像
一个会写诗的程序员
发布2021-06-29 11:32:29
3K0
发布2021-06-29 11:32:29
举报

任务Task

package com.bytedance.ecop.kunlun.engine.scheduler;

import java.util.UUID;

import static java.lang.Thread.sleep;

public class Task implements Executor {
    private Long id;
    private String name;
    private int state;
    private long timeout;

    public Task() {
    }

    public Task(Long id, String name, int state, long timeout) {
        this.id = id;
        this.name = name;
        this.state = state;
        this.timeout = timeout;
    }

    public Long getId() {
        return id;
    }

    public void setId(Long id) {
        this.id = id;
    }

    public String getName() {
        return name;
    }

    public void setName(String name) {
        this.name = name;
    }

    public int getState() {
        return state;
    }

    public void setState(int state) {
        this.state = state;
    }

    public long getTimeout() {
        return timeout;
    }

    public void setTimeout(long timeout) {
        this.timeout = timeout;
    }

    public boolean execute(TaskCallBack callBack) {
        System.out.println("Task id: [" + id + "], " + "task name: [" + name + "] is running");
        state = 1;
        try {
            sleep(3000L);
        } catch (InterruptedException e) {
        }

        TaskInstance taskInstance = new TaskInstance();
        taskInstance.setTaskId(id);
        String taskInstanceId = UUID.randomUUID().toString();
        taskInstance.setTaskInstanceId(taskInstanceId);

        TaskInstanceResult taskInstanceResult = new TaskInstanceResult("Task[" + id + "], taskInstanceId=" + taskInstanceId + " TaskInstanceResult = " + UUID.randomUUID());
        callBack.invoke(taskInstanceResult);
        return true;
    }

    public boolean hasExecuted() {
        return state == 1;
    }
}



package com.bytedance.ecop.kunlun.engine.scheduler;

public interface Executor {
    boolean execute(TaskCallBack callBack);
}


package com.bytedance.ecop.kunlun.engine.scheduler;

public interface TaskCallBack {
     Object invoke(TaskInstanceResult result);
}

任务运行实例TaskInstance

package com.bytedance.ecop.kunlun.engine.scheduler;

public class TaskInstance {
    private String taskInstanceId;
    private Long taskId;
    private String name;
    private int state;

    public TaskInstance() {
    }

    public TaskInstance(String taskInstanceId, Long taskId, String name, int state) {
        this.taskInstanceId = taskInstanceId;
        this.taskId = taskId;
        this.name = name;
        this.state = state;
    }

    public String getTaskInstanceId() {
        return taskInstanceId;
    }

    public void setTaskInstanceId(String taskInstanceId) {
        this.taskInstanceId = taskInstanceId;
    }

    public Long getTaskId() {
        return taskId;
    }

    public void setTaskId(Long taskId) {
        this.taskId = taskId;
    }

    public String getName() {
        return name;
    }

    public void setName(String name) {
        this.name = name;
    }

    public int getState() {
        return state;
    }

    public void setState(int state) {
        this.state = state;
    }
}


package com.bytedance.ecop.kunlun.engine.scheduler;

public class TaskInstanceResult {
    String resultJson;

    @Override
    public String toString() {
        return "TaskInstanceResult{" +
                "resultJson='" + resultJson + '\'' +
                '}';
    }

    public TaskInstanceResult(String resultJson) {
        this.resultJson = resultJson;
    }

    public String getResultJson() {
        return resultJson;
    }

    public void setResultJson(String resultJson) {
        this.resultJson = resultJson;
    }
}

DAG流程定义 Process

package com.bytedance.ecop.kunlun.engine.scheduler;

import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;

/**
 * DAG工作流
 */
public class Process {
    private Long graphId;
    private Set<Task> tasks;
    private Map<Task, Set<Task>> map;

    public Process() {
        this.tasks = new HashSet<Task>();
        this.map = new HashMap<Task, Set<Task>>();
    }

    public void addEdge(Task task, Task prev) {
        if (!tasks.contains(task) || !tasks.contains(prev)) {
            throw new IllegalArgumentException();
        }
        Set<Task> prevs = map.get(task);
        if (prevs == null) {
            prevs = new HashSet<Task>();
            map.put(task, prevs);
        }
        if (prevs.contains(prev)) {
            throw new IllegalArgumentException();
        }
        prevs.add(prev);
    }

    public void addTask(Task task) {
        if (tasks.contains(task)) {
            throw new IllegalArgumentException();
        }
        tasks.add(task);
    }

    public void remove(Task task) {
        if (!tasks.contains(task)) {
            return;
        }
        if (map.containsKey(task)) {
            map.remove(task);
        }
        for (Set<Task> set : map.values()) {
            if (set.contains(task)) {
                set.remove(task);
            }
        }
    }

    public Set<Task> getTasks() {
        return tasks;
    }

    public void setTasks(Set<Task> tasks) {
        this.tasks = tasks;
    }

    public Map<Task, Set<Task>> getMap() {
        return map;
    }

    public void setMap(Map<Task, Set<Task>> map) {
        this.map = map;
    }
}

DAG流程实例 ProcessInstance

package com.bytedance.ecop.kunlun.engine.scheduler;

import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;

/**
 * DAG工作流
 */
public class ProcessInstance {
    Long processId;
    String processInstanceId;

    public ProcessInstance(Long processId, String processInstanceId) {
        this.processId = processId;
        this.processInstanceId = processInstanceId;
    }

    public Long getProcessId() {
        return processId;
    }

    public void setProcessId(Long processId) {
        this.processId = processId;
    }

    public String getProcessInstanceId() {
        return processInstanceId;
    }

    public void setProcessInstanceId(String processInstanceId) {
        this.processInstanceId = processInstanceId;
    }
}

DAG工作流程调度器

package com.bytedance.ecop.kunlun.engine.scheduler;

import java.util.ArrayList;
import java.util.List;
import java.util.Set;

public class Scheduler {

    public void schedule(Process process) {
        while (true) {
            // 1、构建 todoTaskList
            List<Task> todoTaskList = new ArrayList<Task>();

            for (Task task : process.getTasks()) {
                if (!task.hasExecuted()) {
                    Set<Task> prevs = process.getMap().get(task);
                    if (prevs != null && !prevs.isEmpty()) {
                        boolean toAdd = true;
                        for (Task tsk : prevs) {
                            if (!tsk.hasExecuted()) {
                                toAdd = false;
                                break;
                            }
                        }
                        if (toAdd) {
                            todoTaskList.add(task);
                        }
                    } else {
                        todoTaskList.add(task);
                    }
                }
            }

            // 2.执行 todoTaskList
            if (!todoTaskList.isEmpty()) {
                for (Task task : todoTaskList) {

                    task.execute(new TaskCallBack() {
                        @Override
                        public Object invoke(TaskInstanceResult taskInstanceResult) {
                            System.out.println(taskInstanceResult);
                            return null;
                        }
                    });

                }
            } else {
                break;
            }

        }
    }

}

测试运行效果

package com.bytedance.ecop.kunlun.engine.scheduler;

public class MainApplication {

    public static void main(String[] args) {
        // 创建工作流
        Process process = new Process();
        // 注册任务
        Task task1 = new Task(1L, "task1", 0, -1);
        Task task2 = new Task(2L, "task2", 0, -1);
        Task task3 = new Task(3L, "task3", 0, -1);
        Task task4 = new Task(4L, "task4", 0, -1);
        Task task5 = new Task(5L, "task5", 0, -1);
        Task task6 = new Task(6L, "task6", 0, -1);

        process.addTask(task1);
        process.addTask(task2);
        process.addTask(task3);
        process.addTask(task4);
        process.addTask(task5);
        process.addTask(task6);

        process.addEdge(task1, task2);
        process.addEdge(task1, task5);
        process.addEdge(task6, task2);
        process.addEdge(task2, task3);
        process.addEdge(task2, task4);

        // 创建调度器,执行DAG调度
        Scheduler scheduler = new Scheduler();
        scheduler.schedule(process);
    }

}

Task id: [4], task name: [task4] is running TaskInstanceResult{resultJson='Task[4], taskInstanceId=b9fc6d44-d564-4131-8995-debf9a90f954 TaskInstanceResult = 37f41979-65b8-4c2d-baf1-b13176c19d02'} Task id: [5], task name: [task5] is running TaskInstanceResult{resultJson='Task[5], taskInstanceId=3da4db7f-4a63-482d-900c-64db48030627 TaskInstanceResult = 4f71de6d-8655-4d0f-8a64-c3c8ce8bdd23'} Task id: [3], task name: [task3] is running TaskInstanceResult{resultJson='Task[3], taskInstanceId=145a130a-5e39-41ea-98f3-87f62a8dfff8 TaskInstanceResult = d8746617-1df2-46f8-a129-d75070478228'} Task id: [2], task name: [task2] is running TaskInstanceResult{resultJson='Task[2], taskInstanceId=f7640b21-58b1-45bd-a886-aac425644d8e TaskInstanceResult = 7182f6c6-6395-4046-b50c-0df19f6de4ae'} Task id: [1], task name: [task1] is running TaskInstanceResult{resultJson='Task[1], taskInstanceId=be9ac46f-df32-4da8-b54c-74698c39d309 TaskInstanceResult = ac78327d-8825-4651-b309-c1bb5da6416c'} Task id: [6], task name: [task6] is running TaskInstanceResult{resultJson='Task[6], taskInstanceId=cb138e08-6ebe-4b4b-87ab-a1b230a06740 TaskInstanceResult = c0be1628-3a0c-4bfd-b863-500ca512ce80'}

Process finished with exit code 0

本文参与 腾讯云自媒体分享计划,分享自作者个人站点/博客。
如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 作者个人站点/博客 前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体分享计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 任务Task
  • 任务运行实例TaskInstance
  • DAG流程定义 Process
  • DAG流程实例 ProcessInstance
  • DAG工作流程调度器
  • 测试运行效果
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档