Airflow是一个工作流分配管理系统,通过有向非循环图的方式管理任务流程,设置任务依赖关系和时间调度。
Airflow独立于我们要运行的任务,只需要把任务的名字和运行方式提供给Airflow作为一个task就可以。
在Linux终端运行如下命令 (需要已安装好python2.x
和pip
):
pip install airflow
pip install "airflow[crypto, password]"
安装成功之后,执行下面三步,就可以使用了。默认是使用的SequentialExecutor
, 只能顺次执行任务。
airflow initdb
[必须的步骤]airflow webserver -p 8080
[方便可视化管理dag]airflow scheduler
[scheduler启动后,DAG目录下的dags就会根据设定的时间定时启动]airflow test ct1 print_date 2016-05-14
最新版本的Airflow可从https://github.com/apache/incubator-airflow下载获得,解压缩按照安装python包的方式安装。
mysql
以启用LocalExecutor
和CeleryExecutor
airflow.cfg
文件通常在~/airflow
目录下airflow initdb
注:作为测试使用,此步可以跳过, 最后的生产环境用的是CeleryExecutor; 若CeleryExecutor配置不方便,也可使用LocalExecutor。
前面数据库已经配置好了,所以如果想使用LocalExecutor就只需要修改airflow配置文件就可以了。airflow.cfg
文件通常在~/airflow
目录下,打开更改executor
为 executor = LocalExecutor
即完成了配置。
把文后TASK部分的dag文件拷贝几个到~/airflow/dags
目录下,顺次执行下面的命令,然后打开网址http://127.0.0.1:8080就可以实时侦测任务动态了:
ct@server:~/airflow: airflow initdb` (若前面执行过,就跳过)
ct@server:~/airflow: airflow webserver --debug &
ct@server:~/airflow: airflow scheduler
yum
或apt-get
安装则万事大吉。rabbitmq-server -detached
chkconfig rabbitmq-server on
airflow.cfg
文件通常在~/airflow
目录下executor = CeleryExecutor
redis-server
启动redis
使用ps -ef | grep 'redis'
检测后台进程是否存在
检测6379端口是否在监听netstat -lntp | grep 6379
开机启动redis: chkconfig redis-server
airflow.cfg
文件通常在~/airflow
目录下executor = CeleryExecutor
airflow backend
的数据库, 可使用airflow resetdb
清空。ps -ef|grep -Ei '(airflow-webserver)'| grep master | awk '{print $2}'|xargs -i kill {}
airflow webserver --debug
airflow worker
airflow scheduler
#!/bin/bash
#set -x
#set -e
set -u
usage()
{
cat <<EOF
${txtcyn}
Usage:
$0 options${txtrst}
${bldblu}Function${txtrst}:
This script is used to start or restart webserver service.
${txtbld}OPTIONS${txtrst}:
-S Start airflow system [${bldred}Default FALSE${txtrst}]
-s Restart airflow server only [${bldred}Default FALSE${txtrst}]
-a Restart all airflow programs including webserver, worker and
scheduler. [${bldred}Default FALSE${txtrst}]
EOF
}
start_all=
server_only=
all=
while getopts "hs:S:a:" OPTION
do
case $OPTION in
h)
usage
exit 1
;;
S)
start_all=$OPTARG
;;
s)
server_only=$OPTARG
;;
a)
all=$OPTARG
;;
?)
usage
exit 1
;;
esac
done
if [ -z "$server_only" ] && [ -z "$all" ] && [ -z "${start_all}" ]; then
usage
exit 1
fi
if [ "$server_only" == "TRUE" ]; then
ps -ef | grep -Ei '(airflow-webserver)' | grep master | \
awk '{print $2}' | xargs -i kill {}
cd ~/airflow/
nohup airflow webserver >webserver.log 2>&1 &
fi
if [ "$all" == "TRUE" ]; then
ps -ef | grep -Ei 'airflow' | grep -v 'grep' | awk '{print $2}' | xargs -i kill {}
cd ~/airflow/
nohup airflow webserver >>webserver.log 2>&1 &
nohup airflow worker >>worker.log 2>&1 &
nohup airflow scheduler >>scheduler.log 2>&1 &
fi
if [ "${start_all}" == "TRUE" ]; then
cd ~/airflow/
nohup airflow webserver >>webserver.log 2>&1 &
nohup airflow worker >>worker.log 2>&1 &
nohup airflow scheduler >>scheduler.log 2>&1 &
fi
dags_folder
目录支持子目录和软连接,因此不同的dag可以分门别类的存储起来。airflow.cfg
中的下面3行配置
authenticate = True auth_backend = airflow.contrib.auth.backends.password_auth filter_by_owner = Trueemail_on_retry: True
retry_delay
,方便在收到邮件后,能有时间做出处理retry_delay
,方便快速启动depends_on_past
Airflow assumes idempotent tasks that operate on immutable data chunks. It also assumes that all task instance (each task for each schedule) needs to run.
If your tasks need to be executed sequentially, you need to tell Airflow: use the depends_on_past=True
flag on the tasks that require sequential execution.)
如果在TASK本该运行却没有运行时,或者设置的interval
为@once
时,推荐使用depends_on_past=False
。我在运行dag时,有时会出现,明明上游任务已经运行结束,下游任务却没有启动,整个dag就卡住了。这时设置depends_on_past=False
可以解决这类问题。timestamp
in format like 2016-01-01T00:03:00
Graph view
中点击run
手动重启。 为了方便任务修改后的顺利运行,有个折衷的方法是:python dag.py
ct2.py
from airflow import DAG from airflow.operators import BashOperator from datetime import datetime, timedelta one_min_ago = datetime.combine(datetime.today() - timedelta(minutes=1), datetime.min.time()) default_args = { 'owner': 'airflow', 'depends_on_past': True, 'start_date': one_min_ago, 'email': ['chentong_biology@163.com'], 'email_on_failure': True, 'email_on_retry': True, 'retries': 5, 'retry_delay': timedelta(hours=30), #'queue': 'bash_queue', #'pool': 'backfill', #'priority_weight': 10, #'end_date': datetime(2016, 5, 29, 11, 30), } dag = DAG('ct2', default_args=default_args, schedule_interval="@once") t1 = BashOperator( task_id='run1', bash_command='(cd /home/ct/test; bash run1.sh -f ct_t1) ', dag=dag) t2 = BashOperator( task_id='run2', bash_command='(cd /home/ct/test; bash run2.sh -f ct_t1) ', dag=dag) t2.set_upstream(t1)conf
parameter that gets exposed in the “context” (templates, operators, …). That is the place where you would associate parameters to a specific run. For now this is only possible in the context of an externally triggered DAG run. The way the TriggerDagRunOperator
works, you can fill in the conf param during the execution of the callable that you pass to the operator.
If you are looking to change the shape of your DAG through parameters, we recommend doing that using “singleton” DAGs (using a “@once”schedule_interval
), meaning that you would write a Python program that generates multiple dag_ids, one of each run, probably based on metadata stored in a config file or elsewhere.
The idea is that if you use parameters to alter the shape of your DAG, you break some of the assumptions around continuity of the schedule. Things like visualizing the tree view or how to perform a backfill becomes unclear and mushy. So if the shape of your DAG changes radically based on parameters, we consider those to be different DAGs, and you generate each one in your pipeline file.backfill
填补特定时间段的任务airflow backfill -s START -e END --mark_success DAG_ID
ssh -v -4 -NF -R 5672:127.0.0.1:5672 aliyun
ssh -R <local port>:<remote host>:<remote port> <SSH hostname>
local port
表示hostname的port
Remote connections from LOCALHOST:5672 forwarded to local address 127.0.0.1:5672
rabbitmq 5672
端口。webserver
scheduler
, 在内网服务器启动airflow worker
发现任务执行状态丢失。继续学习Celery,以解决此问题。start_date
和end_date
是否在合适的时间范围内airflow worker
, airflow scheduler
和airflow webserver --debug
的输出,有没有某个任务运行异常logs
文件夹下的日志输出dag
一个新的dag_id
airflow resetdb
DROP DATABASE airflow
airflow initdb
get error like “You have an error in your SQL syntax; check the manual that corresponds to your MySQL server version for the right syntax to use near ‘(6) NULL’ at line 1” ) [SQL: u’ALTER TABLE dag MODIFY last_scheduler_run DATETIME(6) NULL’
Install mysql5.7, clicke here for ref.airflow.operators.PigOperator
is no longer supported; from airflow.operators.pig_operator import PigOperator
from airflow.operators import BashOperator
to from airflow.operators.bash_operator import BashOperator