前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >azkaban流程调度

azkaban流程调度

作者头像
编程那点事
发布2023-02-25 15:26:09
3060
发布2023-02-25 15:26:09
举报
文章被收录于专栏:java编程那点事

1.搜集数据

upload.job

代码语言:javascript
复制
#upload.job
type=command
command=bash upload.sh

upload.sh

代码语言:javascript
复制
#!/bin/bash

#set java env
export JAVA_HOME=/soft/jdk/
export JRE_HOME=${JAVA_HOME}/jre
export CLASSPATH=.:${JAVA_HOME}/lib:${JRE_HOME}/lib
export PATH=${JAVA_HOME}/bin:$PATH

#set hadoop env
export HADOOP_HOME=/soft/hadoop/
export PATH=${HADOOP_HOME}/bin:${HADOOP_HOME}/sbin:$PATH

#日志文件存放的目录
log_src_dir=/home/centos/logs/log/ 

#待上传文件存放的目录
log_toupload_dir=/home/centos/logs/toupload/

#得到昨天的日期
day_01=`date -d'-1 day' +%Y-%m-%d`
#得到昨天的年份
syear=`date --date=$day_01 +%Y`
#得到昨天的月份
smonth=`date --date=$day_01 +%m`
#得到昨天的日份
sday=`date --date=$day_01 +%d`

#日志文件上传到hdfs的根路径
hdfs_root_dir=/data/clickLog/$syear/$smonth/$sday

#创建hdfs上的路径文件夹
hadoop fs -mkdir -p $hdfs_root_dir

#读取日志文件的目录,判断是否有需要上传的文件
ls $log_src_dir | while read fileName
do
    if [[ "$fileName" == access.log ]]; then
    # if [ "access.log" = "$fileName" ];then
        date=`date +%Y_%m_%d_%H_%M_%S`
        #将文件移动到待上传目录并重命名
        mv $log_src_dir$fileName $log_toupload_dir"xxxxx_click_log_$fileName"$date
        #将待上传的文件path写入一个列表文件willDoing
        echo $log_toupload_dir"xxxxx_click_log_$fileName"$date >> $log_toupload_dir"willDoing."$date
    fi

done

#找到列表文件willDoing
ls $log_toupload_dir | grep will |grep -v "_COPY_" | grep -v "_DONE_" | while read line
do
    #将待上传文件列表willDoing改名为willDoing_COPY_
    mv $log_toupload_dir$line $log_toupload_dir$line"_COPY_"
    #读列表文件willDoing_COPY_的内容(一个一个的待上传文件名)  ,此处的line 就是列表中的一个待上传文件的path
    cat $log_toupload_dir$line"_COPY_" |while read line
    do
        hadoop fs -put $line $hdfs_root_dir
    done    
    mv $log_toupload_dir$line"_COPY_"  $log_toupload_dir$line"_DONE_"
done

2.清洗数据

clean.job

代码语言:javascript
复制
# clean.job
type=command
dependencies=upload
command=bash clean.sh

clean.sh

代码语言:javascript
复制
#!/bin/bash

#set java env
export JAVA_HOME=/soft/jdk
export JRE_HOME=${JAVA_HOME}/jre 
export CLASSPATH=.:${JAVA_HOME}/lib:${JRE_HOME}/lib 
export PATH=${JAVA_HOME}/bin:$PATH

#set hadoop env
export HADOOP_HOME=/soft/hadoop
export PATH=${HADOOP_HOME}/bin:${HADOOP_HOME}/sbin:$PATH

#获取昨天的日期
day_01=`date -d'-1 day' +%Y-%m-%d`
#获取昨天的年份
syear=`date --date=$day_01 +%Y`
#获取昨天的月份
smonth=`date --date=$day_01 +%m`
#获取昨天的日份
sday=`date --date=$day_01 +%d`

#日志在hdfs上的路径
log_hdfs_dir=/data/clickLog/$syear/$smonth/$sday
#mapreduce程序的入口路径
click_log_clean=clickLog.AccessLogDriver
#清洗后的数据路径
clean_dir=/cleaup/$syear/$smonth/$sday
#清洗后的数据路径不可存在(删除操作)
hadoop fs -rm -r -f $clean_dir
#运行mapreduce程序的jar文件(jar文件的位置;程序的入口路径;数据输入路径;数据输出路径)
hadoop jar /home/centos/hivedemo/mrclick.jar $click_log_clean $log_hdfs_dir $clean_dir

3.数据绑定到hive

hivesql.job

代码语言:javascript
复制
# hivesql.job
type=command
dependencies=clean
command=bash hivesql.sh

hivesql.sh

代码语言:javascript
复制
#!/bin/bash

#set java env
export JAVA_HOME=/soft/jdk
export JRE_HOME=${JAVA_HOME}/jre 
export CLASSPATH=.:${JAVA_HOME}/lib:${JRE_HOME}/lib 
export PATH=${JAVA_HOME}/bin:$PATH

#set hadoop env
export HADOOP_HOME=/soft/hadoop
export PATH=${HADOOP_HOME}/bin:${HADOOP_HOME}/sbin:$PATH

#set hive env
export HIVE_HOME=/soft/hive
export PATH=${HIVE_HOME}/bin:$PATH

#获取昨天的日期
day_01=`date -d'-1 day' +%Y-%m-%d`
#获取昨天的年份
syear=`date --date=$day_01 +%Y`
#获取昨天的月份
smonth=`date --date=$day_01 +%m`
#获取昨天的日份
sday=`date --date=$day_01 +%d`

#清洗后的数据在hdfs上的路径
clean_dir=/cleaup/$syear/$smonth/$sday

#hive sql 导入数据到hive
HQL_origin="load data inpath '$clean_dir' into table mydb.accesslog"

#执行sql语句
hive -e  "$HQL_origin"

4.查询数据

ip.job

代码语言:javascript
复制
# ip.job
type=command
dependencies=hivesqljob
command=bash ip.sh

ip.sh

代码语言:javascript
复制
#!/bin/bash

#set java env
export JAVA_HOME=/soft/jdk
export JRE_HOME=${JAVA_HOME}/jre 
export CLASSPATH=.:${JAVA_HOME}/lib:${JRE_HOME}/lib 
export PATH=${JAVA_HOME}/bin:$PATH

#set hadoop env
export HADOOP_HOME=/soft/hadoop
export PATH=${HADOOP_HOME}/bin:${HADOOP_HOME}/sbin:$PATH

 #set hive env
export HIVE_HOME=/soft/hive
export PATH=${HIVE_HOME}/bin:$PATH

#hive sql 从一张表查询出数据放到结果集表中
HQL_origin="insert into  mydb.upflow  select ip,sum(upflow) as sum from mydb.accesslog group by ip order by sum desc "

#执行sql语句
hive -e  "$HQL_origin"

5.导出到mysql

mysql.job

代码语言:javascript
复制
# mysql.job
type=command
dependencies=ipjob
command=bash mysql.sh

mysql.sh

代码语言:javascript
复制
#!/bin/bash

#set java env
export JAVA_HOME=/soft/jdk
export JRE_HOME=${JAVA_HOME}/jre
export CLASSPATH=.:${JAVA_HOME}/lib:${JRE_HOME}/lib
export PATH=${JAVA_HOME}/bin:$PATH

#set hadoop env
export HADOOP_HOME=/soft/hadoop
export PATH=${HADOOP_HOME}/bin:${HADOOP_HOME}/sbin:$PATH

#set hive env
export HIVE_HOME=/soft/hive
export PATH=${HIVE_HOME}/bin:$PATH

#set sqoop env
export SQOOP_HOME=/soft/sqoop
export PATH=${SQOOP_HOME}/bin:$PATH

#sqoop语句 从hive导出到mysql(导出到mysql的表中从hive的路径文件下)
sqoop export --connect jdbc:mysql://s201:3306/userdb --username sqoop --password sqoop --table upflow --export-dir /user/hive/warehouse/mydb.db/upflow --input-fields-terminated-by ','

五个job有依赖关系

本文参与 腾讯云自媒体同步曝光计划,分享自作者个人站点/博客。
原始发表:2019-02-13,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 作者个人站点/博客 前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
相关产品与服务
云数据库 SQL Server
腾讯云数据库 SQL Server (TencentDB for SQL Server)是业界最常用的商用数据库之一,对基于 Windows 架构的应用程序具有完美的支持。TencentDB for SQL Server 拥有微软正版授权,可持续为用户提供最新的功能,避免未授权使用软件的风险。具有即开即用、稳定可靠、安全运行、弹性扩缩等特点。
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档