我尝试在名为test.py的命令提示符和名为Test的类中运行1Luigi任务,如下所示
luigi -m test Test --local-scheduler
如何传递参数并在ubuntu中运行
代码示例:
class Test:
def requires(self):
return class1(param1='path', param2='', param3=123)
def run(self):
# some logic with using those 3 params
def out
考虑以下任务:
import luigi
class YieldFailTaskInBatches(luigi.Task):
def run(self):
for i in range(5):
yield [
FailTask(i, j)
for j in range(2)
]
class YieldAllFailTasksAtOnce(luigi.Task):
def run(self):
yield [
我很难理解如何在Luigi中实现可重用的任务,然后在具体的情况下使用它们。
例如。我有两个一般任务,对一个文件做一些事情,然后输出结果:
class GffFilter(luigi.Task):
"Filters a GFF file to only one feature"
feature = luigi.Parameter()
out_file = luigi.Parameter()
in_file = luigi.Parameter()
...
class BgZip(luigi.Task):
"bgZips a
我有一个简单的luigi任务,它在运行时会产生一个不同的参数,如下所示。 import luigi
class ComputeJob(luigi.Task):
id_parameter = luigi.parameter.IntParameter()
#run defination
def run(self):
print ("\nrunning task {}".format(self.id_parameter))
#Do some work here
if self.id_parameter
我试图理解luigi是如何工作的,我理解了这个想法,但是实际的实现要困难一些;)这就是我所拥有的:
class MyTask(luigi.Task):
x = luigi.IntParameter()
def requires(self):
return OtherTask(self.x)
def run(self):
print(self.x)
class OtherTask(luigi.Task):
x = luigi.IntParameter()
def run(self):
y = se
我正在运行一个很大的Luigi工作流,它总共运行了100多个任务。工作流在相当长的一段时间内运行良好,但在某个阶段,它达到了一个点,即有15个挂起的任务,所有其他任务都成功完成,没有失败的任务。但是,它似乎不再接收要执行的那些挂起的任务。我已经彻底查看了日志,没有任何错误。从那时起,它只会定期打印以下日志:
There are no more tasks to run at this time
There are 15 pending tasks possibly being run by other workers
There are 15 pending tasks unique to t
我正在尝试理解一个使用luigi来构建管道的程序。我已经理解了基本的知识,但还有以下几点
class Task5(luigi.Task):
task_namespace = "examples"
# something else
当我用task_namespace对行进行注释时,程序运行正常,但该行失败的原因如下:
luigi.task_register.TaskClassNotFoundException: No task Task5. Did you mean:
Task
目前,我有许多luigi任务一起排队,其中有一个简单的依赖链( a -> b -> c -> d)。首先执行d,最后执行a。a是被触发的任务。
除a之外,所有目标都返回一个luigi.LocalTarget()对象,并有一个泛型luigi.Parameter(),它是一个字符串(包含日期和时间)。在luigi中央服务器上运行(该服务器已启用历史记录)。
问题是,当我重新运行该任务a时,luigi检查历史记录,查看该特定任务以前是否运行过,如果它的状态为“已完成”,它就不会运行任务(在本例中为d),而且我不能这样做,更改字符串无助于此(添加了一个随机的微秒)。如何强制运行任务?
我意识到我可能需要使用动态需求来完成以下任务,但是我还不能理解这在实践中会是什么样子。 其目标是使用Luigi生成数据并将其添加到数据库中,而无需提前知道将生成什么数据。 以mongodb为例: import luigi
from uuid import uuid4
from luigi.contrib import mongodb
import pymongo
# Make up IDs, though in practice the IDs may be generated from an API
class MakeID(luigi.Task):
def run(self):
我是Luigi的新手,我想设置luigi来执行我的API调用。 我正在使用MockFiles,因为我通过API检索的json对象是轻量级的,并且我想避免使用外部数据库。 这是我的代码: import luigi
from luigi import Task, run as runLuigi, mock as LuigiMock
import yaml
class getAllCountries(Task):
task_complete = False
def requires(self):
return LuigiMock.MockFile("a
考虑一个任务通过动态依赖关系依赖于另一个任务的情况: import luigi
from luigi import Task, TaskParameter, IntParameter
class TaskA(Task):
parent = TaskParameter()
arg = IntParameter(default=0)
def requires(self):
return self.parent()
def run(self):
print(f"task A arg = {self.arg}")
c
我正在使用httparty进行一系列API调用。前两个API调用成功,但第三个API调用失败。它暂停大约60秒(默认超时时间),然后返回以下错误:
/Users/luigi/.rvm/rubies/ruby-2.0.0-p247/lib/ruby/2.0.0/net/protocol.rb:158:in `rescue in rbuf_fill': Net::ReadTimeout (Net::ReadTimeout)
from /Users/luigi/.rvm/rubies/ruby-2.0.0-p247/lib/ruby/2.0.0/net/protocol.rb:152
我是Luigi的新手,我已经创建了一个管道,它从数据库中获取数据,转换数据,然后将其加载回数据库。我在其中创建了四个任务。但是,当我在cmd或Pycharm上执行任务时,它显示不能调度非任务。下面是我的流水线的伪代码。每个任务的参数不是输入,而是从其他文件中获取。 class Task1(luigi.Task):
# Some Parameters
def get_target():
def query():
def run():
class Task2(luigi.Task):
我是Luigi的新手,正在尝试在我的流程中有一个条件分支。分支任务将评估一个条件,并根据结果跳过它的一些子项。
为了测试这一点,我只有一个虚拟任务,它检查当前小时,如果流在早上执行,则返回True,否则返回False。该任务有两个子任务,一个在控制台中打印'Morning‘,另一个在控制台中打印’下午‘。根据分支任务的结果,一个被激活,另一个被跳过。下面是它在Prefect中的样子:
您可以在这里看到,流是在上午执行的,因此跳过了下午的任务。
在做了一些研究之后,我不知道Luigi是否有能力做这样的事情。到目前为止,我尝试的是:
# Third Task
class Branch
我想使用Luigi工作流并行加载spark data frame中的多个文件,并将它们存储在字典中。一旦所有的文件都被加载,我希望能够从main中的字典中访问这些数据帧,然后进一步执行processing.This进程。当我运行Luigi时,一个运行Luigi的worker.if与多个工作人员一起运行Luigi时,此变量在main方法中为空。
任何建议都会有帮助。
import Luigi
from Luigi import LocalTarget
from pyspark import SQLContext
from src.etl.SparkA
我正在使用luigi.build方法尝试luigi的多处理能力。但是我在执行的时候遇到了一些库错误。
for next in self._add(item,is_complete):文件请求行604,_add self._validate_dependency(d)文件异常行622,_validate_dependency raise Exception('requires() in is_complete objects')
这是我试图实现目标的一段代码。
import luigi
class TaskOne(luigi.Task):
custid= luigi
我有一个很好的直线工作管道,我在命令行上通过luigi运行的任务触发了所有所需的上游数据的获取和处理,并按正确的顺序进行,直到它滴入我的数据库。
class IMAP_Fetch(luigi.Task):
"""fetch a bunch of email messages with data in them"""
date = luigi.DateParameter()
uid = luigi.Parameter()
…
def output(self):
loc = os.path.join(self.data_dro
我有一个luigi预处理任务,它将我的原始数据分割成更小的文件。然后,这些文件将由实际管道处理。
因此,关于参数,我想要求每个管道都有一个预处理文件id作为参数。但是,此文件id仅在预处理步骤中生成,因此仅在运行时才知道。为了说明我的想法,我提供了以下不起作用的代码:
import luigi
import subprocess
import random
class GenPipelineFiles(luigi.Task):
input_file = luigi.Parameter()
def requires(self):
pass
de