目标:掌握定时调度的使用方式
实施
http://airflow.apache.org/docs/apache-airflow/stable/dag-run.html
方式一:内置
with DAG(
dag_id='example_branch_operator',
default_args=args,
start_date=days_ago(2),
schedule_interval="@daily",
tags=['example', 'example2'],
) as dag:
方式二:datetime.timedelta对象
timedelta(minutes=1)
timedelta(hours=3)
timedelta(days=1)
with DAG(
dag_id='latest_only',
schedule_interval=dt.timedelta(hours=4),
start_date=days_ago(2),
tags=['example2', 'example3'],
) as dag:
方式三:Crontab表达式
与Linux Crontab用法一致
with DAG(
dag_id='example_branch_dop_operator_v3',
schedule_interval='*/1 * * * *',
start_date=days_ago(2),
default_args=args,
tags=['example'],
) as dag:
分钟 小时 日 月 周
00 00 * * *
05 12 1 * *
30 8 * * 4
小结
目标:了解AirFlow的常用命令
实施
列举当前所有的dag
airflow dags list
暂停某个DAG
airflow dags pause dag_name
启动某个DAG
airflow dags unpause dag_name
删除某个DAG
airflow dags delete dag_name
执行某个DAG
airflow dags trigger dag_name
查看某个DAG的状态
airflow dags state dag_name
列举某个DAG的所有Task
airflow tasks list dag_name
小结
目标:了解AirFlow中如何实现邮件告警
路径
实施
原理:自动发送邮件的原理:邮件第三方服务
发送方账号:配置文件中配置
smtp_user = 12345678910@163.com
# 秘钥id:需要自己在第三方后台生成
smtp_password = 自己生成的秘钥
# 端口
smtp_port = 25
# 发送邮件的邮箱
smtp_mail_from = 12345678910@163.com
接收方账号:程序中配置
default_args = {
'owner': 'airflow',
'email': ['jiangzonghai@itcast.cn'],
'email_on_failure': True,
'email_on_retry': True,
'retries': 1,
'retry_delay': timedelta(minutes=1),
}
AirFlow配置:airflow.cfg
# 发送邮件的代理服务器地址及认证:每个公司都不一样
smtp_host = smtp.163.com
smtp_starttls = True
smtp_ssl = False
# 发送邮件的账号
smtp_user = 12345678910@163.com
# 秘钥id:需要自己在第三方后台生成
smtp_password = 自己生成的秘钥
# 端口
smtp_port = 25
# 发送邮件的邮箱
smtp_mail_from = 12345678910@163.com
# 超时时间
smtp_timeout = 30
# 重试次数
smtp_retry_limit = 5
关闭Airflow
# 统一杀掉airflow的相关服务进程命令
ps -ef|egrep 'scheduler|flower|worker|airflow-webserver'|grep -v grep|awk '{print $2}'|xargs kill -9
# 下一次启动之前
rm -f /root/airflow/airflow-*
程序配置
default_args = {
'email': ['jiangzonghai@itcast.cn'],
'email_on_failure': True,
'email_on_retry': True
}
启动Airflow
airflow webserver -D
airflow scheduler -D
airflow celery flower -D
airflow celery worker -D
模拟错误
小结
什么是分布式计算?
Spark程序的组成结构?
什么是Standalone?
为什么要用Spark on YARN?
Master和Worker是什么?
Driver和Executer是什么?
step1:启动了分布式资源平台
step2:开发一个分布式计算程序
sc = SparkContext(conf)
# step1:读取数据
inputRdd = sc.textFile(hdfs_path)
#step2:转换数据
wcRdd = inputRdd.filter.map.flatMap.reduceByKey
#step3:保存结果
wcRdd.foreach
sc.stop
step3:提交分布式程序到分布式资源集群运行
spark-submit xxx.py
executor个数和资源
driver资源配置
先启动Driver进程
再启动Executor进程:根据资源配置运行在Worker节点上
Job是怎么产生的?
DAG是怎么生成的?
Task的个数怎么决定?
Spark的算子分为几类?