前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >Spark将Dataframe数据写入Hive分区表的方案

Spark将Dataframe数据写入Hive分区表的方案

作者头像
王知无-import_bigdata
修改2019-08-17 23:22:25
15.3K0
修改2019-08-17 23:22:25
举报

5万人关注的大数据成神之路,不来了解一下吗?

5万人关注的大数据成神之路,真的不来了解一下吗?

5万人关注的大数据成神之路,确定真的不来了解一下吗?

欢迎您关注《大数据成神之路》

DataFrame 将数据写入hive中时,默认的是hive默认数据库,insert into没有指定数据库的参数,数据写入hive表或者hive表分区中:

1、将DataFrame数据写入到hive表中

从DataFrame类中可以看到与hive表有关的写入API有一下几个:

registerTempTable(tableName:String):Unit,
inserInto(tableName:String):Unit
insertInto(tableName:String,overwrite:Boolean):Unit
saveAsTable(tableName:String,source:String,mode:SaveMode,options:Map[String,String]):Unit

有很多重载函数,不一一列举

registerTempTable函数是创建spark临时表

insertInto函数是向表中写入数据,可以看出此函数不能指定数据库和分区等信息,不可以直接写入。

向hive数据仓库写入数据必须指定数据库,hive数据表建立可以在hive上建立,或者使用hiveContext.sql("create table .....")

下面语句是向指定数据库数据表中写入数据:

case class Person(name:String,col1:Int,col2:String)

val sc = new org.apache.spark.SparkContext   
val hiveContext = new org.apache.spark.sql.hive.HiveContext(sc)

import hiveContext.implicits._
hiveContext.sql("use DataBaseName")
val data = sc.textFile("path").map(x=>x.split("\\s+")).map(x=>Person(x(0),x(1).toInt,x(2)))
data.toDF().insertInto("tableName")

创建一个case类将RDD中数据类型转为case类类型,然后通过toDF转换DataFrame,调用insertInto函数时,首先指定数据库,使用的是hiveContext.sql("use DataBaseName") 语句,就可以将DataFrame数据写入hive数据表中了。

2、将DataFrame数据写入hive指定数据表的分区中

hive数据表建立可以在hive上建立,或者使用hiveContext.sql("create table....") ,使用saveAsTable时数据存储格式有限,默认格式为parquet,将数据写入分区的思路是:首先将DataFrame数据写入临时表,之后由hiveContext.sql语句将数据写入hive分区表中,具体操作如下:

case class Person(name:String,col1:Int,col2:String)

val sc = new org.apache.spark.SparkContext   
val hiveContext = new org.apache.spark.sql.hive.HiveContext(sc)

import hiveContext.implicits._
hiveContext.sql("use DataBaseName")
val data = sc.textFile("path").map(x=>x.split("\\s+")).map(x=>Person(x(0),x(1).toInt,x(2)))
data.toDF().registerTempTable("table1")
hiveContext.sql("insert into table2 partition(date='2015-04-02') select name,col1,col2 from table1")

上面代码是spark版本1.6

下面代码是spark版本2.0及以上版本

val session = SparkSession.builder().appName("WarehouseInventoryByNewMysqlSnap").enableHiveSupport().getOrCreate()
val sc: SparkContext=session.sparkContext
session.sql("use bi_work")
import session.implicits._
val data = sc.textFile("path").map(x=>x.split("\\s+")).map(x=>Person(x(0),x(1).toInt,x(2)))
data.toDF().registerTempTable("table1")
session.sql("insert into table2 partition(date='2015-04-02') select name,col1,col2 from table1")

这边捎带介绍一下hive创建分区表:

hive分区表:是指在创建表时指定的partition的分区空间,若需要创建有分区的表,需要在create表的时候调用可选参数partitioned by。

注意:

  • 一个表可以拥有一个或者多个分区,每个分区以文件夹的形式单独存在表文件夹的目录下
  • hive的表和列名不区分大小写
  • 分区是以字段的形式在表的结构中存在,通过desc table_name 命令可以查看到字段存在,该字段是分区的标识
  • 建表的语句:
CREATE EXTERNAL TABLE bi_work.`dw_inventory_snap`
(`warehouse_id` string COMMENT '',
`internal_id` string COMMENT '', 
`logical_inventory` string COMMENT '', 
`create_time` timestamp COMMENT '')
PARTITIONED BY (`snap_time` string) --指定分区
row format delimited fields terminated by '\t'

— THE END —

本文参与 腾讯云自媒体分享计划,分享自微信公众号。
原始发表:2019-07-18,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 大数据技术与架构 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体分享计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
相关产品与服务
数据库
云数据库为企业提供了完善的关系型数据库、非关系型数据库、分析型数据库和数据库生态工具。您可以通过产品选择和组合搭建,轻松实现高可靠、高可用性、高性能等数据库需求。云数据库服务也可大幅减少您的运维工作量,更专注于业务发展,让企业一站式享受数据上云及分布式架构的技术红利!
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档