前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >pyspark读取pickle文件内容并存储到hive

pyspark读取pickle文件内容并存储到hive

作者头像
西西嘛呦
发布2020-10-27 17:23:52
2.7K0
发布2020-10-27 17:23:52
举报
文章被收录于专栏:数据分析与挖掘

在平常工作中,难免要和大数据打交道,而有时需要读取本地文件然后存储到Hive中,本文接下来将具体讲解。

过程:

  • 使用pickle模块读取.plk文件;
  • 将读取到的内容转为RDD;
  • 将RDD转为DataFrame之后存储到Hive仓库中;

1、使用pickle保存和读取pickle文件

代码语言:javascript
复制
import pickle
data = ""
path = "xxx.plj"
#保存为pickle
pickle.dump(data,open(path,'wb'))
#读取pickle
data2 = pickle.load(open(path,'rb'))

使用python3读取python2保存的pickle文件时,会报错:

UnicodeDecodeError: 'ascii' codec can't decode byte 0xa0 in position 11: ordinal not in range(128)

解决方法:

代码语言:javascript
复制
data2 = pickle.load(open(path,'rb',encoding='latin1'))

使用python2读取python3保存的pickle文件时,会报错:

unsupported pickle protocol:3

解决方法:

代码语言:javascript
复制
import pickle
path = "xxx.plk"
path2 = 'xxx2.plk'
data = pickle.load(open(path,'rb'))
#保存为python2的pickle
pickle.dump(data,open(path2,'wb'),protocol=2)
#读取pickle
data2 = pickle.load(open(path2,'rb'))

2、读取pickle的内容并转为RDD

代码语言:javascript
复制
from pyspark.sql import SparkSession
from pyspark.sql import Row
import pickle


spark = SparkSession \
    .builder \
    .appName("Python Spark SQL basic example") \
    .config("spark.some.config.option", "some-value") \
    .getOrCreate()
with open(picle_path,"rb") as fp:
    data = pickle.load(fp)
    #这里可根据data的类型进行相应的操作

#假设data是一个一维数组:[1,2,3,4,5],读取数据并转为rdd
pickleRdd = spark.parallelize(data)

3、将rdd转为dataframe并存入到Hive中

代码语言:javascript
复制
#定义列名
column = Row('col')
#转为dataframe
pickleDf =pickleRdd.map(lambda x:column(x))
#存储到Hive中,会新建数据库:hive_database,新建表:hive_table,以覆盖的形式添加,partitionBy用于指定分区字段
pickleDf..write.saveAsTable("hive_database.hvie_table", mode='overwrite', partitionBy=‘’)

补充存入到Hive中的知识:

(1)通过sql的方式

代码语言:javascript
复制
data = [
    (1,"3","145"),
    (1,"4","146"),
    (1,"5","25"),
    (1,"6","26"),
    (2,"32","32"),
    (2,"8","134"),
    (2,"8","134"),
    (2,"9","137")
]
df = spark.createDataFrame(data, ['id', "test_id", 'camera_id'])
 
# method one,default是默认数据库的名字,write_test 是要写到default中数据表的名字
df.registerTempTable('test_hive')
sqlContext.sql("create table default.write_test select * from test_hive")

或者:

代码语言:javascript
复制
# df 转为临时表/临时视图
df.createOrReplaceTempView("df_tmp_view")
# spark.sql 插入hive
spark.sql(""insert overwrite table 
                    XXXXX  # 表名
                   partition(分区名称=分区值)   # 多个分区按照逗号分开
                   select 
                   XXXXX  # 字段名称,跟hive字段顺序对应,不包含分区字段
                   from df_tmp_view""")

(2)以saveAsTable的形式

代码语言:javascript
复制
# "overwrite"是重写表的模式,如果表存在,就覆盖掉原始数据,如果不存在就重新生成一张表
#  mode("append")是在原有表的基础上进行添加数据
df.write.format("hive").mode("overwrite").saveAsTable('default.write_test')

以下是通过rdd创建dataframe的几种方法:

(1)通过键值对

代码语言:javascript
复制
d = [{'name': 'Alice', 'age': 1}]
output = spark.createDataFrame(d).collect()
print(output)

# [Row(age=1, name='Alice')]

(2)通过rdd

代码语言:javascript
复制
a = [('Alice', 1)]
rdd = sc.parallelize(a)
output = spark.createDataFrame(rdd).collect()
print(output)
output = spark.createDataFrame(rdd, ["name", "age"]).collect()
print(output)

# [Row(_1='Alice', _2=1)]
# [Row(name='Alice', age=1)]

(3)通过rdd和Row

代码语言:javascript
复制
from pyspark.sql import Row


a = [('Alice', 1)]
rdd = sc.parallelize(a)
Person = Row("name", "age")
person = rdd.map(lambda r: Person(*r))
output = spark.createDataFrame(person).collect()
print(output)

# [Row(name='Alice', age=1)]

(4)通过rdd和StrutType

代码语言:javascript
复制
from pyspark.sql.types import *

a = [('Alice', 1)]
rdd = sc.parallelize(a)
schema = StructType(
    [
        StructField("name", StringType(), True),
        StructField("age", IntegerType(), True)
    ]
)
output = spark.createDataFrame(rdd, schema).collect()
print(output)

# [Row(name='Alice', age=1)]

(5)基于pandas dataframe创建

代码语言:javascript
复制
df = spark.createDataFrame(rdd, ['name', 'age'])
print(df)  # DataFrame[name: string, age: bigint]

print(type(df.toPandas()))  # <class 'pandas.core.frame.DataFrame'>

# 传入pandas DataFrame
output = spark.createDataFrame(df.toPandas()).collect()
print(output)

# [Row(name='Alice', age=1)]

参考:

https://blog.csdn.net/sinat_28224453/article/details/84977693

https://blog.csdn.net/weixin_39198406/article/details/104916715

https://blog.csdn.net/u011412768/article/details/93426353

本文参与 腾讯云自媒体同步曝光计划,分享自作者个人站点/博客。
原始发表:2020-10-14 ,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 作者个人站点/博客 前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档