前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >SparkSQL入门_1

SparkSQL入门_1

作者头像
用户1147754
发布2018-01-02 17:28:03
9120
发布2018-01-02 17:28:03
举报
文章被收录于专栏:YoungGyYoungGy
  • 概述
  • DataFrame
  • SQL query
  • ReadWrite
  • Example

概述

先说说准备工作吧。 目前使用的是伪分布式模式,hadoop,spark都已经配置好了。 数据仓库采用的是hive,hive的metastore存储在mysql中。

现在的主要目的是想把spark和hive结合起来,也就是用spark读取hive中的数据。 所以就用到了sparksql。

sparksql的配置有点麻烦,需要将spark的源码编译获取assembly包,另外还需要mysql-connector的驱动包,另外再将hive-site.xml放到conf文件夹中就可以了。

目前存在的问题是sparksql创建表权限报错,解决的方法是用hive先创建了。

sparksql整体的逻辑是dataframe,df可以从Row形式的RDD转换。同时df还可以转换成表接着使用sql的语句进行查询操作。

DataFrame

HiveContext是SQLContext的超集,一般需要实例化它,也就是

代码语言:javascript
复制
from pyspark.sql import HiveContext
sqlContext = HiveContext(sc)


#创建df
df = sqlContext.read.json("examples/src/main/resources/people.json")


#df的操作
df.show()
df.printSchema()
df.select("name").show()
df.select(df['name'], df['age'] + 1).show()
df.filter(df['age'] > 21).show()
df.groupBy("age").count().show()

SQL query

代码语言:javascript
复制
df = sqlContext.sql("SELECT * FROM table")

ReadWrite

代码语言:javascript
复制
#读写数据
#一般的读写操作
df = sqlContext.read.load("examples/src/main/resources/users.parquet")
df.select("name", "favorite_color").write.save("namesAndFavColors.parquet")
#指定格式的读写
df = sqlContext.read.load("examples/src/main/resources/people.json", format="json")
df.select("name", "age").write.save("namesAndAges.parquet", format="parquet")

#将df暂时保存,重启核后消失
DataFrame.saveAsTable("people3")
#将df直接保存到hive的metastore中,通过hive可以查询到
#df格式的数据registerTempTable到表中就可以使用sql语句查询了
DataFrame.registerTempTable ("people3")

Example

代码语言:javascript
复制
#创建一个表
# sc is an existing SparkContext.
from pyspark.sql import SQLContext, Row
sqlContext = SQLContext(sc)

# Load a text file and convert each line to a Row.
lines = sc.textFile("examples/src/main/resources/people.txt")
parts = lines.map(lambda l: l.split(","))
people = parts.map(lambda p: Row(name=p[0], age=int(p[1])))

# Infer the schema, and register the DataFrame as a table.
schemaPeople = sqlContext.createDataFrame(people)
schemaPeople.registerTempTable("people")

# SQL can be run over DataFrames that have been registered as a table.
teenagers = sqlContext.sql("SELECT name FROM people WHERE age >= 13 AND age <= 19")

# The results of SQL queries are RDDs and support all the normal RDD operations.
teenNames = teenagers.map(lambda p: "Name: " + p.name)
for teenName in teenNames.collect():
  print(teenName)
代码语言:javascript
复制
# hive的操作
# sc is an existing SparkContext.
from pyspark.sql import HiveContext
sqlContext = HiveContext(sc)

sqlContext.sql("CREATE TABLE IF NOT EXISTS src (key INT, value STRING)")
sqlContext.sql("LOAD DATA LOCAL INPATH 'examples/src/main/resources/kv1.txt' INTO TABLE src")

# Queries can be expressed in HiveQL.
results = sqlContext.sql("FROM src SELECT key, value").collect()

#常用的操作
hiveql.table("student").show()
hiveql.tables().show()
hiveql.tableNames()
本文参与 腾讯云自媒体分享计划,分享自作者个人站点/博客。
如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 作者个人站点/博客 前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体分享计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 概述
  • DataFrame
  • SQL query
  • ReadWrite
  • Example
相关产品与服务
云数据库 SQL Server
腾讯云数据库 SQL Server (TencentDB for SQL Server)是业界最常用的商用数据库之一,对基于 Windows 架构的应用程序具有完美的支持。TencentDB for SQL Server 拥有微软正版授权,可持续为用户提供最新的功能,避免未授权使用软件的风险。具有即开即用、稳定可靠、安全运行、弹性扩缩等特点。
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档