SparkRDD转DataSet/DataFrame的一个深坑

场景描述:本文是根据读者反馈的一个问题总结而成的。

关键词:Saprk RDD

原需求:希望在map函数中将每一个rdd转为DataSet或者DataFrame。

SparkRDD转为DataSet的两种方式

第一种方法是使用反射来推断包含特定对象类型的RDD的模式。在写Spark程序的同时,已经知道了模式,这种基于反射的方法可以使代码更简洁并且程序工作得更好。

第二种方法是通过一个编程接口来实现,这个接口允许构造一个模式,然后在存在的RDD上使用它。虽然这种方法代码较为冗长,但是它允许在运行期间之前不知道列以及列的类型的情况下构造DataSet。

官方给出的两个案例:

  • 利用反射推断Schema

Spark SQL支持将javabean的RDD自动转换为DataFrame。使用反射获得的BeanInfo定义了表的模式。目前,Spark SQL不支持包含Map字段的javabean。但是支持嵌套的javabean和列表或数组字段。您可以创建一个实现Serializable的类并为其所有字段设置getter和setter,从而创建一个JavaBean。

  public static class Person implements Serializable {
    private String name;
    private int age;

    public String getName() {
      return name;
    }

    public void setName(String name) {
      this.name = name;
    }

    public int getAge() {
      return age;
    }

    public void setAge(int age) {
      this.age = age;
    }
  }
  private static void runInferSchemaExample(SparkSession spark) {
    // Create an RDD of Person objects from a text file
    JavaRDD<Person> peopleRDD = spark.read()
      .textFile("examples/src/main/resources/people.txt")
      .javaRDD()
      .map(line -> {
        String[] parts = line.split(",");
        Person person = new Person();
        person.setName(parts[0]);
        person.setAge(Integer.parseInt(parts[1].trim()));
        return person;
      });

    // Apply a schema to an RDD of JavaBeans to get a DataFrame
    Dataset<Row> peopleDF = spark.createDataFrame(peopleRDD, Person.class);
    // Register the DataFrame as a temporary view
    peopleDF.createOrReplaceTempView("people");

    // SQL statements can be run by using the sql methods provided by spark
    Dataset<Row> teenagersDF = spark.sql("SELECT name FROM people WHERE age BETWEEN 13 AND 19");

    // The columns of a row in the result can be accessed by field index
    Encoder<String> stringEncoder = Encoders.STRING();
    Dataset<String> teenagerNamesByIndexDF = teenagersDF.map(
        (MapFunction<Row, String>) row -> "Name: " + row.getString(0),
        stringEncoder);
    teenagerNamesByIndexDF.show();
    // +------------+
    // |       value|
    // +------------+
    // |Name: Justin|
    // +------------+

    // or by field name
    Dataset<String> teenagerNamesByFieldDF = teenagersDF.map(
        (MapFunction<Row, String>) row -> "Name: " + row.<String>getAs("name"),
        stringEncoder);
    teenagerNamesByFieldDF.show();
    // +------------+
    // |       value|
    // +------------+
    // |Name: Justin|
    // +------------+
    // $example off:schema_inferring$
  }
  • 编程指定Schema

如果不能提前定义JavaBean类(例如,记录的结构是在字符串中编码的,或者将对文本数据集进行解析,而对不同的用户将对字段进行不同的投影),那么可以通过三个步骤以编程方式创建DataSet<Row>。

  private static void runProgrammaticSchemaExample(SparkSession spark) {
    // 1、创建一个RDD
    JavaRDD<String> peopleRDD = spark.sparkContext()
      .textFile("examples/src/main/resources/people.txt", 1)
      .toJavaRDD();

    // The schema is encoded in a string
    String schemaString = "name age";

    // 2、根据schema的字符串生成schema
    List<StructField> fields = new ArrayList<>();
    for (String fieldName : schemaString.split(" ")) {
      StructField field = DataTypes.createStructField(fieldName, DataTypes.StringType, true);
      fields.add(field);
    }
    StructType schema = DataTypes.createStructType(fields);


    // 3、将JavaRDD<String>的记录转换成JavaRDD<Row>
    JavaRDD<Row> rowRDD = peopleRDD.map((Function<String, Row>) record -> {
      String[] attributes = record.split(",");
      return RowFactory.create(attributes[0], attributes[1].trim());
    });

    ///4、将 schema 应用在JavaRDD<Row> ,创建 Dataset<Row>
    Dataset<Row> peopleDataFrame = spark.createDataFrame(rowRDD, schema);

    // Creates a temporary view using the DataFrame
    peopleDataFrame.createOrReplaceTempView("people");

    // SQL can be run over a temporary view created using DataFrames
    Dataset<Row> results = spark.sql("SELECT name FROM people");

    // The results of SQL queries are DataFrames and support all the normal RDD operations
    // The columns of a row in the result can be accessed by field index or by field name
    Dataset<String> namesDS = results.map(
        (MapFunction<Row, String>) row -> "Name: " + row.getString(0),
        Encoders.STRING());
    namesDS.show();
    // +-------------+
    // |        value|
    // +-------------+
    // |Name: Michael|
    // |   Name: Andy|
    // | Name: Justin|
    // +-------------+
    // $example off:programmatic_schema$
  }
Task not serializable
作者的代码类似在map中使用了方法传入的SparkContext/SparkSession,伪代码如下:source.map(rdd->sparkSession.createDataFrame)
报了如下的错误:
org.apache.spark.SparkException: Job aborted due to stage failure: Task not serializable: java.io.NotSerializableException: ...

网上也提供很多办法,包括:

  • @Transient 注解
class MyTest1(conf:String) extends Serializable{
  val list = List("a.com", "www.b.com", "a.cn", "a.com.cn", "a.org");
  @transient
  private val sparkConf = new SparkConf().setAppName("AppName");
  @transient
  private val sc = new SparkContext(sparkConf);
  val rdd = sc.parallelize(list);

  private val rootDomain = conf

  def getResult(): Array[(String)] = {

    val result = rdd.filter(item => item.contains(rootDomain))
    result.take(result.count().toInt)
  }
}

注解是方法级别的,不是变量级别。

  • 方法实现implements Serializable

例如:

public class RDDTest implements Serializable
  • 设置一个参数
sparkConf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer");

简单的分析

以上的方法,不一定管用。

在编写Spark程序中,由于在map等算子内部使用了外部定义的变量和函数,由于外部定义的变量和函数有可能不支持序列化,仍然会导致整个类序列化时出现问题,最终可能会出现Task未序列化问题。

引用了类的成员函数,会导致该类及所有成员都需要支持序列化。因此,对于使用了某类成员变量或函数的情形,首先该类需要序列化(Serializable),同时需要对某些不需要序列化的成员变量标记以避免为序列化造成影响。

所以:

  • 引用了类的成员函数或变量,对应的类需要做序列化处理
  • 执行map等方法的时候,尽量不要在闭包内部直接引用成员函数或变量

如果上述办法全都不管用,那么就换个实现方案吧。

本文分享自微信公众号 - 大数据技术与架构(import_bigdata)

原文出处及转载信息见文内详细说明,如有侵权,请联系 yunjia_community@tencent.com 删除。

原始发表时间:2019-10-21

本文参与腾讯云自媒体分享计划,欢迎正在阅读的你也加入,一起分享。

发表于

我来说两句

0 条评论
登录 后参与评论

相关文章

来自专栏逆向与安全

基于设备指纹零感验证系统

作者: 我是小三 博客: http://www.cnblogs.com/2014asm/ 由于时间和水平有限,本文会存在诸多不足,希望得到您的及时反馈与指正,多...

18720
来自专栏java达人

透过源码学习设计模式7-适配器模式与HandlerApapter

适配器模式把一个类的接口,变换成客户端所期待的另一种接口,使原本因接口不匹配的两个类能够在一起工作。

8630
来自专栏故久

itext根据模板生成pdf(支持分页)

// 利用模板生成pdf public static void pdfout(Map<String,Object> o,String newPDFPa...

61320
来自专栏Java架构师历程

利用nohup后台运行jar文件包程序

java -jar XXX.jar 特点:当前ssh窗口被锁定,可按CTRL + C打断程序运行,或直接关闭窗口,程序退出

8930
来自专栏A周立SpringCloud

让人头大的各种锁,从这里让你思绪清晰

说到了锁我们经常会联想到生活中的锁,在我们日常中我们经常会接触到锁。比如我们的手机锁,电脑锁,再比如我们生活中的门锁,这些都是锁。

11920
来自专栏MongoDB中文社区

使用JMeter做MongoDB性能测试

对大多数应用环境来说,数据库是一个关键要素。如何存储数据以及在哪里存储数据,对整个系统的性能会产生巨大影响。因此,在做开发之前,数据库的选择肯定是最重要的决定之...

13020
来自专栏木东居士的专栏

憋瞎说,大数据不是你想的那样!

学生党以及很多没设计过大数据开发的小伙伴呢,都对大数据这么一个领域感到非常非常的好奇非常非常的神秘,我今天就非要戳穿给你们看。

9820
来自专栏搜云库技术团队

这样讲 SpringBoot 自动配置原理,你应该能明白了吧

小伙伴们是否想起曾经被 SSM 整合支配的恐惧?相信很多小伙伴都是有过这样的经历的,一大堆配置问题,各种排除扫描,导入一个新的依赖又得添加新的配置。自从有了 S...

8430
来自专栏孟君的编程札记

一步步完成Maven+SpringMVC+SpringFox+Swagger整合示例

本文给出一个整合Maven+SpringMVC+SpringFOX+Swagger的示例,并且一步步给出完成步骤。

7720
来自专栏.Net、.Net Core 、Docker

通俗易懂设计模式解析——享元模式

  今天我们继续讲述设计模式,今天提及的是享元模式,享——共享。之前不是出现了一系列共享的东西吗?为啥呀,还不就是有些东西每个人都需要,但是每个人都去买一个又...

8230

扫码关注云+社区

领取腾讯云代金券

年度创作总结 领取年终奖励