专栏首页技术沉淀Spark读取结构化数据

Spark读取结构化数据

开公众号啦,分享读书心得,欢迎一起交流成长。

读取结构化数据

Spark可以从本地CSV,HDFS以及Hive读取结构化数据,直接解析为DataFrame,进行后续分析。

读取本地CSV

需要指定一些选项,比如留header,比如指定delimiter值,用或者\t或者其他。

import org.apache.spark.sql.{DataFrame, SparkSession}

object ReadCSV {
  val spark: SparkSession = SparkSession
    .builder()
    .appName("Spark Rocks")
    .master("local[*]")
    .getOrCreate()

  val path: String = "/path/to/file/data.csv"
  val df: DataFrame = spark.read
    .option("header","true")
    .option("inferSchema","true")
    .option("delimiter",",")
    .csv(path)
    .toDF()

  def main(args: Array[String]): Unit = {
    df.show()
    df.printSchema()
  }
}

读取Hive数据

SparkSession可以直接调用sql方法,传入sql查询语句即可。返回的DataFrame可以做简单的变化,比如转换 数据类型,对重命名之类。

import org.apache.spark.sql.{DataFrame, SparkSession}
import org.apache.spark.sql.types.IntegerType

object ReadHive {
  val spark: SparkSession = SparkSession
    .builder()
    .appName("Spark Rocks")
    .master("local[*]")
    .enableHiveSupport() // 需要开启Hive支持
    .getOrCreate()
  import spark.implicits._ //隐式转换

  val sql: String = "SELECT col1, col2 FROM db.myTable LIMIT 1000"
  val df: DataFrame = spark.sql(sql)
    .withColumn("col1", $"col1".cast(IntegerType))
    .withColumnRenamed("col2","new_col2")

  def main(args: Array[String]): Unit = {
    df.show()
    df.printSchema()
  }
}

读取HDFS数据

HDFS上没有数据无法获取表头,需要单独指定。可以参考databricks的网页。一般HDFS默认在9000端口访问。

import org.apache.spark.sql.{DataFrame, SparkSession}

object ReadHDFS {
  val spark: SparkSession = SparkSession
    .builder()
    .appName("Spark Rocks")
    .master("local[*]")
    .getOrCreate()

  val location: String = "hdfs://localhost:9000/user/zhangsan/test"
  val df: DataFrame = spark
    .read
    .format("com.databricks.spark.csv")
    .option("inferSchema","true")
    .option("delimiter","\001")
    .load(location)
    .toDF("col1","col2")

  def main(args: Array[String]): Unit = {
    df.show()
    df.printSchema()
  }
}

本文参与腾讯云自媒体分享计划,欢迎正在阅读的你也加入,一起分享。

我来说两句

0 条评论
登录 后参与评论

相关文章

  • 命令行工具:sed流式编辑

    用户2183996
  • Rails里实现Filter功能

    用户2183996
  • Hinge Loss

    用户2183996
  • MySQL参数之sql_slave_skip_counter

    在MySQL5.5和MySQL5.6中,处理主从复制断开的问题时,经常会用到sql_slave_skip_counter这个参数,一般是将这个参数设置...

    AsiaYe
  • 实战|JPS跳点寻路实现运行路径规划

    上两篇我们主要就是说了A*算法结合OpenCV进行室内地图路线规划,在具体使用过程中发现,遇到比较复杂的地形路线后,计算的时间太长了,经过了一些基础的优化(最近...

    Vaccae
  • 【未完成】7-14 特殊队列 (30 分)

    版权声明:本文为博主原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明。 ...

    韩旭051
  • pyspark列合并为一行

    将 dataframe 利用 pyspark 列合并为一行,类似于 sql 的 GROUP_CONCAT 函数。例如如下 dataframe :

    机器学习和大数据挖掘
  • HBase默认配置文件 hbase-default.xml 注释解析

    黑泽君
  • B题 2010年上海世博会影响力的定量评估---数据曲线拟合

    2010年上海世博会是首次在中国举办的世界博览会。从1851年伦敦的“万国工业博览会”开始,世博会正日益成为各国人民交流历史文化、展示科技成果、体现合作精神、展...

    ccf19881030
  • com.mysql.jdbc.exceptions.jdbc4.CommunicationsE...

    com.mysql.jdbc.exceptions.jdbc4.CommunicationsException: Communications link ...

    闵开慧

扫码关注云+社区

领取腾讯云代金券