前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >如何编码检查依赖关系是否有循环依赖

如何编码检查依赖关系是否有循环依赖

作者头像
somenzz
发布2020-11-25 14:41:14
2.7K0
发布2020-11-25 14:41:14
举报
文章被收录于专栏:Python七号Python七号

什么是永不过时的技能:算法思维。

之前做数据仓库的运维,上线部署时需要处理很多任务的依赖关系,所谓任务,就是一个一个 shell 脚本或者存储过程等批处理任务,他们之间是有依赖关系的,由于数据仓库的任务超级多,约 3000 多个任务,这么多的任务是无法使用一张有向无环图来表示,因此依赖关系除了使用直观的有向连线来配置,还使用了隐藏式的配置,就是依赖关系无法使用有向线条来直观的看到。

既然看不到,就有可能出现循环依赖而不自知,只要有可能,就一定会有人犯错,不是你就是他,不是今天就是未来某一天,这就是墨菲定律。这不,我就经历过。

调度平台用的是先进数通的 MoiaControl V5,这是我用过的最好的调度平台了,之前用过 ETlplus,Airflow。但 MoiaControl 中出现循环依赖并不提示,会导致第二天的任务不会跑批,影响数据的时效性。假如你准备面试先进数通这家公司,说你可以为该产品增加一项检查否有循环依赖的功能,我想这一定是个加分项。

那问题来了,如何编码检查任务依赖关系是否有循环依赖?

答案很简单,就是构造一个有向图,进行拓扑排序,如果拓扑排序后没有未访问的点,那就没有环,否则就有环。

下面,我用 Python 来演示这一解决过程,带你彻底掌握拓扑排序。

首先,我们需要借助一种数据结构来表示有向图,使用方便即可,这里,我使用字典来表示,比如表达 a->b, a->c, c->d 这样的依赖关系,我们可以构造字典 edges = { 'a':{'b','c'},'c':{'d'} } 来表示。字典的键表示前驱任务,字典的值是一个集合,表示依赖前驱的任务集合。这样的字典可以借助于标准库的 collections 来快速初始化:

代码语言:javascript
复制
edges = collections.defaultdict(set)

仅保存边是不够的,我们还需要保存顶点,这可以借助一个集合,它可以自动去重,后面看是否所有的任务节点都参与了拓扑排序,就靠它了。

代码语言:javascript
复制
self.edges = collections.defaultdict(set)
vertex = set()

接下来就是拓扑排序的代码实现了。

拓扑排序一般来说有两种思路,一种是广度优先遍历,借助于先进先出的队列,一种是深度优先遍历,借助于后进先出的栈。无论哪一种思路,都与入度和出度有关。下面分别进行分析。

广度优先遍历比较符合人的习惯思维,从前到后逐层推进。它首先找出不被任何任务依赖的任务进入队列,哪一种任务不被任何任务依赖呢?比如 a->b->c ,a 就是不被任何任务依赖的任务,这样的任务有个特点,就是入度为 0,没有箭头指向的任务的入度就是 0。

首先,我们计算所有节点的入度,把所有入度为 0 的任务依次放入队列,然后开始循环遍历队列,取出第一个任务,记为 a,标记为已访问,同时将依赖于 a 的任务的入度都减少 1,如果减少 1 后入度为 0 的任务放入队列。继续循环,直到所有的节点都被访问。如果循环结束,仍有节点未被遍历,说明存在循环依赖,无论如何他们的入度也不可能为 0。

以上的思路,翻译成代码,如下所示:

代码语言:javascript
复制
import collections

class CheckCycle(object):
    def __init__(self):
        self.vertex = set()  # 顶点集合
        self.edges = collections.defaultdict(
            set
        )  # 使用字典表示有向边 如 a -> {b,c,e} 表示 b,c,e 均依赖 a
        self.indegree = collections.defaultdict(int)  # 计算每个顶点的入度

    def add_edge(self, from_job: str, to_job: str) -> bool:
        """
        添加一条边
        """
        if from_job == to_job:
            return False

        if from_job:
            self.vertex.add(from_job)
            if not from_job in self.indegree:
                self.indegree[from_job] = 0  # 初始化入度为 0
        if to_job:
            self.vertex.add(to_job)
            if not to_job in self.indegree:  # 初始化入度为0
                self.indegree[to_job] = 0
        if from_job and to_job:
            if to_job not in self.edges[from_job]:  # 防止充分添加相同的边
                self.indegree[to_job] += 1  # 入度加 1
                self.edges[from_job].add(to_job)  # 防止充分添加相同的边

        return True

    def can_finish(self) -> bool:
        """
        Returns:
            True: 表示没有环,任务可以完成
            False: 表示有环,任务不可以完成
        """

        q = collections.deque([u for u in self.indegree if self.indegree[u] == 0])
        visited = 0

        possible_sequence = []

        while q:
            visited += 1
            u = q.popleft()
            possible_sequence.append(u)
            for v in self.edges[u]:
                self.indegree[v] -= 1
                if self.indegree[v] == 0:
                    q.append(v)
        print(f'possible sequence: {"->".join(possible_sequence)}')
        return visited == len(self.vertex)


if __name__ == "__main__":
    """
    a->b->c
       b->d
       True
    a->b->c
       b->d->a
       False
    """
    check_cycle = CheckCycle()
    check_cycle.add_edge(from_job="a", to_job="b")
    check_cycle.add_edge(from_job="b", to_job="c")
    check_cycle.add_edge(from_job="b", to_job="d")
    print(check_cycle.can_finish())

    check_cycle = CheckCycle()
    check_cycle.add_edge(from_job="a", to_job="b")
    check_cycle.add_edge(from_job="b", to_job="c")
    check_cycle.add_edge(from_job="b", to_job="d")
    check_cycle.add_edge(from_job="d", to_job="a")
    print(check_cycle.can_finish())

时间复杂度和空间复杂度的分析同广度优先遍历算法,都是 O(m+n), m 是顶点数,n 是边数,不在赘述。

另一种方法就是深度优先遍历, 深度优先遍历则是一种逆向思维,为了简单的理解,先考虑没有环的情况,如 a->b->c-d :从任意任务节点出发,假如是 b,一条路走到黑,遍历到最后一个节点 d ,将其入栈,同时将已经访问过的节点标记为已访问(b,c 已访问),将已入栈的节点标记为已完成(d 已完成),还没有访问过的节点标记为未访问 (a 未访问)。也就是说任何一个节点,只会有以下三种状态:

  • 「未访问」:我们还没有访问到这个节点,使用 0 来表示。
  • 「已访问」:我们访问过这个节点,但还没有回溯到该节点,即该节点还没有入栈,还有相邻的节点没有完成,使用 1 来表示。
  • 「已完成」:我们访问过且回溯过这个节点,即该节点已经入栈,并且所有该节点的后续节点都出现在栈的更底部的位置,满足拓扑排序的要求,使用 2 来表示。

现在回溯到 c,发现 c 已访问,且 c 的后续节点 d 已经完成,因此将 c 入栈,标记为已完成,依次类推,现在,栈底到栈顶依次为 d,c,b。然后从剩余节点 a 出发,执行同样的逻辑,a 也入栈,标记为完成,最终从栈底到栈顶为 d,c,b,a,将这些节点依次出栈,即为拓扑排序。

现在考虑有环的情况 a->b->c->d->b,访问到 d 时,继续访问 b 发现 b 已经被访问,说明有环,退出即可。根据上面的分析,不难写出以下深度遍历的代码:

代码语言:javascript
复制
def can_finish2(self) -> bool:
    """
    深度优先遍历
    Returns:
        True: 表示没有环,任务可以完成
        False: 表示有环,任务不可以完成
    """

    visited = collections.defaultdict(int)  # 保存每个顶点是否被访问过
    for job in self.vertex:
        visited[job] = 0  # 初始化,均未被访问过

    result = list()  # 模拟栈
    valid = True

    def dfs(from_job: str):
        nonlocal valid
        visited[from_job] = 1
        for to_job in self.edges[from_job]:
            if visited[to_job] == 0:
                dfs(to_job)
                if not valid:
                    return
            elif visited[to_job] == 1:
                valid = False
                return
        visited[from_job] = 2
        result.append(from_job)

    for job in self.vertex:
        if valid and not visited[job]:
            dfs(job)

    # print(result)
    return valid

时间复杂度即为深度优先遍历或广度优先遍历的时间复杂度,都为 O(m+n) ,其中 m 是顶点数,n 是边数,对应着任务数和任务的依赖数。

其实即使写不出深度优先或广度优先的代码关系也不大,只有会灵活使用就行,网上都是现成的代码,最重要的是要理解这些代码,为我所用。

想使用代码时不必辛苦的复制,回复「拓扑排序」获取可执行代码。

感谢你的点赞支持。

本文参与 腾讯云自媒体分享计划,分享自微信公众号。
原始发表:2020-08-28,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 Python七号 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体分享计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档