【Oozie】Oozie中工作流workflow的定义及各种Action的配置运行

版权声明:本文为博主原创文章,转载请注明出处。 https://blog.csdn.net/gongxifacai_believe/article/details/81079134

1、Oozie中工作流的定义

Oozie中的工作流workflow包含控制流节点和Action节点。通过workflow.xml定义,通过schema进行约束。 Workflow的控制流节点包括:start、decision、fork、join、kill、end节点。 Action是执行或计算的任务,如 MapReduce job、Pig job、a shell command。跑的一个MapReduce任务就是一个MapReduce Action。Action节点有2个转移:ok和error。 Workflow的Action节点包括:MapReduce Action、Pig Action、Fs(HDFS)Action、Ssh Action、Sub-workflow Action、Java Action。Oozie的Workflow里面运行MapReduce、Hive、Sqoop或Shell脚本。 Action Extensions包括:Email Action、Shell Action、Hive Action、Hive 2 Action、Sqoop Action、Ssh Action、DistCp Action、Writing a Custom Action Executor。 Workflow的定义语言是基于XML的,叫做hPDL(Hadoop Process Defination Language)。节点名字范式:[a-zA-Z][\-_a-zA-Z0-9]*=,长度小于20个字符。 job.properties:用于指向workflow.xml文件所在的HDFS位置。 workflow.xml:包含start、action、kill、end。 lib 目录:存放依赖的jar包。

2、MapReduce Action

目的:使用Ooize调度MapReduce程序。 方式:将以前Java MapReduce程序中的Driver部分写到workflow.xml中的configuration里面。 示例:用Oozie安装包中自带的examples例子跑wordcount程序。 (1)在hdfs上创建wordcount输入文件夹,并向文件夹中上传wordcount要统计的文件。 hadoop-2.5.0-cdh5.3.6]$ bin/hdfs dfs -mkdir -p mapreduce/wordcount/input hadoop-2.5.0-cdh5.3.6]$ bin/hdfs dfs -put /opt/datas/wc.input mapreduce/wordcount/input (2)执行wordcount的jar包,指定输入输出路径。 hadoop-2.5.0-cdh5.3.6]$ bin/yarn jar share/hadoop/mapreduce/hadoop-mapreduce-examples-2.5.0-cdh5.3.6.jar wordcount mapreduce/wordcount/input mapreduce/wordcount/output (3)将oozie自带的examples例子拷贝到新建的目录下,作为mapreduce action的示例。 oozie-4.0.0-cdh5.3.6]$ mkdir oozie-apps oozie-4.0.0-cdh5.3.6]$ cd oozie-apps/ oozie-apps]$ cp -r ../examples/apps/map-reduce/ . oozie-apps]$ mv map-reduce/ mr-wordcount-wf (4)编辑配置文件/opt/cdh-5.3.6/oozie-4.0.0-cdh5.3.6/oozie-apps/mr-wordcount-wf/workflow.xml。

<workflow-app xmlns="uri:oozie:workflow:0.5" name="mr-wordcount-wf">
    <start to="mr-node-wordcount"/>
    <action name="mr-node-wordcount">
        <map-reduce>
            <job-tracker>${jobTracker}</job-tracker>
            <name-node>${nameNode}</name-node>
            <prepare>
                <delete path="${nameNode}/${oozieDataRoot}/${outputDir}"/>
            </prepare>
            <configuration>
                <property>
                    <name>mapred.mapper.new-api</name>
                    <value>true</value>
                </property>
                <property>
                    <name>mapred.reducer.new-api</name>
                    <value>true</value>
                </property>
                <property>
                    <name>mapreduce.job.queuename</name>
                    <value>${queueName}</value>
                </property>
                <property>
                    <name>mapreduce.job.map.class</name>
                    <value>com.ibeifeng.hadoop.senior.mapreduce.WordCount$WordCountMapper</value>
                </property>
                <property>
                    <name>mapreduce.job.reduce.class</name>
                    <value>com.ibeifeng.hadoop.senior.mapreduce.WordCount$WordCountReducer</value>
                </property>
                <property>
                    <name>mapreduce.map.output.key.class</name>
                    <value>org.apache.hadoop.io.Text</value>
                </property>
                <property>
                    <name>mapreduce.map.output.value.class</name>
                    <value>org.apache.hadoop.io.IntWritable</value>
                </property>
                <property>
                    <name>mapreduce.job.output.key.class</name>
                    <value>org.apache.hadoop.io.Text</value>
                </property>
                <property>
                    <name>mapreduce.job.output.value.class</name>
                    <value>org.apache.hadoop.io.IntWritable</value>
                </property>
                <property>
                    <name>mapreduce.input.fileinputformat.inputdir</name>
                    <value>${nameNode}/${oozieDataRoot}/${inputDir}</value>
                </property>
                <property>
                    <name>mapreduce.output.fileoutputformat.outputdir</name>
                    <value>${nameNode}/${oozieDataRoot}/${outputDir}</value>
                </property>
            </configuration>
        </map-reduce>
        <ok to="end"/>
        <error to="fail"/>
    </action>
    <kill name="fail">
        <message>Map/Reduce failed, error message[${wf:errorMessage(wf:lastErrorNode())}]</message>
    </kill>
    <end name="end"/>
</workflow-app>

(5)编辑配置文件/opt/cdh-5.3.6/oozie-4.0.0-cdh5.3.6/oozie-apps/mr-wordcount-wf/job.properties。

nameNode=hdfs://hadoop-senior.ibeifeng.com:8020
jobTracker=hadoop-senior.ibeifeng.com:8032
queueName=default
oozieAppsRoot=user/beifeng/oozie-apps
oozieDataRoot=user/beifeng/oozie/datas

oozie.wf.application.path=${nameNode}/${oozieAppsRoot}/mr-wordcount-wf/workflow.xml
inputDir=mr-wordcount-wf/input
outputDir=mr-wordcount-wf/output

(6)将之前编写并打包的mr-wc.jar文件放入mapreduce action例子程序的lib包中。 $ cd /opt/modules/hadoop-2.5.0/jars/ jars]$ cp mr-wc.jar /opt/cdh-5.3.6/oozie-4.0.0-cdh5.3.6/oozie-apps/mr-wordcount-wf/lib mr-wordcount-wf]$ cd lib lib]$ ls mr-wc.jar (7)将mapreduce action文件夹上传到hdfs中。 oozie-4.0.0-cdh5.3.6]$ /opt/cdh-5.3.6/hadoop-2.5.0-cdh5.3.6/bin/hdfs dfs -put oozie-apps/ oozie-apps (8)在hdfs上创建wordcount程序的输入文件夹。 hadoop-2.5.0-cdh5.3.6]$ bin/hdfs dfs -mkdir -p oozie/datas/mr-wordcount-wf/input (9)将wordcount程序要统计的文件上传到hdfs的输入文件夹中。 hadoop-2.5.0-cdh5.3.6]$ bin/hdfs dfs -put /opt/datas/wc.input oozie/datas/mr-wordcount-wf/input (10)如果上传错误,删除hdfs上的文件。 hadoop-2.5.0-cdh5.3.6]$ bin/hdfs dfs -r oozie/datas/input (11)临时设置环境变量,为浏览器中访问oozie的url地址。 oozie-4.0.0-cdh5.3.6]$ export OOZIE_URL=http://hadoop-senior.ibeifeng.com:11000/oozie/ (12)运行oozie的工作流调度,执行mapreduce action。 oozie-4.0.0-cdh5.3.6]$ bin/oozie job -config oozie-apps/mr-wordcount-wf/job.properties -run

3、Hive Action

官网网址:http://archive.cloudera.com/cdh5/cdh/5/oozie-4.0.0-cdh5.3.6/DG_HiveActionExtension.html (1)编辑配置文件/opt/cdh-5.3.6/oozie-4.0.0-cdh5.3.6/oozie-apps/hive-select/job.properties。

nameNode=hdfs://hadoop-senior.ibeifeng.com:8020
jobTracker=hadoop-senior.ibeifeng.com:8032
queueName=default
oozieAppsRoot=user/beifeng/oozie-apps
oozieDataRoot=user/beifeng/oozie/datas

oozie.use.system.libpath=true

oozie.wf.application.path=${nameNode}/${oozieAppsRoot}/hive-select

outputDir=hive-select/output

(2)编辑sql脚本/opt/cdh-5.3.6/oozie-4.0.0-cdh5.3.6/oozie-apps/hive-select/select-student.sql。执行时类似于执行shell命令:bin/hive -f select-student.sql

insert overwrite directory '${OUTPUT}';
select count(1) cnt from default.student;

(3)编辑配置文件/opt/cdh-5.3.6/oozie-4.0.0-cdh5.3.6/oozie-apps/hive-select/workflow.xml。

<workflow-app xmlns="uri:oozie:workflow:0.5" name="wf-hive-select">
    <start to="hive-node"/>

    <action name="hive-node">
        <hive xmlns="uri:oozie:hive-action:0.2">
            <job-tracker>${jobTracker}</job-tracker>
            <name-node>${nameNode}</name-node>
            <prepare>
                <delete path="${nameNode}/${oozieDataRoot}/${outputDir}"/>
            </prepare>
            <job-xml>${nameNode}/${oozieAppsRoot}/hive-select/hive-site.xml</job-xml>
            <configuration>
                <property>
                    <name>mapred.job.queue.name</name>
                    <value>${queueName}</value>
                </property>
            </configuration>
            <script>select-student.sql</script>
            <param>OUTPUT=${nameNode}/${oozieDataRoot}/${outputDir}</param>
        </hive>
        <ok to="end"/>
        <error to="fail"/>
    </action>

    <kill name="fail">
        <message>Hive failed, error message[${wf:errorMessage(wf:lastErrorNode())}]</message>
    </kill>
    <end name="end"/>
</workflow-app>

(4)将hive action例子程序上传到hdfs中。 oozie-apps]$ /opt/cdh-5.3.6/hadoop-2.5.0-cdh5.3.6/bin/hdfs dfs -put hive-select/ /user/beifeng/oozie-apps (5)配置环境变量。 export OOZIE_URL=http://hadoop-senior.ibeifeng.com:11000/oozie/ (6)运行oozie工作流调度hive action。 oozie-4.0.0-cdh5.3.6]$ bin/oozie job -config oozie-apps/hive-select/job.properties -run (7)将/opt/cdh-5.3.6/hive-0.13.1-cdh5.3.6/conf/hive-site.xml配置文件放入hive action文件夹中。 conf]$ cp hive-site.xml /opt/cdh-5.3.6/oozie-4.0.0-cdh5.3.6/oozie-apps/hive-select/ (8)上传错误,删除hdfs上的文件。 hive-select]$ /opt/cdh-5.3.6/hadoop-2.5.0-cdh5.3.6/bin/hdfs dfs -rm /user/beifeng/oozie-apps/hive-select/workflow.xml (9)将hive-site.xml文件上传hdfs上的指定目录中。 hive-select]$ /opt/cdh-5.3.6/hadoop-2.5.0-cdh5.3.6/bin/hdfs dfs -put workflow.xml hive-site.xml /user/beifeng/oozie-apps/hive-select/ (10)将mysql的jar包放入hive action的lib包中。 hive-select]$ mkdir lib hive-select]$ cp /opt/cdh-5.3.6/hive-0.13.1-cdh5.3.6/lib/mysql-connector-java-5.1.27-bin.jar ./lib/ (11)上传lib包中的mysql jar包到hdfs中。 hive-select]$ /opt/cdh-5.3.6/hadoop-2.5.0-cdh5.3.6/bin/hdfs dfs -put lib/ /user/beifeng/oozie-apps/hive-select/ (12)运行oozie工作流调度hive action程序。 oozie-4.0.0-cdh5.3.6]$ bin/oozie job -config oozie-apps/hive-select/job.properties -run (13)杀掉oozie正在执行的任务。 oozie-4.0.0-cdh5.3.6]$ bin/oozie job -kill 0000001-180717120019494-oozie-beif-W

4、Sqoop Action

官网网址:http://archive.cloudera.com/cdh5/cdh/5/oozie-4.0.0-cdh5.3.6/DG_SqoopActionExtension.html (1)进入/opt/cdh-5.3.6/oozie-4.0.0-cdh5.3.6/examples/apps目录,将examples中的sqoop例子程序拷贝到自己的目录中。 apps]$ cp -r sqoop ../../oozie-apps/ (2)查看mysql服务是否已启动。 $ su Password: [root@hadoop-senior apps]# service mysql status MySQL running (1717) [ OK ] [root@hadoop-senior apps]# exit $ mysql -uroot -p123456 (3)编写第一个sqoop action。编辑配置文件/opt/cdh-5.3.6/oozie-4.0.0-cdh5.3.6/oozie-apps/sqoop-import-user/workflow.xml。

<workflow-app xmlns="uri:oozie:workflow:0.5" name="sqoop-wf">
    <start to="sqoop-node"/>

    <action name="sqoop-node">
        <sqoop xmlns="uri:oozie:sqoop-action:0.3">
            <job-tracker>${jobTracker}</job-tracker>
            <name-node>${nameNode}</name-node>
            <prepare>
                <delete path="${nameNode}/${oozieDataRoot}/${outputDir}"/>
            </prepare>
            <configuration>
                <property>
                    <name>mapred.job.queue.name</name>
                    <value>${queueName}</value>
                </property>
            </configuration>
             <command>import --connect jdbc:mysql://hadoop-senior.ibeifeng.com:3306/test --username root --password 123456 --table my_user --target-dir /user/beifeng/oozie/datas/sqoop-import-user/output --fields-terminated-by "$$$" --num-mappers 1</command>
        </sqoop>
        <ok to="end"/>
        <error to="fail"/>
    </action>

    <kill name="fail">
        <message>Sqoop failed, error message[${wf:errorMessage(wf:lastErrorNode())}]</message>
    </kill>
    <end name="end"/>
</workflow-app>

(4)编写第一个sqoop action。编辑配置文件/opt/cdh-5.3.6/oozie-4.0.0-cdh5.3.6/oozie-apps/sqoop-import-user/job.properties。

nameNode=hdfs://hadoop-senior.ibeifeng.com:8020
jobTracker=hadoop-senior.ibeifeng.com:8032
queueName=default
oozieAppsRoot=user/beifeng/oozie-apps
oozieDataRoot=user/beifeng/oozie/datas

oozie.use.system.libpath=true

oozie.wf.application.path=${nameNode}/${oozieAppsRoot}/sqoop-import-user

outputDir=sqoop-import-user/output

(5)将mysql jar包拷贝到sqoop action的lib目录中。 (6)将sqoop action程序上传hdfs。 oozie-apps]$ /opt/cdh-5.3.6/hadoop-2.5.0-cdh5.3.6/bin/hdfs dfs -put sqoop-import-user/ /user/beifeng/oozie-apps (7)运行oozie工作流调度sqoop action程序。 oozie-4.0.0-cdh5.3.6]$ bin/oozie job -config oozie-apps/sqoop-import-user/job.properties -run (8)编写第二个sqoop action程序。编辑配置文件/opt/cdh-5.3.6/oozie-4.0.0-cdh5.3.6/oozie-apps/sqoop-import-user2/workflow.xml。

<workflow-app xmlns="uri:oozie:workflow:0.5" name="sqoop-wf">
    <start to="sqoop-node"/>

    <action name="sqoop-node">
        <sqoop xmlns="uri:oozie:sqoop-action:0.3">
            <job-tracker>${jobTracker}</job-tracker>
            <name-node>${nameNode}</name-node>
            <prepare>
                <delete path="${nameNode}/${oozieDataRoot}/${outputDir}"/>
            </prepare>
            <configuration>
                <property>
                    <name>mapred.job.queue.name</name>
                    <value>${queueName}</value>
                </property>
            </configuration>
            <command>import --options-file ${imp-user}</command>
        </sqoop>
        <ok to="end"/>
        <error to="fail"/>
    </action>

    <kill name="fail">
        <message>Sqoop failed, error message[${wf:errorMessage(wf:lastErrorNode())}]</message>
    </kill>
    <end name="end"/>
</workflow-app>

(9)编辑配置文件/opt/cdh-5.3.6/oozie-4.0.0-cdh5.3.6/oozie-apps/sqoop-import-user2/job.properties。

nameNode=hdfs://hadoop-senior.ibeifeng.com:8020
jobTracker=hadoop-senior.ibeifeng.com:8032
queueName=default
oozieAppsRoot=user/beifeng/oozie-apps
oozieDataRoot=user/beifeng/oozie/datas

oozie.use.system.libpath=true

oozie.wf.application.path=${nameNode}/${oozieAppsRoot}/sqoop-import-user

outputDir=sqoop-import-user/output

imp-user=${nameNode}/${oozieAppsRoot}/sqoop-import-user/imp-user.sql

(10)编辑sql脚本/opt/cdh-5.3.6/oozie-4.0.0-cdh5.3.6/oozie-apps/sqoop-import-user2/imp-user.sql。执行时类似于执行shell命令:bin/sqoop --option-file imp-user.sql

--connect
jdbc:mysql://hadoop-senior.ibeifeng.com:3306/test 
--username 
root 
--password 
123456 
--table 
my_user 
--target-dir 
/user/beifeng/oozie/datas/sqoop-import-user/output 
--fields-terminated-by 
"\t" 
--num-mappers 
1

(11)将mysql jar包拷贝到第二个sqoop action的lib目录中。 (12)上传sqoop action的第二个示例程序。 oozie-apps]$ /opt/cdh-5.3.6/hadoop-2.5.0-cdh5.3.6/bin/hdfs dfs -put sqoop-import-user2 oozie-apps/ (13)运行oozie工作流调度第二个sqoop action程序。 oozie-4.0.0-cdh5.3.6]$ bin/oozie job -config oozie-apps/sqoop-import-user2/job.properties -run

5、Shell Action

官网网址:http://archive.cloudera.com/cdh5/cdh/5/oozie-4.0.0-cdh5.3.6/DG_ShellActionExtension.html (1)进入/opt/cdh-5.3.6/oozie-4.0.0-cdh5.3.6/examples/apps目录中,将examples目录下的shell示例程序拷贝到自己的目录中,并按照需求改名。 apps]$ cp -r shell/ ../../oozie-apps/ oozie-apps]$ mv shell shell-hive-select/ (2)编辑配置文件/opt/cdh-5.3.6/oozie-4.0.0-cdh5.3.6/oozie-apps/shell-hive-select/job.properties。

nameNode=hdfs://hadoop-senior.ibeifeng.com:8020
jobTracker=hadoop-senior.ibeifeng.com:8032
queueName=default
oozieAppsRoot=user/beifeng/oozie-apps
oozieDataRoot=user/beifeng/oozie/datas

oozie.wf.application.path=${nameNode}/${oozieAppsRoot}/shell-hive-select

exec=student-select.sh
script=student-select.sql

(3)编辑配置文件/opt/cdh-5.3.6/oozie-4.0.0-cdh5.3.6/oozie-apps/shell-hive-select/workflow.xml。

<workflow-app xmlns="uri:oozie:workflow:0.5" name="shell-wf">
    <start to="shell-node"/>
    <action name="shell-node">
        <shell xmlns="uri:oozie:shell-action:0.2">
            <job-tracker>${jobTracker}</job-tracker>
            <name-node>${nameNode}</name-node>
            <configuration>
                <property>
                    <name>mapred.job.queue.name</name>
                    <value>${queueName}</value>
                </property>
            </configuration>
            <exec>${exec}</exec>
            <file>${nameNode}/${oozieAppsRoot}/shell-hive-select/${exec}#${exec}</file>
            <file>${nameNode}/${oozieAppsRoot}/shell-hive-select/${script}#${script}</file>
            <capture-output/>
        </shell>
        <ok to="end"/>
        <error to="fail"/>
    </action>
    <kill name="fail">
        <message>Shell action failed, error message[${wf:errorMessage(wf:lastErrorNode())}]</message>
    </kill>
    <end name="end"/>
</workflow-app>

(4)编辑shell脚本/opt/cdh-5.3.6/oozie-4.0.0-cdh5.3.6/oozie-apps/shell-hive-select/student-select.sh。

#!/usr/bin/env bash

## student select
/opt/cdh-5.3.6/hive-0.13.1-cdh5.3.6/bin/hive -f student-select.sql

(5)编辑sql脚本/opt/cdh-5.3.6/oozie-4.0.0-cdh5.3.6/oozie-apps/shell-hive-select/student-select.sql。

insert overwrite directory '/user/beifeng/oozie/datas/shell-hive-select/output'
select id, name from default.student;

(6)将shell action程序上传到hdfs中。 oozie-apps]$ /opt/cdh-5.3.6/hadoop-2.5.0-cdh5.3.6/bin/hdfs dfs -put shell-hive-select/ oozie-apps/ (7)运行Oozie工作流调度shell action程序。 oozie-4.0.0-cdh5.3.6]$ bin/oozie job -config oozie-apps/shell-hive-select/job.properties -run

6、多个Action的协作

配置文件workflow.xml中如下配置:

start node hive action对输入数据进行分析,将结果存储在hdfs中 sqoop action将hdfs上的数据导入mysql中,mysql进行数据的前端展示 kill node end node

本文参与腾讯云自媒体分享计划,欢迎正在阅读的你也加入,一起分享。

发表于

我来说两句

0 条评论
登录 后参与评论

扫码关注云+社区

领取腾讯云代金券