上篇文章我们介绍了离线数仓的用户注册模块,本文我们来介绍做题模块
模拟日志的数据格式如下,详细请参见我的开源项目 https://github.com/SoundHearer/kuaiban
1.QzWebsite.log 做题网站日志数据
{
"createtime": "2019-07-22 11:47:18", //创建时间
"creator": "admin", //创建者
"dn": "webA", //网站分区
"domain": "-",
"dt": "20190722", //日期分区
"multicastgateway": "-",
"multicastport": "-",
"multicastserver": "-",
"sequence": "-",
"siteid": 0, //网站id
"sitename": "sitename0", //网站名称
"status": "-",
"templateserver": "-"
}
2.QzSiteCourse.log 网站课程日志数据
{
"boardid": 64, //课程模板id
"coursechapter": "-",
"courseid": 66, //课程id
"createtime": "2019-07-22 11:43:32", //创建时间
"creator": "admin", //创建者
"dn": "webA", //网站分区
"dt": "20190722", //日期分区
"helpparperstatus": "-",
"sequence": "-",
"servertype": "-",
"showstatus": "-",
"sitecourseid": 2, //网站课程id
"sitecoursename": "sitecoursename2", //网站课程名称
"siteid": 77, //网站id
"status": "-"
}
3.QzQuestionType.log 题目类型数据
{
"createtime": "2019-07-22 10:42:47", //创建时间
"creator": "admin", //创建者
"description": "-",
"dn": "webA", //网站分区
"dt": "20190722", //日期分区
"papertypename": "-",
"questypeid": 0, //做题类型id
"quesviewtype": 0,
"remark": "-",
"sequence": "-",
"splitscoretype": "-",
"status": "-",
"viewtypename": "viewtypename0"
}
4.QzQuestion.log 做题日志数据
{
"analysis": "-",
"answer": "-",
"attanswer": "-",
"content": "-",
"createtime": "2019-07-22 11:33:46", //创建时间
"creator": "admin", //创建者
"difficulty": "-",
"dn": "webA", //网站分区
"dt": "20190722", //日期分区
"lecture": "-",
"limitminute": "-",
"modifystatus": "-",
"optnum": 8,
"parentid": 57,
"quesskill": "-",
"questag": "-",
"questionid": 0, //题id
"questypeid": 57, //题目类型id
"quesviewtype": 44,
"score": 24.124501582742543, //题的分数
"splitscore": 0.0,
"status": "-",
"vanalysisaddr": "-",
"vdeoaddr": "-"
}
5.QzPointQuestion.log 做题知识点关联数据
{
"createtime": "2019-07-22 09:16:46", //创建时间
"creator": "admin", //创建者
"dn": "webA", //网站分区
"dt": "20190722", //日期分区
"pointid": 0, //知识点id
"questionid": 0, //题id
"questype": 0
}
篇幅较大,详见开源项目
create external table `dwd`.`dwd_qz_chapter`(
chapterid int ,
chapterlistid int ,
chaptername string ,
sequence string ,
showstatus string ,
creator string ,
createtime timestamp,
courseid int ,
chapternum int,
outchapterid int)
partitioned by(
dt string,
dn string)
ROW FORMAT DELIMITED
FIELDS TERMINATED BY '\t'
STORED AS PARQUET TBLPROPERTIES('parquet.compression'='SNAPPY');
create external table `dwd`.`dwd_qz_chapter_list`(
chapterlistid int ,
chapterlistname string ,
courseid int ,
chapterallnum int ,
sequence string,
status string,
creator string ,
createtime timestamp
)
partitioned by(
dt string,
dn string)
ROW FORMAT DELIMITED
FIELDS TERMINATED BY '\t'
STORED AS PARQUET TBLPROPERTIES('parquet.compression'='SNAPPY');
需求1:使用spark解析ods层数据,将数据存入到对应的hive表中,要求对所有score 分数字段进行保留两位1位小数并且四舍五入。
import com.alibaba.fastjson.JSONObject
import com.catelf.qz.bean.{DwdQzPaperView, DwdQzPoint, DwdQzQuestion}
import com.catelf.util.ParseJsonData
import org.apache.spark.SparkContext
import org.apache.spark.sql.{SaveMode, SparkSession}
/**
* etl用户做题信息
*/
object EtlDataService {
/**
* 解析章节数据
*
* @param ssc
* @param sparkSession
* @return
*/
def etlQzChapter(ssc: SparkContext, sparkSession: SparkSession) = {
import sparkSession.implicits._ //隐式转换
ssc.textFile("hdfs://cdh1.macro.com:8020/user/catelf/ods/QzChapter.log").filter(item => {
val obj = ParseJsonData.getJsonData(item)
obj.isInstanceOf[JSONObject]
}).mapPartitions(partitions => {
partitions.map(item => {
val jsonObject = ParseJsonData.getJsonData(item)
val chapterid = jsonObject.getIntValue("chapterid")
val chapterlistid = jsonObject.getIntValue("chapterlistid")
val chaptername = jsonObject.getString("chaptername")
val sequence = jsonObject.getString("sequence")
val showstatus = jsonObject.getString("showstatus")
// val status = jsonObject.getString("status")
val creator = jsonObject.getString("creator")
val createtime = jsonObject.getString("createtime")
val courseid = jsonObject.getIntValue("courseid")
val chapternum = jsonObject.getIntValue("chapternum")
val outchapterid = jsonObject.getIntValue("outchapterid")
val dt = jsonObject.getString("dt")
val dn = jsonObject.getString("dn")
(chapterid, chapterlistid, chaptername, sequence, showstatus, creator, createtime,
courseid, chapternum, outchapterid, dt, dn)
})
}).toDF().coalesce(1).write.mode(SaveMode.Append).insertInto("dwd.dwd_qz_chapter")
}
/**
* 解析章节列表数据
*
* @param ssc
* @param sparkSession
*/
def etlQzChapterList(ssc: SparkContext, sparkSession: SparkSession) = {
import sparkSession.implicits._
ssc.textFile("hdfs://cdh1.macro.com:8020/user/catelf/ods/QzChapterList.log").filter(item => {
val obj = ParseJsonData.getJsonData(item)
obj.isInstanceOf[JSONObject]
}).mapPartitions(partitions => {
partitions.map(item => {
val jsonObject = ParseJsonData.getJsonData(item)
val chapterlistid = jsonObject.getIntValue("chapterlistid")
val chapterlistname = jsonObject.getString("chapterlistname")
val courseid = jsonObject.getIntValue("courseid")
val chapterallnum = jsonObject.getIntValue("chapterallnum")
val sequence = jsonObject.getString("sequence")
val status = jsonObject.getString("status")
val creator = jsonObject.getString("creator")
val createtime = jsonObject.getString("createtime")
val dt = jsonObject.getString("dt")
val dn = jsonObject.getString("dn")
(chapterlistid, chapterlistname, courseid, chapterallnum, sequence, status, creator, createtime, dt, dn)
})
}).toDF().coalesce(1).write.mode(SaveMode.Append).insertInto("dwd.dwd_qz_chapter_list")
}
/**
* 解析做题数据
*
* @param ssc
* @param sparkSession
*/
def etlQzPoint(ssc: SparkContext, sparkSession: SparkSession) = {
import sparkSession.implicits._
ssc.textFile("hdfs://cdh1.macro.com:8020/user/catelf/ods/QzPoint.log").filter(item => {
val obj = ParseJsonData.getJsonData(item)
obj.isInstanceOf[JSONObject]
}).mapPartitions(partitions => {
partitions.map(item => {
val jsonObject = ParseJsonData.getJsonData(item)
val pointid = jsonObject.getIntValue("pointid")
val courseid = jsonObject.getIntValue("courseid")
val pointname = jsonObject.getString("pointname")
val pointyear = jsonObject.getString("pointyear")
val chapter = jsonObject.getString("chapter")
val creator = jsonObject.getString("creator")
val createtime = jsonObject.getString("createtime")
val status = jsonObject.getString("status")
val modifystatus = jsonObject.getString("modifystatus")
val excisenum = jsonObject.getIntValue("excisenum")
val pointlistid = jsonObject.getIntValue("pointlistid")
val chapterid = jsonObject.getIntValue("chapterid")
val sequence = jsonObject.getString("sequence")
val pointdescribe = jsonObject.getString("pointdescribe")
val pointlevel = jsonObject.getString("pointlevel")
val typeslist = jsonObject.getString("typelist")
val score = BigDecimal(jsonObject.getDouble("score")).setScale(1, BigDecimal.RoundingMode.HALF_UP) //保留1位小数 并四舍五入
val thought = jsonObject.getString("thought")
val remid = jsonObject.getString("remid")
val pointnamelist = jsonObject.getString("pointnamelist")
val typelistids = jsonObject.getString("typelistids")
val pointlist = jsonObject.getString("pointlist")
val dt = jsonObject.getString("dt")
val dn = jsonObject.getString("dn")
DwdQzPoint(pointid, courseid, pointname, pointyear, chapter, creator, createtime, status, modifystatus, excisenum, pointlistid,
chapterid, sequence, pointdescribe, pointlevel, typeslist, score, thought, remid, pointnamelist, typelistids,
pointlist, dt, dn)
})
}).toDF().coalesce(1).write.mode(SaveMode.Append).insertInto("dwd.dwd_qz_point")
}
/**
* 解析知识点下的题数据
*
* @param ssc
* @param sparkSession
* @return
*/
def etlQzPointQuestion(ssc: SparkContext, sparkSession: SparkSession) = {
import sparkSession.implicits._
ssc.textFile("hdfs://cdh1.macro.com:8020/user/catelf/ods/QzPointQuestion.log").filter(item => {
val obj = ParseJsonData.getJsonData(item)
obj.isInstanceOf[JSONObject]
}).mapPartitions(partitions => {
partitions.map(item => {
val jsonObject = ParseJsonData.getJsonData(item)
val pointid = jsonObject.getIntValue("pointid")
val questionid = jsonObject.getIntValue("questionid")
val questtype = jsonObject.getIntValue("questtype")
val creator = jsonObject.getString("creator")
val createtime = jsonObject.getString("createtime")
val dt = jsonObject.getString("dt")
val dn = jsonObject.getString("dn")
(pointid, questionid, questtype, creator, createtime, dt, dn)
})
}).toDF().write.mode(SaveMode.Append).insertInto("dwd.dwd_qz_point_question")
}
/**
* 解析网站课程
*
* @param ssc
* @param sparkSession
*/
def etlQzSiteCourse(ssc: SparkContext, sparkSession: SparkSession) = {
import sparkSession.implicits._
ssc.textFile("hdfs://cdh1.macro.com:8020/user/catelf/ods/QzSiteCourse.log").filter(item => {
val obj = ParseJsonData.getJsonData(item)
obj.isInstanceOf[JSONObject]
}).mapPartitions(partitions => {
partitions.map(item => {
val jsonObject = ParseJsonData.getJsonData(item)
val sitecourseid = jsonObject.getIntValue("sitecourseid")
val siteid = jsonObject.getIntValue("siteid")
val courseid = jsonObject.getIntValue("courseid")
val sitecoursename = jsonObject.getString("sitecoursename")
val coursechapter = jsonObject.getString("coursechapter")
val sequence = jsonObject.getString("sequence")
val status = jsonObject.getString("status")
val creator = jsonObject.getString("creator")
val createtime = jsonObject.getString("createtime")
val helppaperstatus = jsonObject.getString("helppaperstatus")
val servertype = jsonObject.getString("servertype")
val boardid = jsonObject.getIntValue("boardid")
val showstatus = jsonObject.getString("showstatus")
val dt = jsonObject.getString("dt")
val dn = jsonObject.getString("dn")
(sitecourseid, siteid, courseid, sitecoursename, coursechapter, sequence, status, creator
, createtime, helppaperstatus, servertype, boardid, showstatus, dt, dn)
})
}).toDF().coalesce(1).write.mode(SaveMode.Append).insertInto("dwd.dwd_qz_site_course")
}
/**
* 解析课程数据
*
* @param ssc
* @param sparkSession
*/
def etlQzCourse(ssc: SparkContext, sparkSession: SparkSession) = {
import sparkSession.implicits._
ssc.textFile("hdfs://cdh1.macro.com:8020/user/catelf/ods/QzCourse.log").filter(item => {
val obj = ParseJsonData.getJsonData(item)
obj.isInstanceOf[JSONObject]
}).mapPartitions(partitions => {
partitions.map(item => {
val jsonObject = ParseJsonData.getJsonData(item)
val courseid = jsonObject.getIntValue("courseid")
val majorid = jsonObject.getIntValue("majorid")
val coursename = jsonObject.getString("coursename")
val coursechapter = jsonObject.getString("coursechapter")
val sequence = jsonObject.getString("sequnece")
val isadvc = jsonObject.getString("isadvc")
val creator = jsonObject.getString("creator")
val createtime = jsonObject.getString("createtime")
val status = jsonObject.getString("status")
val chapterlistid = jsonObject.getIntValue("chapterlistid")
val pointlistid = jsonObject.getIntValue("pointlistid")
val dt = jsonObject.getString("dt")
val dn = jsonObject.getString("dn")
(courseid, majorid, coursename, coursechapter, sequence, isadvc, creator, createtime, status
, chapterlistid, pointlistid, dt, dn)
})
}).toDF().coalesce(1).write.mode(SaveMode.Append).insertInto("dwd.dwd_qz_course")
}
/**
* 解析课程辅导数据
*
* @param ssc
* @param sparkSession
*/
def etlQzCourseEdusubject(ssc: SparkContext, sparkSession: SparkSession) = {
import sparkSession.implicits._
ssc.textFile("hdfs://cdh1.macro.com:8020/user/catelf/ods/QzCourseEduSubject.log").filter(item => {
val obj = ParseJsonData.getJsonData(item)
obj.isInstanceOf[JSONObject]
}).mapPartitions(partitions => {
partitions.map(item => {
val jsonObject = ParseJsonData.getJsonData(item)
val courseeduid = jsonObject.getIntValue("courseeduid")
val edusubjectid = jsonObject.getIntValue("edusubjectid")
val courseid = jsonObject.getIntValue("courseid")
val creator = jsonObject.getString("creator")
val createtime = jsonObject.getString("createtime")
val majorid = jsonObject.getIntValue("majorid")
val dt = jsonObject.getString("dt")
val dn = jsonObject.getString("dn")
(courseeduid, edusubjectid, courseid, creator, createtime, majorid, dt, dn)
})
}).toDF().coalesce(1).write.mode(SaveMode.Append).insertInto("dwd.dwd_qz_course_edusubject")
}
/**
* 解析课程网站
*
* @param ssc
* @param sparkSession
*/
def etlQzWebsite(ssc: SparkContext, sparkSession: SparkSession) = {
import sparkSession.implicits._
ssc.textFile("hdfs://cdh1.macro.com:8020/user/catelf/ods/QzWebsite.log").filter(item => {
val obj = ParseJsonData.getJsonData(item)
obj.isInstanceOf[JSONObject]
}).mapPartitions(partitions => {
partitions.map(item => {
val jsonObject = ParseJsonData.getJsonData(item)
val siteid = jsonObject.getIntValue("siteid")
val sitename = jsonObject.getString("sitename")
val domain = jsonObject.getString("domain")
val sequence = jsonObject.getString("sequence")
val multicastserver = jsonObject.getString("multicastserver")
val templateserver = jsonObject.getString("templateserver")
val status = jsonObject.getString("status")
val creator = jsonObject.getString("creator")
val createtime = jsonObject.getString("createtime")
val multicastgateway = jsonObject.getString("multicastgateway")
val multicastport = jsonObject.getString("multicastport")
val dt = jsonObject.getString("dt")
val dn = jsonObject.getString("dn")
(siteid, sitename, domain, sequence, multicastserver, templateserver, status, creator, createtime,
multicastgateway, multicastport, dt, dn)
})
}).toDF().coalesce(1).write.mode(SaveMode.Append).insertInto("dwd.dwd_qz_website")
}
/**
* 解析主修数据
*
* @param ssc
* @param sparkSession
*/
def etlQzMajor(ssc: SparkContext, sparkSession: SparkSession) = {
import sparkSession.implicits._
ssc.textFile("hdfs://cdh1.macro.com:8020/user/catelf/ods/QzMajor.log").filter(item => {
val obj = ParseJsonData.getJsonData(item)
obj.isInstanceOf[JSONObject]
}).mapPartitions(partitions => {
partitions.map(item => {
val jsonObject = ParseJsonData.getJsonData(item)
val majorid = jsonObject.getIntValue("majorid")
val businessid = jsonObject.getIntValue("businessid")
val siteid = jsonObject.getIntValue("siteid")
val majorname = jsonObject.getString("majorname")
val shortname = jsonObject.getString("shortname")
val status = jsonObject.getString("status")
val sequence = jsonObject.getString("sequence")
val creator = jsonObject.getString("creator")
val createtime = jsonObject.getString("createtime")
val columm_sitetype = jsonObject.getString("columm_sitetype")
val dt = jsonObject.getString("dt")
val dn = jsonObject.getString("dn")
(majorid, businessid, siteid, majorname, shortname, status, sequence, creator, createtime, columm_sitetype, dt, dn)
})
}).toDF().coalesce(1).write.mode(SaveMode.Append).insertInto("dwd.dwd_qz_major")
}
/**
* 解析做题业务
*
* @param ssc
* @param sparkSession
*/
def etlQzBusiness(ssc: SparkContext, sparkSession: SparkSession) = {
import sparkSession.implicits._
ssc.textFile("hdfs://cdh1.macro.com:8020/user/catelf/ods/QzBusiness.log").filter(item => {
val obj = ParseJsonData.getJsonData(item)
obj.isInstanceOf[JSONObject]
}).mapPartitions(partitions => {
partitions.map(item => {
val jsonObject = ParseJsonData.getJsonData(item);
val businessid = jsonObject.getIntValue("businessid")
val businessname = jsonObject.getString("businessname")
val sequence = jsonObject.getString("sequence")
val status = jsonObject.getString("status")
val creator = jsonObject.getString("creator")
val createtime = jsonObject.getString("createtime")
val siteid = jsonObject.getIntValue("siteid")
val dt = jsonObject.getString("dt")
val dn = jsonObject.getString("dn")
(businessid, businessname, sequence, status, creator, createtime, siteid, dt, dn)
})
}).toDF().coalesce(1).write.mode(SaveMode.Append).insertInto("dwd.dwd_qz_business")
}
def etlQzPaperView(ssc: SparkContext, sparkSession: SparkSession) = {
import sparkSession.implicits._
ssc.textFile("hdfs://cdh1.macro.com:8020/user/catelf/ods/QzPaperView.log").filter(item => {
val obj = ParseJsonData.getJsonData(item)
obj.isInstanceOf[JSONObject]
}).mapPartitions(partitions => {
partitions.map(item => {
val jsonObject = ParseJsonData.getJsonData(item)
val paperviewid = jsonObject.getIntValue("paperviewid")
val paperid = jsonObject.getIntValue("paperid")
val paperviewname = jsonObject.getString("paperviewname")
val paperparam = jsonObject.getString("paperparam")
val openstatus = jsonObject.getString("openstatus")
val explainurl = jsonObject.getString("explainurl")
val iscontest = jsonObject.getString("iscontest")
val contesttime = jsonObject.getString("contesttime")
val conteststarttime = jsonObject.getString("conteststarttime")
val contestendtime = jsonObject.getString("contestendtime")
val contesttimelimit = jsonObject.getString("contesttimelimit")
val dayiid = jsonObject.getIntValue("dayiid")
val status = jsonObject.getString("status")
val creator = jsonObject.getString("creator")
val createtime = jsonObject.getString("createtime")
val paperviewcatid = jsonObject.getIntValue("paperviewcatid")
val modifystatus = jsonObject.getString("modifystatus")
val description = jsonObject.getString("description")
val papertype = jsonObject.getString("papertype")
val downurl = jsonObject.getString("downurl")
val paperuse = jsonObject.getString("paperuse")
val paperdifficult = jsonObject.getString("paperdifficult")
val testreport = jsonObject.getString("testreport")
val paperuseshow = jsonObject.getString("paperuseshow")
val dt = jsonObject.getString("dt")
val dn = jsonObject.getString("dn")
DwdQzPaperView(paperviewid, paperid, paperviewname, paperparam, openstatus, explainurl, iscontest, contesttime,
conteststarttime, contestendtime, contesttimelimit, dayiid, status, creator, createtime, paperviewcatid, modifystatus,
description, papertype, downurl, paperuse, paperdifficult, testreport, paperuseshow, dt, dn)
})
}).toDF().coalesce(1).write.mode(SaveMode.Append).insertInto("dwd.dwd_qz_paper_view")
}
def etlQzCenterPaper(ssc: SparkContext, sparkSession: SparkSession) = {
import sparkSession.implicits._
ssc.textFile("hdfs://cdh1.macro.com:8020/user/catelf/ods/QzCenterPaper.log").filter(item => {
val obj = ParseJsonData.getJsonData(item)
obj.isInstanceOf[JSONObject]
}).mapPartitions(partitions => {
partitions.map(item => {
val jsonObject = ParseJsonData.getJsonData(item)
val paperviewid = jsonObject.getIntValue("paperviewid")
val centerid = jsonObject.getIntValue("centerid")
val openstatus = jsonObject.getString("openstatus")
val sequence = jsonObject.getString("sequence")
val creator = jsonObject.getString("creator")
val createtime = jsonObject.getString("createtime")
val dt = jsonObject.getString("dt")
val dn = jsonObject.getString("dn")
(paperviewid, centerid, openstatus, sequence, creator, createtime, dt, dn)
})
}).toDF().coalesce(1).write.mode(SaveMode.Append).insertInto("dwd.dwd_qz_center_paper")
}
def etlQzPaper(ssc: SparkContext, sparkSession: SparkSession) = {
import sparkSession.implicits._
ssc.textFile("hdfs://cdh1.macro.com:8020/user/catelf/ods/QzPaper.log").filter(item => {
val obj = ParseJsonData.getJsonData(item)
obj.isInstanceOf[JSONObject]
}).mapPartitions(partitions => {
partitions.map(item => {
val jsonObject = ParseJsonData.getJsonData(item)
val paperid = jsonObject.getIntValue("paperid")
val papercatid = jsonObject.getIntValue("papercatid")
val courseid = jsonObject.getIntValue("courseid")
val paperyear = jsonObject.getString("paperyear")
val chapter = jsonObject.getString("chapter")
val suitnum = jsonObject.getString("suitnum")
val papername = jsonObject.getString("papername")
val status = jsonObject.getString("status")
val creator = jsonObject.getString("creator")
val craetetime = jsonObject.getString("createtime")
val totalscore = BigDecimal.apply(jsonObject.getString("totalscore")).setScale(1, BigDecimal.RoundingMode.HALF_UP)
val chapterid = jsonObject.getIntValue("chapterid")
val chapterlistid = jsonObject.getIntValue("chapterlistid")
val dt = jsonObject.getString("dt")
val dn = jsonObject.getString("dn")
(paperid, papercatid, courseid, paperyear, chapter, suitnum, papername, status, creator, craetetime, totalscore, chapterid,
chapterlistid, dt, dn)
})
}).toDF().coalesce(1).write.mode(SaveMode.Append).insertInto("dwd.dwd_qz_paper")
}
def etlQzCenter(ssc: SparkContext, sparkSession: SparkSession) = {
import sparkSession.implicits._
ssc.textFile("hdfs://cdh1.macro.com:8020/user/catelf/ods/QzCenter.log").filter(item => {
val obj = ParseJsonData.getJsonData(item)
obj.isInstanceOf[JSONObject]
}).mapPartitions(parititons => {
parititons.map(item => {
val jsonObject = ParseJsonData.getJsonData(item)
val centerid = jsonObject.getIntValue("centerid")
val centername = jsonObject.getString("centername")
val centeryear = jsonObject.getString("centeryear")
val centertype = jsonObject.getString("centertype")
val openstatus = jsonObject.getString("openstatus")
val centerparam = jsonObject.getString("centerparam")
val description = jsonObject.getString("description")
val creator = jsonObject.getString("creator")
val createtime = jsonObject.getString("createtime")
val sequence = jsonObject.getString("sequence")
val provideuser = jsonObject.getString("provideuser")
val centerviewtype = jsonObject.getString("centerviewtype")
val stage = jsonObject.getString("stage")
val dt = jsonObject.getString("dt")
val dn = jsonObject.getString("dn")
(centerid, centername, centeryear, centertype, openstatus, centerparam, description, creator, createtime,
sequence, provideuser, centerviewtype, stage, dt, dn)
})
}).toDF().coalesce(1).write.mode(SaveMode.Append).insertInto("dwd.dwd_qz_center")
}
def etlQzQuestion(ssc: SparkContext, sparkSession: SparkSession) = {
import sparkSession.implicits._
ssc.textFile("hdfs://cdh1.macro.com:8020/user/catelf/ods/QzQuestion.log").filter(item => {
val obj = ParseJsonData.getJsonData(item)
obj.isInstanceOf[JSONObject]
}).mapPartitions(partitions => {
partitions.map(item => {
val jsonObject = ParseJsonData.getJsonData(item)
val questionid = jsonObject.getIntValue("questionid")
val parentid = jsonObject.getIntValue("parentid")
val questypeid = jsonObject.getIntValue("questypeid")
val quesviewtype = jsonObject.getIntValue("quesviewtype")
val content = jsonObject.getString("content")
val answer = jsonObject.getString("answer")
val analysis = jsonObject.getString("analysis")
val limitminute = jsonObject.getString("limitminute")
val score = BigDecimal.apply(jsonObject.getDoubleValue("score")).setScale(1, BigDecimal.RoundingMode.HALF_UP)
val splitscore = BigDecimal.apply(jsonObject.getDoubleValue("splitscore")).setScale(1, BigDecimal.RoundingMode.HALF_UP)
val status = jsonObject.getString("status")
val optnum = jsonObject.getIntValue("optnum")
val lecture = jsonObject.getString("lecture")
val creator = jsonObject.getString("creator")
val createtime = jsonObject.getString("createtime")
val modifystatus = jsonObject.getString("modifystatus")
val attanswer = jsonObject.getString("attanswer")
val questag = jsonObject.getString("questag")
val vanalysisaddr = jsonObject.getString("vanalysisaddr")
val difficulty = jsonObject.getString("difficulty")
val quesskill = jsonObject.getString("quesskill")
val vdeoaddr = jsonObject.getString("vdeoaddr")
val dt = jsonObject.getString("dt")
val dn = jsonObject.getString("dn")
DwdQzQuestion(questionid, parentid, questypeid, quesviewtype, content, answer, analysis, limitminute, score, splitscore,
status, optnum, lecture, creator, createtime, modifystatus, attanswer, questag, vanalysisaddr, difficulty, quesskill,
vdeoaddr, dt, dn)
})
}).toDF().coalesce(1).write.mode(SaveMode.Append).insertInto("dwd.dwd_qz_question")
}
def etlQzQuestionType(ssc: SparkContext, sparkSession: SparkSession) = {
import sparkSession.implicits._
ssc.textFile("hdfs://cdh1.macro.com:8020/user/catelf/ods/QzQuestionType.log").filter(item => {
val obj = ParseJsonData.getJsonData(item)
obj.isInstanceOf[JSONObject]
}).mapPartitions(partitions => {
partitions.map(item => {
val jsonObject = ParseJsonData.getJsonData(item)
val quesviewtype = jsonObject.getIntValue("quesviewtype")
val viewtypename = jsonObject.getString("viewtypename")
val questiontypeid = jsonObject.getIntValue("questypeid")
val description = jsonObject.getString("description")
val status = jsonObject.getString("status")
val creator = jsonObject.getString("creator")
val createtime = jsonObject.getString("createtime")
val papertypename = jsonObject.getString("papertypename")
val sequence = jsonObject.getString("sequence")
val remark = jsonObject.getString("remark")
val splitscoretype = jsonObject.getString("splitscoretype")
val dt = jsonObject.getString("dt")
val dn = jsonObject.getString("dn")
(quesviewtype, viewtypename, questiontypeid, description, status, creator, createtime, papertypename, sequence,
remark, splitscoretype, dt, dn)
})
}).toDF().coalesce(1).write.mode(SaveMode.Append).insertInto("dwd.dwd_qz_question_type")
}
/**
* 解析用户做题情况数据
*
* @param ssc
* @param sparkSession
*/
def etlQzMemberPaperQuestion(ssc: SparkContext, sparkSession: SparkSession) = {
import sparkSession.implicits._
ssc.textFile("hdfs://cdh1.macro.com:8020/user/catelf/ods/QzMemberPaperQuestion.log").filter(item => {
val obj = ParseJsonData.getJsonData(item)
obj.isInstanceOf[JSONObject]
}).mapPartitions(partitions => {
partitions.map(item => {
val jsonObject = ParseJsonData.getJsonData(item)
val userid = jsonObject.getIntValue("userid")
val paperviewid = jsonObject.getIntValue("paperviewid")
val chapterid = jsonObject.getIntValue("chapterid")
val sitecourseid = jsonObject.getIntValue("sitecourseid")
val questionid = jsonObject.getIntValue("questionid")
val majorid = jsonObject.getIntValue("majorid")
val useranswer = jsonObject.getString("useranswer")
val istrue = jsonObject.getString("istrue")
val lasttime = jsonObject.getString("lasttime")
val opertype = jsonObject.getString("opertype")
val paperid = jsonObject.getIntValue("paperid")
val spendtime = jsonObject.getIntValue("spendtime")
v al score = BigDecimal.apply(jsonObject.getString("score")).setScale(1, BigDecimal.RoundingMode.HALF_UP)
val question_answer = jsonObject.getIntValue("question_answer")
val dt = jsonObject.getString("dt")
val dn = jsonObject.getString("dn")
(userid, paperviewid, chapterid, sitecourseid, questionid, majorid, useranswer, istrue, lasttime, opertype, paperid, spendtime, score,question_answer, dt, dn)
})
}).toDF().coalesce(1).write.mode(SaveMode.Append).insertInto("dwd.dwd_qz_member_paper_question")
}
}
创建DwdController
import com.catelf.qz.service.EtlDataService
import com.catelf.util.HiveUtil
import org.apache.spark.SparkConf
import org.apache.spark.sql.SparkSession
/**
* 解析做题数据导入dwd层
*/
object DwdController {
def main(args: Array[String]): Unit = {
System.setProperty("HADOOP_USER_NAME", "hdfs")
val sparkConf = new SparkConf().setAppName("dwd_qz_controller").setMaster("local[*]")
val sparkSession = SparkSession.builder().config(sparkConf).enableHiveSupport().getOrCreate()
val ssc = sparkSession.sparkContext
HiveUtil.openDynamicPartition(sparkSession) //开启动态分区
HiveUtil.openCompression(sparkSession) //开启压缩
HiveUtil.useSnappyCompression(sparkSession) //使用snappy压缩
EtlDataService.etlQzChapter(ssc, sparkSession)
EtlDataService.etlQzChapterList(ssc, sparkSession)
EtlDataService.etlQzPoint(ssc, sparkSession)
EtlDataService.etlQzPointQuestion(ssc, sparkSession)
EtlDataService.etlQzSiteCourse(ssc, sparkSession)
EtlDataService.etlQzCourse(ssc, sparkSession)
EtlDataService.etlQzCourseEdusubject(ssc, sparkSession)
EtlDataService.etlQzWebsite(ssc, sparkSession)
EtlDataService.etlQzMajor(ssc, sparkSession)
EtlDataService.etlQzBusiness(ssc, sparkSession)
EtlDataService.etlQzPaperView(ssc, sparkSession)
EtlDataService.etlQzCenterPaper(ssc, sparkSession)
EtlDataService.etlQzPaper(ssc, sparkSession)
EtlDataService.etlQzCenter(ssc, sparkSession)
EtlDataService.etlQzQuestion(ssc, sparkSession)
EtlDataService.etlQzQuestionType(ssc, sparkSession)
EtlDataService.etlQzMemberPaperQuestion(ssc, sparkSession)
}
}
运行该主类,可以在hive中得到解析后的dwd表
import org.apache.spark.sql.SparkSession
object QzChapterDao {
/**
* 查询qz_chapter基础数据
*
* @param sparkSession
* @return
*/
def getDwdQzChapter(sparkSession: SparkSession, dt: String) = {
sparkSession.sql("select chapterid,chapterlistid,chaptername,sequence,showstatus,creator as " +
"chapter_creator,createtime as chapter_createtime,courseid as chapter_courseid,chapternum,outchapterid,dt,dn from dwd.dwd_qz_chapter where " +
s"dt='$dt'")
}
/**
* 查询qz_chapter_list基础数据
*
* @param sparkSession
* @param dt
*/
def getDwdQzChapterList(sparkSession: SparkSession, dt: String) = {
sparkSession.sql("select chapterlistid,chapterlistname,chapterallnum,dn from dwd.dwd_qz_chapter_list " +
s"where dt='$dt'")
}
/**
* 查询qz_point基础数据
*
* @param sparkSession
* @param dt
*/
def getDwdQzPoint(sparkSession: SparkSession, dt: String) = {
sparkSession.sql("select pointid,pointname,pointyear,chapter,excisenum,pointlistid,chapterid," +
"pointdescribe,pointlevel,typelist,score as point_score,thought,remid,pointnamelist,typelistids,pointlist,dn from " +
s"dwd.dwd_qz_point where dt='$dt'")
}
/**
* 查询qz_point_question基础数据
*
* @param sparkSession
* @param dt
*/
def getDwdQzPointQuestion(sparkSession: SparkSession, dt: String) = {
sparkSession.sql(s"select pointid,questionid,questype,dn from dwd.dwd_qz_point_question where dt='$dt'")
}
import org.apache.spark.sql.SparkSession
object QzCourseDao {
def getDwdQzSiteCourse(sparkSession: SparkSession, dt: String) = {
sparkSession.sql("select sitecourseid,siteid,courseid,sitecoursename,coursechapter,sequence,status," +
"creator as sitecourse_creator,createtime as sitecourse_createtime,helppaperstatus,servertype,boardid,showstatus,dt,dn " +
s"from dwd.dwd_qz_site_course where dt='${dt}'")
}
def getDwdQzCourse(sparkSession: SparkSession, dt: String) = {
sparkSession.sql("select courseid,majorid,coursename,isadvc,chapterlistid,pointlistid,dn from " +
s"dwd.dwd_qz_course where dt='${dt}'")
}
def getDwdQzCourseEduSubject(sparkSession: SparkSession, dt: String) = {
sparkSession.sql("select courseeduid,edusubjectid,courseid,dn from dwd.dwd_qz_course_edusubject " +
s"where dt='${dt}'")
}
}
import org.apache.spark.sql.SparkSession
object QzMajorDao {
def getQzMajor(sparkSession: SparkSession, dt: String) = {
sparkSession.sql("select majorid,businessid,siteid,majorname,shortname,status,sequence,creator as major_creator," +
s"createtime as major_createtime,dt,dn from dwd.dwd_qz_major where dt='$dt'")
}
def getQzWebsite(sparkSession: SparkSession, dt: String) = {
sparkSession.sql("select siteid,sitename,domain,multicastserver,templateserver,creator," +
s"createtime,multicastgateway,multicastport,dn from dwd.dwd_qz_website where dt='$dt'")
}
def getQzBusiness(sparkSession: SparkSession, dt: String) = {
sparkSession.sql(s"select businessid,businessname,dn from dwd.dwd_qz_business where dt='$dt'")
}
}
import org.apache.spark.sql.SparkSession
object QzPaperDao {
def getDwdQzPaperView(sparkSession: SparkSession, dt: String) = {
sparkSession.sql("select paperviewid,paperid,paperviewname,paperparam,openstatus,explainurl,iscontest," +
"contesttime,conteststarttime,contestendtime,contesttimelimit,dayiid,status,creator as paper_view_creator," +
"createtime as paper_view_createtime,paperviewcatid,modifystatus,description,papertype,downurl,paperuse," +
s"paperdifficult,testreport,paperuseshow,dt,dn from dwd.dwd_qz_paper_view where dt='$dt'")
}
def getDwdQzCenterPaper(sparkSession: SparkSession, dt: String) = {
sparkSession.sql(s"select paperviewid,sequence,centerid,dn from dwd.dwd_qz_center_paper where dt='$dt'")
}
def getDwdQzPaper(sparkSession: SparkSession, dt: String) = {
sparkSession.sql("select paperid,papercatid,courseid,paperyear,chapter,suitnum,papername,totalscore,chapterid," +
s"chapterlistid,dn from dwd.dwd_qz_paper where dt='$dt'")
}
def getDwdQzCenter(sparkSession: SparkSession, dt: String) = {
sparkSession.sql("select centerid,centername,centeryear,centertype,centerparam,provideuser," +
s"centerviewtype,stage,dn from dwd.dwd_qz_center where dt='$dt'")
}
}
import org.apache.spark.sql.SparkSession
object QzQuestionDao {
def getQzQuestion(sparkSession: SparkSession, dt: String) = {
sparkSession.sql("select questionid,parentid,questypeid,quesviewtype,content,answer,analysis,limitminute," +
"score,splitscore,status,optnum,lecture,creator,createtime,modifystatus,attanswer,questag,vanalysisaddr,difficulty," +
s"quesskill,vdeoaddr,dt,dn from dwd.dwd_qz_question where dt='$dt'")
}
def getQzQuestionType(sparkSession: SparkSession, dt: String) = {
sparkSession.sql("select questypeid,viewtypename,description,papertypename,remark,splitscoretype,dn from " +
s"dwd.dwd_qz_question_type where dt='$dt'")
}
}
import org.apache.spark.sql.SparkSession
object UserPaperDetailDao {
def getDwdQzMemberPaperQuestion(sparkSession: SparkSession, dt: String) = {
sparkSession.sql("select userid,paperviewid,chapterid,sitecourseid,questionid,majorid,useranswer,istrue,lasttime,opertype," +
s"paperid,spendtime,score,question_answer,dt,dn from dwd.dwd_qz_member_paper_question where dt='$dt'")
}
def getDwsQzChapter(sparkSession: SparkSession, dt: String) = {
sparkSession.sql("select chapterid,chapterlistid,chaptername,sequence as chapter_sequence,status as chapter_status," +
"chapter_courseid,chapternum,chapterallnum,outchapterid,chapterlistname,pointid,questype,pointname,pointyear" +
",chapter,excisenum,pointlistid,pointdescribe,pointlevel,typelist,point_score,thought,remid,pointnamelist," +
s"typelistids,pointlist,dn from dws.dws_qz_chapter where dt='$dt'")
}
def getDwsQzCourse(sparkSession: SparkSession, dt: String) = {
sparkSession.sql("select sitecourseid,siteid,courseid,sitecoursename,coursechapter,sequence as course_sequence," +
"status as course_status,sitecourse_creator,sitecourse_createtime,helppaperstatus,servertype,boardid,showstatus,majorid," +
s"coursename,isadvc,chapterlistid,pointlistid,courseeduid,edusubjectid,dn from dws.dws_qz_course where dt='$dt'")
}
def getDwsQzMajor(sparkSession: SparkSession, dt: String) = {
sparkSession.sql("select majorid,businessid,majorname,shortname,status as major_status,sequence as major_sequence," +
"major_creator,major_createtime,businessname,sitename,domain,multicastserver,templateserver,multicastgateway,multicastport," +
s"dn from dws.dws_qz_major where dt=$dt")
}
def getDwsQzPaper(sparkSession: SparkSession, dt: String) = {
sparkSession.sql("select paperviewid,paperid,paperviewname,paperparam,openstatus,explainurl,iscontest,contesttime," +
"conteststarttime,contestendtime,contesttimelimit,dayiid,status as paper_status,paper_view_creator,paper_view_createtime," +
"paperviewcatid,modifystatus,description,paperuse,testreport,centerid,sequence as paper_sequence,centername,centeryear," +
"centertype,provideuser,centerviewtype,stage as paper_stage,papercatid,courseid,paperyear,suitnum,papername,totalscore,dn" +
s" from dws.dws_qz_paper where dt=$dt")
}
def getDwsQzQuestion(sparkSession: SparkSession, dt: String) = {
sparkSession.sql("select questionid,parentid as question_parentid,questypeid,quesviewtype,content as question_content," +
"answer as question_answer,analysis as question_analysis,limitminute as question_limitminute,score as question_score," +
"splitscore,lecture,creator as question_creator,createtime as question_createtime,modifystatus as question_modifystatus," +
"attanswer as question_attanswer,questag as question_questag,vanalysisaddr as question_vanalysisaddr,difficulty as question_difficulty," +
"quesskill,vdeoaddr,description as question_description,splitscoretype as question_splitscoretype,dn " +
s" from dws.dws_qz_question where dt=$dt")
}
}
需求2:基于dwd层基础表数据,需要对表进行维度退化进行表聚合,聚合成dws.dws_qz_chapter(章节维度表),dws.dws_qz_course(课程维度表),dws.dws_qz_major(主修维度表),dws.dws_qz_paper(试卷维度表),dws.dws_qz_question(题目维度表),使用spark sql和dataframe api操作
dws.dws_qz_chapte : 4张表join dwd.dwd_qz_chapter inner join dwd.qz_chapter_list join条件:chapterlistid和dn ,inner join dwd.dwd_qz_point join条件:chapterid和dn, inner join dwd.dwd_qz_point_question join条件:pointid和dn
dws.dws_qz_course:3张表join dwd.dwd_qz_site_course inner join dwd.qz_course join条件:courseid和dn , inner join dwd.qz_course_edusubject join条件:courseid和dn
dws.dws_qz_major:3张表join dwd.dwd_qz_major inner join dwd.dwd_qz_website join条件:siteid和dn , inner join dwd.dwd_qz_business join条件:siteid和dn
dws.dws_qz_paper: 4张表join qz_paperview left join qz_center join 条件:paperviewid和dn,
left join qz_center join 条件:centerid和dn, inner join qz_paper join条件:paperid和dn
dws.dws_qz_paper: 4张表join qz_paperview left join qz_center join 条件:paperviewid和dn,
left join qz_center join 条件:centerid和dn, inner join qz_paper join条件:paperid和dn
需求3:基于dws.dws_qz_chapter、dws.dws_qz_course、dws.dws_qz_major、dws.dws_qz_paper、dws.dws_qz_question、dwd.dwd_qz_member_paper_question 合成宽表dw.user_paper_detail,使用spark sql和dataframe api操作
dws.user_paper_detail:dwd_qz_member_paper_question inner join dws_qz_chapter join条件:chapterid 和dn ,inner join dws_qz_course join条件:sitecourseid和dn , inner join dws_qz_major join条件majorid和dn, inner join dws_qz_paper 条件paperviewid和dn , inner join dws_qz_question 条件questionid和dn
import com.catelf.qz.dao.{QzChapterDao, QzCourseDao, QzMajorDao, QzPaperDao, QzQuestionDao, UserPaperDetailDao}
import org.apache.spark.sql.{SaveMode, SparkSession}
object DwsQzService {
def saveDwsQzChapter(sparkSession: SparkSession, dt: String) = {
val dwdQzChapter = QzChapterDao.getDwdQzChapter(sparkSession, dt)
val dwdQzChapterlist = QzChapterDao.getDwdQzChapterList(sparkSession, dt)
val dwdQzPoint = QzChapterDao.getDwdQzPoint(sparkSession, dt)
val dwdQzPointQuestion = QzChapterDao.getDwdQzPointQuestion(sparkSession, dt)
val result = dwdQzChapter.join(dwdQzChapterlist, Seq("chapterlistid", "dn"))
.join(dwdQzPoint, Seq("chapterid", "dn"))
.join(dwdQzPointQuestion, Seq("pointid", "dn"))
result.select("chapterid", "chapterlistid", "chaptername", "sequence", "showstatus", "status",
"chapter_creator", "chapter_createtime", "chapter_courseid", "chapternum", "chapterallnum", "outchapterid", "chapterlistname",
"pointid", "questionid", "questype", "pointname", "pointyear", "chapter", "excisenum", "pointlistid", "pointdescribe",
"pointlevel", "typelist", "point_score", "thought", "remid", "pointnamelist", "typelistids", "pointlist", "dt", "dn")
.coalesce(1).write.mode(SaveMode.Append).insertInto("dws.dws_qz_chapter")
}
def saveDwsQzCourse(sparkSession: SparkSession, dt: String) = {
val dwdQzSiteCourse = QzCourseDao.getDwdQzSiteCourse(sparkSession, dt)
val dwdQzCourse = QzCourseDao.getDwdQzCourse(sparkSession, dt)
val dwdQzCourseEdusubject = QzCourseDao.getDwdQzCourseEduSubject(sparkSession, dt)
val result = dwdQzSiteCourse.join(dwdQzCourse, Seq("courseid", "dn"))
.join(dwdQzCourseEdusubject, Seq("courseid", "dn"))
.select("sitecourseid", "siteid", "courseid", "sitecoursename", "coursechapter",
"sequence", "status", "sitecourse_creator", "sitecourse_createtime", "helppaperstatus", "servertype", "boardid",
"showstatus", "majorid", "coursename", "isadvc", "chapterlistid", "pointlistid", "courseeduid", "edusubjectid"
, "dt", "dn")
result.coalesce(1).write.mode(SaveMode.Append).insertInto("dws.dws_qz_course")
}
def saveDwsQzMajor(sparkSession: SparkSession, dt: String) = {
val dwdQzMajor = QzMajorDao.getQzMajor(sparkSession, dt)
val dwdQzWebsite = QzMajorDao.getQzWebsite(sparkSession, dt)
val dwdQzBusiness = QzMajorDao.getQzBusiness(sparkSession, dt)
val result = dwdQzMajor.join(dwdQzWebsite, Seq("siteid", "dn"))
.join(dwdQzBusiness, Seq("businessid", "dn"))
.select("majorid", "businessid", "siteid", "majorname", "shortname", "status", "sequence",
"major_creator", "major_createtime", "businessname", "sitename", "domain", "multicastserver", "templateserver",
"multicastgateway", "multicastport", "dt", "dn")
result.coalesce(1).write.mode(SaveMode.Append).insertInto("dws.dws_qz_major")
}
def saveDwsQzPaper(sparkSession: SparkSession, dt: String) = {
val dwdQzPaperView = QzPaperDao.getDwdQzPaperView(sparkSession, dt)
val dwdQzCenterPaper = QzPaperDao.getDwdQzCenterPaper(sparkSession, dt)
val dwdQzCenter = QzPaperDao.getDwdQzCenter(sparkSession, dt)
val dwdQzPaper = QzPaperDao.getDwdQzPaper(sparkSession, dt)
val result = dwdQzPaperView.join(dwdQzCenterPaper, Seq("paperviewid", "dn"), "left")
.join(dwdQzCenter, Seq("centerid", "dn"), "left")
.join(dwdQzPaper, Seq("paperid", "dn"))
.select("paperviewid", "paperid", "paperviewname", "paperparam", "openstatus", "explainurl", "iscontest"
, "contesttime", "conteststarttime", "contestendtime", "contesttimelimit", "dayiid", "status", "paper_view_creator",
"paper_view_createtime", "paperviewcatid", "modifystatus", "description", "paperuse", "paperdifficult", "testreport",
"paperuseshow", "centerid", "sequence", "centername", "centeryear", "centertype", "provideuser", "centerviewtype",
"stage", "papercatid", "courseid", "paperyear", "suitnum", "papername", "totalscore", "chapterid", "chapterlistid",
"dt", "dn")
result.coalesce(1).write.mode(SaveMode.Append).insertInto("dws.dws_qz_paper")
}
def saveDwsQzQuestionTpe(sparkSession: SparkSession, dt: String) = {
val dwdQzQuestion = QzQuestionDao.getQzQuestion(sparkSession, dt)
val dwdQzQuestionType = QzQuestionDao.getQzQuestionType(sparkSession, dt)
val result = dwdQzQuestion.join(dwdQzQuestionType, Seq("questypeid", "dn"))
.select("questionid", "parentid", "questypeid", "quesviewtype", "content", "answer", "analysis"
, "limitminute", "score", "splitscore", "status", "optnum", "lecture", "creator", "createtime", "modifystatus"
, "attanswer", "questag", "vanalysisaddr", "difficulty", "quesskill", "vdeoaddr", "viewtypename", "papertypename",
"remark", "splitscoretype", "dt", "dn")
result.coalesce(1).write.mode(SaveMode.Append).insertInto("dws.dws_qz_question")
}
def saveDwsUserPaperDetail(sparkSession: SparkSession, dt: String) = {
val dwdQzMemberPaperQuestion = UserPaperDetailDao.getDwdQzMemberPaperQuestion(sparkSession, dt).drop("paperid")
.withColumnRenamed("question_answer", "user_question_answer")
val dwsQzChapter = UserPaperDetailDao.getDwsQzChapter(sparkSession, dt).drop("courseid")
val dwsQzCourse = UserPaperDetailDao.getDwsQzCourse(sparkSession, dt).withColumnRenamed("sitecourse_creator", "course_creator")
.withColumnRenamed("sitecourse_createtime", "course_createtime").drop("majorid")
.drop("chapterlistid").drop("pointlistid")
val dwsQzMajor = UserPaperDetailDao.getDwsQzMajor(sparkSession, dt)
val dwsQzPaper = UserPaperDetailDao.getDwsQzPaper(sparkSession, dt).drop("courseid")
val dwsQzQuestion = UserPaperDetailDao.getDwsQzQuestion(sparkSession, dt)
dwdQzMemberPaperQuestion.join(dwsQzCourse, Seq("sitecourseid", "dn")).
join(dwsQzChapter, Seq("chapterid", "dn")).join(dwsQzMajor, Seq("majorid", "dn"))
.join(dwsQzPaper, Seq("paperviewid", "dn")).join(dwsQzQuestion, Seq("questionid", "dn"))
.select("userid", "courseid", "questionid", "useranswer", "istrue", "lasttime", "opertype",
"paperid", "spendtime", "chapterid", "chaptername", "chapternum",
"chapterallnum", "outchapterid", "chapterlistname", "pointid", "questype", "pointyear", "chapter", "pointname"
, "excisenum", "pointdescribe", "pointlevel", "typelist", "point_score", "thought", "remid", "pointnamelist",
"typelistids", "pointlist", "sitecourseid", "siteid", "sitecoursename", "coursechapter", "course_sequence", "course_status"
, "course_creator", "course_createtime", "servertype", "helppaperstatus", "boardid", "showstatus", "majorid", "coursename",
"isadvc", "chapterlistid", "pointlistid", "courseeduid", "edusubjectid", "businessid", "majorname", "shortname",
"major_status", "major_sequence", "major_creator", "major_createtime", "businessname", "sitename",
"domain", "multicastserver", "templateserver", "multicastgateway", "multicastport", "paperviewid", "paperviewname", "paperparam",
"openstatus", "explainurl", "iscontest", "contesttime", "conteststarttime", "contestendtime", "contesttimelimit",
"dayiid", "paper_status", "paper_view_creator", "paper_view_createtime", "paperviewcatid", "modifystatus", "description", "paperuse",
"testreport", "centerid", "paper_sequence", "centername", "centeryear", "centertype", "provideuser", "centerviewtype",
"paper_stage", "papercatid", "paperyear", "suitnum", "papername", "totalscore", "question_parentid", "questypeid",
"quesviewtype", "question_content", "question_answer", "question_analysis", "question_limitminute", "score",
"splitscore", "lecture", "question_creator", "question_createtime", "question_modifystatus", "question_attanswer",
"question_questag", "question_vanalysisaddr", "question_difficulty", "quesskill", "vdeoaddr", "question_description",
"question_splitscoretype", "user_question_answer", "dt", "dn").coalesce(1)
.write.mode(SaveMode.Append).insertInto("dws.dws_user_paper_detail")
}
import com.catelf.qz.service.DwsQzService
import com.catelf.util.HiveUtil
import org.apache.spark.SparkConf
import org.apache.spark.sql.SparkSession
object DwsController {
def main(args: Array[String]): Unit = {
System.setProperty("HADOOP_USER_NAME", "hdfs")
val sparkConf = new SparkConf().setAppName("dws_qz_controller").setMaster("local[*]")
val sparkSession = SparkSession.builder().config(sparkConf).enableHiveSupport().getOrCreate()
val ssc = sparkSession.sparkContext
HiveUtil.openDynamicPartition(sparkSession) //开启动态分区
HiveUtil.openCompression(sparkSession) //开启压缩
HiveUtil.useSnappyCompression(sparkSession) //使用snappy压缩
val dt = "20190722"
DwsQzService.saveDwsQzChapter(sparkSession, dt)
DwsQzService.saveDwsQzCourse(sparkSession, dt)
DwsQzService.saveDwsQzMajor(sparkSession, dt)
DwsQzService.saveDwsQzPaper(sparkSession, dt)
DwsQzService.saveDwsQzQuestionTpe(sparkSession, dt)
DwsQzService.saveDwsUserPaperDetail(sparkSession, dt)
}
}
可以看到已经生成了dws表数据
需求4:基于宽表统计各试卷平均耗时、平均分,先使用Spark Sql 完成指标统计,再使用Spark DataFrame Api。
需求5:统计各试卷最高分、最低分,先使用Spark Sql 完成指标统计,再使用Spark DataFrame Api。
需求6:按试卷分组统计每份试卷的前三用户详情,先使用Spark Sql 完成指标统计,再使用Spark DataFrame Api。
需求7:按试卷分组统计每份试卷的倒数前三的用户详情,先使用Spark Sql 完成指标统计,再使用Spark DataFrame Api。
需求8:统计各试卷各分段的用户id,分段有0-20,20-40,40-60,60-80,80-100
需求9:统计试卷未及格的人数,及格的人数,试卷的及格率 及格分数60
需求10:统计各题的错误数,正确数,错题率
import org.apache.spark.sql.SparkSession
object AdsQzDao {
/**
* 统计各试卷平均耗时 平均分
*
* @param sparkSession
* @param dt
* @return
*/
def getAvgSPendTimeAndScore(sparkSession: SparkSession, dt: String) = {
sparkSession.sql(s"select paperviewid,paperviewname,cast(avg(score) as decimal(4,1)) score,cast(avg(spendtime) as decimal(10,2))" +
s" spendtime,dt,dn from dws.dws_user_paper_detail where dt='$dt' group by " +
"paperviewid,paperviewname,dt,dn order by score desc,spendtime desc");
}
/**
* 统计试卷 最高分 最低分
*
* @param sparkSession
* @param dt
*/
def getTopScore(sparkSession: SparkSession, dt: String) = {
sparkSession.sql("select paperviewid,paperviewname,cast(max(score) as decimal(4,1)),cast(min(score) as decimal(4,1)) " +
s",dt,dn from dws.dws_user_paper_detail where dt=$dt group by paperviewid,paperviewname,dt,dn ")
}
/**
* 按试卷分组获取每份试卷的分数前三用户详情
*
* @param sparkSession
* @param dt
*/
def getTop3UserDetail(sparkSession: SparkSession, dt: String) = {
sparkSession.sql("select *from (select userid,paperviewname,chaptername,pointname,sitecoursename,coursename,majorname,shortname," +
"sitename,papername,score,dense_rank() over (partition by paperviewid order by score desc) as rk,dt,dn from dws.dws_user_paper_detail) " +
"where rk<4")
}
/**
* 按试卷分组获取每份试卷的分数倒数三的用户详情
*
* @param sparkSession
* @param dt
* @return
*/
def getLow3UserDetail(sparkSession: SparkSession, dt: String) = {
sparkSession.sql("select *from (select userid,paperviewname,chaptername,pointname,sitecoursename,coursename,majorname,shortname," +
s"sitename,papername,score,dense_rank() over (partition by paperviewid order by score asc) as rk,dt,dn from dws.dws_user_paper_detail where dt='$dt') where rk<4")
}
/**
* 统计各试卷 各分段学员名称
*/
def getPaperScoreSegmentUser(sparkSession: SparkSession, dt: String) = {
sparkSession.sql("select paperviewid,paperviewname,score_segment,concat_ws(',',collect_list(cast(userid as string))),dt,dn" +
" from (select paperviewid,paperviewname,userid," +
" case when score >=0 and score <=20 then '0-20'" +
" when score >20 and score <=40 then '20-40' " +
" when score >40 and score <=60 then '40-60' " +
" when score >60 and score <=80 then '60-80' " +
" when score >80 and score <=100 then '80-100' end as score_segment" +
s",dt,dn from dws.dws_user_paper_detail where dt='$dt') group by paperviewid,paperviewname,score_segment,dt,dn order by paperviewid,score_segment")
}
/**
* 统计各试卷未及格人数 及格人数 及格率
*
* @param sparkSession
* @param dt
*/
def getPaperPassDetail(sparkSession: SparkSession, dt: String) = {
sparkSession.sql("select t.*,cast(t.passcount/(t.passcount+t.countdetail) as decimal(4,2)) as rate,dt,dn" +
" from(select a.paperviewid,a.paperviewname,a.countdetail,a.dt,a.dn,b.passcount from " +
s"(select paperviewid,paperviewname,count(*) countdetail,dt,dn from dws.dws_user_paper_detail where dt='$dt' and score between 0 and 60 group by" +
s" paperviewid,paperviewname,dt,dn) a join (select paperviewid,count(*) passcount,dn from dws.dws_user_paper_detail where dt='$dt' and score >60 " +
"group by paperviewid,dn) b on a.paperviewid=b.paperviewid and a.dn=b.dn)t")
}
/**
* 统计各题 正确人数 错误人数 错题率 top3错误题数多的questionid
*
* @param sparkSession
* @param dt
*/
def getQuestionDetail(sparkSession: SparkSession, dt: String) = {
sparkSession.sql(s"select t.*,cast(t.errcount/(t.errcount+t.rightcount) as decimal(4,2))as rate" +
s" from((select questionid,count(*) errcount,dt,dn from dws.dws_user_paper_detail where dt='$dt' and user_question_answer='0' " +
s"group by questionid,dt,dn) a join(select questionid,count(*) rightcount,dt,dn from dws.dws_user_paper_detail where dt='$dt' and user_question_answer='1' " +
s"group by questionid,dt,dn) b on a.questionid=b.questionid and a.dn=b.dn)t order by errcount desc")
}
}
import com.catelf.qz.dao.AdsQzDao
import org.apache.spark.sql.expressions.Window
import org.apache.spark.sql.{SaveMode, SparkSession}
object AdsQzService {
def getTarget(sparkSession: SparkSession, dt: String) = {
val avgDetail = AdsQzDao.getAvgSPendTimeAndScore(sparkSession, dt)
val topscore = AdsQzDao.getTopScore(sparkSession, dt)
val top3UserDetail = AdsQzDao.getTop3UserDetail(sparkSession, dt)
val low3UserDetail = AdsQzDao.getLow3UserDetail(sparkSession, dt)
val paperScore = AdsQzDao.getPaperScoreSegmentUser(sparkSession, dt)
val paperPassDetail = AdsQzDao.getPaperPassDetail(sparkSession, dt)
val questionDetail = AdsQzDao.getQuestionDetail(sparkSession, dt)
}
def getTargetApi(sparkSession: SparkSession, dt: String) = {
import org.apache.spark.sql.functions._
val avgDetail = sparkSession.sql("select paperviewid,paperviewname,score,spendtime,dt,dn from dws.dws_user_paper_detail ")
.where(s"dt=${dt}").groupBy("paperviewid", "paperviewname", "dt", "dn").
agg(avg("score").cast("decimal(4,1)").as("avgscore"),
avg("spendtime").cast("decimal(10,1)").as("avgspendtime"))
.select("paperviewid", "paperviewname", "avgscore", "avgspendtime", "dt", "dn")
.coalesce(1).write.mode(SaveMode.Append).insertInto("ads.ads_paper_avgtimeandscore")
val topscore = sparkSession.sql("select paperviewid,paperviewname,score,dt,dn from dws.dws_user_paper_detail")
.where(s"dt=$dt").groupBy("paperviewid", "paperviewname", "dt", "dn")
.agg(max("score").as("maxscore"), min("score").as("minscore"))
.select("paperviewid", "paperviewname", "maxscore", "minscore", "dt", "dn")
.coalesce(1).write.mode(SaveMode.Append).insertInto("ads.ads_paper_maxdetail")
val top3UserDetail = sparkSession.sql("select *from dws.dws_user_paper_detail")
.where(s"dt=$dt").select("userid", "paperviewid", "paperviewname", "chaptername", "pointname"
, "sitecoursename", "coursename", "majorname", "shortname", "papername", "score", "dt", "dn")
.withColumn("rk", dense_rank().over(Window.partitionBy("paperviewid").orderBy(desc("score"))))
.where("rk<4")
.select("userid", "paperviewid", "paperviewname", "chaptername", "pointname", "sitecoursename"
, "coursename", "majorname", "shortname", "papername", "score", "rk", "dt", "dn")
.coalesce(1).write.mode(SaveMode.Append).insertInto("ads.ads_top3_userdetail")
val low3UserDetail = sparkSession.sql("select *from dws.dws_user_paper_detail")
.where(s"dt=$dt").select("userid", "paperviewid", "paperviewname", "chaptername", "pointname"
, "sitecoursename", "coursename", "majorname", "shortname", "papername", "score", "dt", "dn")
.withColumn("rk", dense_rank().over(Window.partitionBy("paperviewid").orderBy("score")))
.where("rk<4")
.select("userid", "paperviewid", "paperviewname", "chaptername", "pointname", "sitecoursename"
, "coursename", "majorname", "shortname", "papername", "score", "rk", "dt", "dn")
.coalesce(1).write.mode(SaveMode.Append).insertInto("ads.ads_low3_userdetail")
val paperScore = sparkSession.sql("select *from dws.dws_user_paper_detail")
.where(s"dt=$dt")
.select("paperviewid", "paperviewname", "userid", "score", "dt", "dn")
.withColumn("score_segment",
when(col("score").between(0, 20), "0-20")
.when(col("score") > 20 && col("score") <= 40, "20-40")
.when(col("score") > 40 && col("score") <= 60, "40-60")
.when(col("score") > 60 && col("score") <= 80, "60-80")
.when(col("score") > 80 && col("score") <= 100, "80-100"))
.drop("score").groupBy("paperviewid", "paperviewname", "score_segment", "dt", "dn")
.agg(concat_ws(",", collect_list(col("userid").cast("string").as("userids"))).as("userids"))
.select("paperviewid", "paperviewname", "score_segment", "userids", "dt", "dn")
.orderBy("paperviewid", "score_segment")
.coalesce(1).write.mode(SaveMode.Append).insertInto("ads.ads_paper_scoresegment_user")
val paperPassDetail = sparkSession.sql("select * from dws.dws_user_paper_detail").cache()
val unPassDetail = paperPassDetail.select("paperviewid", "paperviewname", "dn", "dt")
.where(s"dt='$dt'").where("score between 0 and 60")
.groupBy("paperviewid", "paperviewname", "dn", "dt")
.agg(count("paperviewid").as("unpasscount"))
val passDetail = paperPassDetail.select("paperviewid", "dn")
.where(s"dt='$dt'").where("score >60")
.groupBy("paperviewid", "dn")
.agg(count("paperviewid").as("passcount"))
unPassDetail.join(passDetail, Seq("paperviewid", "dn")).
withColumn("rate", (col("passcount")./(col("passcount") + col("unpasscount")))
.cast("decimal(4,2)"))
.select("paperviewid", "paperviewname", "unpasscount", "passcount", "rate", "dt", "dn")
.coalesce(1).write.mode(SaveMode.Append).insertInto("ads.ads_user_paper_detail")
paperPassDetail.unpersist()
val userQuestionDetail = sparkSession.sql("select * from dws.dws_user_paper_detail").cache()
val userQuestionError = userQuestionDetail.select("questionid", "dt", "dn", "user_question_answer")
.where(s"dt='$dt'").where("user_question_answer='0'").drop("user_question_answer")
.groupBy("questionid", "dt", "dn")
.agg(count("questionid").as("errcount"))
val userQuestionRight = userQuestionDetail.select("questionid", "dn", "user_question_answer")
.where(s"dt='$dt'").where("user_question_answer='1'").drop("user_question_answer")
.groupBy("questionid", "dn")
.agg(count("questionid").as("rightcount"))
userQuestionError.join(userQuestionRight, Seq("questionid", "dn"))
.withColumn("rate", (col("errcount") / (col("errcount") + col("rightcount"))).cast("decimal(4,2)"))
.orderBy(desc("errcount")).coalesce(1)
.select("questionid", "errcount", "rightcount", "rate", "dt", "dn")
.write.mode(SaveMode.Append).insertInto("ads.ads_user_question_detail")
}
}
import com.catelf.qz.service.AdsQzService
import com.catelf.util.HiveUtil
import org.apache.spark.SparkConf
import org.apache.spark.sql.SparkSession
object AdsController {
def main(args: Array[String]): Unit = {
System.setProperty("HADOOP_USER_NAME", "hdfs")
val sparkConf = new SparkConf().setAppName("ads_controller").setMaster("local[*]")
val sparkSession = SparkSession.builder().config(sparkConf).enableHiveSupport().getOrCreate()
val ssc = sparkSession.sparkContext
HiveUtil.openDynamicPartition(sparkSession) //开启动态分区
AdsQzService.getTargetApi(sparkSession, "20190722")
// AdsQzService.getTarget(sparkSession, dt="20190722")
}
}
最后将统计指标用DataX导入MySQL中