专栏首页伦少的博客利用Spark实现Oracle到Hive的历史数据同步

利用Spark实现Oracle到Hive的历史数据同步

1、需求背景

和上一篇文章Spark通过修改DataFrame的schema给表字段添加注释一样,通过Spark将关系型数据库(以Oracle为例)的表同步的Hive,这里讲的只是同步历史数据,不包括同步增量数据。

2、Oracle和Hive的字段类型对应

利用Spark的字段类型自动匹配,本来以为Spark匹配的不是很好,只是简单的判断一下是否为数字、字符串,结果经验证,Spark可以获取到Oracle的小数点精度,Spark的字段类型对应和我自己整理的差不多,所以就索性用Spark自带的字段类型匹配,而不是自己去Oracle相关表获取每个字段类型,然后一一转化为Hive对应的字段类型,下面是Oracle和Hive的字段类型对应,只是整理了大概:

Oracle

Hive

VARCHAR2

String

NVARCHAR2

String

NUMBER

DECIMAL/Int

DATE

TIMESTAMP

TIMESTAMP

TIMESTAMP

CHAR

String

CLOB(一般用不到)

String

BLOB(一般用不到)

BINARY

RAW (一般用不到)

BINARY

Other

String

2.1 看一下Spark字段类型对应

首先建一张包含大部分字段类型的Oracle表

CREATE TABLE TEST (
	COL1 VARCHAR2(25),
	COL2 NVARCHAR2(18),
	COL3 INTEGER,
	COL4 NUMBER(10,4),
	COL5 NUMBER(30,7),
	COL6 NUMBER,
	COL7 DATE,
	COL8 TIMESTAMP,
	COL9 CHAR(30),
	COL10 CLOB,
	COL11 BLOB,
	COL12 RAW(12)
) ;
COMMENT ON COLUMN TEST.COL2 IS '注释2' ;
COMMENT ON COLUMN TEST.COL7 IS '注释7' ;
COMMENT ON COLUMN TEST.COL10 IS '注释10' ;

然后用Spark打印一下获取到的字段类型。

可以看到Spark成功的完成上述表格的字段类型转化,小数的精度和是否为空都可以获取到,但是不完美的一点是没有将NUMBER标度为零的转换为Int,而还是以DECIMAL(38,0)的形式表示,虽然都是表示的整数,但是在后面Spark读取hive的时候,还需要将DECIMAL转为Int。

2.2 按需修改字段类型对应

以上面讲的将DECIMAL(38,0)转为Int为例: 先尝试通过修改schema实现

import org.apache.spark.sql.types._
val schema = df.schema.map(s => {
  if (s.dataType.equals(DecimalType(38, 0))) {
    new StructField(s.name, IntegerType, s.nullable)
  } else {
    s
  }
})
//根据添加了注释的schema,新建DataFrame
val new_df = spark.createDataFrame(df.rdd, StructType(schema)).repartition(160)
new_df.printSchema()

可以看到,已经成功的将COL3的字段转为了Int。但是这样构造的DataFrame是不能用的,如执行new_df.show会报如下错误:

java.lang.RuntimeException: Error while encoding: java.lang.RuntimeException: java.math.BigDecimal is not a valid external type for schema of int

原因是rdd的数据类型和schema的数据类型不匹配。 最后可以通过如下方式实现:

import scala.collection.mutable.ArrayBuffer
//需要转换的列名
val colName = ArrayBuffer[String]()
val schema = df.schema.foreach(s => {
  if (s.dataType.equals(DecimalType(38, 0))) {
    colName += s.name
  }
})

import org.apache.spark.sql.functions._
var df_int = df
colName.foreach(name => {
  df_int = df_int.withColumn(name, col(name).cast(IntegerType))
})
df_int.printSchema()
df_int.show

3、Oracle全部历史数据同步Hive

3.1 再新建一张表

这里的目的是表示多个表,而不是一个表,上面已经建了一张表,再建一张表,以验证代码可以将所有的表都同步过去,这里用上一篇博客上的建表Sql即可

CREATE TABLE ORA_TEST (
ID VARCHAR2(100), 
NAME VARCHAR2(100)
);
COMMENT ON COLUMN ORA_TEST.ID IS 'ID';
COMMENT ON COLUMN ORA_TEST.NAME IS '名字';
COMMENT ON TABLE ORA_TEST IS  '测试';

再在每张表里造点数据,这里就不截图了。

3.2 代码

上篇博客里用到的注释,是在程序里手工添加的注释,下面的代码是从Oracle里取的,且同步的是一个用户下所有的表。

package com.dkl.leanring.spark.sql.Oracle

import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.types._
import java.util.Properties
import scala.collection.mutable.ArrayBuffer

/**
 *
 * Spark自动建表(带字段注释、暂无表注释)
 * 并将中间库Oracle的历史数据全部初始化到hive
 *
 * 实现方案:
 * 1、 利用Spark的自动字段类型匹配
 * 2、 读取Oracle表字段注释,添加到DataFrame的元数据
 * 3、按需修改Spark默认的字段类型转换
 *
 * 注:需要提前建好对应的hive数据库
 */
object Oracle2Hive {

  def main(args: Array[String]): Unit = {
    val spark = SparkSession
      .builder()
      .appName("Oracle2Hive")
      .master("local")
      .config("spark.sql.parquet.writeLegacyFormat", true)
      .enableHiveSupport()
      .getOrCreate()

    //oracle的连接信息
    val p = new Properties()
    p.put("driver", "oracle.jdbc.driver.OracleDriver")
    p.put("url", "jdbc:oracle:thin:@192.168.44.128:1521:orcl")
    p.put("user", "bigdata")
    p.put("password", "bigdata")

    import scala.collection.JavaConversions._
    val database_conf: scala.collection.mutable.Map[String, String] = p

    //Oracle是分用户的,这里以用户BIGDATA为例
    val owner = "BIGDATA"
    val sql_in_owner = s"('${owner}')"

    database_conf.put("dbtable", "TEST")

    spark.sql(s"use ${owner}")

    database_conf.put("dbtable", s"(select table_name from all_tables where owner in ${sql_in_owner})a")
    //所有的表名
    val allTableNames = getDataFrame(spark, database_conf)

    database_conf.put("dbtable", s"(select * from all_col_comments where owner in ${sql_in_owner})a")
    //所有的表字段对应的注释
    val allColComments = getDataFrame(spark, database_conf).repartition(160).cache

    allTableNames.select("table_name").collect().foreach(row => {
      //表名
      val table_name = row.getAs[String]("table_name")
      database_conf.put("dbtable", table_name)
      //根据表名从Oracle取数
      val df = getDataFrame(spark, database_conf)
      //字段名 和注 对应的map
      val colName_comments_map = allColComments.where(s"TABLE_NAME='${table_name}'")
        .select("COLUMN_NAME", "COMMENTS")
        .na.fill("", Array("COMMENTS"))
        .rdd.map(row => (row.getAs[String]("COLUMN_NAME"), row.getAs[String]("COMMENTS")))
        .collect()
        .toMap

      val colName = ArrayBuffer[String]()
      //为schema添加注释信息
      val schema = df.schema.map(s => {
        if (s.dataType.equals(DecimalType(38, 0))) {
          colName += s.name
          new StructField(s.name, IntegerType, s.nullable, s.metadata).withComment(colName_comments_map(s.name))
        } else {
          s.withComment(colName_comments_map(s.name))
        }
      })

      import org.apache.spark.sql.functions._
      var df_int = df
      colName.foreach(name => {
        df_int = df_int.withColumn(name, col(name).cast(IntegerType))

      })
      //根据添加了注释的schema,新建DataFrame
      val new_df = spark.createDataFrame(df_int.rdd, StructType(schema))
      new_df.write.mode("overwrite").saveAsTable(table_name)

      //      new_df.schema.foreach(s => println(s.metadata))
      //      new_df.printSchema()

    })

    spark.stop

  }
  /**
   * @param spark SparkSession
   * @param database_conf 数据库配置项Map,包括driver,url,username,password,dbtable等内容,提交程序时需用--jars选项引用相关jar包
   * @return 返回DataFrame对象
   */
  def getDataFrame(spark: SparkSession, database_conf: scala.collection.Map[String, String]) = {
    spark.read.format("jdbc").options(database_conf).load()
  }

}

3.3 看一下Hive里的结果

这样就成功的完成了Oracle历史数据到Hive的同步!

4、关于增量数据的同步

4.1 实时同步

可以考虑这样,先用ogg将Oracle的增量数据实时同步到kafka,再用Spark Streaming实现kafka到hive的实时同步。

  • 下面两篇文章提供参考:利用ogg实现oracle到kafka的增量数据实时同步Spark Streamming+Kafka提交offset实现有且仅有一次,其中Spark Streaming的代码并没有实现写入hive的功能,但是实时读取kafka的功能已经实现,只要自己处理一下解析kafka里json格式的增量数据,转成DataFrame保存到hive表里即可。4.2 非实时 如果Oracle的每个表里都有时间字段,那么可以通过时间字段来过滤增量数据,用上面的Spark程序去定时的跑,如果没有时间字段的话,可以用ogg的colmap函数增加时间字段,先实时同步到中间的Oracle库,再根据时间字段来同步。

本文由 董可伦 发表于 伦少的博客 ,采用署名-非商业性使用-禁止演绎 3.0进行许可。

非商业转载请注明作者及出处。商业转载请联系作者本人。

本文标题:利用Spark实现Oracle到Hive的历史数据同步

本文链接:https://dongkelun.com/2018/08/27/sparkOracle2Hive/

本文参与腾讯云自媒体分享计划,欢迎正在阅读的你也加入,一起分享。

我来说两句

0 条评论
登录 后参与评论

相关文章

  • Spark通过修改DataFrame的schema给表字段添加注释

    通过Spark将关系型数据库(以Oracle为例)的表同步的Hive表,要求用Spark建表,有字段注释的也要加上注释。Spark建表,有两种方法:

    董可伦
  • spark-submit报错:Application application_1529650293575_0148 finished with failed status

    董可伦
  • spark 将DataFrame所有的列类型改为double

    转载请务必注明原创地址为:http://dongkelun.com/2018/04/27/dfChangeAllColDatatypes/

    董可伦
  • 在 K8S 部署一个 Spark History Server - 篇2

    之前我们组在生产环境上部署的是 Spark 2.2 on k8s 的那个 fork,部署在 K8S 上,至少需要一个 Dockerfile,最近有计划升级到 3...

    runzhliu
  • 【Spark】Spark Local 及 Spark On Standalone 环境搭建

    (1)解压spark安装包 $ cd /opt/softwares/cdh cdh]$ tar -zxf spark-1.6.1-bin-2.5.0-cdh...

    魏晓蕾
  • Oozie分布式任务的工作流——Spark篇

    Spark是现在应用最广泛的分布式计算框架,oozie支持在它的调度中执行spark。在我的日常工作中,一部分工作就是基于oozie维护好每天的spark离线任...

    用户1154259
  • mysql 多表关联查询 实现 全文匹配的 模糊搜索接口 SQLmysql 多表关联查询 实现 全文匹配的 模糊搜索接口 SQL

    在mysql中,有时我们在做数据库查询时,需要得到某字段中包含某个值的记录,但是它也不是用like能解决的,使用like可能查到我们不想要的记录,它比like更...

    一个会写诗的程序员
  • 重磅发布 | 基于Spark训练线性回归模型 实战入门教程

    最开始接触分布式计算框架的是Hadoop中的MapReduce,虽然开发起来很复杂(Map与Reduce都要有相应的实现类)但是我也成功的启动了第一个“Hell...

    double
  • IT技术人员对于网站漏洞渗透测试公司 如何去选择

    近期有许多网站渗透测试安全防护从业人员向我咨询就业角度疑问,去甲方公司做安全防护好或者去乙方客户企业做安全防护好,特别是应届毕业生或工作中1到3年的安全防护从业...

    网站安全专家
  • 企业安全策略越大越好?Facebook CEO扎克伯格给了几点建议

    谈到企业安全,大多数管理者都认为一定越大越好——预算更多、范围更广、回报更大。然而,Facebook首席执行官马克·扎克伯格在最近召开的F8会议上提及Faceb...

    FB客服

扫码关注云+社区

领取腾讯云代金券