专栏首页不温卜火Spark Core快速入门系列(5) | RDD 中函数的传递

Spark Core快速入门系列(5) | RDD 中函数的传递

我们进行 Spark 进行编程的时候, 初始化工作是在 driver端完成的, 而实际的运行程序是在executor端进行的. 所以就涉及到了进程间的通讯, 数据是需要序列化的.

RDD 中函数的传递

1. 传递函数

  • 1. 创建传递函数
package day03

import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.rdd.RDD

/**
 **
@author 不温卜火
 **
 * @create 2020-07-24 19:31
 **
 *         MyCSDN :https://buwenbuhuo.blog.csdn.net/
 */
//  创建的主程序
object Demo01 {
  def main(args: Array[String]): Unit = {
    val conf: SparkConf = new SparkConf().setAppName("Demo01").setMaster("local[*]")
    val sc = new SparkContext(conf)
    val rdd: RDD[String] = sc.parallelize(Array("hello world", "hello buwenbuhuo", "xia0li", "hahah"), 2)
    val searcher = new Searcher("hello")
    val result: RDD[String] = searcher.getMatchedRDD1(rdd)
    result.collect.foreach(println)
  }
}
//需求: 在 RDD 中查找出来包含 query 子字符串的元素

// 创建的类
// query 为需要查找的子字符串
class Searcher(val query: String){
  // 判断 s 中是否包括子字符串 query
  def isMatch(s : String) ={
    s.contains(query)
  }
  // 过滤出包含 query字符串的字符串组成的新的 RDD
  def getMatchedRDD1(rdd: RDD[String]) ={
    rdd.filter(isMatch)  //
  }
  // 过滤出包含 query字符串的字符串组成的新的 RDD
  def getMatchedRDD2(rdd: RDD[String]) ={
    rdd.filter(_.contains(query))
  }

}
  • 2. 运行查看结果(会报错)

注意:   直接运行程序会发现报错: 没有初始化. 因为rdd.filter(isMatch) 用到了对象this的方法isMatch, 所以对象this需要序列化,才能把对象从driver发送到executor.

  • 3. 解决方案: 让 Searcher 类实现序列化接口:Serializable

2. 传递变量

  • 创建函数
package day03

import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}

/**
 **
 *
*@author 不温卜火
 **
 * @create 2020-07-24 20:12
 **
 *         MyCSDN :https://buwenbuhuo.blog.csdn.net/
 */
object Demo02 {
  def main(args: Array[String]): Unit = {
    val conf: SparkConf = new SparkConf().setAppName("Demo02").setMaster("local[*]")
    val sc = new SparkContext(conf)
    val rdd: RDD[String] = sc.parallelize(Array("hello world", "hello buwenbuhuo", "xiaoli", "hahah"), 2)
    val searcher = new Searcher("hello")
    val result: RDD[String] = searcher.getMatchedRDD2(rdd)
    result.collect.foreach(println)
  }
  // query 为需要查找的子字符串
  class Searcher(val query: String)  {
    // 判断 s 中是否包括子字符串 query
    def isMatch(s: String) = {
      s.contains(query)
    }

    // 过滤出包含 query字符串的字符串组成的新的 RDD
    def getMatchedRDD2(rdd: RDD[String]) = {
      rdd.filter(_.contains(query))
    }
  }
}
  • 2. 运行查看结果(会报错)

报错原因: 这次没有传递函数, 而是传递了一个属性过去. 仍然会报错没有序列化. 因为this仍然没有序列化.

  • 3. 解决方案:
  • 1.让类实现序列化接口:Serializable
  • 2.传递局部变量而不是属性

3. kryo 序列化框架

参考地址: https://github.com/EsotericSoftware/kryo

  Java 的序列化比较重, 能够序列化任何的类. 比较灵活,但是相当的慢, 并且序列化后对象的体积也比较大.   Spark 出于性能的考虑, 支持另外一种序列化机制: kryo (2.0开始支持). kryo 比较快和简洁.(速度是Serializable的10倍). 想获取更好的性能应该使用 kryo 来序列化.   从2.0开始, Spark 内部已经在使用 kryo 序列化机制: 当 RDD 在 Shuffle数据的时候, 简单数据类型, 简单数据类型的数组和字符串类型已经在使用 kryo 来序列化.   有一点需要注意的是: 即使使用 kryo 序列化, 也要继承 Serializable 接口.

  • 1.代码案例
package day03

import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}

/**
 **
 *
*@author 不温卜火
 **
 * @create 2020-07-24 20:36
 **
 *         MyCSDN :https://buwenbuhuo.blog.csdn.net/
 */
object Demo03 {
  def main(args: Array[String]): Unit = {
    val conf: SparkConf = new SparkConf()
      .setAppName("Demo03")
      .setMaster("local[*]")
      // 替换默认的序列化机制 可以省(如果调用registerKryoClasses
      .set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
      // 注册需要使用 kryo 序列化的自定义类
      .registerKryoClasses(Array(classOf[Searcher]))
    val sc = new SparkContext(conf)
    val rdd: RDD[String] = sc.parallelize(Array("hello world", "hello buwenbuhuo", "xiaoli", "hahah"), 2)
    val searcher = new Searcher("hello")
    val result: RDD[String] = searcher.getMatchedRDD1(rdd)
    result.collect.foreach(println)
  }
}

case class Searcher(val query: String) {
  // 判断 s 中是否包括子字符串 query
  def isMatch(s: String) = {
    s.contains(query)
  }


  // 过滤出包含 query字符串的字符串组成的新的 RDD
  def getMatchedRDD1(rdd: RDD[String]) = {
    rdd.filter(isMatch) //
  }

  // 过滤出包含 query字符串的字符串组成的新的 RDD
  def getMatchedRDD2(rdd: RDD[String]) = {
    val q = query
    rdd.filter(_.contains(q))
  }

}
  • 2.运行案例

  本次的分享就到这里了

本文参与腾讯云自媒体分享计划,欢迎正在阅读的你也加入,一起分享。

我来说两句

0 条评论
登录 后参与评论

相关文章

  • Spark Core快速入门系列(9) | RDD缓存和设置检查点

      RDD通过persist方法或cache方法可以将前面的计算结果缓存,默认情况下 persist() 会把数据以序列化的形式缓存在 JVM 的堆空间中。 ...

    不温卜火
  • Spark Core快速入门系列(4) | <Action> 行动算子转换算子

      返回一个由RDD的前n个元素组成的数组   take 的数据也会拉到 driver 端, 应该只对小数据集使用

    不温卜火
  • Spark Core快速入门系列(1) | 什么是RDD?一文带你快速了解Spark中RDD的概念!

    看了前面的几篇Spark博客,相信大家对于Spark的基本概念以及不同模式下的环境部署问题已经搞明白了。但其中,我们曾提到过Spark程序的核心,也就是弹性分布...

    不温卜火
  • 设计iOS中随系统键盘弹收和内容文字长度自适应高度的文本框

        文本输入框是多数与社交相关的app中不可或缺的一个控件,这些文本输入框应该具备如下的功能:

    珲少
  • Pandas的函数应用处理缺失数据

    王小婷
  • HDOJ 2016 数据的交换输出

    Problem Description 输入n(n<100)个数,找出其中最小的数,将它与最前面的数交换后输出这些数。

    谙忆
  • 提升 Python 性能 - Numba 与 Cython

    花下猫语:最近,读者微信群里又频繁聊到了 Python 的性能问题,这真是老生常谈了。我想起自己曾收藏过几篇关于如何提升性能的文章,似乎挺有帮助的,便去联系了下...

    Python猫
  • 基于hadoop生态圈的数据仓库实践 —— 进阶技术(十五)

    版权声明:本文为博主原创文章,未经博主允许不得转载。 https://blog.csdn.net/...

    用户1148526
  • 当 Python 爬虫搭配起 Bilibili 唧唧,奇怪的生产力出现了

    最近需要大规模下载B站视频,同时要将下载好的视频用BV号进行重命名,最后上传至服务器。这个工作一开始我是完全手工完成的,通过游猴来下载,可是下载几十个视频还好,...

    Python小二
  • 如何彻底卸载MySQL 原

    卸载了重新安装MYSQL,这一卸载出了问题,导致安装的时候安装不上,在网上找了一个多小时也没解决。    重装系统永远是个好办法,但有谁喜欢这么做呀:( ...

    wuweixiang

扫码关注云+社区

领取腾讯云代金券