spark2的SparkSession思考与总结2:SparkSession有哪些函数及作用是什么

问题导读 1.spark SparkSession包含哪些函数? 2.创建DataFrame有哪些函数? 3.创建DataSet有哪些函数? 上一篇spark2:SparkSession思考与总结1 http://www.aboutyun.com/forum.php?mod=viewthread&tid=23381 版本:spark2我们在学习的过程中,很多都是注重实战,这没有错的,但是如果在刚开始入门就能够了解这些函数,在遇到新的问题,可以找到方向去解决问题。比如我们常用的创建DateFrame和DataTable方式就那么一种或则两种,如果更多那就看不懂了。在比如想测试下程序的性能,这时候如果自己写,那就太麻烦了,可以使用spark提供的Time函数。这就是知识全面的一个好处。 SparkSession是一个比较重要的类,它的功能的实现,肯定包含比较多的函数,这里介绍下它包含哪些函数。 builder函数 public static SparkSession.Builder builder() 创建 SparkSession.Builder,初始化SparkSession. setActiveSession函数 public static void setActiveSession(SparkSession session) 当SparkSession.GetOrCreate()被调用,SparkSession发生变化,将会返回一个线程和它的子线程。这将会确定给定的线程接受带有隔离会话的SparkSession,而不是全局的context。 clearActiveSession函数 public static void clearActiveSession() 清除当前线程的Active SparkSession。然后调用GetOrCreate将会返回第一次创建的context代替本地线程重写 setDefaultSession函数 public static void setDefaultSession(SparkSession session) 设置默认的SparkSession,返回builder clearDefaultSession函数 public static void clearDefaultSession() 清除默认的SparkSession返回的builder getActiveSession函数 public static scala.Option<SparkSession> getActiveSession() 由builder,返回当前线程的Active SparkSession getDefaultSession函数 public static scala.Option<SparkSession> getDefaultSession() 由builder,返回默认的SparkSession sparkContext函数 public SparkContext sparkContext() version函数 public String version() 返回运行应用程序的spark版本 sharedState函数 public org.apache.spark.sql.internal.SharedState sharedState() 通过sessions共享状态,包括SparkContext, cached 数据, listener, 和catalog. 这是内部spark,接口稳定性没有保证 sessionState函数 public org.apache.spark.sql.internal.SessionState sessionState() 通过session隔离状态,包括:SQL 配置, 临时表, registered 功能, 和 其它可接受的 SQLConf. 这是内部spark,接口稳定性没有保证 sqlContext函数 public SQLContext sqlContext() session封装以 SQLContext的形式,为了向后兼容。 conf函数 public RuntimeConfig conf() 运行spark 配置接口 通过这个接口用户可以设置和获取与spark sql相关的所有Spark 和Hadoop配置.当获取config值, listenerManager函数 public ExecutionListenerManager listenerManager() 用于注册自定义QueryExecutionListeners的接口,用于侦听执行指标。 experimental函数 public ExperimentalMethods experimental() collection函数,被认为是experimental,可以用于查询高级功能的查询计划程序。 udf函数 public UDFRegistration udf() collection 函数,用于用户自定义函数 例子: Scala版本:

[Scala] 纯文本查看 复制代码

?

sparkSession.udf.register("myUDF", (arg1: Int, arg2: String) => arg2 + arg1)

Java版本:

[Java] 纯文本查看 复制代码

?

sparkSession.udf().register("myUDF",
      (Integer arg1, String arg2) -> arg2 + arg1,
      DataTypes.StringType);

streams函数 public StreamingQueryManager streams() 返回StreamingQueryManager ,允许管理所有的StreamingQuerys newSession函数 public SparkSession newSession() 启动一个独立的 SQL 配置, temporary 表, registered 功能新的session,但共享底层的SparkContext 和缓存数据. emptyDataFrame函数 public Dataset<Row> emptyDataFrame() 返回一个空没有行和列的DataFrame emptyDataset函数 public <T> Dataset<T> emptyDataset(Encoder<T> evidence$1) 创建一个T类型的空的Dataset createDataFrame函数 public <A extends scala.Product> Dataset<Row> createDataFrame(RDD<A> rdd,scala.reflect.api.TypeTags.TypeTag<A> evidence$2) 从rdd创建DateFrame public Dataset<Row> createDataFrame(RDD<Row> rowRDD, StructType schema) 从RDD包含的行给定的schema,创建DataFrame。需要确保每行的RDD结构匹配提供的schema,否则将会运行异常。例如:

[Scala] 纯文本查看 复制代码

?

import org.apache.spark.sql._
  import org.apache.spark.sql.types._
  val sparkSession = new org.apache.spark.sql.SparkSession(sc)
 
  val schema =
    StructType(
      StructField("name", StringType, false) ::
      StructField("age", IntegerType, true) :: Nil)
 
  val people =
    sc.textFile("examples/src/main/resources/people.txt").map(
      _.split(",")).map(p => Row(p(0), p(1).trim.toInt))
  val dataFrame = sparkSession.createDataFrame(people, schema)
  dataFrame.printSchema
  // root
  // |-- name: string (nullable = false)
  // |-- age: integer (nullable = true)
 
  dataFrame.createOrReplaceTempView("people")
  sparkSession.sql("select name from people").collect.foreach(println)

public Dataset<Row> createDataFrame(JavaRDD<Row> rowRDD,StructType schema) 创建DataFrame从包含schema的行的RDD。确保RDD提供的每行结构匹配提供的schema,否则运行异常 public Dataset<Row> createDataFrame(java.util.List<Row> rows,StructType schema) 创建DataFrame从包含行的schema的java.util.List public Dataset<Row> createDataFrame(RDD<?> rdd,Class<?> beanClass) 应用schema到Java Beans的RDD 警告:由于Java Bean中的字段没有保证的顺序,因此SELECT *查询将以未定义的顺序返回列。 public Dataset<Row> createDataFrame(JavaRDD<?> rdd, Class<?> beanClass) 应用schema到Java Beans的RDD 警告:由于Java Bean中的字段没有保证的顺序,因此SELECT *查询将以未定义的顺序返回列。 public Dataset<Row> createDataFrame(java.util.List<?> data,Class<?> beanClass) 应用schema到Java Bean list 警告:由于Java Bean中的字段没有保证的顺序,因此SELECT *查询将以未定义的顺序返回列。 baseRelationToDataFrame函数 public Dataset<Row> baseRelationToDataFrame(BaseRelation baseRelation) 转换创建的BaseRelation,为外部数据源到DataFrame createDataset函数 public <T> Dataset<T> createDataset(scala.collection.Seq<T> data,Encoder<T> evidence$4) 从本地给定类型的数据Seq创建DataSet。这个方法需要encoder (将T类型的JVM对象转换为内部Spark SQL表示形式)。这通常是通过从sparksession implicits自动创建。或则可以通过调用 Encoders上的静态方法来显式创建。 例子:

[Scala] 纯文本查看 复制代码

?

import spark.implicits._
  case class Person(name: String, age: Long)
  val data = Seq(Person("Michael", 29), Person("Andy", 30), Person("Justin", 19))
  val ds = spark.createDataset(data)
 
  ds.show()
  // +-------+---+
  // |   name|age|
  // +-------+---+
  // |Michael| 29|
  // |   Andy| 30|
  // | Justin| 19|
  // +-------+---+

public <T> Dataset<T> createDataset(RDD<T> data,Encoder<T> evidence$5) 创建DataSet从给定类型的RDD。这个方法需要encoder (将T类型的JVM对象转换为内部Spark SQL表示形式)。通常自动创建通过SparkSession的implicits 或则可以通过调用 Encoders上的静态方法来显式创建。 public <T> Dataset<T> createDataset(java.util.List<T> data,Encoder<T> evidence$6) 创建 Dataset,对于T类型的java.util.List。这个方法需要encoder (将T类型的JVM对象转换为内部Spark SQL表示形式), 或则可以通过调用 Encoders上的静态方法来显式创建。 Java例子

[Java] 纯文本查看 复制代码

?

List<String> data = Arrays.asList("hello", "world");
    Dataset<String> ds = spark.createDataset(data, Encoders.STRING());

range函数 public Dataset<Long> range(long end)使用名为id的单个LongType列创建一个Dataset,包含元素的范围从0到结束(不包括),步长值为1。 public Dataset<Long> range(long start,long end) 使用名为id的单个LongType列创建一个Dataset,包含元素的范围从start到结束(不包括),步长值为1。 public Dataset<Long> range(long start, long end, long step) 使用名为id的单个LongType列创建一个Dataset,包含元素的范围从start到结束(不包括),步长值为step。 public Dataset<Long> range(long start,long end,long step,int numPartitions) 使用名为id的单个LongType列创建一个Dataset,包含元素的范围从start到结束(不包括),步长值为step,指定partition 的数目 catalog函数 public Catalog catalog() 用户可以通过它 create, drop, alter 或则query 底层数据库, 表, 函数等. table函数 public Dataset<Row> table(String tableName)返回指定的table/view作为DataFrame tableName是可以合格或则不合格的名称。如果在数据库中指定,它在数据库中会识别。否则它会尝试找到一个临时view ,匹配到当前数据库的table/view,全局的临时的数据库view也是有效的。 sql函数 public Dataset<Row> sql(String sqlText) 使用spark执行sql查询,作为DataFrame返回结果。用来sql parsing,可以用spark.sql.dialect来配置 read函数 public DataFrameReader read() 返回一个DataFrameReader,可以用来读取非流数据作为一个DataFrame

[Scala] 纯文本查看 复制代码

?

sparkSession.read.parquet("/path/to/file.parquet")
sparkSession.read.schema(schema).json("/path/to/file.json")

readStream函数 public DataStreamReader readStream() 返回一个DataFrameReader,可以用来读取流数据作为一个DataFrame

[Scala] 纯文本查看 复制代码

?

sparkSession.readStream.parquet("/path/to/directory/of/parquet/files")
 sparkSession.readStream.schema(schema).json("/path/to/directory/of/json/files")

time函数 public <T> T time(scala.Function0<T> f) 执行一些代码块并打印输出执行该块所花费的时间。 这仅在Scala中可用,主要用于交互式测试和调试。 这个函数还是比较有用的,很多地方都能用到 implicits函数 public SparkSession.implicits$ implicits() 嵌套Scala对象访问 stop函数 public void stop() 停止SparkContext close函数 public void close() 与stop类似

原文发布于微信公众号 - about云(wwwaboutyuncom)

原文发表时间:2017-11-28

本文参与腾讯云自媒体分享计划,欢迎正在阅读的你也加入,一起分享。

发表于

我来说两句

0 条评论
登录 后参与评论

相关文章

来自专栏牛肉圆粉不加葱

[Spark源码剖析] DAGScheduler划分stage划分stage源码剖析

在DAGScheduler内部通过post一个JobSubmitted事件来触发Job的提交

10330
来自专栏岑玉海

Spark源码系列(九)Spark SQL初体验之解析过程详解

好久没更新博客了,之前学了一些R语言和机器学习的内容,做了一些笔记,之后也会放到博客上面来给大家共享。一个月前就打算更新Spark Sql的内容了,因为一些别的...

51750
来自专栏拭心的安卓进阶之路

Android 进阶6:两种序列化方式 Serializable 和 Parcelable

什么是序列化 我们总是说着或者听说着“序列化”,它的定义是什么呢? 序列化 (Serialization)将对象的状态信息转换为可以存储或传输的形式的过程。在...

33350
来自专栏个人分享

Spark常用函数(源码阅读六)

  源码层面整理下我们常用的操作RDD数据处理与分析的函数,从而能更好的应用于工作中。

12920
来自专栏扎心了老铁

java使用spark/spark-sql处理schema数据

1、spark是什么? Spark是基于内存计算的大数据并行计算框架。 1.1 Spark基于内存计算 相比于MapReduce基于IO计算,提高了在大数据环境...

37050
来自专栏大数据智能实战

Spark 2.0 DataFrame map操作中Unable to find encoder for type stored in a Dataset.问题的分析与解决

随着新版本的spark已经逐渐稳定,最近拟将原有框架升级到spark 2.0。还是比较兴奋的,特别是SQL的速度真的快了许多。。 然而,在其中一个操作时却卡住了...

53590
来自专栏恰童鞋骚年

Hadoop学习笔记—7.计数器与自定义计数器

  在上图所示中,计数器有19个,分为四个组:File Output Format Counters、FileSystemCounters、File Input...

12320
来自专栏Spark生态圈

[spark] Shuffle Write解析 (Sort Based Shuffle)

从 Spark 2.0 开始移除了Hash Based Shuffle,想要了解可参考Shuffle 过程,本文将讲解 Sort Based Shuffle。

16320
来自专栏Spark生态圈

[spark] Task执行流程

在文章TaskScheduler 任务提交与调度源码解析 中介绍了Task在executor上的逻辑分配,调用TaskSchedulerImpl的resourc...

21310
来自专栏听雨堂

从MapX到MapXtreme2004[10]-根据zoom值修改显示范围

        原来在Mapx中只需要修改zoom值即可,现在也是一样。虽然map对象有setview方法,但似乎不太好用,因为需要coordsys。     ...

19370

扫码关注云+社区

领取腾讯云代金券