前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >以编程方式执行Spark SQL查询的两种实现方式

以编程方式执行Spark SQL查询的两种实现方式

作者头像
天策
发布2018-06-22 14:53:34
2K0
发布2018-06-22 14:53:34
举报
文章被收录于专栏:行者悟空行者悟空

摘 要

在自定义的程序中编写Spark SQL查询程序

1.通过反射推断Schema
代码语言:javascript
复制
package com.itunic.sql

import org.apache.spark.sql.SQLContext
import org.apache.spark.{SparkConf, SparkContext}

/**
  * Created by itunic.com on 2017/1/2.
  * Spark SQL
  * 通过反射推断Schema
  * by me:
  * 我本沉默是关注互联网以及分享IT相关工作经验的博客,
  * 主要涵盖了操作系统运维、计算机编程、项目开发以及系统架构等经验。
  * 博客宗旨:把最实用的经验,分享给最需要的你,
  * 希望每一位来访的朋友都能有所收获!
  *
  */
object InferringSchema {
  def main(args: Array[String]): Unit = {
 //创建SparkConf()并设置App名称
    val conf = new SparkConf().setAppName("InferringSchema").setMaster("local")
 //SQLContext要依赖SparkContext
    val sc = new SparkContext(conf)
 //创建SQLContext
    val sqlContext = new SQLContext(sc)
 //从指定的地址创建RDD
    val lineRdd = sc.textFile("F:\\test\\input\\wc.txt").map(f => {
      val fields = f.split("\t")
 //将RDD和case class关联
      Person(fields(0).toLong, fields(1), fields(2).toInt)
    })
 //导入隐式转换,如果不导入无法将RDD转换成DataFrame
 //将RDD转换成DataFrame
 import sqlContext.implicits._
    val personDF = lineRdd.toDF
 //注册表
    personDF.registerTempTable("t_person")
 //传入SQL
    val df = sqlContext.sql("select * from t_person order by age desc limit 2")
 //显示
    df.show()
 //以json方式写入hdfs
 //df.write.json("hdfs://ns1:9000/wc")
    sc.stop()
  }
}

//定义样例类
case class Person(id: Long, userName: String, age: Int)
2.通过StructType直接指定Schema
代码语言:javascript
复制
package com.itunic.sql

import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.sql.{Row, SQLContext}
import org.apache.spark.sql.types._

/**
  * Created by itunic.com on 2017/1/2.
  *  Spark SQL
  * 通过StructType直接指定Schema
  * by me:
  * 我本沉默是关注互联网以及分享IT相关工作经验的博客,
  * 主要涵盖了操作系统运维、计算机编程、项目开发以及系统架构等经验。
  * 博客宗旨:把最实用的经验,分享给最需要的你,
  * 希望每一位来访的朋友都能有所收获!
  *
  */
object SpecifyingSchema {
  def main(args: Array[String]): Unit = {
 //创建SparkConf()并设置App名称
    val conf = new SparkConf().setAppName("SpecifyingSchema").setMaster("local")
 //SQLContext要依赖SparkContext
    val sc = new SparkContext(conf)
 //创建SQLContext
    val sqlContext = new SQLContext(sc)
 //通过StructType直接指定每个字段的schema
    val schema = StructType(
      List(
        StructField("id", LongType, true),
        StructField("name", StringType, true),
        StructField("age", IntegerType, true)
      )
    )

    val lineRdd = sc.textFile("F:\\test\\input\\wc.txt").map(f => {
      val fields = f.split("\t")
      Row(fields(0).toLong, fields(1), fields(2).toInt)
    })
 //将schema信息应用到lineRdd上
    val personDF = sqlContext.createDataFrame(lineRdd, schema)
    personDF.registerTempTable("t_person")
 //传入SQL
    val df = sqlContext.sql("select * from t_person order by age desc limit 2")
 //显示
    df.show()
 //以json方式写入hdfs
 //df.write.json("hdfs://ns1:9000/wc")
    sc.stop()
  }
}
本文参与 腾讯云自媒体分享计划,分享自作者个人站点/博客。
原始发表:2017年01月02日,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 作者个人站点/博客 前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体分享计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 1.通过反射推断Schema
  • 2.通过StructType直接指定Schema
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档