慕课网Spark SQL日志分析 - 4.从Hive平滑过渡到Spark SQL

4.1 SQLContext/HiveContext/SparkSesson

1.SQLContext

image.png

老版本文档:http://spark.apache.org/docs/1.6.1/

  • SQLContext示例文件:
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.sql.SQLContext

/**
* SQLContext使用
* 注意:IDEA是在本地,而测试数据是在服务器上 ,能不能在本地进行开发测试的?
*/
object SQLContextApp {

def main(args: Array[String]): Unit = {

val path = args(0)

//1)创建相应的Context
val sparkConf = new SparkConf()

//在测试或者生产中,AppName和Master我们是通过脚本进行指定
sparkConf.setAppName("SQLContextApp").setMaster("local[2]").set("spark.driver.bindAddress","127.0.0.1")

val sc = new SparkContext(sparkConf)
val sqlContext = new SQLContext(sc)

//2)相关的处理: json
val people = sqlContext.read.format("json").load(path)
people.printSchema()
people.show()



//3)关闭资源
sc.stop()
}
}
  • 打包:
mvn clean package -DSkipTests
./bin/spark-submit \
--class <main-class>
--master <master-url> \
--deploy-mode <deploy-mode> \
--conf <key>=<value> \
... # other options
<application-jar> \
[application-arguments]
./bin/spark-submit \
--class com.gwf.spark.SQLContextApp
--master local[2] \
/Users/gaowenfeng/Downloads/MySparkSqlProject/target/sql-1.0.jar \
file:///Users/gaowenfeng/software/spark-2.2.0-bin-2.6.0-cdh5.7.0/examples/src/main/resources/people.json
  • 脚本提交: 将上面的命令做成shell脚本,赋予执行权限即可执行

2.HiveContext使用

To use a HiveContext, you do not need to have an existing Hive setup

代码上面代码类似,只是把SQLContext改成HiveContext。不过使用时需要通过--jars 把mysql的驱动传递到classpath

3.SparkSession

def main(args: Array[String]): Unit = {
val path = args(0)

val spark = SparkSession
.builder()
.appName("SQLContextApp")
.config("spark.driver.bindAddress","127.0.0.1")
.master("local[2]")
.getOrCreate()


val people = spark.read.format("json").load(path)
people.printSchema()
people.show()
spark.stop()
}

4.2 spark-shell/spark-sql的使用

  1. 在conf目录添加hive-site.xml
  2. --jars 传递mysql驱动包
# shell
spark-shell --master local[2] --jars /Users/gaowenfeng/.m2/repository/mysql/mysql-connector-java/5.1.45/mysql-connector-java-5.1.45.jar

# spark.sql('sql语句').show

# mysql
spark-sql --master local[2] --jars /Users/gaowenfeng/.m2/repository/mysql/mysql-connector-java/5.1.45/mysql-connector-java-5.1.45.jar
# 可以直接执行SQL

分析执行计划理解sparksql的架构

create table t(key string,value string);
explain extended select a.key * (2+3),b.value from t a join t b on a.key = b.key and a.key > 3;


# 解析成一个逻辑执行计划
== Parsed Logical Plan ==
# unresolvedalias:并没有解析全
'Project [unresolvedalias(('a.key * (2 + 3)), None), 'b.value] # select 的两个字段
+- 'Join Inner, (('a.key = 'b.key) && ('a.key > 3)) # or后面的条件
:- 'SubqueryAlias a
: +- 'UnresolvedRelation `t`
+- 'SubqueryAlias b
+- 'UnresolvedRelation `t`

# 解析操作(需要与底层的metastore打交道)
== Analyzed Logical Plan ==
(CAST(key AS DOUBLE) * CAST((2 + 3) AS DOUBLE)): double, value: string # 将a.key , (2+3) 分别转换成double类型
Project [(cast(key#8 as double) * cast((2 + 3) as double)) AS (CAST(key AS DOUBLE) * CAST((2 + 3) AS DOUBLE))#12, value#11] # select 的两个字段
+- Join Inner, ((key#8 = key#10) && (cast(key#8 as int) > 3))
:- SubqueryAlias a
: +- SubqueryAlias t # 已经解析出了使元数据中的哪张表
: +- CatalogRelation `default`.`t`, org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe, [key#8, value#9]
+- SubqueryAlias b
+- SubqueryAlias t
+- CatalogRelation `default`.`t`, org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe, [key#10, value#11]

# 优化操作
== Optimized Logical Plan ==
Project [(cast(key#8 as double) * 5.0) AS (CAST(key AS DOUBLE) * CAST((2 + 3) AS DOUBLE))#12, value#11]
+- Join Inner, (key#8 = key#10)
:- Project [key#8]
: +- Filter (isnotnull(key#8) && (cast(key#8 as int) > 3)) # 把a.key>3 提到前面来,先过滤,
: +- CatalogRelation `default`.`t`, org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe, [key#8, value#9]
+- Filter (isnotnull(key#10) && (cast(key#10 as int) > 3))
+- CatalogRelation `default`.`t`, org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe, [key#10, value#11]

# 物理执行计划
== Physical Plan ==
*Project [(cast(key#8 as double) * 5.0) AS (CAST(key AS DOUBLE) * CAST((2 + 3) AS DOUBLE))#12, value#11]
+- *SortMergeJoin [key#8], [key#10], Inner
:- *Sort [key#8 ASC NULLS FIRST], false, 0
: +- Exchange hashpartitioning(key#8, 200)
: +- *Filter (isnotnull(key#8) && (cast(key#8 as int) > 3))
: +- HiveTableScan [key#8], CatalogRelation `default`.`t`, org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe, [key#8, value#9]
+- *Sort [key#10 ASC NULLS FIRST], false, 0
+- Exchange hashpartitioning(key#10, 200)
+- *Filter (isnotnull(key#10) && (cast(key#10 as int) > 3))
+- HiveTableScan [key#10, value#11], CatalogRelation `default`.`t`, org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe, [key#10, value#11]

4.3 thriftserver/beeline的使用

  1. 启动thriftserver,默认端口是10000
./sbin/start-thriftserver.sh \
# 修改端口
--hiveconf hive.server2.thrift.port=<listening-port> \
# 修改host
--hiveconf hive.server2.thrift.bind.host=<listening-host> \
--master <master-uri>
  1. 启动beeline beeline -u jdbc:hive2://localhost:10000 -n gaowenfeng

image.png

image.png

3.thriftserver 和 spark-shell/spark-sql 的区别:

  1. spark-shell,spark-sql都是一个spark application
  2. thriftserver不管你启动了多少个客户端(beeline/code),永远都是一个spark application,解决了一个数据共享的问题,多个客户端可以共享数据

4.4 jdbc方式编程访问

1.添加maven依赖

<dependency>
<groupId>org.spark-project.hive</groupId>
<artifactId>hive-jdbc</artifactId>
<version>1.2.1.spark2</version>
</dependency>

2.开发代码访问thriftserver

注意事项:在使用jdbc开发时,一定要先启动thriftserver

def main(args: Array[String]): Unit = {
Class.forName("org.apache.hive.jdbc.HiveDriver")
try{}
val conn = DriverManager.getConnection("jdbc:hive2://localhost:10000","gaowenfeng","")
val pstmt = conn.prepareStatement("select * from emp")
val rs = pstmt.executeQuery()

while (rs.next()){
print(rs.getInt("id")+"\t"+rs.getString("name"))
}

rs.close()
pstmt.close()
conn.close()
}

本文参与腾讯云自媒体分享计划,欢迎正在阅读的你也加入,一起分享。

发表于

我来说两句

0 条评论
登录 后参与评论

相关文章

来自专栏乐沙弥的世界

Oracle Net Services - Tracing and Logging at a Glance

    Oracle Net trace 用于跟踪或调试oracle连接故障,连接异常断开或者连接超时等情形,通过产生详细的跟踪信息来进行分析和诊断Oracle...

8130
来自专栏蓝天

同时具备多线程和多进程安全的写日志工具

接口请浏览:https://github.com/eyjian/mooon/blob/master/mooon/include/mooon/sys/log.h ...

28940
来自专栏码匠的流水账

解决jd-gui在Sierra下闪退问题

在升级了mac操作系统到Sierra版本之后,之前的jd-gui就闪退了,本文就讲述一下如何解决这个问题。

29910
来自专栏FreeBuf

Scrounger:一款功能强大的移动端应用程序安全测试套件

今天给大家介绍的是一款名叫Scrounger 的工具,广大研究人员可以使用这款工具来对移动端应用程序的安全性进行测试。首先,这款工具参考和借鉴了很多目前安全社区...

14210
来自专栏乐沙弥的世界

使用 SQLNET.EXPIRE_TIME 清除僵死连接

    数据库连接的客户端异常断开后,其占有的相应并没有被释放,如从v$session视图中依旧可以看到对应的session处于inactive,且对应的服务器...

34120
来自专栏技术小讲堂

LINQ to SQL(2):生成对象模型

在LINQ to SQL中,可以使用自己的编程语言的对象模型映射到关系数据库,在上一节课,已经有一部分内容,简单的介绍了一下这种对象模型的结构,这一节,我们主要...

29840
来自专栏冷冷

Spring的事务传播行为

先举一个Spring的嵌套例子 ServiceA {     void methodA() {         ServiceB.methodB();    ...

218100
来自专栏linux驱动个人学习

基于input子系统的sensor驱动调试(二)

继上一篇:https://cloud.tencent.com/developer/article/1054078 一、驱动流程解析: 1、模块加载: 1 st...

67870
来自专栏杨建荣的学习笔记

一条执行4秒的sql语句导致的系统问题(r3笔记第10天)

一般来说一条sql语句执行个4秒钟是可以接受的,没有什么问题,但是如果应该执行1秒,却执行了4秒,问题就挺大的了。 今天查看数据库负载,发现在中午12:00 ...

38580
来自专栏数据和云

推陈出新:12C 推进 SCN 新方法实践

在数据库异常恢复中,经常需要修改数据库的 SCN 值,在 12C 之前,我们常用的方法有如下几个: oradebug poke 直接修改内存中的值; event...

430100

扫码关注云+社区

领取腾讯云代金券