Spark性能优化:基于分区进行操作

前言(摘自Spark快速大数据分析)

基于分区对数据进行操作可以让我们避免为每个数据元素进行重复的配置工作。诸如打开数据库连接或创建随机数生成器等操作,都是我们应当尽量避免为每个元素都配置一次的工作。Spark 提供基于分区的map 和foreach,让你的部分代码只对RDD 的每个分区运行一次,这样可以帮助降低这些操作的代价。 当基于分区操作RDD 时,Spark 会为函数提供该分区中的元素的迭代器。返回值方面,也返回一个迭代器。除mapPartitions() 外,Spark 还有一些别的基于分区的操作符,见下表:

函数名

调用所提供的

返回的

对于RDD[T]的函数签名

mapPartitions()

该分区中元素的迭代器

返回的元素的迭代器

f: (Iterator[T]) → Iterator[U]

mapPartitionsWithIndex()

分区序号,以及每个分区中的元素的迭代器

返回的元素的迭代器

f: (Int, Iterator[T]) → Iterator[U]

foreachPartitions()

元素迭代器

f: (Iterator[T]) → Unit

首先给出上面三个算子的具体代码示例。

1、mapPartitions

与map类似,不同点是map是对RDD的里的每一个元素进行操作,而mapPartitions是对每一个分区的数据(迭代器)进行操作,具体可以看上面的表格。 下面同时用map和mapPartitions实现WordCount,看一下mapPartitions的用法以及与map的区别

package com.dkl.leanring.spark.test

import org.apache.spark.sql.SparkSession

object WordCount {
  def main(args: Array[String]): Unit = {
    val spark = SparkSession.builder().master("local").appName("WordCount").getOrCreate()
    val sc = spark.sparkContext

    val input = sc.parallelize(Seq("Spark Hive Kafka", "Hadoop Kafka Hive Hbase", "Java Scala Spark"))
    val words = input.flatMap(line => line.split(" "))
    val counts = words.map(word => (word, 1)).reduceByKey { (x, y) => x + y }
    println(counts.collect().mkString(","))
    val counts1 = words.mapPartitions(it => it.map(word => (word, 1))).reduceByKey { (x, y) => x + y }
    println(counts1.collect().mkString(","))

    spark.stop()

  }
}

2、mapPartitionsWithIndex

和mapPartitions一样,只是多了一个分区的序号,下面的代码实现了将Rdd的元素数字n变为(分区序号,n*n)

val rdd = sc.parallelize(1 to 10, 5)
val res = rdd.mapPartitionsWithIndex((index, it) => {
  it.map(n => (index, n * n))
})
println(res.collect().mkString(" "))

3、foreachPartitions

foreachPartitions和foreach类似,不同点也是foreachPartitions基于分区进行操作的

rdd.foreachPartition(it => it.foreach(println))

4、关于如何避免重复配置

下面以打开数据库连接举例,需求是这样的: 读取mysql表里的数据,做了一系列数据处理得到结果之后,需要修改我们mysql表里的每一条数据的状态,代表程序已经处理过了,下次不需要处理了。

4.1 表

以最简单表结构示例

字段名

注释

ID

主键、唯一标识

ISDEAL

程序是否处理过

建表语句

CREATE TABLE test (
	id INTEGER NOT NULL AUTO_INCREMENT,
	isdeal INTEGER DEFAULT 0 NOT NULL,
	primary key(id) 
)
ENGINE=InnoDB
DEFAULT CHARSET=utf8
COLLATE=utf8_general_ci;

4.2 不基于分区操作

一共用两种方法

4.2.1 第一种

package com.dkl.leanring.spark.sql.mysql

import org.apache.spark.sql.SparkSession

object UpdateMysqlDemo {
  def main(args: Array[String]): Unit = {
    val spark = SparkSession.builder().appName("UpdateMysqlDemo").master("local").getOrCreate()

    val database_url = "jdbc:mysql://192.168.44.128:3306/test?useUnicode=true&characterEncoding=utf-8&useSSL=false"
    val user = "root"
    val password = "Root-123456"
    val df = spark.read
      .format("jdbc")
      .option("url", database_url)
      .option("dbtable", "(select * from test where isDeal=0 limit 5)a")
      .option("user", user)
      .option("password", password)
      .option("driver", "com.mysql.jdbc.Driver")
      .option("numPartitions", "5")
      .option("partitionColumn", "ID")
      .option("lowerBound", "1")
      .option("upperBound", "10")
      .load()

    import java.sql.{ Connection, DriverManager, ResultSet };
    df.rdd.foreach(row => {
      val conn = DriverManager.getConnection(database_url, user, password)
      try {
        // Configure to be Read Only
        val statement = conn.createStatement(ResultSet.TYPE_FORWARD_ONLY, ResultSet.CONCUR_READ_ONLY)
        val prep = conn.prepareStatement(s"update test set isDeal=1 where id=?")

        val id = row.getAs[Int]("id")
        prep.setInt(1, id)
        prep.executeUpdate

      } catch {
        case e: Exception => e.printStackTrace
      } finally {
        conn.close()
      }

    })

    spark.stop()
  }
}
  • 上面的代码,取isDeal=0的前五条,因为造的数据量少,所以只取了前五条,然后指定了五个分区,这里只是一个代码示例,实际工作中应该数据量很大,每个分区肯定不止一条数据

根据上面的代码,看到用这种方式的缺点是每一个元素都要创建一个数据库连接,这样频繁创建连接、关闭连接,在数据量很大的情况下,势必会对性能产生影响,但是优点是不用担心内存不够。

4.2.2 第二种

val conn = DriverManager.getConnection(database_url, user, password)
try {
  val statement = conn.createStatement(ResultSet.TYPE_FORWARD_ONLY, ResultSet.CONCUR_READ_ONLY)
  val prep = conn.prepareStatement(s"update test set isDeal=1 where id=?")

  df.select("id").collect().foreach(row => {
    val id = row.getAs[Int]("id")
    prep.setInt(1, id)
    prep.executeUpdate

 })

} catch {
  case e: Exception => e.printStackTrace
}

这种方式的缺点是把要操作的数据全部转成scala数组,仅在Driver端执行,但是如果数据量很大的话,可能因为Driver内存不够大而抛出异常,优点是只建立一次数据库连接,在数据量不是特别大,且确定Driver的内存足够的时候,可以采取这种方式。

4.3 基于分区的方式

df.rdd.foreachPartition(it => {
  val conn = DriverManager.getConnection(database_url, user, password)
  try {
    val statement = conn.createStatement(ResultSet.TYPE_FORWARD_ONLY, ResultSet.CONCUR_READ_ONLY)
    val prep = conn.prepareStatement(s"update test set isDeal=1 where id=?")
    it.foreach(row => {
      val id = row.getAs[Int]("id")
      prep.setInt(1, id)
      prep.executeUpdate
    })

  } catch {
    case e: Exception => e.printStackTrace
  } finally {
    conn.close()
  }

})

这种方式就结合了上面两种方式的优点,基于分区的方式使得创建连接的次数不会那么多,然后每个分区的数据也可以平均分到每个节点的executor上,避免了内存不足产生的异常,当然前提是要合理的分配分区数,既不能让分区数太多,也不能让每个分区的数据太多,还有要注意数据倾斜的问题,因为当数据倾斜造成某个分区数据量太大同样造成OOM(内存溢出)。

4.4 其他

上面只是列举了一个例子,且只是在foreach这样的action算子里体现的,当然肯定也有需求需是在transformation里进行如数据库的连接这样的操作,大家可类比的使用mapPartitions即可

5、其他优点(未证实)

网上有很多博客提到mapPartitions还有其他优点,就是mapPartitions比map快,性能高,原因是因为map的function会执行rdd.count次,而mapPartitions的function则执行rdd.numPartitions次。 但我并这么认为,因mapPartitions的function和map的function是不一样的,mapPartitions里的迭代器的每个元素还是都要执行一遍的,实际上也是执行rdd.count次。 下面以其中一篇博客举例(只列出优点,大部分博客上的写的都一样的,应该出自同一篇博客吧~)

博客地址:Spark—算子调优之MapPartitions提升Map类操作性能

本文参与腾讯云自媒体分享计划,欢迎正在阅读的你也加入,一起分享。

发表于

我来说两句

0 条评论
登录 后参与评论

相关文章

来自专栏牛肉圆粉不加葱

Spark Task 的执行流程③ - 执行 task

创建、分发 Task一文中我们提到 TaskRunner(继承于 Runnable) 对象最终会被提交到 Executor 的线程池中去执行,本文就将对该执行过...

791
来自专栏Spark生态圈

[spark] Checkpoint 源码解析

在spark应用程序中,常常会遇到运算量很大经过很复杂的 Transformation才能得到的RDD即Lineage链较长、宽依赖的RDD,此时我们可以考虑将...

1862
来自专栏一名叫大蕉的程序员

Spark你一定学得会(一)No.7

我是小蕉。 上一篇大家说没有干货,妈蛋回南天哪来的干货你告诉我!!!还好这几天天气还不错,干货来了。 首先祭上今天关键代码,要做的事情就是从Hive表中取得年龄...

1985
来自专栏李德鑫的专栏

Spark SQL 数据统计 Scala 开发小结

Dataset API 属于用于处理结构化数据的 Spark SQL 模块,通过比 RDD 多的数据的结构信息,Spark SQL 在计算的时候可以进行额外的...

4.3K4
来自专栏王小雷

Spark学习之编程进阶——累加器与广播(5)

Spark学习之编程进阶——累加器与广播(5) 1. Spark中两种类型的共享变量:累加器(accumulator)与广播变量(broadcast varia...

2049
来自专栏猿天地

Netty-整合kryo高性能数据传输

前言 本篇文章是Netty专题的第三篇,前面2篇文章如下: 高性能NIO框架Netty入门篇 高性能NIO框架Netty-对象传输 Netty 是 开源的基于j...

90512
来自专栏岑玉海

Spark源码系列(五)分布式缓存

这一章想讲一下Spark的缓存是如何实现的。这个persist方法是在RDD里面的,所以我们直接打开RDD这个类。 def persist(newLevel...

3815
来自专栏栗霖积跬步之旅

为什么对象序列化要定义serialVersionUID

对于实现了java.io.Serializable接口的实体类来说,往往都会手动声明serialVersionUID,因为只要你实现了序列化,java自己就会默...

2209
来自专栏Albert陈凯

4.2 创建RDD

4.2 创建RDD 由于Spark一切都是基于RDD的,如何创建RDD就变得非常重要,除了可以直接从父RDD转换,还支持两种方式来创建RDD: 1)并行化一个...

3179
来自专栏岑玉海

Spark编程指南

1、在maven里面添加引用,spark和hdfs的客户端的。 groupId = org.apache.spark artifactId = spark-co...

3719

扫码关注云+社区

领取腾讯云代金券