Spark读取结构化数据

读取结构化数据

Spark可以从本地CSV,HDFS以及Hive读取结构化数据,直接解析为DataFrame,进行后续分析。

读取本地CSV

需要指定一些选项,比如留header,比如指定delimiter值,用或者\t或者其他。

import org.apache.spark.sql.{DataFrame, SparkSession}

object ReadCSV {
  val spark: SparkSession = SparkSession
    .builder()
    .appName("Spark Rocks")
    .master("local[*]")
    .getOrCreate()

  val path: String = "/path/to/file/data.csv"
  val df: DataFrame = spark.read
    .option("header","true")
    .option("inferSchema","true")
    .option("delimiter",",")
    .csv(path)
    .toDF()

  def main(args: Array[String]): Unit = {
    df.show()
    df.printSchema()
  }
}

读取Hive数据

SparkSession可以直接调用sql方法,传入sql查询语句即可。返回的DataFrame可以做简单的变化,比如转换 数据类型,对重命名之类。

import org.apache.spark.sql.{DataFrame, SparkSession}
import org.apache.spark.sql.types.IntegerType

object ReadHive {
  val spark: SparkSession = SparkSession
    .builder()
    .appName("Spark Rocks")
    .master("local[*]")
    .enableHiveSupport() // 需要开启Hive支持
    .getOrCreate()
  import spark.implicits._ //隐式转换

  val sql: String = "SELECT col1, col2 FROM db.myTable LIMIT 1000"
  val df: DataFrame = spark.sql(sql)
    .withColumn("col1", $"col1".cast(IntegerType))
    .withColumnRenamed("col2","new_col2")

  def main(args: Array[String]): Unit = {
    df.show()
    df.printSchema()
  }
}

读取HDFS数据

HDFS上没有数据无法获取表头,需要单独指定。可以参考databricks的网页。一般HDFS默认在9000端口访问。

import org.apache.spark.sql.{DataFrame, SparkSession}

object ReadHDFS {
  val spark: SparkSession = SparkSession
    .builder()
    .appName("Spark Rocks")
    .master("local[*]")
    .getOrCreate()

  val location: String = "hdfs://localhost:9000/user/zhangsan/test"
  val df: DataFrame = spark
    .read
    .format("com.databricks.spark.csv")
    .option("inferSchema","true")
    .option("delimiter","\001")
    .load(location)
    .toDF("col1","col2")

  def main(args: Array[String]): Unit = {
    df.show()
    df.printSchema()
  }
}

本文参与腾讯云自媒体分享计划,欢迎正在阅读的你也加入,一起分享。

发表于

我来说两句

0 条评论
登录 后参与评论

相关文章

来自专栏张善友的专栏

如何结合IbatisNet的LIST遍历实现模糊查询

我仿照Java的Spring+Ibatis+Struct用Castle+IBatisNet+Asp.net的开发框架的DAO的基类:BaseSqlMapDao内...

1809
来自专栏进击的程序猿

通过Eloquent实现Repository模式

Eloquent采用了ActiveRecord的模式,这也让Eloquent招致了好多批评,让我们去看现在Eloquent/Model.php文件, 该文件已经...

713
来自专栏https://www.cnblogs.com/L

Hbase篇--Hbase和MapReduce结合Api

Mapreduce可以自定义Inputforma对象和OutPutformat对象,所以原理上Mapreduce可以和任意输入源结合。

703
来自专栏丑胖侠

《Drools7.0.0.Final规则引擎教程》第4章 4.6 结果条件

结果条件 在Java中,如果有重复的代码我们会考虑进行重构,抽取公共方法或继承父类,以减少相同的代码在多处出现,达到代码的最优管理和不必要的麻烦。Drools同...

2049
来自专栏https://www.cnblogs.com/L

Hadoop源码篇--Reduce篇

Reduce文件会从Mapper任务中拉取很多小文件,小文件内部有序,但是整体是没序的,Reduce会合并小文件,然后套个归并算法,变成一个整体有序的文件。

701
来自专栏https://www.cnblogs.com/L

Hadoop源码篇--Client源码

今天起剖析源码,先从Client看起,因为Client在MapReduce的过程中承担了很多重要的角色。

863
来自专栏个人随笔

房上的猫:吃货联盟项目

一.首先先定义部分成员变量: String[] name = new String[4];// 订餐人 String[] greens = new St...

33710
来自专栏魏琼东

基于DotNet构件技术的企业级敏捷软件开发平台 - AgileEAS.NET平台开发指南 - 数据层开发

对象关系映射          AgileEAS.NETORM并没有采用如NHibernate中映射文件的文件的模式,而是采用了直接硬编码的模式实现,ORM体系...

1969
来自专栏醒者呆

EOS技术研究:合约与数据库交互

2055
来自专栏Spark学习技巧

Spark源码系列之spark2.2的StructuredStreaming使用及源码介绍

一,概述 Structured Streaming是一个可扩展和容错的流处理引擎,并且是构建于sparksql引擎之上。你可以用处理静态数据的方式去处理你的流计...

7306

扫码关注云+社区