首页
学习
活动
专区
圈层
工具
发布
首页
学习
活动
专区
圈层
工具
MCP广场
社区首页 >问答首页 >完善任务调度

完善任务调度
EN

Stack Overflow用户
提问于 2021-12-21 09:24:16
回答 1查看 368关注 0票数 1

我是新来的省长,主要工作是气流。我已经建立了一个工作流,它可以很好地执行,但是任务没有按照我所期望的顺序执行。在这里流动:

代码语言:javascript
运行
复制
with Flow(name='4chan_extract') as flow:
    board_param = Parameter(name='board_name', required = True, default='pol')
    getData(board= board_param)
    checkDB(url= 'postgresql://postgres:user@localhost:5434/postgres')
    upload_raw(url="postgresql://postgres:user@localhost:5434/postgres", 
    board=board_param)
    remove_dupes(board=board_param)

但是,当我使用flow.visualise()这个流时,DAG看起来确实是很奇怪。

我的理解是上下文运算符with设置顺序?在每个任务中使用up_stream都没有帮助。

任何帮助都是非常感谢的。

EN

回答 1

Stack Overflow用户

回答已采纳

发布于 2021-12-21 11:49:59

如果希望依次调用任务,可以将upstream_tasks添加到每个任务中。此外,为了轻松传递状态依赖项,您可以在调用任务(data = get_data(board=board_param))时为它分配一个名称,这允许将这个命名的引用传递给下游依赖项。

我只能猜测您希望这个流看起来如何,但假设您希望它按顺序运行,下面是一个完整的示例和一个DAG可视化:

代码语言:javascript
运行
复制
from prefect import task, Flow, Parameter


@task
def get_data(board):
    pass


@task
def check_db(url):
    pass


@task
def upload_raw(url, board):
    pass


@task
def remove_duplicates(board):
    pass


with Flow(name="4chan_extract") as flow:
    board_param = Parameter(name="board_name", required=True, default="pol")
    data = get_data(board=board_param)
    check = check_db(
        url="postgresql://postgres:user@localhost:5434/postgres", upstream_tasks=[data]
    )
    upload = upload_raw(
        url="postgresql://postgres:user@localhost:5434/postgres",
        board=board_param,
        upstream_tasks=[check],
    )
    remove_duplicates(board=board_param, upstream_tasks=[upload])

if __name__ == "__main__":
    flow.visualize()

票数 2
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/70433140

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档