工作流(Workflow),指“业务过程的部分或整体在计算机应用环境下的自动化”。是对工作流程及其各操作步骤之间业务规则的抽象、概括描述。工作流解决的主要问题是:为了实现某个业务目标,利用计算机软件在多个参与者之间按某种预定规则自动传递文档、信息或者任务。
一个完整的数据分析系统通常都是由多个前后依赖的模块组合构成的:数据采集、数据预处理、数据分析、数据展示等。各个模块单元之间存在时间先后依赖关系,且存在着周期性重复。
为了很好地组织起这样的复杂执行计划,需要一个工作流调度系统来调度执行。
crontab
使用linux的crontab来定义调度,但是缺点比较明显,无法设置依赖复杂任务调度。且需要编写相关shell脚本。
当下企业两种选择,
知名度比较高的是Apache Oozie,但是其配置工作流的过程是编写大量的XML配置,而且代码复杂度比较高,不易于二次开发。
下面的表格对四种hadoop工作流调度器的关键特性进行了比较,尽管这些工作流调度器能够解决的需求场景基本一致,但在设计理念,目标用户,应用场景等方面还是存在显著的区别,在做技术选型的时候,可以提供参考。
特性 | Hamake | Oozie | Azkaban | Cascading |
---|---|---|---|---|
工作流描述语言 | XML | XML (xPDL based) | text file with key/value pairs | Java API |
依赖机制 | data-driven | explicit | explicit | explicit |
是否要web容器 | No | Yes | Yes | No |
进度跟踪 | console/log messages | web page | web page | Java API |
Hadoop job调度支持 | no | yes | yes | yes |
运行模式 | command line utility | daemon | daemon | API |
Pig支持 | yes | yes | yes | yes |
事件通知 | no | no | no | yes |
需要安装 | no | yes | yes | no |
支持的hadoop版本 | 0.18+ | 0.20+ | currently unknown | 0.18+ |
重试支持 | no | workflownode evel | yes | yes |
运行任意命令 | yes | yes | yes | yes |
Amazon EMR支持 | yes | no | currently unknown | yes |
Azkaban是由linkedin(领英)公司推出的一个批量工作流任务调度器,用于在一个工作流内以一个特定的顺序运行一组工作和流程。
Azkaban使用job配置文件建立任务之间的依赖关系,并提供一个易于使用的web用户界面维护和跟踪你的工作流。
Azkaban功能特点:
3.1、solo server mode
该模式中webServer和executorServer运行在同一个进程中,进程名是AzkabanSingleServer。使用自带的H2数据库。这种模式包含Azkaban的所有特性,但一般用来学习和测试。
3.2、two-server mode
该模式使用MySQL数据库, Web Server和Executor Server运行在不同的进程中。
3.3、multiple-executor mode
该模式使用MySQL数据库, Web Server和Executor Server运行在不同的机器中。且有多个Executor Server。该模式适用于大规模应用。
访问Web Server=>http://node2:8081/ 默认用户名密码azkaban
http://node2:8081/index登录=>Create Project=>Upload zip包 =>execute flow执行一步步操作即可。
创建两个文件one.job two.job,内容如下,打包成zip包。
cat one.job
type=command
command=echo "this is job one"
cat two.job
type=command
dependencies=one
command=echo "this is job two"
创建工程:
上传zip压缩包:
execute执行:
执行页面:
执行结果查看:
vi command.job
#command.job type=command command=echo 'hello'
zip command.job
# foo.job type=command command=echo foo
# bar.job type=command dependencies=foo command=echo bar
# fs.job type=command command=hadoop fs -mkdir /azaz
除了手动立即执行工作流任务外,azkaban也支持配置定时任务调度。开启方式如下:
首页选择待处理的project
上述图片中,选择左边schedule表示配置定时调度信息,选择右边execute表示立即执行工作流任务。
shell脚本(scheduler.sh)
#!/bin/sh
cls=$1
flag=0
clsDwd=cn.it.logistics.offline.dwd.${cls}DWD
clsDws=cn.it.logistics.offline.dws.${cls}DWS
baseDir=/export/services/logistics/lib/
if [[ $cls = "Customer" || $cls = "ExpressBill" || $cls = "TransportTool" || $cls = "Warehouse" || $cls = "Waybill" ]]; then
echo -e "\e[32m==== MainClass is: "$clsDwd" and "$clsDws"\e[0m"
flag=1
else
echo -e "\e[31mUsage : \n\tExpressBill\n\tCustomer\n\tTransportTool\n\tWarehouse\n\tWaybill\e[0m"
fi
if [[ $flag = 1 ]]; then
echo -e "\e[32m==== builder spark commands ====\e[0m"
cmd1="spark-submit --packages org.apache.kudu:kudu-spark2_2.11:1.9.0-cdh6.2.1 --class ${clsDwd} --master yarn --deploy-mode cluster --driver-memory 512m --executor-cores 1 --executor-memory 512m --queue default --verbose ${baseDir}logistics-offline-1.0-SNAPSHOT.jar"
cmd2="spark-submit --packages org.apache.kudu:kudu-spark2_2.11:1.9.0-cdh6.2.1 --class ${clsDws} --master yarn --deploy-mode cluster --driver-memory 512m --executor-cores 1 --executor-memory 512m --queue default --verbose ${baseDir}logistics-offline-1.0-SNAPSHOT.jar"
echo -e "\e[32m==== CMD1 is: $cmd1 ====\e[0m"
echo -e "\e[32m==== CMD2 is: $cmd2 ====\e[0m"
fi
if [[ $flag = 1 && `ls -A $baseDir|wc -w` = 1 ]]; then
echo -e "\e[32m==== start execute ${clsDwd} ====\e[0m"
sh $cmd1
echo -e "\e[32m==== start execute ${clsDws} ====\e[0m"
sh $cmd2
else
echo -e "\e[31m==== The jar package in $baseDir directory does not exist! ====\e[0m"
echo -e "\e[31m==== Plase upload logistics-common.jar,logistics-etl.jar,logistics-generate.jar ====\e[0m"
fi
#command
type=command
command=sh /export/services/logistics/bin/scheduler.sh ExpressBill
#command
type=command
command=sh /export/services/logistics/bin/scheduler.sh Waybill
#command
type=command
command=sh /export/services/logistics/bin/scheduler.sh Warehouse
#command
type=command
command=sh /export/services/logistics/bin/scheduler.sh TransportTool
#command
type=command
command=sh /export/services/logistics/bin/scheduler.sh Customer