sparksql工程小记

  最近做一个oracle项目迁移工作,跟着spark架构师学着做,进行一些方法的总结。

  1、首先,创建SparkSession对象(老版本为sparkContext)

  val session = SparkSession.builder().appName("app1").getOrCreate()

 2、数据的更新时间配置表,选用mysql,就是说每次结果数据计算写入mysql后,还会将此次数据的更新时间写入数据配置表。 那么在代码里,需要创建配置表的case class,配置与构造数据库schema信息,url,用户名密码等,随后根据配置表中的不同app进行数据的过滤。

  val appId = "1"

  case class DBInformation(url:Stirng,schema:String,user:String,passwd:String)

  val mysqlDB = DBInformation("jdbc:mysql://....",schema,user,passowrd)

  val tableName = mysqlDB.schema + "." + name

  val props = new Properties()

  props.setProperty("user",mysqlDB.user)

  props.setProperty("password",mysqlDB.passwd)

  props.setProperty(JDBCOptions.JDBC_DRIVER_CLASS,"com.mysql.jdbc.Driver")

  val record = session.read.jdbc(mysqlDB.url,tableName,props).filter(row => row.getAs[Int]("app_id") == appId).take(1)

  //第一次写入,木有数据

  if(0 == record.size){

    DBInfoMation(null,null,null)

  }else{

    DBInfoMation(record(0).getTimestmap(1),recode(0).getTimestamp(2),recode(0)..getTimestamp(3))  

 3、注册UDF,由于原来是用oracle的语法,现如今转为sparksql,需要注册一些UDF,来兼容原有oracle的函数

  def registerUDF(session:SparkSession) : Unit = {

    session.udf.register("UDF",(value : String,modifieds:Array[String) => {

      val filter = modifieds.filter(_!=null)

      if(!filter.isEmpty){

        filter.max

      }else{

        null

      }

     })

   {

4、很多计算是需要过往的历史数据的,在第一次初始化的时候,先对历史数据进行缓存。这里有个知识点,会将一直计算的同步数据进行checkPoint落地磁盘,如果发现历史时间在同步时间之后,则加载历史数据,否则就加载同步数据。

  val (updateTime,initData) = if(historyTime.after(syncTime)){

    (historyTime,initFromHistory(tableName))

  } else {

    (syncTime,initFromCheckPoint(syncTime))

  }

  //记录schema

  schema = initData.schema

  //baseData为缓存在内存的数据,并根据数据量进行repartition

  baseData = initData.repartition(numPartitions,_partitionColumns.map(new Column()):_*).rdd.persisit(storageLevel)

  //触发action动作

  baseData.foreach(_=>Unit)

5、有一种情况,下游三个表要关联生成一张大表,这三张表的数据来源于消息中间件中的三个topic,但是数据可能不是同时到来,那么就需要将历史加载的大表拆根据ID拆分为三个小表,然后逐个append到三个小表上,随后再根据ID关联起来,再组成最终表。

  val table1 = new createUpdatingTable(session,"tableName1",topicConf,numPartitons,...)

  val table2 = new createUpdatingTable (session,"tableName2",topicConf1,numPartitions,...)

  val table3 = new createUpdatingTable(session,"tableName3","topicConf2,numPartitions,...)

  val mergeBaseTable = (session,"mergeTableName",Array(table1,table2,table3),finallyColumn,finallyPartitions...)

  mergeBaseTable.updateAndGetData(Some(genDataFilter(currentTime)))

  //三表拆分与合并

  val tmpPartitionKey = "pd_code"

  if(baseData != null) {

    val oldData = getOldData(baseData,keyDF.rdd,tmpPartitionKey)

    oldDf = session.createDataFrame(oldData,schema)

    .repartition(numPartitions,new Column(tmpPartitionKey))

    .persist(storageLevel)

  }

  val table1 = updateShardTable(oldDf,inDfs(0)...).sparksession.createDataFrame(data,schema)

  val table2 = ....

  val table3 = ....

  6、三表key进行合并,通过sql进行三来源表合并

  val keySet = keys.collect()

  val broadcastKeys = session.sparkContext.broadCast(keySet)

  baseData.mapPartitions({iter =>

    val set = broadcastKey.value.toSet

    iter.filter(row=>set.contains(row.getAs[Any](keyCol)))

  },true)

  val sql ="select a.column,b.column,c.column.... from table1 a left join table2 b on a.pd_code = b.pd_code......

  val finallyTable = session.sql(sql)

7、从历史数据中筛选出此次需要更新的数据(通过ID进行过滤),随后将新数据进行append

  val new Data = baseData.zipPartitions(updateData,true){case(liter,riter)=>

    val rset = new mutable.HashSet[Any]

    for(row <- riter){

      rset.add(row.getAs[Any](keyCol))

    }

    liter.filter(row=>!rset.contains(row.getAs[Any](keyCol))))

    }.zipPartitions(updateData,true){case (liter,riter)=>

      liter++riter

    }.persisit(storageLevel)

本文参与腾讯云自媒体分享计划,欢迎正在阅读的你也加入,一起分享。

发表于

我来说两句

0 条评论
登录 后参与评论

相关文章

来自专栏菩提树下的杨过

Oracle中使用Entity Framework 6.x Code-First方式开发

去年写过一篇EF的简单学习笔记,当时EF还不支持Oracle的Code-First开发模式,今天无意又看了下Oracle官网,发现EF6.X已经支持了,并且给出...

28650
来自专栏星回的实验室

打造自己的MapReduce[二]:Hadoop连接MongoDB

在搭建完Hadoop集群后,我们可以基于HDFS做一些离线计算。然而HDFS毕竟是基于文件的系统,所以当我们存储的数据要兼顾一些线上业务访问的时候(如接入层/推...

21420
来自专栏Android 研究

OKHttp源码解析(八)--中阶之连接与请求前奏

在http请求中,对于请求速度提升和降低延迟,keepalive在网络连接发挥着重大作用。

39920
来自专栏Java编程技术

MyBatis中使用流式查询避免数据量过大导致OOM

其中fetchSize="-2147483648",Integer.MIN_VALUE=-2147483648

73610
来自专栏史上最简单的Spring Cloud教程

如何在springcloud分布式系统中实现分布式锁?

最近在看分布式锁的资料,看了 Josial L的《Redis in Action》的分布式锁的章节。实现思路是利用springcloud结合redis实现分布式...

43080
来自专栏技术小讲堂

Entity Framework4.3 Code-First基于代码的数据迁移讲解1.建立一个最初的模型和数据库   2.启动Migration(数据迁移)3.第一个数据迁移4.订制的数据迁移4.动态

前段时间一直在研究Entity Framework4,但是苦于没有找到我特别中意的教程,要么就是千篇一律的文章,而且写的特别简单,可以说,糟践了微软这么牛埃克斯...

35480
来自专栏lulianqi

一个基于.NET平台的自动化/压力测试系统设计简述

AutoTest是一个基于.NET平台实现的自动化/压力测试的系统,可独立运行于windows平台下,支持分布式部署,不需要其他配置或编译器的支持。(本质是一个...

24310
来自专栏Java帮帮-微信公众号-技术文章全总结

Java基础-26(01)总结网络编程

1:网络编程(理解) (1)网络编程:用Java语言实现计算机间数据的信息传递和资源共享 (2)网络编程模型 (3)网络编程的三要素 A:IP地址 ...

37280
来自专栏有趣的django

Django+Bootstrap+Mysql 搭建个人博客 (六)

87020
来自专栏Danny的专栏

机房收费系统(VB.NET)——存储过程实战

版权声明:本文为博主原创文章,未经博主允许不得转载。 https://blog.csdn.net/huyuyang6688/article/...

18750

扫码关注云+社区

领取腾讯云代金券