目标:了解AirFlow的架构组件
路径
实施
架构
组件
A scheduler, which handles both triggering scheduled workflows, and submitting Tasks to the executor to run.
An executor, which handles running tasks. In the default Airflow installation, this runs everything inside the scheduler, but most production-suitable executors actually push task execution out to workers.
A webserver, which presents a handy user interface to inspect, trigger and debug the behaviour of DAGs and tasks.
A folder of DAG files, read by the scheduler and executor (and any workers the executor has)
A metadata database, used by the scheduler, executor and webserver to store state.
小结
目标:掌握AirFlow的开发规则
路径
实施
官方文档
示例:http://airflow.apache.org/docs/apache-airflow/stable/tutorial.html
开发Python调度程序
开发一个Python程序,程序文件中需要包含以下几个部分
注意:该文件的运行不支持utf8编码,不能写中文
step1:导包
# 必选:导入airflow的DAG工作流
from airflow import DAG
# 必选:导入具体的TaskOperator类型
from airflow.operators.bash import BashOperator
# 可选:导入定时工具的包
from airflow.utils.dates import days_ago
step2:定义DAG及配置
# 当前工作流的基础配置
default_args = {
# 当前工作流的所有者
'owner': 'airflow',
# 当前工作流的邮件接受者邮箱
'email': ['airflow@example.com'],
# 工作流失败是否发送邮件告警
'email_on_failure': True,
# 工作流重试是否发送邮件告警
'email_on_retry': True,
# 重试次数
'retries': 2,
# 重试间隔时间
'retry_delay': timedelta(minutes=1),
}
# 定义当前工作流的DAG对象
dagName = DAG(
# 当前工作流的名称,唯一id
'airflow_name',
# 使用的参数配置
default_args=default_args,
# 当前工作流的描述
description='first airflow task DAG',
# 当前工作流的调度周期:定时调度【可选】
schedule_interval=timedelta(days=1),
# 工作流开始调度的时间
start_date=days_ago(1),
# 当前工作流属于哪个组
tags=['itcast_bash'],
)
step3:定义Tasks
Task类型:http://airflow.apache.org/docs/apache-airflow/stable/concepts/operators.html
常用
BashOperator
- executes a bash command PythonOperator
- calls an arbitrary Python function EmailOperator
- sends an email 其他
MySqlOperator
PostgresOperator
MsSqlOperator
OracleOperator
JdbcOperator
DockerOperator
HiveOperator
PrestoToMySqlOperator
BashOperator:定义一个Shell命令的Task
# 导入BashOperator
from airflow.operators.bash import BashOperator
# 定义一个Task的对象
t1 = BashOperator(
# 指定唯一的Task的名称
task_id='first_bashoperator_task',
# 指定具体要执行的Linux命令
bash_command='echo "hello airflow"',
# 指定属于哪个DAG对象
dag=dagName
)
PythonOperator:定义一个Python代码的Task
# 导入PythonOperator
from airflow.operators.python import PythonOperator
# 定义需要执行的代码逻辑
def sayHello():
print("this is a programe")
#定义一个Task对象
t2 = PythonOperator(
# 指定唯一的Task的名称
task_id='first_pyoperator_task',
# 指定调用哪个Python函数
python_callable=sayHello,
# 指定属于哪个DAG对象
dag=dagName
)
step4:运行Task并指定依赖关系
定义Task
Task1:runme_0
Task2:runme_1
Task3:runme_2
Task4:run_after_loop
Task5:also_run_this
Task6:this_will_skip
Task7:run_this_last
需求
代码
task1 >> task4
task2 >> task4
task3 >> task4
task4 >> task7
task5 >> task7
task6 >> task7
如果只有一个Task,只要直接写上Task对象名称即可
task1
提交Python调度程序
哪种提交都需要等待一段时间
自动提交:需要等待自动检测
手动提交:手动运行文件让airflow监听加载
python xxxx.py
调度状态
小结