Spark读取结构化数据

读取结构化数据

Spark可以从本地CSV,HDFS以及Hive读取结构化数据,直接解析为DataFrame,进行后续分析。

读取本地CSV

需要指定一些选项,比如留header,比如指定delimiter值,用或者\t或者其他。

import org.apache.spark.sql.{DataFrame, SparkSession}

object ReadCSV {
  val spark: SparkSession = SparkSession
    .builder()
    .appName("Spark Rocks")
    .master("local[*]")
    .getOrCreate()

  val path: String = "/path/to/file/data.csv"
  val df: DataFrame = spark.read
    .option("header","true")
    .option("inferSchema","true")
    .option("delimiter",",")
    .csv(path)
    .toDF()

  def main(args: Array[String]): Unit = {
    df.show()
    df.printSchema()
  }
}

读取Hive数据

SparkSession可以直接调用sql方法,传入sql查询语句即可。返回的DataFrame可以做简单的变化,比如转换 数据类型,对重命名之类。

import org.apache.spark.sql.{DataFrame, SparkSession}
import org.apache.spark.sql.types.IntegerType

object ReadHive {
  val spark: SparkSession = SparkSession
    .builder()
    .appName("Spark Rocks")
    .master("local[*]")
    .enableHiveSupport() // 需要开启Hive支持
    .getOrCreate()
  import spark.implicits._ //隐式转换

  val sql: String = "SELECT col1, col2 FROM db.myTable LIMIT 1000"
  val df: DataFrame = spark.sql(sql)
    .withColumn("col1", $"col1".cast(IntegerType))
    .withColumnRenamed("col2","new_col2")

  def main(args: Array[String]): Unit = {
    df.show()
    df.printSchema()
  }
}

读取HDFS数据

HDFS上没有数据无法获取表头,需要单独指定。可以参考databricks的网页。一般HDFS默认在9000端口访问。

import org.apache.spark.sql.{DataFrame, SparkSession}

object ReadHDFS {
  val spark: SparkSession = SparkSession
    .builder()
    .appName("Spark Rocks")
    .master("local[*]")
    .getOrCreate()

  val location: String = "hdfs://localhost:9000/user/zhangsan/test"
  val df: DataFrame = spark
    .read
    .format("com.databricks.spark.csv")
    .option("inferSchema","true")
    .option("delimiter","\001")
    .load(location)
    .toDF("col1","col2")

  def main(args: Array[String]): Unit = {
    df.show()
    df.printSchema()
  }
}

本文参与腾讯云自媒体分享计划,欢迎正在阅读的你也加入,一起分享。

发表于

我来说两句

0 条评论
登录 后参与评论

相关文章

来自专栏积累沉淀

Spark(1.6.1) Sql 编程指南+实战案例分析

首先看看从官网学习后总结的一个思维导图 ? 概述(Overview) Spark SQL是Spark的一个模块,用于结构化数据处理。它提供了一个编程的抽象被称...

3068
来自专栏一名叫大蕉的程序员

Spark你一定学得会(一)No.7

我是小蕉。 上一篇大家说没有干货,妈蛋回南天哪来的干货你告诉我!!!还好这几天天气还不错,干货来了。 首先祭上今天关键代码,要做的事情就是从Hive表中取得年龄...

1845
来自专栏大数据-Hadoop、Spark

Spark DataFrame基本操作

DataFrame的概念来自R/Pandas语言,不过R/Pandas只是runs on One Machine,DataFrame是分布式的,接口简单易用。 ...

2624
来自专栏大数据-Hadoop、Spark

DataFrame与RDD的互操作

DataFrame Interoperating with RDDs 参考官网 http://spark.apache.org/docs/2.2.0/sql-...

2614
来自专栏浪淘沙

SparkStreaming编程实现

3.MyNetworkTotalWordCountV2.scala(开发自己的实时词频统计程序(累计单词出现次数))

1305
来自专栏Hadoop实操

PySpark数据类型转换异常分析

在使用PySpark的SparkSQL读取HDFS的文本文件创建DataFrame时,在做数据类型转换时会出现一些异常,如下:

4125
来自专栏星汉技术

原 荐 Spark框架核心概念

3708
来自专栏个人分享

spark基础练习(未完)

1、filter val rdd = sc.parallelize(List(1,2,3,4,5)) val mappedRDD = rdd.map(2*_) ...

1042
来自专栏岑玉海

Spark1.0新特性-->Spark SQL

Spark1.0出来了,变化还是挺大的,文档比以前齐全了,RDD支持的操作比以前多了一些,Spark on yarn功能我居然跑通了。但是最最重要的就是多了一个...

2644
来自专栏李德鑫的专栏

Spark SQL 数据统计 Scala 开发小结

Dataset API 属于用于处理结构化数据的 Spark SQL 模块,通过比 RDD 多的数据的结构信息,Spark SQL 在计算的时候可以进行额外的...

3.6K4

扫码关注云+社区