Ariflow 用 Python 编写的工作流调度器,你可以在上面定义管理执行任务流。简单来说,它可以用来调度你写的 Python 脚本,能实现对你脚本执行过程的监控以及日志的输出,一个脚本可以包括多个任务步骤,组成业务上需要的工作流水线。
概念
- DAG: 无环有向图,简单可以粗暴的理解为一个流水线。
- TASK:流水线中的所需要调度的步骤,这是一个静态概念。
- TASK Instance:当真正进行调度的过程中,一个TASK真的被执行的实体。
下图是展示一些 dags 历史执行情况,绿色表示成功,红色表示失败,任务执行可以在Web UI 上点击运行dag,也可以通过调用 Airflow 的 API 接口运行指定的 dag 。还可以设置定时任务,让任务根据设置的时间周期自动触发运行。
在页面上还能看到某个 dag 的任务步骤依赖关系,下图是用的最简单的串行
下面展示的是每个步骤的历史执行情况
在代码中按照规定好的语法就能设置每个 dag 的子任务以及每个子任务之间的依赖关系(绿框)
对于开发人员来说,使用 Airflow 就是编写 dags 文件
编写 DAG 的流程:
先用装饰器@dag 定义一个 DAG,dag_id就是网页上DAG的名称,这个必须是唯一的,不允许和其他的dag重复。
然后定义一个函数,函数里面再定义你的任务函数,并用@task对任务函数装饰,表名这个函数是某个任务步骤。get_current_context() 是 Airflow 自带的函数,获取上下文信息,包含给DAG传递的参数,通过 parmas 这个 key 获取。如果下一个任务需要上一个任务的输出结果,可以把上一个任务作为下个任务的输入参数, 使用 》这个符号将每个任务关系串联起来
还可以给任务装饰器传入参数,可以设置该任务失败后执行的操作或者等待所有父任务执行完再操作等。
编写完 dags 下面的 py 脚本之后,需要给脚本增加允许执行的权限。
本文分享自 pythonista的日常 微信公众号,前往查看
如有侵权,请联系 cloudcommunity@tencent.com 删除。
本文参与 腾讯云自媒体同步曝光计划 ,欢迎热爱写作的你一起参与!