首页
学习
活动
专区
圈层
工具
发布
首页
学习
活动
专区
圈层
工具
MCP广场
社区首页 >问答首页 >如何在执行luigi任务时修复"luigi.worker.TaskException: Can not schedule non-task <class '__main__.Task'>“?

如何在执行luigi任务时修复"luigi.worker.TaskException: Can not schedule non-task <class '__main__.Task'>“?
EN

Stack Overflow用户
提问于 2019-08-28 13:49:53
回答 1查看 511关注 0票数 1

我是Luigi的新手,我已经创建了一个管道,它从数据库中获取数据,转换数据,然后将其加载回数据库。我在其中创建了四个任务。但是,当我在cmd或Pycharm上执行任务时,它显示不能调度非任务。下面是我的流水线的伪代码。每个任务的参数不是输入,而是从其他文件中获取。

代码语言:javascript
运行
复制
    class Task1(luigi.Task): 
          # Some Parameters
         def get_target(): 
         def query():
         def run(): 
    class Task2(luigi.Task):
          # Some Parameters 
         def requires():
           return Task1()
         def func1():
         def func2():
         def run()
    class Task3(luigi.Task): 
         # Some Parameters 
         def requires():
             return Task2()
         def run():
    class Task4(luigi.Task):
         # Some Parameters 
          def requires(): 
              return Task3()
          def run(): 

在Pycharm上,我使用

代码语言:javascript
运行
复制
if __name__ == '__main__':
    luigi.build([Task1, Task2, Task3, Task4], workers=5, local_scheduler=True)

在cmd上,我使用

代码语言:javascript
运行
复制
 python .\folder\file.py Task1

但是它给了我这个错误

代码语言:javascript
运行
复制
INFO: Worker Worker was stopped. Shutting down Keep-Alive thread
Traceback (most recent call last):
  File "D:/folder/file.py", line 300, in <module>
    luigi.build([Task1, Task2, Task3, Task4], workers=5, local_scheduler=True)
  File "C:\Users\Anaconda3\lib\site-packages\luigi\interface.py", line 237, in build
    luigi_run_result = _schedule_and_run(tasks, worker_scheduler_factory, override_defaults=env_params)
  File "C:\Users\Anaconda3\lib\site-packages\luigi\interface.py", line 171, in _schedule_and_run
    success &= worker.add(t, env_params.parallel_scheduling, env_params.parallel_scheduling_processes)
  File "C:\Users\Anaconda3\lib\site-packages\luigi\worker.py", line 740, in add
    self._validate_task(task)
  File "C:\Users\Anaconda3\lib\site-packages\luigi\worker.py", line 638, in _validate_task
    raise TaskException('Can not schedule non-task %s' % task)
luigi.worker.TaskException: Can not schedule non-task <class '__main__.Task1'>
EN

回答 1

Stack Overflow用户

回答已采纳

发布于 2019-09-03 03:43:29

首先,您需要为所有任务指定输出。没有输出,luigi不知道任务何时完成。其次(这是你的实际问题所在),你还没有实例化你正在传递的任务。您只需要创建实例,请尝试:

代码语言:javascript
运行
复制
luigi.build([Task1(), Task2(), Task3(), Task4()], workers=5, local_scheduler=True)

然而,我觉得有必要指出其他几件事:

1)由于您的每个任务都指定了之前需要的任务,所以您只需要告诉luigi运行链中的最后一个任务,所以:

代码语言:javascript
运行
复制
luigi.build([Task4()], workers=5, local_scheduler=True)

这将告诉luigi它需要完成任务4。为了完成任务4,luigi将查看任务4所需的内容,并查看任务3。然后,它将查看任务3运行所需的内容,并查看任务2等。Luigi将自动为您构建图表,并按满足每个任务的依赖关系的顺序运行它们。

最后,在.build中给luigi分配任务的顺序总体上影响不大。这是因为luigi不会根据您给出的顺序来确定顺序,而是通过依赖图和task priority来确定顺序。

编辑:如果您需要对另一个任务执行多个任务,您可以简单地这样做:

代码语言:javascript
运行
复制
class Task4(luigi.Task):
  def requires(self):
    return [Task1(), Task2(), Task3()]
票数 3
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/57685815

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档