SparkSQL入门_1

概述

先说说准备工作吧。 目前使用的是伪分布式模式,hadoop,spark都已经配置好了。 数据仓库采用的是hive,hive的metastore存储在mysql中。

现在的主要目的是想把spark和hive结合起来,也就是用spark读取hive中的数据。 所以就用到了sparksql。

sparksql的配置有点麻烦,需要将spark的源码编译获取assembly包,另外还需要mysql-connector的驱动包,另外再将hive-site.xml放到conf文件夹中就可以了。

目前存在的问题是sparksql创建表权限报错,解决的方法是用hive先创建了。

sparksql整体的逻辑是dataframe,df可以从Row形式的RDD转换。同时df还可以转换成表接着使用sql的语句进行查询操作。

DataFrame

HiveContext是SQLContext的超集,一般需要实例化它,也就是

from pyspark.sql import HiveContext
sqlContext = HiveContext(sc)


#创建df
df = sqlContext.read.json("examples/src/main/resources/people.json")


#df的操作
df.show()
df.printSchema()
df.select("name").show()
df.select(df['name'], df['age'] + 1).show()
df.filter(df['age'] > 21).show()
df.groupBy("age").count().show()

SQL query

df = sqlContext.sql("SELECT * FROM table")

ReadWrite

#读写数据
#一般的读写操作
df = sqlContext.read.load("examples/src/main/resources/users.parquet")
df.select("name", "favorite_color").write.save("namesAndFavColors.parquet")
#指定格式的读写
df = sqlContext.read.load("examples/src/main/resources/people.json", format="json")
df.select("name", "age").write.save("namesAndAges.parquet", format="parquet")

#将df暂时保存,重启核后消失
DataFrame.saveAsTable("people3")
#将df直接保存到hive的metastore中,通过hive可以查询到
#df格式的数据registerTempTable到表中就可以使用sql语句查询了
DataFrame.registerTempTable ("people3")

Example

#创建一个表
# sc is an existing SparkContext.
from pyspark.sql import SQLContext, Row
sqlContext = SQLContext(sc)

# Load a text file and convert each line to a Row.
lines = sc.textFile("examples/src/main/resources/people.txt")
parts = lines.map(lambda l: l.split(","))
people = parts.map(lambda p: Row(name=p[0], age=int(p[1])))

# Infer the schema, and register the DataFrame as a table.
schemaPeople = sqlContext.createDataFrame(people)
schemaPeople.registerTempTable("people")

# SQL can be run over DataFrames that have been registered as a table.
teenagers = sqlContext.sql("SELECT name FROM people WHERE age >= 13 AND age <= 19")

# The results of SQL queries are RDDs and support all the normal RDD operations.
teenNames = teenagers.map(lambda p: "Name: " + p.name)
for teenName in teenNames.collect():
  print(teenName)
# hive的操作
# sc is an existing SparkContext.
from pyspark.sql import HiveContext
sqlContext = HiveContext(sc)

sqlContext.sql("CREATE TABLE IF NOT EXISTS src (key INT, value STRING)")
sqlContext.sql("LOAD DATA LOCAL INPATH 'examples/src/main/resources/kv1.txt' INTO TABLE src")

# Queries can be expressed in HiveQL.
results = sqlContext.sql("FROM src SELECT key, value").collect()

#常用的操作
hiveql.table("student").show()
hiveql.tables().show()
hiveql.tableNames()

本文参与腾讯云自媒体分享计划,欢迎正在阅读的你也加入,一起分享。

发表于

我来说两句

0 条评论
登录 后参与评论

相关文章

来自专栏我和未来有约会

Silverlight读取xml

这例子是为我的试验项目"SLShowCase"做的读取xml的试验. 项目的功能是做案例展示. ok  首先,先定义xml. projects.xml ...

1969
来自专栏跟着阿笨一起玩NET

treeview 如何从多个数据表中获取数据动态生成 [提问]

在 汪洋怡舟的这篇文章中【http://www.cnblogs.com/longren629/archive/2007/03/14/674633.html】只使...

572
来自专栏跟着阿笨一起玩NET

linq Distinct 去除重复数据

转载:http://www.cnblogs.com/ldp615/archive/2011/08/01/distinct-entension.html

501
来自专栏HBStream流媒体与音视频技术

c#万能视频播放器(附代码)

36512
来自专栏运维前线

Dell服务器常用管理命令总结

Dell服务器常用管理命令总结 准备新版本的 MegaCli-8.07.14-1.noarch.rpm 会把程序安装在/opt下,可以自定义安装目录,例如: ...

1617
来自专栏Kubernetes

Kubernetes Scheduler原理解析

本文是对Kubernetes Scheduler的算法解读和原理解析,重点介绍了预选(Predicates)和优选(Priorities)步骤的原理,并介绍了默...

4165
来自专栏Netkiller

数据库安全·开发加密插件

以下节选择《Netkiller Architect 手札》 作者:netkiller 地址 http://www.netkiller.cn/archit...

3037
来自专栏c#开发者

为什么nhibernate 不能保存on-to-many的结构

下面是主类文件 Code namespace EasyTalk.Module {     /// <summary>     /// SiteAddre...

2525
来自专栏跟着阿笨一起玩NET

Winform TreeView 查找下一个节点

var tn = _Tv.NextNodes().FirstOrDefault(x => Regex.IsMatch(x.Text, "(?i)" + txtK...

852
来自专栏林德熙的博客

C# 解析 sln 文件 使用

我的项目,编码工具 需要检测打开一个工程,获取所有项目。 但是发现原来的方法,如果存在文件夹,把项目放在文件夹中,那么是无法获得项目,于是我就找了一个方法去获得...

480

扫码关注云+社区