前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >Airflow 实践笔记-从入门到精通二

Airflow 实践笔记-从入门到精通二

作者头像
大数据技术架构
发布2022-06-14 15:22:46
2.5K0
发布2022-06-14 15:22:46
举报

数据处理逻辑多,脚本相互依赖强,运维管理监测难,怎么办?!为了解决这些问题,最近比较深入研究Airflow的使用方法,重点参考了官方文档和Data Pipelines with Apache Airflow,特此笔记,跟大家分享共勉。

前面文章我们已经讲到了Airflow的搭建这里主要讲一下Airflow的其他特性。

DAG

配置表中的变量DAG_FOLDER是DAG文件存储的地址,DAG文件是定义任务流的python代码,airflow会定期去查看这些代码,自动加载到系统里面。

DAG是多个脚本处理任务组成的工作流pipeline,概念上包含以下元素

1) 各个脚本任务内容是什么

2) 什么时候开始执行工作流

3) 脚本执行的前后顺序是什么

针对1),通过operator来实现对任务的定义。Operator,翻译成“操作单元”,有很多种形式,可以是一个bash命令,也可以是一个python函数,或者是一个数据库连接任务。Airflow封装了很多operator,开发者基于需要来做二次开发。实际上各种形式的operator都是python语言写的对象。

针对2),在DAG的配置函数中有一个参数schedule_interval,约定被调度的频次,是按照每天、每周或者固定的时间来执行。这个参数,跟start_date开始时间和end_date结束时间(需要某个时间段后不需要执行该任务)配合着用,来约定什么时候跑这个DAG。logical date指的是这个DAG后续预计执行发生的时间。

下图是参数设置为@daily的执行节奏

airflow有事先定义好的参数,例如@daily,@hourly,@weekly等,一般场景下足够使用,如果需要更精细化的定义,可以使用cron-based配置方法、基于次数的方法。

Schedule本质上是一个while true循环,不断检查每个任务的状态,如果其上游任务都跑完,并且当前系统资源足够task slots,就会把该任务变成queued状态,等待executor去具体执行

针对3),使用>>或者<<来定义任务之间的依赖关系,例如start >> [fetch_weather, fetch_sales]意思是,start执行完以后,同时执行fetch_weather和fetch_sales。进一步定义[clean_weather, clean_sales] >> join_datasets,就会形成下面的DAG图。

注意:在图里面的分支,有的时候是都需要执行,有的时候可能两个分支会根据条件选择一个分支执行。这种分支判断(branch)的逻辑,可以在函数里面写,也可以通过brach operator实现。用后者的好处是,可以在DAG里面直观的看到具体执行的是哪个分支。

一般来讲,只有当上游任务“执行成功”时,才会开始执行下游任务。但是除了“执行成功all_success”这个条件以外,还有其他的trigger rule,例如one_success, one_failed(至少一个上游失败),none_failed ,none_skipped

DAG在配置的时候,可以配置同时运行的任务数concurrency,默认是16个。这个16,就是task slot,可以理解为资源,如果资源满了,具备运行条件的task就需要等待。

定义DAG的方式有两种:可以使用with语法,也可以使用修饰函数@dag。

代码语言:javascript
复制
with DAG(
    dag_id='example_bash_operator',
    schedule_interval='0 0 * * *',
    start_date=pendulum.datetime(2021, 1, 1, tz="UTC"),
    catchup=False,
    dagrun_timeout=datetime.timedelta(minutes=60),
    tags=['example', 'example2'],
    params={"example_key": "example_value"},
) as dag:

配置DAG的参数:

'depends_on_past': False,

前置任务成功后或者skip,才能运行

'email': ['airflow@example.com'],

警告邮件发件地址

'email_on_failure': False,

失败的时候发邮件

'email_on_retry': False,

任务重新尝试的时候发邮件

'retries': 1,

尝试次数

'retry_delay': timedelta(minutes=5),

尝试之间的间隔

'queue': 'bash_queue',

指定一个 队列 运行该任务,CeleryExecutor用到

'pool': 'backfill',

'priority_weight': 10,

任务优先级

'end_date': datetime(2016, 1, 1),

任务计划的截止时间

'wait_for_downstream': False,

如果前一个任务实例的下游任务没有跑完,该任务是否可以跑

'sla': timedelta(hours=2),

如果在规定的时间间隔内任务没有跑完,会发警告

'execution_timeout': timedelta(seconds=300),

如果执行超出所设置的时间,任务被当做失败

'on_failure_callback': some_function,

当任务失败时,调用的函数

'on_success_callback': some_other_function,

当任务成功时,调用的函数

'on_retry_callback': another_function,

当任务重新尝试的时候,调用的函数

'sla_miss_callback': yet_another_function,

'trigger_rule': 'all_success'

前置任务的执行状态符合什么条件时,该任务会被启动

tags:[‘example’]

相当于是对DAG的一个分类,方便在前台UI根据tag来进行查询

DAG Run是DAG运行一次的对象(记录),记录所包含任务的状态信息。如果所有的任务状态是success或者skipped,就是success;如果任务有failed或者upstream_failed,就是falied。

其中的run_id的前缀会有如下几个

  • scheduled__ 表明是不是定时的
  • backfill__ 表明是不是回填的
  • manual__ 表明是不是手动或者trigger的

启动DAG,除了根据定时方法,也可以通过CLI命令或者Rest api的方式。在调用的时候可以通过指定dag_run.conf,作为参数让DAG根据不同的参数处理不同的数据。

在定义DAG的时候,有时会使用Edge Labels,可以理解成是虚拟的节点,目的是为了在前端UI更方便看到任务之间的依赖关系(类似注释的方法)。为了提高相同DAG操作的复用性,可以使用subDAG或者Taskgroup。

Operator

在任务流中的具体任务执行中,需要依据一些外部条件,例如之前任务的执行时间、开始时间等。这些“公有变量参数”,我们称为模板参数。airflow利用Jinja templates,实现“公有变量”调用的机制。在bashoprator中引用,例如 {{ execution_date}}就代表一个参数。在前端UI中,点击graph中的具体任务,在点击弹出菜单中rendered tempalate可以看到该参数在具体任务中代表的值。

除了公有变量,如果operator之间要互相传递参数或者中间过程数据,例如一个operator要依赖另一个operator的输出结果进行执行,有以下几个方式

  • 使用XCom,有点像dict对象,存储在airflow的database里,但是不适合数据量大的场景。另外,XCom如果设置过多后,也无形中也增加了operator的约束条件且不容易直观发现。在前端UI的adimin-》Xcoms里可以看到各个DAG用到的值。Airflow2中允许自定义XCom,以数据库的形式存储,从而支持较大的数据。
代码语言:javascript
复制
# 从该实例中的xcom里面取 前面任务train_model设置的键值为model_id的值。
model_id = context["task_instance"].xcom_pull(
 task_ids="train_model", key="model_id")
  • 在operator中使用op_kwargs,里面配置模板参数
  • 存储在数据库,例如一个operator存储数据在外部数据库中,另一个operator查询该数据库获得数据
  • 使用Taskflow API,其实就是@task这样的修饰函数,被称为TaskFlow function。在python函数上使用修饰函数@task,就是pythonOperator,也可以用PythonOperator来定义任务逻辑。不同的task之间直接用函数嵌套的方式来交换信息,例如email_info = compose_email(get_ip())。这种方式跟传统的函数编程方式比较接近,同时也完成了依赖关系的定义,不需要使用>>来定义任务之间的依赖关系。这种@修饰函数的方式,目前只限于python类型的operator。task可以用原来1.0的方式来定义,也可以用@task的方式来定义,相互之间如果需要传递参数,可以使用.output的方法。task可以通过在函数参数中定义**kwargs,或者使用get_current_context,获得该任务执行期间的上下文信息。
  • Operator的类型有以下几种:
  • 1) DummyOperator

  • 作为一个虚拟的任务节点,使得DAG有一个起点,但实际不执行任务;或者是在上游几个分支任务的合并节点,为了清楚的现实数据逻辑。
  • 2)BashOperator
  • 当一个任务是执行一个shell命令,就可以用BashOperator。可以是一个命令,也可以指向一个具体的脚本文件。
  • 3) 条件分支判断

BranchDateTimeOperator

  • 在一个时间段内执行一种任务,否则执行另一个任务。Target_lower可以设置为None
代码语言:javascript
复制
cond1 = BranchDateTimeOperator(
 task_id='datetime_branch',
 follow_task_ids_if_true=['date_in_range'],
 follow_task_ids_if_false=['date_outside_range'],
 target_upper=pendulum.datetime(2020, 10, 10, 15, 0, 0),
 target_lower=pendulum.datetime(2020, 10, 10, 14, 0, 0),
 dag=dag,)

BranchDayOfWeekOperator

根据是哪一天来选择跑哪个任务

BranchPythonOperator

根据业务逻辑条件,选择下游的一个task运行

代码语言:javascript
复制
dummy_task_1 = DummyOperator(task_id='branch_true', dag=dag)
dummy_task_2 = DummyOperator(task_id='branch_false', dag=dag)

branch = BranchDayOfWeekOperator(
 task_id="make_choice",
 follow_task_ids_if_true="branch_true",
 follow_task_ids_if_false="branch_false",
 week_day="Monday",
)

# Run dummy_task_1 if branch executes on Monday
branch >> [dummy_task_1, dummy_task_2]

4)PythonOperator

用的最广泛的Operator,在airflow1.0的时候,定义pythonOperator会有两部分,一个是operator的申明,一个是python函数。这时候函数传参是需要用到op_args 或者op_kwargs或者templates_dict

代码语言:javascript
复制
def _get_data(output_path, **context):
year, month, day, hour, *_ = context["execution_date"].timetuple() url = ( 
 "https://dumps.wikimedia.org/other/pageviews/"
f"{year}/{year}-{month:0>2}/pageviews-{year}{month:0>2}{day:0>2}-{hour:0>2}0000.gz" ) 
 request.urlretrieve(url, output_path)
 
get_data = PythonOperator( 
task_id="get_data", 
python_callable=_get_data, 
op_args=["/tmp/wikipageviews.gz"], dag=dag, 
) 
 
def _calculate_stats(input_path, output_path):
 """Calculates event statistics."""
Path(output_path).parent.mkdir(exist_ok=True)
events = pd.read_json(input_path)
stats = events.groupby(["date", "user"]).size().reset_index() stats.to_csv(output_path, index=False) 
 
calculate_stats = PythonOperator(
 task_id="calculate_stats",
 python_callable=_calculate_stats,
 op_kwargs={
 "input_path": "/data/events.json",
 "output_path": "/data/stats.csv",
 },
dag=dag, )

在airflow2.0以后,用TaskFlow API以后,传参简单很多,就是当函数参数用即可。但是需要注意的是,这种传参本质上还是通过xcom来实现传递的,必须是可序列号的对象,所以参数必须是python最基本的数据类型,像dataframe就不能作为参数来传递。

代码语言:javascript
复制
@task(task_id="print_the_context")
def print_context(ds=None, **kwargs):
 """Print the Airflow context and ds variable from the context."""
 pprint(kwargs)
 print(ds)
 return 'Whatever you return gets printed in the logs'

5)图之间依赖关系的operator

如果两个任务流之间,存在一些依赖关系。

使用ExternalTaskSensor,根据另一个DAG中的某一个任务的执行情况,例如当负责下载数据的DAG完成以后,这个负责计算指标的DAG才能启动。

代码语言:javascript
复制
child_task1 = ExternalTaskSensor(
 task_id="child_task1",
 external_dag_id=parent_dag.dag_id,
 external_task_id=parent_task.task_id,
 timeout=600,
 allowed_states=['success'],
 failed_states=['failed', 'skipped'],
 mode="reschedule",
)

当 任务图的某一个任务的执行状态被清理(clear),其相应影响的另一个图的任务状态也要随着连带被清理,就要用上ExternalTaskMarker。使用TriggerDagRunOperator ,可以让DAG的某一个任务 启动另一个DAG

6)LatestOnlyOperator

LatestOnlyOperator,是为了标识该DAG是不是最新的执行时间,只有在最新的时候才有必要执行下游任务,例如部署模型的任务,只需要在最近一次的时间进行部署即可。

代码语言:javascript
复制
from airflow.operators.latest_only import LatestOnlyOperator 
latest_only = LatestOnlyOperator(
 task_id="latest_only",
 dag=dag,
)
train_model >> latest_only >> deploy_model

7)Sensor

Sensor 是用来判断外部条件是否成熟的感应器,例如判断输入文件是否到位(可以设置一个时间窗口内,例如到某个时间点之前检查文件是否到位),但是sensor很耗费计算资源(设置mode为reschedule可以减少开销,默认是poke),DAG会设置concurrency约定同时最多有多少个任务可以运行,称为task slot。所以一种办法是使用Deferrable Operators。FileSensor,判断是否文件存在了;自定义sensor,继承BaseSensorOperator,通过实现poke函数来实现检查逻辑

8)自定义Operator

  • Hook是一种自定义的operator,可以理解为与外部系统的接口函数,类似数据库连接对象,负责权限认证、连接和关闭的动作。根据需要我们也可以自己开发hook,继承自Baseoperator或者Basehook。例如PostgresHook会自动加载conn的连接字符串,连接目的数据库。具体连接数据库的字符串,可以在前台界面的Admin > Connections进行管理,然后在自己定义的hook里面有get_connection获得具体的连接字符串
  • 数据库operator,可以直接执行包含sql语句的文件。不同的数据库,需要安装对应的provider包,主要的作用是hook连接外部的数据库,管理连接池。
  • 自定义的operator,继承自Baseoperator,在方法execute里定义主要的操作逻辑。自定义Operator的初始函数中,如果参数的赋值会需要用到模板变量,可以在类定义中通过template_fields来指定是哪个参数会需要用到模板变量。
  • 在UI界面中展示自定义Operatior的样式,也可以在类中通过ui_color等属性进行定义。
  • 其他provider包提供的operator,例如连接AWS云服务器的operator,亚马逊云提供的模型训练的接口等,当然也可以自己来开发这些operator,继承baseoperator。
  • SparkSubmitOperator 可以调用另外一个spark实例,从而把复杂的处理工作交给spark处理
  • 自定义的operator,可以通过设置setup.py,形成package,方便其他人安装使用 https://link.zhihu.com/?target=https%3A//github.com/audreyr/cookiecutter-pypackage
代码语言:javascript
复制
#自定义一个从PostgreSQL取数,转移数据到S3的operator
def execute(self, context):
postgres_hook = PostgresHook(postgres_conn_id=self._postgres_conn_id) s3_hook = S3Hook(aws_conn_id=self._s3_conn_id) 
results = postgres_hook.get_records(self._query) s3_hook.load_string( 
Fetch records from the PostgreSQL database. 
 string_data=str(results),
 bucket_name=self._s3_bucket,
 key=self._s3_key,
)

关于dag和operator的相关特性介绍到此,后续会讲述Airflow的集群搭建(从入门到精通三),Dolphinscheduler , Dataworks(阿里云)的调度工具后续也会介绍,敬请期待。。。

本文参与 腾讯云自媒体同步曝光计划,分享自微信公众号。
原始发表:2022-06-03,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 大数据技术架构 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
相关产品与服务
数据库
云数据库为企业提供了完善的关系型数据库、非关系型数据库、分析型数据库和数据库生态工具。您可以通过产品选择和组合搭建,轻松实现高可靠、高可用性、高性能等数据库需求。云数据库服务也可大幅减少您的运维工作量,更专注于业务发展,让企业一站式享受数据上云及分布式架构的技术红利!
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档