前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >Apache AirFlow 入门

Apache AirFlow 入门

原创
作者头像
HLee
修改2021-06-07 15:12:08
2.6K0
修改2021-06-07 15:12:08
举报
文章被收录于专栏:房东的猫

简介

Airflow 于 2014 年 10 月由 Airbnb 的 Maxime Beauchemin 开始。它是第一次提交的开源,并在 2015 年 6 月宣布正式加入 Airbnb Github。

Airflow是一个可编程,调度和监控的工作流平台,基于有向无环图(DAG),airflow可以定义一组有依赖的任务,按照依赖依次执行。airflow提供了丰富的命令行工具用于系统管控,而其web管理界面同样也可以方便的管控调度任务,并且对任务运行状态进行实时监控,方便了系统的运维和管理。

官方网站-AirFlow

AirFlow-中文文档

定义 Pipeline

导入模块

一个 Airflow 的 pipeline 就是一个 Python 脚本,这个脚本的作用是为了定义 Airflow 的 DAG 对象。让我们首先导入我们需要的库。

代码语言:javascript
复制
# DAG 对象; 我们将需要它来实例化一个 DAG
from airflow import DAG

# Operators 我们需要利用这个对象去执行流程
from airflow.operators.bash import BashOperator

默认参数

我们即将创建一个 DAG 和一些任务,我们可以选择显式地将一组参数传递给每个任务的构造函数,或者我们可以定义一个默认参数的字典,这样我们可以在创建任务时使用它。

代码语言:javascript
复制
from datetime import datetime, timedelta

default_args = {
    'owner': 'lihuan', # 拥有者名称
    'depends_on_past': False,
    'start_date': datetime(2015, 6, 1),  # 第一次开始执行的时间,为 UTC 时间
    'email': ['airflow@example.com'],
    'email_on_failure': False,
    'email_on_retry': False,
    'retries': 1,  # 失败重试次数
    'retry_delay': timedelta(minutes=5),  # 失败重试间隔
    # 'queue': 'bash_queue',
    # 'pool': 'backfill',
    # 'priority_weight': 10,
    # 'end_date': datetime(2016, 1, 1),
}

实例化一个 DAG

我们需要一个 DAG 对象来嵌入我们的任务。这里我们传递一个定义为dag_id的字符串,把它用作 DAG 的唯一标识符。我们还传递我们刚刚定义的默认参数字典,同时也为 DAG 定义schedule_interval,设置调度间隔为每天一次。

代码语言:javascript
复制
dag = DAG(
    dag_id = 'tutorial_airflow', 
    default_args = default_args, 
    schedule_interval = timedelta(days=1)
    
)

任务(Task)

在实例化 operator(执行器)时会生成任务。从一个 operator(执行器)实例化出来的对象的过程,被称为一个构造方法。第一个参数task_id充当任务的唯一标识符。

代码语言:javascript
复制
# operator 支持多种类型, 这里使用 BashOperator
t1 = BashOperator(
    task_id='print_date',
    bash_command='date',
    dag=dag
)

t2 = BashOperator(
    task_id='sleep',
    bash_command='sleep 5',
    retries=3,
    dag=dag
)

注意到我们传递了一个 BashOperator 特有的参数(bash_command)和所有的 operator 构造函数中都会有的一个参数(retries)。这比为每个构造函数传递所有的参数要简单很多。另请注意,在第二个任务中,我们使用3覆盖了默认的retries参数值。

任务参数的优先规则如下:

  1. 明确传递参数
  2. default_args字典中存在的值
  3. operator 的默认值(如果存在)

任务必须包含或继承参数task_idowner,否则 Airflow 将出现异常。

使用 Jinja 作为模版

Airflow 充分利用了Jinja Templating的强大功能,并为 pipline(管道)的作者提供了一组内置参数和 macros(宏)。Airflow 还为 pipline(管道)作者提供了自定义参数,macros(宏)和 templates(模板)的能力。

设置依赖关系

我们有三个不相互依赖任务,分别是t1t2t3。以下是一些可以定义它们之间依赖关系的方法:

代码语言:javascript
复制
t1.set_downstream(t2)

# 这意味着 t2 会在 t1 成功执行之后才会执行
# 与下面这种写法相等
t2.set_upstream(t1)

# 位移运算符也可用于链式运算
# 用于链式关系 和上面达到一样的效果
t1 >> t2

# 位移运算符用于上游关系中
t2 << t1


# 使用位移运算符能够链接
# 多个依赖关系变得简洁
t1 >> t2 >> t3

# 任务列表也可以设置为依赖项。
# 下面的这些操作都具有相同的效果:
t1.set_downstream([t2, t3])
t1 >> [t2, t3]
[t2, t3] << t1

请注意,在执行脚本时,在 DAG 中如果存在循环或多次引用依赖项时,Airflow 会引发异常。

回顾

到此,我们有了一个非常基本的 DAG。此时,您的代码应如下所示:

代码语言:javascript
复制
 """
Airflow 教程代码位于:
https://github.com/apache/airflow/blob/master/airflow/example_dags/tutorial.py
"""
from airflow import DAG
from airflow.operators.bash_operator import BashOperator
from datetime import datetime, timedelta

default_args = {
    'owner': 'airflow',
    'depends_on_past': False,
    'start_date': datetime(2015, 6, 1),
    'email': ['airflow@example.com'],
    'email_on_failure': False,
    'email_on_retry': False,
    'retries': 1,
    'retry_delay': timedelta(minutes=5),
    # 'queue': 'bash_queue',
    # 'pool': 'backfill',
    # 'priority_weight': 10,
    # 'end_date': datetime(2016, 1, 1),
}

dag = DAG(
    'tutorial', default_args=default_args, schedule_interval=timedelta(days=1))

# t1, t2 and t3 are examples of tasks created by instantiating operators
t1 = BashOperator(
    task_id='print_date',
    bash_command='date',
    dag=dag)

t2 = BashOperator(
    task_id='sleep',
    bash_command='sleep 5',
    retries=3,
    dag=dag)

templated_command = """
    { % f or i in range(5) %}
    echo "{{ ds }}"
    echo "{{ macros.ds_add(ds, 7)}}"
    echo "{{ params.my_param }}"
    { % e ndfor %}
"""

t3 = BashOperator(
    task_id='templated',
    bash_command=templated_command,
    params={'my_param': 'Parameter I passed in'},
    dag=dag)

t2.set_upstream(t1)
t3.set_upstream(t1)

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 简介
  • 定义 Pipeline
    • 导入模块
      • 默认参数
        • 实例化一个 DAG
          • 任务(Task)
            • 使用 Jinja 作为模版
              • 设置依赖关系
              • 回顾
              相关产品与服务
              命令行工具
              腾讯云命令行工具 TCCLI 是管理腾讯云资源的统一工具。使用腾讯云命令行工具,您可以快速调用腾讯云 API 来管理您的腾讯云资源。此外,您还可以基于腾讯云的命令行工具来做自动化和脚本处理,以更多样的方式进行组合和重用。
              领券
              问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档