我是新来的省长,主要工作是气流。我已经建立了一个工作流,它可以很好地执行,但是任务没有按照我所期望的顺序执行。在这里流动:
with Flow(name='4chan_extract') as flow:
board_param = Parameter(name='board_name', required = True, default='pol')
getData(board= board_param)
checkDB(url= 'postgresql://postgres:user@localhost:5434/postgres')
upload_raw(url="postgresql://postgres:user@localhost:5434/postgres",
board=board_param)
remove_dupes(board=board_param)
但是,当我使用flow.visualise()
这个流时,DAG看起来确实是很奇怪。。
我的理解是上下文运算符with
设置顺序?在每个任务中使用up_stream
都没有帮助。
任何帮助都是非常感谢的。
发布于 2021-12-21 11:49:59
如果希望依次调用任务,可以将upstream_tasks
添加到每个任务中。此外,为了轻松传递状态依赖项,您可以在调用任务(data = get_data(board=board_param)
)时为它分配一个名称,这允许将这个命名的引用传递给下游依赖项。
我只能猜测您希望这个流看起来如何,但假设您希望它按顺序运行,下面是一个完整的示例和一个DAG可视化:
from prefect import task, Flow, Parameter
@task
def get_data(board):
pass
@task
def check_db(url):
pass
@task
def upload_raw(url, board):
pass
@task
def remove_duplicates(board):
pass
with Flow(name="4chan_extract") as flow:
board_param = Parameter(name="board_name", required=True, default="pol")
data = get_data(board=board_param)
check = check_db(
url="postgresql://postgres:user@localhost:5434/postgres", upstream_tasks=[data]
)
upload = upload_raw(
url="postgresql://postgres:user@localhost:5434/postgres",
board=board_param,
upstream_tasks=[check],
)
remove_duplicates(board=board_param, upstream_tasks=[upload])
if __name__ == "__main__":
flow.visualize()
https://stackoverflow.com/questions/70433140
复制相似问题