Spark读取结构化数据

读取结构化数据

Spark可以从本地CSV,HDFS以及Hive读取结构化数据,直接解析为DataFrame,进行后续分析。

读取本地CSV

需要指定一些选项,比如留header,比如指定delimiter值,用或者\t或者其他。

import org.apache.spark.sql.{DataFrame, SparkSession}

object ReadCSV {
  val spark: SparkSession = SparkSession
    .builder()
    .appName("Spark Rocks")
    .master("local[*]")
    .getOrCreate()

  val path: String = "/path/to/file/data.csv"
  val df: DataFrame = spark.read
    .option("header","true")
    .option("inferSchema","true")
    .option("delimiter",",")
    .csv(path)
    .toDF()

  def main(args: Array[String]): Unit = {
    df.show()
    df.printSchema()
  }
}

读取Hive数据

SparkSession可以直接调用sql方法,传入sql查询语句即可。返回的DataFrame可以做简单的变化,比如转换 数据类型,对重命名之类。

import org.apache.spark.sql.{DataFrame, SparkSession}
import org.apache.spark.sql.types.IntegerType

object ReadHive {
  val spark: SparkSession = SparkSession
    .builder()
    .appName("Spark Rocks")
    .master("local[*]")
    .enableHiveSupport() // 需要开启Hive支持
    .getOrCreate()
  import spark.implicits._ //隐式转换

  val sql: String = "SELECT col1, col2 FROM db.myTable LIMIT 1000"
  val df: DataFrame = spark.sql(sql)
    .withColumn("col1", $"col1".cast(IntegerType))
    .withColumnRenamed("col2","new_col2")

  def main(args: Array[String]): Unit = {
    df.show()
    df.printSchema()
  }
}

读取HDFS数据

HDFS上没有数据无法获取表头,需要单独指定。可以参考databricks的网页。一般HDFS默认在9000端口访问。

import org.apache.spark.sql.{DataFrame, SparkSession}

object ReadHDFS {
  val spark: SparkSession = SparkSession
    .builder()
    .appName("Spark Rocks")
    .master("local[*]")
    .getOrCreate()

  val location: String = "hdfs://localhost:9000/user/zhangsan/test"
  val df: DataFrame = spark
    .read
    .format("com.databricks.spark.csv")
    .option("inferSchema","true")
    .option("delimiter","\001")
    .load(location)
    .toDF("col1","col2")

  def main(args: Array[String]): Unit = {
    df.show()
    df.printSchema()
  }
}

本文参与腾讯云自媒体分享计划,欢迎正在阅读的你也加入,一起分享。

发表于

我来说两句

0 条评论
登录 后参与评论

相关文章

来自专栏LhWorld哥陪你聊算法

【Spark篇】---SparkSQL初始和创建DataFrame的几种方式

          Hive是Shark的前身,Shark是SparkSQL的前身,SparkSQL产生的根本原因是其完全脱离了Hive的限制。

1051
来自专栏大数据学习笔记

Spark2.x学习笔记:14、Spark SQL程序设计

Spark2.x学习笔记:14、 Spark SQL程序设计 14.1 RDD的局限性 RDD仅表示数据集,RDD没有元数据,也就是说没有字段语义定义。 RDD...

5757
来自专栏浪淘沙

SparkSql学习笔记一

1.简介     Spark SQL是Spark用来处理结构化数据的一个模块,它提供了一个编程抽象叫做DataFrame并且作为分布式SQL查询引擎的作用。 ...

1423
来自专栏大数据-Hadoop、Spark

DataFrame与RDD的互操作

DataFrame Interoperating with RDDs 参考官网 http://spark.apache.org/docs/2.2.0/sql-...

2654
来自专栏个人分享

spark基础练习(未完)

1、filter val rdd = sc.parallelize(List(1,2,3,4,5)) val mappedRDD = rdd.map(2*_) ...

1532
来自专栏Albert陈凯

Spark自定义累加器的实现

Spark自定义累加器的实现 Java版本: package com.luoxuehuan.sparkproject.spark; import org.apa...

3995
来自专栏岑玉海

Spark1.0新特性-->Spark SQL

Spark1.0出来了,变化还是挺大的,文档比以前齐全了,RDD支持的操作比以前多了一些,Spark on yarn功能我居然跑通了。但是最最重要的就是多了一个...

2704
来自专栏Hadoop实操

PySpark数据类型转换异常分析

在使用PySpark的SparkSQL读取HDFS的文本文件创建DataFrame时,在做数据类型转换时会出现一些异常,如下:

5035
来自专栏Spark生态圈

[spark] DAGScheduler划分stage源码解析

Spark Application只有遇到action操作时才会真正的提交任务并进行计算,DAGScheduler 会根据各个RDD之间的依赖关系形成一个DAG...

1402
来自专栏积累沉淀

Spark(1.6.1) Sql 编程指南+实战案例分析

首先看看从官网学习后总结的一个思维导图 ? 概述(Overview) Spark SQL是Spark的一个模块,用于结构化数据处理。它提供了一个编程的抽象被称...

3388

扫码关注云+社区