前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >助力工业物联网,工业大数据之服务域:定时调度使用【三十四】

助力工业物联网,工业大数据之服务域:定时调度使用【三十四】

作者头像
Maynor
发布2023-08-18 08:52:59
2150
发布2023-08-18 08:52:59
举报
文章被收录于专栏:最新最全的大数据技术体系

12:定时调度使用

目标掌握定时调度的使用方式

实施

http://airflow.apache.org/docs/apache-airflow/stable/dag-run.html

image-20211005153849026
image-20211005153849026

方式一:内置

代码语言:javascript
复制
with DAG(
    dag_id='example_branch_operator',
    default_args=args,
    start_date=days_ago(2),
    schedule_interval="@daily",
    tags=['example', 'example2'],
) as dag:

方式二:datetime.timedelta对象

代码语言:javascript
复制
timedelta(minutes=1)
timedelta(hours=3)
timedelta(days=1)
代码语言:javascript
复制
with DAG(
    dag_id='latest_only',
    schedule_interval=dt.timedelta(hours=4),
    start_date=days_ago(2),
    tags=['example2', 'example3'],
) as dag:

方式三:Crontab表达式

与Linux Crontab用法一致

代码语言:javascript
复制
with DAG(
    dag_id='example_branch_dop_operator_v3',
    schedule_interval='*/1 * * * *',
    start_date=days_ago(2),
    default_args=args,
    tags=['example'],
) as dag:
代码语言:javascript
复制
分钟		小时		日			月			周
00		 00 	 	*			*			*
05		12			1			*			*
30		8			*			*			4

小结

  • 掌握定时调度的使用方式

13:Airflow常用命令

目标:了解AirFlow的常用命令

实施

列举当前所有的dag

代码语言:javascript
复制
airflow dags list

暂停某个DAG

代码语言:javascript
复制
airflow dags pause dag_name

启动某个DAG

代码语言:javascript
复制
airflow dags unpause dag_name

删除某个DAG

代码语言:javascript
复制
airflow dags delete dag_name

执行某个DAG

代码语言:javascript
复制
airflow dags  trigger dag_name

查看某个DAG的状态

代码语言:javascript
复制
airflow dags  state dag_name

列举某个DAG的所有Task

代码语言:javascript
复制
airflow tasks list dag_name

小结

  • 了解AirFlow的常用命令

14:邮件告警使用

目标:了解AirFlow中如何实现邮件告警

路径

  • step1:AirFlow配置
  • step2:DAG配置

实施

原理:自动发送邮件的原理:邮件第三方服务

发送方账号:配置文件中配置

代码语言:javascript
复制
smtp_user = 12345678910@163.com
# 秘钥id:需要自己在第三方后台生成
smtp_password = 自己生成的秘钥
# 端口
smtp_port = 25
# 发送邮件的邮箱
smtp_mail_from = 12345678910@163.com

接收方账号:程序中配置

代码语言:javascript
复制
default_args = {
    'owner': 'airflow',
    'email': ['jiangzonghai@itcast.cn'],
  'email_on_failure': True,
    'email_on_retry': True,
  'retries': 1,
    'retry_delay': timedelta(minutes=1),
}

AirFlow配置:airflow.cfg

代码语言:javascript
复制
# 发送邮件的代理服务器地址及认证:每个公司都不一样
smtp_host = smtp.163.com
smtp_starttls = True
smtp_ssl = False
# 发送邮件的账号
smtp_user = 12345678910@163.com
# 秘钥id:需要自己在第三方后台生成
smtp_password = 自己生成的秘钥
# 端口
smtp_port = 25
# 发送邮件的邮箱
smtp_mail_from = 12345678910@163.com
# 超时时间
smtp_timeout = 30
# 重试次数
smtp_retry_limit = 5

关闭Airflow

代码语言:javascript
复制
# 统一杀掉airflow的相关服务进程命令
ps -ef|egrep 'scheduler|flower|worker|airflow-webserver'|grep -v grep|awk '{print $2}'|xargs kill -9
# 下一次启动之前
rm -f /root/airflow/airflow-*

程序配置

代码语言:javascript
复制
default_args = {
    'email': ['jiangzonghai@itcast.cn'],
    'email_on_failure': True,
    'email_on_retry': True
}

启动Airflow

代码语言:javascript
复制
airflow webserver -D
airflow scheduler -D
airflow celery flower -D
airflow celery worker -D

模拟错误

image-20211005161322100
image-20211005161322100

小结

  • 了解AirFlow中如何实现邮件告警

15:一站制造中的调度

  • 目标:了解一站制造中调度的实现
  • 实施
    • ODS层 / DWD层:定时调度:每天00:05开始运行
    • dws(11)
      • dws耗时1小时
      • 从凌晨1点30分开始执行
    • dwb(16)
      • dwb耗时1.5小时
      • 从凌晨3点开始执行
    • st(10)
      • st耗时1小时
      • 从凌晨4点30分开始执行
    • dm(1)
      • dm耗时0.5小时
      • 从凌晨5点30分开始执行
  • 小结
    • 了解一站制造中调度的实现

16:回顾:Spark核心概念

image-20211014110944047
image-20211014110944047

什么是分布式计算?

  • 分布式程序:MapReduce、Spark、Flink程序
    • 多进程:一个程序由多个进程来共同实现,不同进程可以运行在不同机器上
    • 每个进程所负责计算的数据是不一样,都是整体数据的某一个部分
    • 自己基于MapReduce或者Spark的API开发的程序:数据处理的逻辑
      • 分逻辑
      • MR
        • ·MapTask进程:分片规则:基于处理的数据做计算
          • 判断:文件大小 / 128M > 1.1
          • 大于:按照每128M分
          • 小于:整体作为1个分片
          • 大文件:每128M作为一个分片
          • 一个分片就对应一个MapTask
        • ReduceTask进程:指定
      • Spark
        • Executor:指定
  • 分布式资源:YARN、Standalone资源容器
    • 将多台机器的物理资源:CPU、内存、磁盘从逻辑上合并为一个整体
    • YARN:ResourceManager、NodeManager【8core8GB】
      • 每个NM管理每台机器的资源
      • RM管理所有的NM
    • Standalone:Master、Worker
  • 实现统一的硬件资源管理:MR、Flink、Spark on YARN

Spark程序的组成结构?

  • Application:程序
  • 进程:一个Driver、多个Executor
  • 运行:多个Job、多个Stage、多个Task

什么是Standalone?

  • Spark自带的集群资源管理平台

为什么要用Spark on YARN?

  • 为了实现资源统一化的管理,将所有程序都提交到YARN运行

Master和Worker是什么?

  • 分布式主从架构:Hadoop、Hbase、Kafka、Spark……
    • 主:管理节点:Master
      • 接客
      • 管理从节点
      • 管理所有资源
    • 从:计算节点:Worker
      • 负责执行主节点分配的任务

Driver和Executer是什么?

step1:启动了分布式资源平台

step2:开发一个分布式计算程序

代码语言:javascript
复制
sc = SparkContext(conf)

# step1:读取数据
inputRdd = sc.textFile(hdfs_path)

#step2:转换数据
wcRdd = inputRdd.filter.map.flatMap.reduceByKey

#step3:保存结果
wcRdd.foreach

sc.stop

step3:提交分布式程序到分布式资源集群运行

代码语言:javascript
复制
spark-submit xxx.py
executor个数和资源
driver资源配置

先启动Driver进程

  • 申请资源:启动Executor计算进程
  • Driver开始解析代码,判断每一句代码是否产生job

再启动Executor进程:根据资源配置运行在Worker节点上

  • 所有Executor向Driver反向注册,等待Driver分配Task

Job是怎么产生的?

  • 当用到RDD中的数据时候就会触发Job的产生:所有会用到RDD数据的函数称为触发算子
  • DAGScheduler组件根据代码为当前的job构建DAG图

DAG是怎么生成的?

  • 算法:回溯算法:倒推
  • DAG构建过程中,将每个算子放入Stage中,如果遇到宽依赖的算子,就构建一个新的Stage
  • Stage划分:宽依赖
  • 运行Stage:按照Stage编号小的开始运行
    • 将每个Stage转换为一个TaskSet:Task集合

Task的个数怎么决定?

  • 一核CPU = 一个Task = 一个分区
  • 一个Stage转换成的TaskSet中有几个Task:由Stage中RDD的最大分区数来决定

Spark的算子分为几类?

  • 转换:Transformation
    • 返回值:RDD
    • 为lazy模式,不会触发job的产生
    • map、flatMap
  • 触发:Action
    • 返回值:非RDD
    • 触发job的产生
    • count、first
本文参与 腾讯云自媒体同步曝光计划,分享自作者个人站点/博客。
原始发表:2023-08-18,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 作者个人站点/博客 前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 12:定时调度使用
  • 13:Airflow常用命令
  • 14:邮件告警使用
  • 15:一站制造中的调度
  • 16:回顾:Spark核心概念
相关产品与服务
大数据
全栈大数据产品,面向海量数据场景,帮助您 “智理无数,心中有数”!
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档