扩展RDD API三部曲之第二部自定义操作算子

扩展RDD API三部曲,主要是帮助大家掌握如下三个内容:

1). 回顾一下RDD的基础

2). 扩展Action,也即是自定义RDD算子

3). 扩展 transform及自定义RDD

本文主要是将自定义Spark RDD算子中的Action 类型操作。

1. 准备阶段

讲到自定义RDD的action操作,大家首先应该想到的就是那些RDD到key-value算子的隐式转换,具体一点也就是PairRDDFunctions这个类里包含的算子,比如reducebykey等操作算子。

具体实现肯定是要比较了解scala的隐式转换操作,这个浪尖也发过文章了,可以点击下文阅读:

首先,我们要进行准备操作,首先定义一个case class

classSalesRecord(val transactionId:String,

val customerId:String,

val itemId:String,

val itemValue: Double)extendsComparable[SalesRecord]

withSerializable {

override def compareTo(o: SalesRecord): Int = {

returnthis.transactionId.compareTo(o.transactionId)

}

override def toString:String= {

transactionId+","+customerId+","+itemId+","+itemValue

}

}

然后,定义我们的主要函数:

val sparkConf =newSparkConf().setAppName(this.getClass.getName).setMaster("local[*]")

.set("yarn.resourcemanager.hostname","mt-mdh.local")

.set("spark.executor.instances","2")

.set("spark.default.parallelism","4")

.set("spark.sql.shuffle.partitions","4")

.setJars(List("/opt/sparkjar/bigdata.jar"

,"/opt/jars/spark-streaming-kafka-0-10_2.11-2.3.1.jar"

,"/opt/jars/kafka-clients-0.10.2.2.jar"

,"/opt/jars/kafka_2.11-0.10.2.2.jar"))

val sc =newSparkContext(sparkConf)

val dataRDD = sc.textFile("file:///opt/bigdata/src/main/data/sales.csv")

val salesRecordRDD = dataRDD.map(row => {

val colValues = row.split(",")

newSalesRecord(colValues(),colValues(1),colValues(2),colValues(3).toDouble)

})

这个时候加入我们需要对itemValue字段求和,常见的做法是

salesRecordRDD.map(_.itemValue).sum

其实,sum就是DoubleRDDFunctions内部的算子,也是通过隐式转换实现的。

2. 自定义算子实现

然后就是要定义RDD的操作算子本身,也即是一个工具类,我们叫他为CustomFunctions,内部包含求和函数如下:

这个仔细读一下上面已有的隐式转换算子,可以发现还不行,需要为自定义RDD的操作算子,自定义一个隐士转换的算子工具,内容如下:

objectCustomFunctions{

implicit defaddCustomFunctions(rdd:RDD[SalesRecord]) =newCustomFunctions(rdd)

}

3. 使用算子

调用我们的转换方法:

println("Spark RDD API : "+salesRecordRDD.map(_.itemValue).sum)

importCustomFunctions._

println("Cunstom RDD API : "+salesRecordRDD.totalSales)

输出结果:

这就是自定义RDD的action操作。

下篇文章为自定义RDD和转换操作,这个就只会在星球里分享了欢迎加入浪尖的知识星球,与近420好友一起学习进步。

  • 发表于:
  • 原文链接https://kuaibao.qq.com/s/20181212G0090X00?refer=cp_1026
  • 腾讯「云+社区」是腾讯内容开放平台帐号(企鹅号)传播渠道之一,根据《腾讯内容开放平台服务协议》转载发布内容。
  • 如有侵权,请联系 yunjia_community@tencent.com 删除。

扫码关注云+社区

领取腾讯云代金券