前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >扩展RDD API三部曲之第二部自定义操作算子

扩展RDD API三部曲之第二部自定义操作算子

作者头像
Spark学习技巧
发布2018-12-26 11:37:25
8300
发布2018-12-26 11:37:25
举报
文章被收录于专栏:Spark学习技巧

扩展RDD API三部曲,主要是帮助大家掌握如下三个内容:

1). 回顾一下RDD的基础

2). 扩展Action,也即是自定义RDD算子

3). 扩展 transform及自定义RDD

本文主要是将自定义Spark RDD算子中的Action 类型操作。

1. 准备阶段

讲到自定义RDD的action操作,大家首先应该想到的就是那些RDD到key-value算子的隐式转换,具体一点也就是PairRDDFunctions这个类里包含的算子,比如reducebykey等操作算子。

具体实现肯定是要比较了解scala的隐式转换操作,这个浪尖也发过文章了,可以点击下文阅读:

Scala语法基础之隐式转换

首先,我们要进行准备操作,首先定义一个case class

代码语言:javascript
复制
class SalesRecord(val transactionId: String,
                  val customerId: String,
                  val itemId: String,
                  val itemValue: Double) extends Comparable[SalesRecord]
  with Serializable {

  override def compareTo(o: SalesRecord): Int = {
    return this.transactionId.compareTo(o.transactionId)
  }

  override def toString: String = {
    transactionId+","+customerId+","+itemId+","+itemValue
  }
}

然后,定义我们的主要函数:

代码语言:javascript
复制
val sparkConf = new SparkConf().setAppName(this.getClass.getName).setMaster("local[*]")
      .set("yarn.resourcemanager.hostname", "mt-mdh.local")
      .set("spark.executor.instances","2")
      .set("spark.default.parallelism","4")
      .set("spark.sql.shuffle.partitions","4")
      .setJars(List("/opt/sparkjar/bigdata.jar"
        ,"/opt/jars/spark-streaming-kafka-0-10_2.11-2.3.1.jar"
        ,"/opt/jars/kafka-clients-0.10.2.2.jar"
        ,"/opt/jars/kafka_2.11-0.10.2.2.jar"))
    val sc = new SparkContext(sparkConf)
    val dataRDD = sc.textFile("file:///opt/bigdata/src/main/data/sales.csv")
    val salesRecordRDD = dataRDD.map(row => {
      val colValues = row.split(",")
      new SalesRecord(colValues(0),colValues(1),colValues(2),colValues(3).toDouble)
    })

这个时候加入我们需要对itemValue字段求和,常见的做法是

代码语言:javascript
复制
salesRecordRDD.map(_.itemValue).sum

其实,sum就是DoubleRDDFunctions内部的算子,也是通过隐式转换实现的。

2. 自定义算子实现

然后就是要定义RDD的操作算子本身,也即是一个工具类,我们叫他为CustomFunctions,内部包含求和函数如下:

代码语言:javascript
复制
import org.apache.spark.rdd.RDD

class CustomFunctions(rdd:RDD[SalesRecord]) {

  def totalSales = rdd.map(_.itemValue).sum

  def discount(discountPercentage:Double) = new DiscountRDD(rdd,discountPercentage)

}

这个仔细读一下上面已有的隐式转换算子,可以发现还不行,需要为自定义RDD的操作算子,自定义一个隐士转换的算子工具,内容如下:

代码语言:javascript
复制
object CustomFunctions {

  implicit def addCustomFunctions(rdd: RDD[SalesRecord]) = new CustomFunctions(rdd)
}

3. 使用算子

调用我们的转换方法:

代码语言:javascript
复制
println("Spark RDD API : "+salesRecordRDD.map(_.itemValue).sum)

import CustomFunctions._
println("Cunstom RDD API : "+salesRecordRDD.totalSales)

输出结果:

这就是自定义RDD的action操作。

本文参与 腾讯云自媒体同步曝光计划,分享自微信公众号。
原始发表:2018-12-12,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 浪尖聊大数据 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档