专栏首页SmartSiSpark SQL DataFrame与RDD交互

Spark SQL DataFrame与RDD交互

Spark SQL 支持两种不同的方法将现有 RDD 转换为 Datasets。

  • 第一种方法使用反射来推断包含特定类型对象的 RDD 的 schema。当你在编写 Spark 应用程序时,你已经知道了 schema,这种基于反射的方法会使代码更简洁,并且运行良好。
  • 第二种方法是通过编程接口来创建 DataSet,这种方法允许构建一个 schema,并将其应用到现有的 RDD 上。虽然这种方法更详细,但直到运行时才知道列及其类型,才能构造 DataSets。

1. 使用反射推导schema

Spark SQL 支持自动将 JavaBeans 的 RDD 转换为 DataFrame。使用反射获取的 BeanInfo 定义了表的 schema。目前为止,Spark SQL 还不支持包含 Map 字段的 JavaBean。但是支持嵌套的 JavaBeans,List 以及 Array 字段。你可以通过创建一个实现 Serializable 的类并为其所有字段设置 getter 和 setter 方法来创建一个 JavaBean。

Java版本:

import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.api.java.function.MapFunction;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.Encoder;
import org.apache.spark.sql.Encoders;

// 从文本文件中创建Person对象的RDD
JavaRDD<Person> personRDD = sparkSession.read()
  .textFile("src/main/resources/person.txt")
  .javaRDD()
  .map(line -> {
    String[] parts = line.split(",");
    Person person = new Person();
    person.setName(parts[0]);
    person.setAge(Integer.parseInt(parts[1].trim()));
    return person;
});

// 在 JavaBean 的 RDD 上应用 schema 生成 DataFrame
Dataset<Row> personDataFrame = sparkSession.createDataFrame(personRDD, Person.class);
// 注册为临时视图
personDataFrame.createOrReplaceTempView("people");

// 运行SQl
Dataset<Row> teenagersDataFrame = sparkSession.sql("SELECT name FROM people WHERE age BETWEEN 13 AND 19");

// Row中的列可以通过字段索引获取
Encoder<String> stringEncoder = Encoders.STRING();
Dataset<String> teenagerNamesByIndexDF = teenagersDataFrame.map(
  (MapFunction<Row, String>) row -> "Name: " + row.getString(0),
  stringEncoder
);
teenagerNamesByIndexDF.show();
/**
 +------------+
 |       value|
 +------------+
 |Name: Justin|
 +------------+
 */

// Row中的列可以通过字段名称获取
Dataset<String> teenagerNamesByFieldDF = teenagersDataFrame.map(
  (MapFunction<Row, String>) row -> "Name: " + row.<String>getAs("name"),
  stringEncoder
);
teenagerNamesByFieldDF.show();
/**
 +------------+
 |       value|
 +------------+
 |Name: Justin|
 +------------+
 */

Scala版本:

// For implicit conversions from RDDs to DataFrames
import spark.implicits._

// Create an RDD of Person objects from a text file, convert it to a Dataframe
val peopleDF = spark.sparkContext
  .textFile("src/main/resources/person.txt")
  .map(_.split(","))
  .map(attributes => Person(attributes(0), attributes(1).trim.toInt))
  .toDF()
// Register the DataFrame as a temporary view
peopleDF.createOrReplaceTempView("people")

// SQL statements can be run by using the sql methods provided by Spark
val teenagersDF = spark.sql("SELECT name, age FROM people WHERE age BETWEEN 13 AND 19")

// The columns of a row in the result can be accessed by field index
teenagersDF.map(teenager => "Name: " + teenager(0)).show()
// +------------+
// |       value|
// +------------+
// |Name: Justin|
// +------------+

// or by field name
teenagersDF.map(teenager => "Name: " + teenager.getAs[String]("name")).show()
// +------------+
// |       value|
// +------------+
// |Name: Justin|
// +------------+

// No pre-defined encoders for Dataset[Map[K,V]], define explicitly
implicit val mapEncoder = org.apache.spark.sql.Encoders.kryo[Map[String, Any]]
// Primitive types and case classes can be also defined as
// implicit val stringIntMapEncoder: Encoder[Map[String, Any]] = ExpressionEncoder()

// row.getValuesMap[T] retrieves multiple columns at once into a Map[String, T]
teenagersDF.map(teenager => teenager.getValuesMap[Any](List("name", "age"))).collect()
// Array(Map("name" -> "Justin", "age" -> 19))

2. 使用编程方式指定Schema

当 JavaBean 类不能提前定义时(例如,记录的结构以字符串编码,或者解析文本数据集,不同用户字段映射方式不同),可以通过编程方式创建 DataSet,有如下三个步骤:

  • 从原始 RDD(例如,JavaRDD)创建 Rows 的 RDD(JavaRDD);
  • 创建由 StructType 表示的 schema,与步骤1中创建的 RDD 中的 Rows 结构相匹配。
  • 通过SparkSession提供的 createDataFrame 方法将 schema 应用到 Rows 的 RDD。

Java版本:

import java.util.ArrayList;
import java.util.List;

import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.function.Function;

import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;

import org.apache.spark.sql.types.DataTypes;
import org.apache.spark.sql.types.StructField;
import org.apache.spark.sql.types.StructType;

// JavaRDD<String>
JavaRDD<String> peopleRDD = sparkSession.sparkContext()
  .textFile("src/main/resources/person.txt", 1)
  .toJavaRDD();

// JavaRDD<Row>
JavaRDD<Row> rowRDD = peopleRDD.map((Function<String, Row>) record -> {
    String[] attributes = record.split(",");
    return RowFactory.create(attributes[0], attributes[1].trim());
});

// 字符串 schema
String schemaString = "name age";
// 根据字符串 schema 产生 schema
List<StructField> fields = new ArrayList<>();
for (String fieldName : schemaString.split(" ")) {
    StructField field = DataTypes.createStructField(fieldName, DataTypes.StringType, true);
    fields.add(field);
}
StructType schema = DataTypes.createStructType(fields);

// Dataset<Row>
Dataset<Row> peopleDataFrame = sparkSession.createDataFrame(rowRDD, schema);

// 临时视图
peopleDataFrame.createOrReplaceTempView("people");

// 运行SQL
Dataset<Row> results = sparkSession.sql("SELECT name FROM people");

Dataset<String> namesDS = results.map(
        (MapFunction<Row, String>) row -> "Name: " + row.getString(0),
        Encoders.STRING());
namesDS.show();
/**
 +-------------+
 |        value|
 +-------------+
 |Name: Michael|
 |   Name: Andy|
 | Name: Justin|
 +-------------+
 */

Scala版本:

import org.apache.spark.sql.types._

// Create an RDD
val peopleRDD = spark.sparkContext.textFile("src/main/resources/person.txt")

// The schema is encoded in a string
val schemaString = "name age"

// Generate the schema based on the string of schema
val fields = schemaString.split(" ")
  .map(fieldName => StructField(fieldName, StringType, nullable = true))
val schema = StructType(fields)

// Convert records of the RDD (people) to Rows
val rowRDD = peopleRDD
  .map(_.split(","))
  .map(attributes => Row(attributes(0), attributes(1).trim))

// Apply the schema to the RDD
val peopleDF = spark.createDataFrame(rowRDD, schema)

// Creates a temporary view using the DataFrame
peopleDF.createOrReplaceTempView("people")

// SQL can be run over a temporary view created using DataFrames
val results = spark.sql("SELECT name FROM people")

// The results of SQL queries are DataFrames and support all the normal RDD operations
// The columns of a row in the result can be accessed by field index or by field name
results.map(attributes => "Name: " + attributes(0)).show()
/**
 +-------------+
 |        value|
 +-------------+
 |Name: Michael|
 |   Name: Andy|
 | Name: Justin|
 +-------------+
 */

Spark 版本: 2.3.1

原文:http://spark.apache.org/docs/2.3.1/sql-programming-guide.html#interoperating-with-rdds

本文参与腾讯云自媒体分享计划,欢迎正在阅读的你也加入,一起分享。

我来说两句

0 条评论
登录 后参与评论

相关文章

  • Jmeter接口测试-websocket测试

    https://github.com/maciejzaleski/JMeter-WebSocketSampler/wiki/Dependencies

    louiezhou001
  • Delphi使用NativeXml处理XML(二)

    4.1.类(Classes) 4.1.1.TComponentAccess类   TComponentAccess = class(TComponent) 4....

    Vaccae
  • MAC下多版本JDK切换

    手头上的工具有时候依赖低版本jdk,有时候需要高版本jdk, 如何在不同版本jdk之间来回自由的切换?

    louiezhou001
  • soapUI接口测试参数化

    原本想先从测试概念慢慢深入讲解测试流程, 测试原理, 由浅入深逐渐讲解,但这样一来感觉先介绍理论有些乏味, 作为一名从事多年的测试工程师, 沉淀了较为丰富的测试...

    louiezhou001
  • Charles 抓包手机app

    mac系统无法使用fiddler, 不知道其他朋友是否遇见过, 只能找替代工具.先去百度上搜索下载Charles 破解版, 选择Charles是4.2.7版本.

    louiezhou001
  • Android利用SurfaceView显示Camera图像爬坑记(五) -- 在现有项目中加入NDK配置

    前面几章我们已经把SurfaceView加载Camera实现实时帧显示图像完成了,我也说过,我们加载实时图像是为了对接OpenCV进行图像处理所以才生成的Bit...

    Vaccae
  • 【干货】VueJs里利用CryptoJs实现Md5加密和3Des加密及解密

    前我们介绍的用于vue用于数据签名的操作,《【干货】Vue TypeScript根据类生成签名字符串》,其目的就是用于生成这个再转MD5加密的模式进行校验,原来...

    Vaccae
  • JavaScript入门总结——第三弹 数组大放送

    Hello大家好~~首先在这里祝大家元宵节快乐呢,大家吃汤圆了吗,兔妞反正还没吃呢,等着晚上放开肚子吃呢,糯叽叽糯叽叽,嘿嘿

    萌兔IT
  • Android NDK编程(四)--- C/C++调用Java中的方法

    上一篇我们介绍了《Android NDK编程(三)--- Android调用C的函数》,主要是介绍了在Android中怎么调用C/C++中的方法,在我们开发nd...

    Vaccae
  • Android NDK编程(五)--- C/C++调用Java不同类中的静态方法

    上一篇我们介绍了《Android NDK编程(四)--- C/C++调用Java中的方法》,主要是C/C++中调用Java的方法,这一篇我们针对上一篇的内容再延...

    Vaccae

扫码关注云+社区

领取腾讯云代金券