前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >Spark离线导出Mysql数据优化之路

Spark离线导出Mysql数据优化之路

原创
作者头像
2011aad
修改2022-05-24 12:13:43
2.5K2
修改2022-05-24 12:13:43
举报

在业务离线数据分析场景下,往往需要将Mysql中的数据先导出到分布式存储中,如Hive、Iceburg。这个功能实现的方式有很多,但每种方式都会遇到一些问题(包括阿里开源的DataX)。本文就介绍下这个功能的优化之路,并最终给出一个笔者实现的终极方案。

阶段0

笔者第一次接触到这个场景是维护历史遗留的任务,这也是笔者看到的最初实现方案(后续称作方案0)。这个版本的核心逻辑可以简化为如下的shell脚本。考虑到业务上有分库分表的场景,所以认为库表名都满足一个正则表达式。这段逻辑就是遍历Mysql实例上的库表,对所有满足正则表达式的库表执行一个SQL,查出需要的数据,保存到本地文件中,然后将文件上传到HDFS。

代码语言:shell
复制
#!/bin/bash

MYSQL_CMD="mysql -hxxx -Pxxx -uxxx -pxxx"
DATABASE_REG="^xdb_test_([0-9]|[1-9][0-9])$" # db_test_0-db_test_99分库
TABLE_REG="^xtb_test_[0-9]{10}$"  # 按租户分表
file="data.txt"

$MYSQL_CMD -NB -e "SHOW DATABASES" | while read DATABASE
do
    if [[ "x$DATABASE" =~ $DATABASE_REG ]] # 遍历符合库名正则表达式的数据库
    then
        $MYSQL_CMD -NB -e "SHOW TABLES FROM ${DATABASE}" | while read TABLE
        do
            if [[ "x$TABLE" =~ $TABLE_REG ]] # 遍历符合表名正则表达式的数据表
            then
                SELECT_SQL="select ...... from ${DATABASE}.${TABLE}"
                $MYSQL_CMD -NB -e "${SELECT_SQL}" >> $file
            fi
        done
    fi
done

hadoop fs -put $file hdfs://xxxxx
rm $file

这个实现看起来可以满足需求,但其实隐藏着以下几个问题:

1. 机器性能要求高:表读取是一个SQL查出所有数据,在单表数据量比较大时,需要大内存来承载这些数据;同时这些数据需要写入本地文件,若写入处理速度较慢,会导致查询执行失败(受mysql net_read_timeout参数控制)。

2. 慢查询:SQL扫描表中全部数据,通常会导致慢查询,可能会影响其他线上业务。

3. 执行效率低:在分库分表的场景下,这些库表数据的读取只能顺序执行,在库表数据量大的情况下,整个任务无法通过并发缩短执行时间。

4. 运维困难:每次新增一个数据源的同步,都要复制一份shell,然后改里面的库表信息、查询语句;要新增一些优化逻辑,需要每个脚本都改一遍;shell脚本在日常业务开发中使用不多,实现逻辑、定位问题都很不方便。

阶段1:解决查询执行失败

方案0最严重的问题就是查询执行失败。随着业务数据量的增大,由于数据无法及时写入磁盘,有些表的SQL查询必然会执行超时(net_read_timeout);同时大数据量的查询也导致脚本运行会占用大量内存。既然是读取数据量的问题,可以先加上分页读取的逻辑,让查询可以执行成功。阶段1的实现(后续称作方案1)就是在方案0的基础上,增加了单表读取分页逻辑:先查出该表的总行数,然后按照固定的PAGE_SIZE进行循环分页查询。

代码语言:shell
复制
#!/bin/bash

MYSQL_CMD="mysql -hxxx -Pxxx -uxxx -pxxx"
DATABASE_REG="^xdb_test_([0-9]|[1-9][0-9])$" # db_test_0-db_test_99分库
TABLE_REG="^xtb_test_[0-9]{10}$"  # 按租户分表
file="data.txt"

$MYSQL_CMD -NB -e "SHOW DATABASES" | while read DATABASE
do
    if [[ "x$DATABASE" =~ $DATABASE_REG ]] # 遍历符合库名正则表达式的数据库
    then
        $MYSQL_CMD -NB -e "SHOW TABLES FROM ${DATABASE}" | while read TABLE
        do
            if [[ "x$TABLE" =~ $TABLE_REG ]] # 遍历符合表名正则表达式的数据表
            then
                count=`$MYSQL_CMD -NB -e "select count(1) from ${DATABASE}.${TABLE}"`
                page_size=100000 # 经验值,不同表可能不一样
                total_page=`echo "$count/$page_size" | bc`
                for page in `seq 0 $total_page`
                do
                    offset=`echo "$page*$page_size" | bc`
                    SELECT_SQL="select ...... from ${DATABASE}.${TABLE} limit $offset, $page_size"
                    $MYSQL_CMD -NB -e "${SELECT_SQL}" >> $file
                done
            fi
        done
    fi
done

hadoop fs -put $file hdfs://xxxxx
rm $file

方案1虽然解决了查询失败的问题,但这种查询方式也有明显的问题:这个查询是典型的深翻页查询,随着数据量的增大,这个查询SQL会执行的越来越慢,一方面会拖慢整个任务的执行时间,另一方面对Mysql的压力也比较大。

阶段2:解决运维问题

方案1上线之后,除了任务执行慢一些,很长一段时间并没有遇到其他问题。但随着同步的库表越来越多,每个表都要抄一份类似的代码,改库名表名,改查询语句;部署脚本的机器替换、mysql实例迁移的时候,需要重新部署所有脚本、每个脚本里改一些配置。长期下来,这就变成了一个机械重复的工作。

为了降低运维成本,我们考虑重新实现一个同步工具,把库表信息、查询语句这些逻辑信息以配置文件的方式抽象出来。这样再增加需要同步的表,就只需要指定业务字段,而不需要关心数据读取的实现。考虑到以下几个方面,决定用Spark重新实现这个工具:

1. 执行效率:Spark支持并发处理数据,可以提升任务执行速度。

2. 可扩展性:Spark SQL可以在数据导出的同时完成一些简单ETL的工作,同时也可以支持多数据源的关联处理。

3. 稳定性:Spark执行基于Yarn调度平台,提供了容错、重试等机制,也不用考虑机器裁撤、迁移的问题。

阶段3:解决慢查询问题

方案1解决了单次查询数据量大的问题,但仍然存在深翻页慢查询的问题。为此我们查了开源工具DataX[1]的实现方式,其核心实现逻辑如下:首先getPkRange方法查出数据表中主键字段的最小值和最大值,然后将主键的取值在最大值和最小值之间划分成用户指定的adviceNum个区间(整数类型区间的划分比较直接,字符串类型的划分就复杂一点,DataX是将字符串转成128进制的大整数,然后再当做整数切分),最后将区间范围转化为SQL中的where条件进行数据读取。

这种实现方式优势是:

1. 划分出的多个查询区间可以并发执行。

2. 除查询数据本身外,额外的开销几乎可以忽略不计(只需要一个查询查出主键字段的最小值和最大值)。

同时这种方式也存在问题:

1. 在SplitPK分布不均匀时,多个SQL执行的耗时可能差距很大。

2. 当SplitPK是字符串的时,区间划分的逻辑相对复杂,且对于主键是随机字符串的场景(如雪花算法生成主键),主键分布不均匀的问题会更严重。

代码语言:java
复制
public static List<Configuration> splitSingleTable(Configuration configuration, int adviceNum) {
        ......
        Pair<Object, Object> minMaxPK = getPkRange(configuration);

        boolean isStringType = Constant.PK_TYPE_STRING.equals(configuration
                .getString(Constant.PK_TYPE));
        boolean isLongType = Constant.PK_TYPE_LONG.equals(configuration
                .getString(Constant.PK_TYPE));

        if (isStringType) {
            rangeList = RdbmsRangeSplitWrap.splitAndWrap(
                    String.valueOf(minMaxPK.getLeft()),
                    String.valueOf(minMaxPK.getRight()), adviceNum,
                    splitPkName, "'", DATABASE_TYPE);
        } else if (isLongType) {
            rangeList = RdbmsRangeSplitWrap.splitAndWrap(
                    new BigInteger(minMaxPK.getLeft().toString()),
                    new BigInteger(minMaxPK.getRight().toString()),
                    adviceNum, splitPkName);
        } else {
            throw DataXException.asDataXException(DBUtilErrorCode.ILLEGAL_SPLIT_PK,
                    "您配置的切分主键(splitPk) 类型 DataX 不支持. DataX 仅支持切分主键为一个,并且类型为整数或者字符串类型. 请尝试使用其他的切分主键或者联系 DBA 进行处理.");
        }
        
        String tempQuerySql;
        if (null != rangeList && !rangeList.isEmpty()) {
            for (String range : rangeList) {
                Configuration tempConfig = configuration.clone();

                tempQuerySql = buildQuerySql(column, table, where)
                        + (hasWhere ? " and " : " where ") + range;

                allQuerySql.add(tempQuerySql);
                tempConfig.set(Key.QUERY_SQL, tempQuerySql);
                pluginParams.add(tempConfig);
            }
        } 

        // deal pk is null
        Configuration tempConfig = configuration.clone();
        tempQuerySql = buildQuerySql(column, table, where)
                + (hasWhere ? " and " : " where ")
                + String.format(" %s IS NULL", splitPkName);

        tempConfig.set(Key.QUERY_SQL, tempQuerySql);
        pluginParams.add(tempConfig);
        
        return pluginParams;
    }

DataX划分区间是为了通过并发提升查询效率,因此区间划分是否均匀相对并不是很重要。而我们的目的是减少对数据表的慢查询,如果划分区间不均匀,那么不同区间的查询执行时间很可能差别很大,并且查询的执行时间会和实际数据的分布强相关,这样就很难通过参数设定控制慢查询是否产生。

于是,我们借鉴了DataX划分区间查询的思路,但是分区策略做了调整:每次查询按主键升序排序,读取N行,并记录下本次查询主键的最大值X,下次查询的查询语句中加上“> X”的条件判断。简单来讲就是每次查询记录游标,下次查询带上游标条件,这其实是一个优化深翻页的标准方法。

基于游标查询的思路实现了Spark版本数据离线导出方案(后续称作方案3),核心逻辑如下:首先通过加载配置的方式获取数据库表的信息,然后遍历所有满足正则表达式的库表,用游标查询的方式导出数据表中的完整数据。

代码语言:scala
复制
val sqlDBPattern = config.sqlDBPattern //read from config
val sqlTablePattern = config.sqlTablePattern
val query = config.query // eg. select a, b, c, d
val splitPK = config.splitPK // 分区主键
val splitRowNum = config.splitRowNum // 分区读取批次大小

var resultDataFrame: DataFrame = null
for (db <- getDBList() if (checkDB(db, sqlDBPattern))) { //遍历筛选出符合正则的数据库
    for(table <- getTableList(db) if (checkTable(table, sqlTablePattern))) { //遍历筛选出符合正则的表
        var sql = query + s" from `$db`.`$table` where 1 = 1"
        if (!splitPK.isEmpty) {
            var minPK = ""
            var count = splitRowNum
            var PKType = "StringType"
            while (count >= splitRowNum) {
                var tmpSql = sql
                if (!minPK.isEmpty) tmpSql += s" and ${splitPK} > $minPK"
                tmpSql += s" order by ${splitPK} asc limit ${splitRowNum}"
                
                val customDF = readFromJDBC(database, tmpSql).cache
                resultDataFrame = resultDataFrame.union(customDF)

                count = customDF.count().toInt
                if (count > 0) {
                    PKType = customDF.dtypes.toMap.get(source.splitPK).get
                    minPK = PKType match { // add types check here when meet
                        case "StringType" => s"'${customDF.agg(max(splitPK)).collect()(0).getString(0)}'" // string 类型这里拼在sql里要加引号
                        case _ => s"${customDF.agg(max(splitPK)).collect()(0).getLong(0)}"
                    }
                }
            }
          }
        } else {
            resultDataFrame = resultDataFrame.union(readFromJDBC(db, query))
        }
    }
}
resultDataFrame.xxxAction // save to target table

private def readFromJDBC(database: String, sql: String): DataFrame = {
    df = sparkSession.read.format("jdbc")
          .option("driver", "com.mysql.jdbc.Driver")
          .option("user", "xxxxxx")
          .option("password", "xxxxxx")
          .option("charset", "iso-8859-1")
          .option("url", "jdbc:mysql://${ip}:${port}/${database}")
          .option("dbtable", s"($sql) t")
          .option("queryTimeout", 30)
          .option("pushDownPredicate", true)
          .option("pushDownAggregate", true)
          .load()
}

这个实现中需要注意在sparkSession.read时,设置 "pushDownPredicate"和"pushDownAggregate"这两个参数为true(默认是false)[2],这两个参数分别控制条件过滤、聚合排序是否下推到Mysql执行,若不指定,则Spark会读取数据表中的所有数据,在内存中做过滤和排序。

方案3的分页查询策略,可以保证主键分布不均匀的情况下,每次拉取的数据条数也是一致的,因此可以通过调整批量的大小,保证不会有慢查询的出现。这个策略的最大问题就是,每一次查询执行,依赖上一次查询执行的结果,这样多个分区的查询不能并发执行。由于这种依赖关系,Spark执行时每个查询都会产生一个单独的stage,都要经过driver任务调度的过程,导致程序执行会非常缓慢,并不能发挥spark并行分布式的优势。如某个业务线上分百表,一百张表加起来数据大概1.5亿行,导出任务执行需要2个小时左右。同时,每个查询中,只设置了主键的单边过滤条件,Mysql在执行时还是会扫描满足条件的所有行,在执行上也没有达到最优的效果。

阶段4:任务并发执行

如何既保证查询批次的均匀,又能让不同区间的查询并发执行呢?既然只查询最小值和最大值无法保证均匀的划分数据,那把所有主键都先读取出来,在内存中划分区间是否可行呢?只查主键通常会命中覆盖索引,查询效率会比较高,数据量也不会很大。但考虑到一个SQL读取表的所有主键还是太暴力了,而且也有可能会出现慢查询,因此查询主键这一步选择采用游标分页查询的方式。实现逻辑如下(后续称作方案4):先通过游标的方式循环拉取主键,然后按照配置中的splitRowNum划分区间。

代码语言:text
复制
private def getPrimaryKeyPoints(statement: Statement, database: String, table: String): (Seq[(Any, Any)], Int) = {
    statement.executeQuery(s"use $database")
    val resultList = new scala.collection.mutable.ArrayBuffer[Any]
    var resultSize = FETCH_PRIMARY_KEY_BATCH_SIZE
    var minID: Any = null
    var pkType = 0
    while (resultSize >= FETCH_PRIMARY_KEY_BATCH_SIZE) {
      var sql = s"select ${source.splitPK} from `$database`.`$table`"
      if (minID != null) sql = sql + s" where ${source.splitPK} > ${Utils.formatPredicateValue(minID, pkType)}"
      sql = sql + s" order by ${source.splitPK} asc limit $FETCH_PRIMARY_KEY_BATCH_SIZE"

      LOGGER.info("Get primary key query: " + sql)
      val rs = statement.executeQuery(sql)
      pkType = rs.getMetaData.getColumnType(1)
      resultSize = 0
      while (rs.next()) {
        resultSize += 1
        pkType match {
          case java.sql.Types.VARCHAR => resultList.append(rs.getString(source.splitPK))
          case _ => resultList.append(rs.getLong(source.splitPK))
        }
      }
      rs.close()
      if (!resultList.isEmpty) minID = resultList.last
    }
    if (resultList.isEmpty) (Seq(), pkType)
    else (Utils.partitionArray(resultList, source.splitRowNum), pkType)
}

Spark JDBC本身提供了并发读取数据表的方式[3],可以直接把划分好的区间转换成查询条件传入JDBC接口中,Spark就为每一个区间生成一个SQL查询,并发执行。同时对于Mysql来说,这样每次查询中过滤条件同时指定了上界和下界,可以大大减少Mysql执行时扫描的行数,进一步提升执行效率。

代码语言:text
复制
sparkSession.read.jdbc(formatDBUrl(database), 
                       table, 
                       Utils.formatPredicates(pkPartitions, splitPK, pkType).toArray, 
                       prop) // user, password etc.
                       
def formatPredicates(partitions: Seq[(Any, Any)], pkField: String, pkType: Int): Seq[String] = {
    partitions.init.map(partition => formatPredicate(partition, pkField, pkType, false)) :+
      formatPredicate(partitions.last, pkField, pkType, true)
}

def formatPredicate(partition: (Any, Any), pkField: String, pkType: Int, isLast: Boolean): String = {
    val secondOperator = if (isLast) "<=" else "<"
    s"$pkField >= ${formatPredicateValue(partition._1, pkType)} and $pkField $secondOperator ${formatPredicateValue(partition._2, pkType)}"
}

def formatPredicateValue(value: Any, pkType: Int): String = {
    pkType match {
      case java.sql.Types.VARCHAR => s"'${value.asInstanceOf[String]}'"
      case _ => s"${value.asInstanceOf[Long]}"
    }
}

方案4虽然引入了读取表主键并在内存中划分区间的时间开销,但后续读取数据并ETL处理的过程完全可以并发执行,整体任务执行的效率提高了很多。经过优化之后,原本2个小时执行的任务,现在只需要20-30分钟,而其中读取表主键的时间只占用1-2分钟。

总结

对于离线导出mysql数据表写入分布式存储这个场景,本文提供了一种实现方式:首先分批查出表的所有主键,按配置的批量大小划分区间;然后区间转化为SQL的分区条件传入Spark JDBC接口,构建Spark任务读取数据。这种方式有以下几点优势:

1. 用分区查询的方式,避免了Mysql的慢查询,对其他线上业务影响较小。

2. 利用Spark分布式的能力提升任务执行速度。

3. Spark SQL功能强大,可以在数据读取的同时,通过配置做一些简单的ETL操作。

参考文献

[1] https://github.com/alibaba/DataX.

[2] JDBC To Other Databases. https://spark.apache.org/docs/latest/sql-data-sources-jdbc.html.

[3] Using predicates in Spark JDBC read method. https://stackoverflow.com/questions/48677883/using-predicates-in-spark-jdbc-read-method.

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 阶段0
  • 阶段1:解决查询执行失败
  • 阶段2:解决运维问题
  • 阶段3:解决慢查询问题
  • 阶段4:任务并发执行
  • 总结
    • 参考文献
    相关产品与服务
    云数据库 SQL Server
    腾讯云数据库 SQL Server (TencentDB for SQL Server)是业界最常用的商用数据库之一,对基于 Windows 架构的应用程序具有完美的支持。TencentDB for SQL Server 拥有微软正版授权,可持续为用户提供最新的功能,避免未授权使用软件的风险。具有即开即用、稳定可靠、安全运行、弹性扩缩等特点。
    领券
    问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档