前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >spark数据保存到mysql 通过Azkaban提交集群任务

spark数据保存到mysql 通过Azkaban提交集群任务

作者头像
编程那点事
发布2023-02-25 15:45:59
8820
发布2023-02-25 15:45:59
举报
文章被收录于专栏:java编程那点事

spark数据保存到mysql 通过Azkaban提交集群任务

toMysql.job

代码语言:javascript
复制
#toMysql.job
type = command
command = bash sparkToMysql.sh

sparkToMysql.sh

代码语言:javascript
复制
#!/bin/bash

spark-submit \
--class AccessLogToMySql.AccessLogSpark \
# 集群地址 不写默认local
--master spark://master:7077 \
# 执行器数量 不写默认
--num-executors 10 \
# 执行器内存大小 不写默认
--executor-memory 3g \
# 执行器核数 不写默认
--executor-cores 8 \
# mysql jar包地址
--driver-class-path /root/hd/apache-hive-2.3.4-bin/lib/mysql-connector-java-8.0.14.jar \
# 上传任务jar到位置
/root/job/toMysql.jar

toMysql.job 和 sparkToMysql.sh压缩上传Azkaban定时执行

AccessLogSpark

代码语言:javascript
复制
// 获取sparksession
val spark = SparkSession.builder().appName("AccessLogSpark").getOrCreate()

// 引入隐式转换
import spark.sqlContext.implicits._

// 读取数据
val data = spark.sparkContext.textFile("hdfs://master/data/clickLog/20190211/xxxx_click_log_access.12019_02_11_16_49_24")

// 分割数据
val splitData = data.map(_.split(" "))

// 判断处理
val filtData = splitData.filter(x => x.length >= 11)

// 数据转化dataFrame
val logF = filtData.map(x => {LogTable(x(0), x(3), x(9).toLong)}).toDF()

// 创建视图
logF.createOrReplaceTempView("AccessTable")

// sql
val sql = "select ip, sum(upflow) as upSum from AccessTable group by ip order by upSum desc"

// 查询结果
val sumFlowDF = spark.sqlContext.sql(sql)

// 把结果保存在mysql表中
// 创建Properties对象,配置连接mysql的用户名和密码
val prop = new Properties()
prop.setProperty("user","hive")
prop.setProperty("password","hive")

// 写入数据库 追加模式 jdbc 表名 Properties
sumFlowDF.write.mode(SaveMode.Append).jdbc("jdbc:mysql://url:3306/sqoop_data", "iptop", prop)

// 停止
spark.stop()

生成jar toMysql.jar存放到服务器指定地址

本文参与 腾讯云自媒体同步曝光计划,分享自作者个人站点/博客。
原始发表:2019-02-20,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 作者个人站点/博客 前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
相关产品与服务
数据库
云数据库为企业提供了完善的关系型数据库、非关系型数据库、分析型数据库和数据库生态工具。您可以通过产品选择和组合搭建,轻松实现高可靠、高可用性、高性能等数据库需求。云数据库服务也可大幅减少您的运维工作量,更专注于业务发展,让企业一站式享受数据上云及分布式架构的技术红利!
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档