前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >原 SparkSQL语法及API

原 SparkSQL语法及API

作者头像
云飞扬
发布2018-05-17 15:49:49
1.4K0
发布2018-05-17 15:49:49
举报
文章被收录于专栏:星汉技术星汉技术星汉技术

SparkSQL语法及API

一、SparkSql基础语法

1、通过方法来使用

1.查询

df.select("id","name").show();
1>带条件的查询
df.select($"id",$"name").where($"name" === "bbb").show()
2>排序查询

    orderBy/sort($"列名")  升序排列

    orderBy/sort($"列名".desc)  降序排列

    orderBy/sort($"列1" , $"列2".desc) 按两列排序

    例如:

df.select($"id",$"name").orderBy($"name".desc).show
df.select($"id",$"name").sort($"name".desc).show
tabx.select($"id",$"name").sort($"id",$"name".desc).show
3>分组查询

    groupBy("列名", ...).max(列名) 求最大值

    groupBy("列名", ...).min(列名) 求最小值

    groupBy("列名", ...).avg(列名) 求平均值

    groupBy("列名", ...).sum(列名) 求和

    groupBy("列名", ...).count() 求个数

    groupBy("列名", ...).agg 可以将多个方法进行聚合

    例如:

scala>val rdd = sc.makeRDD(List((1,"a","bj",100),(2,"b","sh",80),(3,"c","gz",50),(4,"d","bj",45),(5,"e","gz",90)));
scala>val df = rdd.toDF("id","name","addr","score");
scala>df.groupBy("addr").count().show()
scala>df.groupBy("addr").agg(max($"score"), min($"score"), count($"*")).show
4>连接查询
scala>val dept=sc.parallelize(List((100,"caiwubu"),(200,"yanfabu"))).toDF("deptid","deptname")
scala>val emp=sc.parallelize(List((1,100,"zhang"),(2,200,"li"),(3,300,"wang"))).toDF("id","did","name")
scala>dept.join(emp,$"deptid" === $"did").show
scala>dept.join(emp,$"deptid" === $"did","left").show

    左向外联接的结果集包括  LEFT OUTER子句中指定的左表的所有行,而不仅仅是联接列所匹配的行。如果左表的某行在右表中没有匹配行,则在相关联的结果集行中右表的所有选择列表列均为空值。

scala>dept.join(emp,$"deptid" === $"did","right").show

2.执行运算

val df = sc.makeRDD(List(1,2,3,4,5)).toDF("num");
df.select($"num" * 100).show

3.使用列表

val df = sc.makeRDD(List(("zhang",Array("bj","sh")),("li",Array("sz","gz")))).toDF("name","addrs")
df.selectExpr("name","addrs[0]").show

    使用结构体:

{"name":"王二小","address":{"city":"大土坡","street":"南二环甲字1号"}}
{"name":"流放","address":{"city":"天涯海角","street":"南二环甲字2号"}}
val df = sqlContext.read.json("file:///root/work/users.json")
dfs.select("name","address.street").show

    其他

df.count//获取记录总数
val row = df.first()//获取第一条记录
val value = row.getString(1)//获取该行指定列的值
df.collect //获取当前df对象中的所有数据为一个Array 其实就是调用了df对象对应的底层的rdd的collect方法

2、通过sql语句来调用

1.针对表的操作

1>创建表
df.registerTempTable("tabName")
2>查看表
sqlContext.sql("show tables").show

2.查询

val sqc = new org.apache.spark.sql.SQLContext(sc);
val df = sc.makeRDD(List((1,"a","bj"),(2,"b","sh"),(3,"c","gz"),(4,"d","bj"),(5,"e","gz"))).toDF("id","name","addr");
df.registerTempTable("stu");
sqc.sql("select * from stu").show()
1>带条件的查询
val df = sc.makeRDD(List((1,"a","bj"),(2,"b","sh"),(3,"c","gz"),(4,"d","bj"),(5,"e","gz"))).toDF("id","name","addr");
df.registerTempTable("stu");
sqc.sql("select * from stu where addr = 'bj'").show()
2>排序查询
val sqlContext = new org.apache.spark.sql.SQLContext(sc);
val df = sc.makeRDD(List((1,"a","bj"),(2,"b","sh"),(3,"c","gz"),(4,"d","bj"),(5,"e","gz"))).toDF("id","name","addr");
df.registerTempTable("stu");
sqlContext.sql("select * from stu order by addr").show()
sqlContext.sql("select * from stu order by addr  desc").show()
3>分组查询
val sqlContext = new org.apache.spark.sql.SQLContext(sc);
val df = sc.makeRDD(List((1,"a","bj"),(2,"b","sh"),(3,"c","gz"),(4,"d","bj"),(5,"e","gz"))).toDF("id","name","addr");
df.registerTempTable("stu");
sqlContext.sql("select addr,count(*) from stu group by addr").show()
4>连接查询
val sqlContext = new org.apache.spark.sql.SQLContext(sc);
val dept=sc.parallelize(List((100,"财务部"),(200,"研发部"))).toDF("deptid","deptname")
val emp=sc.parallelize(List((1,100,"张财务"),(2,100,"李会计"),(3,300,"王研发"))).toDF("id","did","name")
dept.registerTempTable("deptTab");
emp.registerTempTable("empTab");
sqlContext.sql("select deptname,name from deptTab inner join empTab on deptTab.deptid = empTab.did").show()
5>分页查询
val sqlContext = new org.apache.spark.sql.SQLContext(sc);
val df = sc.makeRDD(List(1,2,3,4,5)).toDF("num");
df.registerTempTable("tabx")
sqlContext.sql("select * from tabx limit 3").show();

3.执行运算

val sqlContext = new org.apache.spark.sql.SQLContext(sc);
val df = sc.makeRDD(List(1,2,3,4,5)).toDF("num");
df.registerTempTable("tabx")
sqlContext.sql("select num * 100 from tabx").show();

4.类似hive方式的操作

scala>val hiveContext = new org.apache.spark.sql.hive.HiveContext(sc)
scala>hiveContext.sql("create table if not exists zzz (key int, value string) row format delimited fields terminated by '|'")
scala>hiveContext.sql("load data local inpath 'file:///home/software/hdata.txt' into table zzz")
scala>hiveContext.sql("select key,value from zzz").show

5.案例

val sqlContext = new org.apache.spark.sql.SQLContext(sc);
val df = sc.textFile("file:///root/work/words.txt").flatMap{ _.split(" ") }.toDF("word")
df.registerTempTable("wordTab")
sqlContext.sql("select word,count(*) from wordTab group by word").show

二、SparkSql API

    可以通过java API使用sparksql。

1、创建工程

    打开scala IDE开发环境,创建一个scala工程。

2、导入jar包

    导入spark相关依赖jar包。

3、创建类

    创建包路径以object类。

4、代码示意

import org.apache.spark.SparkConf
import org.apache.spark.SparkContext
import org.apache.spark.sql.SQLContext

object Driver {
  def main(args: Array[String]): Unit = {

    val conf = new SparkConf().setMaster("local").setAppName("sql")
    val sc = new SparkContext(conf)
    //获取Sparksql上下文对象
    val sqc = new SQLContext(sc)

    val r1 = sc.makeRDD(List(("tom", 23), ("rose", 25), ("jim", 15), ("jary", 30)))
    //导入sql上下文对象的隐藏类,目的是让rdd具有toDF方法
    import sqc.implicits._
    val t1 = r1.toDF("name", "age")

    t1.registerTempTable("stu")
    val result = sqc.sql("select * from stu")
    //DataFrame转成RDD,一般用于结果的存储
    val resultRDD = result.toJavaRDD
    resultRDD.saveAsTextFile("D://sqlresult")

  }
}

5、部署到服务器

    打jar包,并上传到linux虚拟机上,在spark的bin目录下执行如下命令:

sh spark-submit --class cn.tedu.sparksql.Demo01 ./sqlDemo01.jar

    最后检验。

上一篇:SparkSQL简介及入门

下一篇:

本文参与 腾讯云自媒体分享计划,分享自作者个人站点/博客。
如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 作者个人站点/博客 前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体分享计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • SparkSQL语法及API
  • 一、SparkSql基础语法
    • 1、通过方法来使用
      • 1.查询
      • 2.执行运算
      • 3.使用列表
    • 2、通过sql语句来调用
      • 1.针对表的操作
      • 2.查询
      • 3.执行运算
      • 4.类似hive方式的操作
      • 5.案例
  • 二、SparkSql API
    • 1、创建工程
      • 2、导入jar包
        • 3、创建类
          • 4、代码示意
            • 5、部署到服务器
            领券
            问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档