centos7
Airflow 1.10.6
Python 3.6.8
Mysql 5.6
redis 3.3
略(自行百度)
vim ~/.bashrc
# 添加一行环境变量
export AIRFLOW_HOME=/opt/airflow
source ~/.bashrc
export SLUGIFY_USES_TEXT_UNIDECODE=yes
# 生成配置文件,可能会报一些错请忽略,保证AIRFLOW_HOME目录下生成了.cfg及相关文件即证明本次执行成功
# 如果配置了pytho的环境变量直接执行
# 没配置在${PYTHON_HOME}/lib/python3.6/sit-packages/airflow/bin目录下执行`./airflow`
pip install apache-airflow
pip install 'apache-airflow[mysql]'
pip install 'apache-airflow[celery]'
pip install 'apache-airflow[redis]'
pip install pymysql
# sqlalchemy链接
sql_alchemy_conn = mysql+pymysql://root:root@10.1.49.71:3306/airflow?charset=utf8
# 配置执行器
executor=CeleryExecutor
# 配置celery的broker_url
broker_url = redis://lochost:5379/0
# 配置元数据信息管理
result_backend = db+mysql://username:password@localhost:3306/airflow
# 创建用户组和用户
groupadd airflow
useradd airflow -g airflow
# 将 {AIRFLOW_HOME}目录修用户组
cd /opt/
chgrp -R airflow airflow
初始化前请先创建airflow
数据库以免报错
airflow db init
# 前台启动web服务
airflow webserver
# 后台启动web服务
airflow webserver -D
# 前台启动scheduler
airflow schedule
# 后台启动scheduler
airflow scheduler -D
# worker主机只需用普通用户打开airflow worker
# 创建用户airflow
useradd airflow
# 对用户test设置密码
passwd airflow
# 在root用户下,改变airflow文件夹的权限,设为全开放
chmod -R 777 /opt/airflow
# 切换为普通用户,执行airflow worker命令就行
# 启动时发现普通用户读取的~/.bashrc文件 不一致 重新加入AIRFLOW_HOME 就可以了
# 如果在新建普通用户前配置好环境变量可能没有这个问题了 本人是在创建用户后修改了环境变量
airflow worker
# 执行worker之前运行临时变量(临时的不能永久使用)
export C_FORCE_ROOT="true"
# 不需要切换用户
cd /usr/local/python3/bin/
# 前台启动worker服务
airflow worker
# 后台启动work服务
airflow worker -D
default_timezone = Asia/Shanghai
参考如下:
cd /usr/local/lib/python3.6/site-packages/airflow
# 在 utc = pendulum.timezone(‘UTC’) 这行(第27行)代码下添加
from airflow.configuration import conf
try:
tz = conf.get("core", "default_timezone")
if tz == "system":
utc = pendulum.local_timezone()
else:
utc = pendulum.timezone(tz)
except Exception:
pass
# 修改utcnow()函数 (在第69行)
原代码 d = dt.datetime.utcnow()
修改为 d = dt.datetime.now()
# 在utc = pendulum.timezone(‘UTC’) 这行(第37行)代码下添加
from airflow.configuration import conf
try:
tz = conf.get("core", "default_timezone")
if tz == "system":
utc = pendulum.local_timezone()
else:
utc = pendulum.timezone(tz)
except Exception:
pass
把代码 var UTCseconds = (x.getTime() + x.getTimezoneOffset()*60*1000);
改为 var UTCseconds = x.getTime();
把代码 "timeFormat":"H:i:s %UTC%",
改为 "timeFormat":"H:i:s",
default_args = {
# 接受邮箱
'email': ['demo@qq.com''],
# task失败是否发送邮件
'email_on_failure': True,
# task重试是否发送邮件
'email_on_retry': False,
}
——————————————————————————————————————————————
airflow的全局变量中设置
在DAG中加入参数用于控制整个dag
dag = DAG(f"dag_name",
default_args=default_args,
schedule_interval="0 12 * * *",
max_active_runs = 1
)
在每个task中的Operator中设置参数
t3 = PythonOperator(
task_id='demo_task',
provide_context=True,
python_callable=demo_task,
task_concurrency=1,
dag=dag)
原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。
如有侵权,请联系 cloudcommunity@tencent.com 删除。
原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。
如有侵权,请联系 cloudcommunity@tencent.com 删除。