我想使用Luigi工作流并行加载spark data frame中的多个文件,并将它们存储在字典中。当我运行Luigi时,一个运行Luigi的worker.if与多个工作人员一起运行Luigi时,此变量在main方法中为空。 from Luigi import LocalTarget from pyspark import SQLContext
也就是说,我们为不同的x= A, B, C ...运行x=。job_2运行一组参数,这些参数依赖于job_1(x)的结果,job_2还加载job_A(x)存储的数据。诸若此类。因此,如果job_A for x=B失败了,那么树的分支将完全失败,不应该运行。不过,所有其他分支都应该运行。
所有作业都是用Python编写的,并使用并行性(基于生成SLURM作业)。无论树中较高的作业是否失败,所有作业都会运行。如果不深入了解依赖关系,就很难看出问题在哪里。如果更高的作业(例如,job_A)没有
目前,我有许多luigi任务一起排队,其中有一个简单的依赖链( a -> b -> c -> d)。首先执行d,最后执行a。a是被触发的任务。除a之外,所有目标都返回一个luigi.LocalTarget()对象,并有一个泛型luigi.Parameter(),它是一个字符串(包含日期和时间)。在luigi中央服务器上运行(该服务器已启用历史记录)。问题是,当我重新运行该任务a时,luigi检查历史