前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >Spark读取结构化数据

Spark读取结构化数据

作者头像
用户2183996
修改2019-09-22 17:41:23
1.8K0
修改2019-09-22 17:41:23
举报
文章被收录于专栏:技术沉淀技术沉淀

开公众号啦,分享读书心得,欢迎一起交流成长。

读取结构化数据

Spark可以从本地CSV,HDFS以及Hive读取结构化数据,直接解析为DataFrame,进行后续分析。

读取本地CSV

需要指定一些选项,比如留header,比如指定delimiter值,用或者\t或者其他。

代码语言:javascript
复制
import org.apache.spark.sql.{DataFrame, SparkSession}

object ReadCSV {
  val spark: SparkSession = SparkSession
    .builder()
    .appName("Spark Rocks")
    .master("local[*]")
    .getOrCreate()

  val path: String = "/path/to/file/data.csv"
  val df: DataFrame = spark.read
    .option("header","true")
    .option("inferSchema","true")
    .option("delimiter",",")
    .csv(path)
    .toDF()

  def main(args: Array[String]): Unit = {
    df.show()
    df.printSchema()
  }
}
读取Hive数据

SparkSession可以直接调用sql方法,传入sql查询语句即可。返回的DataFrame可以做简单的变化,比如转换 数据类型,对重命名之类。

代码语言:javascript
复制
import org.apache.spark.sql.{DataFrame, SparkSession}
import org.apache.spark.sql.types.IntegerType

object ReadHive {
  val spark: SparkSession = SparkSession
    .builder()
    .appName("Spark Rocks")
    .master("local[*]")
    .enableHiveSupport() // 需要开启Hive支持
    .getOrCreate()
  import spark.implicits._ //隐式转换

  val sql: String = "SELECT col1, col2 FROM db.myTable LIMIT 1000"
  val df: DataFrame = spark.sql(sql)
    .withColumn("col1", $"col1".cast(IntegerType))
    .withColumnRenamed("col2","new_col2")

  def main(args: Array[String]): Unit = {
    df.show()
    df.printSchema()
  }
}
读取HDFS数据

HDFS上没有数据无法获取表头,需要单独指定。可以参考databricks的网页。一般HDFS默认在9000端口访问。

代码语言:javascript
复制
import org.apache.spark.sql.{DataFrame, SparkSession}

object ReadHDFS {
  val spark: SparkSession = SparkSession
    .builder()
    .appName("Spark Rocks")
    .master("local[*]")
    .getOrCreate()

  val location: String = "hdfs://localhost:9000/user/zhangsan/test"
  val df: DataFrame = spark
    .read
    .format("com.databricks.spark.csv")
    .option("inferSchema","true")
    .option("delimiter","\001")
    .load(location)
    .toDF("col1","col2")

  def main(args: Array[String]): Unit = {
    df.show()
    df.printSchema()
  }
}
本文参与 腾讯云自媒体分享计划,分享自作者个人站点/博客。
原始发表:2017.05.26 ,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 作者个人站点/博客 前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体分享计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 读取结构化数据
    • 读取本地CSV
      • 读取Hive数据
        • 读取HDFS数据
        领券
        问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档