前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >Spark SQL读数据库时不支持某些数据类型的问题

Spark SQL读数据库时不支持某些数据类型的问题

作者头像
王知无-import_bigdata
发布2019-12-05 11:11:30
2.1K0
发布2019-12-05 11:11:30
举报

在大数据平台中,经常需要做数据的ETL,从传统关系型数据库RDBMS中抽取数据到HDFS中。之前开发数据湖新版本时使用Spark SQL来完成ETL的工作,但是遇到了 Spark SQL 不支持某些数据类型(比如ORACLE中的Timestamp with local Timezone)的问题。

一、系统环境

  • Spark 版本:2.1.0.cloudera1
  • JDK 版本:Java HotSpot(TM) 64-Bit Server VM, Java 1.8.0_131
  • ORACLE JDBC driver 版本:ojdbc7.jar
  • Scala 版本:2.11.8

二、Spark SQL读数据库表遇到的不支持某些数据类型

Spark SQL 读取传统的关系型数据库同样需要用到 JDBC,毕竟这是提供的访问数据库官方 API。Spark要读取数据库需要解决两个问题:

  • 分布式读取;
  • 原始表数据到DataFrame的映射。
2.1 业务代码
代码语言:javascript
复制
public class Config {
  // spark-jdbc parameter names
  public static String JDBC_PARA_URL = "url";
  public static String JDBC_PARA_USER = "user";
  public static String JDBC_PARA_PASSWORD = "password";
  public static String JDBC_PARA_DRIVER = "driver";
  public static String JDBC_PARA_TABLE = "dbtable";
  public static String JDBC_PARA_FETCH_SIZE = "fetchsize";
}
代码语言:javascript
复制
import org.apache.spark.SparkContext
import org.apache.spark.rdd.RDD
import org.apache.spark.sql._

// 主类
object Main {

  def main(args: Array[String]): Unit = {
    val sparkSession = SparkSession.builder().master("yarn").appName("test")getOrCreate()
    val sqlContext = sparkSession.sqlContext
    val sc = sparkSession.sparkContext
    val partitionNum = 16
    val fetchSize = 1000
    val jdbcUrl = "..."
    val userName = "..."
    val schema_table = "..."
    val password = "..."
    val jdbcDriver = "oracle.jdbc.driver.OracleDriver"
    // 注意需要将oracle jdbc driver jar放置在spark lib jars目录下,或者spark2-submit提交spark application时添加--jars参数
    val jdbcDF = sqlContext.read.format("jdbc").options(
          Map(Config.JDBC_PARA_URL -> jdbcUrl,
            Config.JDBC_PARA_USER -> userName,
            Config.JDBC_PARA_TABLE -> schema_table,
            Config.JDBC_PARA_PASSWORD -> password,
            Config.JDBC_PARA_DRIVER -> jdbcDriver,
            Config.JDBC_PARA_FETCH_SIZE -> s"$fetchSize")).load()
    val rdd = jdbcDF.rdd
    rdd.count()
    ......
}
2.2 部分数据类型不支持

比如ORACLE中的Timestamp with local TimezoneFLOAT(126)


三、解决方法:自定义JdbcDialects

3.1 什么是JdbcDialects ?

Spark SQL 中的 org.apache.spark.sql.jdbc package 中有个类 JdbcDialects.scala,该类定义了Spark DataType 和 SQLType 之间的映射关系,分析该类的源码可知,该类是一个抽象类,包含以下几个方法:

  • def canHandle(url : String):判断该JdbcDialect 实例是否能够处理该jdbc url;
  • getCatalystType(sqlType: Int, typeName: String, size: Int, md: MetadataBuilder):输入数据库中的SQLType,得到对应的Spark DataType的mapping关系;
  • getJDBCType(dt: DataType):输入Spark 的DataType,得到对应的数据库的SQLType;
  • quoteIdentifier(colName: String):引用标识符,用来放置某些字段名用了数据库的保留字(有些用户会使用数据库的保留字作为列名);
  • 其他......。

该类还有一个伴生对象,其中包含3个方法:

  • get(url: String):根据database的url获取JdbcDialect 对象;
  • unregisterDialect(dialect: JdbcDialect):将已注册的JdbcDialect 注销;
  • registerDialect(dialect: JdbcDialect):注册一个JdbcDialect。
3.2 解决步骤
  1. 使用get(url: String)方法获取当前的 JdbcDialect 对象;
  2. 将当前的 JdbcDialect 对象 unregistered 掉;
  3. new 一个 JdbcDialect 对象,并重写方法(主要是getCatalystType()方法,因为其定义了数据库 SQLType 到 Spark DataType 的映射关系),修改映射关系,将不支持的 SQLType 以其他的支持的数据类型返回比如StringType,这样就能够解决问题了;
  4. register新创建的 JdbcDialect 对象
3.3 解决方案的业务代码
代码语言:javascript
复制
object SaicSparkJdbcDialect {


  def useMyJdbcDIalect(jdbcUrl:String,dbType:String): Unit ={

    val logger = LoggerFactory.getLogger(classOf[SaicSparkJdbcDialect])

    // 将当前的 JdbcDialect 对象unregistered掉
    val dialect = JdbcDialects
    JdbcDialects.unregisterDialect(dialect.get(jdbcUrl))

    if (dbType.equals("ORACLE")) {
      val OracleDialect = new JdbcDialect {
          // 只能处理ORACLE数据库
          override def canHandle(url: String): Boolean = url.startsWith("jdbc:oracle")
          // 修改数据库 SQLType 到 Spark DataType 的映射关系(从数据库读取到Spark中)
          override def getCatalystType(sqlType: Int, typeName: String, size: Int,
                                       md: MetadataBuilder): Option[DataType] = {
            if (sqlType==Types.TIMESTAMP || sqlType== -101 || sqlType== -102) {
              // 将不支持的 Timestamp with local Timezone 以TimestampType形式返回
              Some(TimestampType)
            } else if (sqlType == Types.BLOB) {
              Some(BinaryType)
            } else {
              Some(StringType)
            }
          }
          // 该方法定义的是数据库Spark DataType 到 SQLType 的映射关系,此处不需要做修改
          override def getJDBCType(dt: DataType): Option[JdbcType] = dt match {
            case StringType => Some(JdbcType("VARCHAR2(2000)", java.sql.Types.VARCHAR))
            case BooleanType => Some(JdbcType("NUMBER(1)", java.sql.Types.NUMERIC))
            case IntegerType => Some(JdbcType("NUMBER(10)", java.sql.Types.NUMERIC))
            case LongType => Some(JdbcType("NUMBER(19)", java.sql.Types.NUMERIC))
            case DoubleType => Some(JdbcType("NUMBER(19,4)", java.sql.Types.NUMERIC))
            case FloatType => Some(JdbcType("NUMBER(19,4)", java.sql.Types.NUMERIC))
            case ShortType => Some(JdbcType("NUMBER(5)", java.sql.Types.NUMERIC))
            case ByteType => Some(JdbcType("NUMBER(3)", java.sql.Types.NUMERIC))
            case BinaryType => Some(JdbcType("BLOB", java.sql.Types.BLOB))
            case TimestampType => Some(JdbcType("DATE", java.sql.Types.TIMESTAMP))
            case DateType => Some(JdbcType("DATE", java.sql.Types.DATE))
            case _ => None
          }
          override def quoteIdentifier(colName: String): String = {
            colName
          }
        }
        // register新创建的 JdbcDialect 对象
        JdbcDialects.registerDialect(OracleDialect)
      }

本文来自:https://www.jianshu.com/p/20b82891aac9

本文参与 腾讯云自媒体分享计划,分享自微信公众号。
原始发表:2019-11-28,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 大数据技术与架构 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体分享计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 一、系统环境
  • 二、Spark SQL读数据库表遇到的不支持某些数据类型
    • 2.1 业务代码
      • 2.2 部分数据类型不支持
      • 三、解决方法:自定义JdbcDialects
        • 3.1 什么是JdbcDialects ?
          • 3.2 解决步骤
            • 3.3 解决方案的业务代码
            相关产品与服务
            领券
            问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档