SparkSQL入门_1

概述

先说说准备工作吧。 目前使用的是伪分布式模式,hadoop,spark都已经配置好了。 数据仓库采用的是hive,hive的metastore存储在mysql中。

现在的主要目的是想把spark和hive结合起来,也就是用spark读取hive中的数据。 所以就用到了sparksql。

sparksql的配置有点麻烦,需要将spark的源码编译获取assembly包,另外还需要mysql-connector的驱动包,另外再将hive-site.xml放到conf文件夹中就可以了。

目前存在的问题是sparksql创建表权限报错,解决的方法是用hive先创建了。

sparksql整体的逻辑是dataframe,df可以从Row形式的RDD转换。同时df还可以转换成表接着使用sql的语句进行查询操作。

DataFrame

HiveContext是SQLContext的超集,一般需要实例化它,也就是

from pyspark.sql import HiveContext
sqlContext = HiveContext(sc)


#创建df
df = sqlContext.read.json("examples/src/main/resources/people.json")


#df的操作
df.show()
df.printSchema()
df.select("name").show()
df.select(df['name'], df['age'] + 1).show()
df.filter(df['age'] > 21).show()
df.groupBy("age").count().show()

SQL query

df = sqlContext.sql("SELECT * FROM table")

ReadWrite

#读写数据
#一般的读写操作
df = sqlContext.read.load("examples/src/main/resources/users.parquet")
df.select("name", "favorite_color").write.save("namesAndFavColors.parquet")
#指定格式的读写
df = sqlContext.read.load("examples/src/main/resources/people.json", format="json")
df.select("name", "age").write.save("namesAndAges.parquet", format="parquet")

#将df暂时保存,重启核后消失
DataFrame.saveAsTable("people3")
#将df直接保存到hive的metastore中,通过hive可以查询到
#df格式的数据registerTempTable到表中就可以使用sql语句查询了
DataFrame.registerTempTable ("people3")

Example

#创建一个表
# sc is an existing SparkContext.
from pyspark.sql import SQLContext, Row
sqlContext = SQLContext(sc)

# Load a text file and convert each line to a Row.
lines = sc.textFile("examples/src/main/resources/people.txt")
parts = lines.map(lambda l: l.split(","))
people = parts.map(lambda p: Row(name=p[0], age=int(p[1])))

# Infer the schema, and register the DataFrame as a table.
schemaPeople = sqlContext.createDataFrame(people)
schemaPeople.registerTempTable("people")

# SQL can be run over DataFrames that have been registered as a table.
teenagers = sqlContext.sql("SELECT name FROM people WHERE age >= 13 AND age <= 19")

# The results of SQL queries are RDDs and support all the normal RDD operations.
teenNames = teenagers.map(lambda p: "Name: " + p.name)
for teenName in teenNames.collect():
  print(teenName)
# hive的操作
# sc is an existing SparkContext.
from pyspark.sql import HiveContext
sqlContext = HiveContext(sc)

sqlContext.sql("CREATE TABLE IF NOT EXISTS src (key INT, value STRING)")
sqlContext.sql("LOAD DATA LOCAL INPATH 'examples/src/main/resources/kv1.txt' INTO TABLE src")

# Queries can be expressed in HiveQL.
results = sqlContext.sql("FROM src SELECT key, value").collect()

#常用的操作
hiveql.table("student").show()
hiveql.tables().show()
hiveql.tableNames()

本文参与腾讯云自媒体分享计划,欢迎正在阅读的你也加入,一起分享。

发表于

我来说两句

0 条评论
登录 后参与评论

相关文章

来自专栏个人分享

spark基础练习(未完)

1、filter val rdd = sc.parallelize(List(1,2,3,4,5)) val mappedRDD = rdd.map(2*_) ...

512
来自专栏北京马哥教育

MongoDB常用shell命令之index shell

1、创建索引 db.collectionName.ensureIndex({name:1}); db.collectionName.ensureIndex({...

2665
来自专栏大数据-Hadoop、Spark

DataFrame常用操作

在spark-shell状态下查看sql内置函数: spark.sql("show functions").show(1000) 比如:SUBSTR(col...

3365
来自专栏about云

spark2 sql编程样例:sql操作

问题导读 1.DataFrame中本文使用了row哪些方法? 2.操作DataFrame row需要导入什么包? 3.teenagersDF.map(teena...

4255
来自专栏个人分享

Spark RDDRelation

注意 这里声明的是 sqlContext = new SQLContext(sc)  如果要存成hive 表 需用hivecontext.

411
来自专栏Spark生态圈

SparkSQL常用操作

5、测试dataframe的read和save方法(注意load方法默认是加载parquet文件)

842
来自专栏函数式编程语言及工具

SDP(5):ScalikeJDBC- JDBC-Engine:Streaming

  作为一种通用的数据库编程引擎,用Streaming来应对海量数据的处理是必备功能。同样,我们还是通过一种Context传递产生流的要求。因为Streamin...

2574
来自专栏写代码的海盗

spark在yarn-cluster上面执行报错

在单机模式下执行成功的spark程序,在yarn上面就报错。异常信息如下: 1 14/08/14 02:05:42 INFO DAGScheduler: Co...

2965
来自专栏函数式编程语言及工具

SDP(3):ScalikeJDBC- JDBC-Engine:Fetching

  ScalikeJDBC在覆盖JDBC基本功能上是比较完整的,而且实现这些功能的方式比较简洁,运算效率方面自然会稍高一筹了。理论上用ScalikeJDBC作为...

3455
来自专栏岑玉海

Spark1.0新特性-->Spark SQL

Spark1.0出来了,变化还是挺大的,文档比以前齐全了,RDD支持的操作比以前多了一些,Spark on yarn功能我居然跑通了。但是最最重要的就是多了一个...

2544

扫码关注云+社区