首页
学习
活动
专区
圈层
工具
发布
50 篇文章
1
2021年大数据Spark(一):框架概述
2
2021年大数据Spark(二):四大特点
3
2021年大数据Spark(三):框架模块初步了解
4
2021年大数据Spark(四):三种常见的运行模式
5
2021年大数据Spark(五):大环境搭建本地模式 Local
6
2021年大数据Spark(六):环境搭建集群模式 Standalone
7
2021年大数据Spark(七):应用架构基本了解
8
2021年大数据Spark(八):环境搭建集群模式 Standalone HA
9
2021年大数据Spark(九):Spark On Yarn两种模式总结
10
2021年大数据Spark(十):环境搭建集群模式 Spark on YARN
11
2021年大数据Spark(十一):应用开发基于IDEA集成环境
12
2021年大数据Spark(十二):Spark Core的RDD详解
13
2021年大数据Spark(十三):Spark Core的RDD创建
14
2021年大数据Spark(十四):Spark Core的RDD操作
15
2021年大数据Spark(十五):Spark Core的RDD常用算子
16
2021年大数据Spark(十六):Spark Core的RDD算子练习
17
2021年大数据Spark(十七):Spark Core的RDD持久化
18
2021年大数据Spark(十八):Spark Core的RDD Checkpoint
19
2021年大数据Spark(十九):Spark Core的​​​​​​​共享变量
20
2021年大数据Spark(二十):Spark Core外部数据源引入
21
2021年大数据Spark(二十一):Spark Core案例-SogouQ日志分析
22
2021年大数据Spark(二十二):内核原理
23
2021年大数据Spark(二十三):SparkSQL 概述
24
2021年大数据Spark(二十四):SparkSQL数据抽象
25
2021年大数据Spark(二十五):SparkSQL的RDD、DF、DS相关操作
26
2021年大数据Spark(二十六):SparkSQL数据处理分析
27
2021年大数据Spark(二十七):SparkSQL案例一花式查询和案例二WordCount
28
2021年大数据Spark(二十八):SparkSQL案例三电影评分数据分析
29
2021年大数据Spark(二十九):SparkSQL案例四开窗函数
30
2021年大数据Spark(三十):SparkSQL自定义UDF函数
31
2021年大数据Spark(三十一):Spark On Hive
32
2021年大数据Spark(三十二):SparkSQL的External DataSource
33
2021年大数据Spark(三十三):SparkSQL分布式SQL引擎
34
2021年大数据Spark(三十四):Spark Streaming概述
35
2021年大数据Spark(三十五):SparkStreaming数据抽象 DStream
36
2021年大数据Spark(三十六):SparkStreaming实战案例一 WordCount
37
2021年大数据Spark(三十七):SparkStreaming实战案例二 UpdateStateByKey
38
2021年大数据Spark(三十八):SparkStreaming实战案例三 状态恢复 扩展
39
2021年大数据Spark(三十九):SparkStreaming实战案例四 窗口函数
40
2021年大数据Spark(四十):SparkStreaming实战案例五 TopN-transform
41
2021年大数据Spark(四十一):SparkStreaming实战案例六 自定义输出 foreachRDD
42
2021年大数据Spark(四十二):SparkStreaming的Kafka快速回顾与整合说明
43
2021年大数据Spark(四十三):SparkStreaming整合Kafka 0.10 开发使用
44
2021年大数据Spark(四十四):Structured Streaming概述
45
2021年大数据Spark(四十五):Structured Streaming Sources 输入源
46
2021年大数据Spark(四十六):Structured Streaming Operations 操作
47
2021年大数据Spark(四十七):Structured Streaming Sink 输出
48
2021年大数据Spark(四十八):Structured Streaming 输出终端/位置
49
2021年大数据Spark(四十九):Structured Streaming 整合 Kafka
50
2021年大数据Spark(五十):Structured Streaming 案例一实时数据ETL架构

2021年大数据Spark(四十七):Structured Streaming Sink 输出


​​​​​​​

Sink 输出

在StructuredStreaming中定义好Result DataFrame/Dataset后,调用writeStream()返回DataStreamWriter对象,设置查询Query输出相关属性,启动流式应用运行,相关属性如下:

文档:http://spark.apache.org/docs/2.4.5/structured-streaming-programming-guide.html#starting-streaming-queries

​​​​​​​输出模式

"Output"是用来定义写入外部存储器的内容,输出可以被定义为不同模式:

  •  追加模式(Append mode),默认模式,其中只有自从上一次触发以来,添加到 Result Table 的新行将会是outputted to the sink。只有添加到Result Table的行将永远不会改变那些查询才支持这一点。这种模式保证每行只能输出一次(假设 fault-tolerant sink )。例如,只有select, where, map, flatMap, filter, join等查询支持 Append mode 。只输出那些将来永远不可能再更新的数据,然后数据从内存移除 。没有聚合的时候,append和update一致;有聚合的时候,一定要有水印,才能使用。
  •  完全模式(Complete mode),每次触发后,整个Result Table将被输出到sink,aggregation queries(聚合查询)支持。全部输出,必须有聚合。
  •  更新模式(Update mode),只有 Result Table rows 自上次触发后更新将被输出到 sink。与Complete模式不同,因为该模式只输出自上次触发器以来已经改变的行。如果查询不包含聚合,那么等同于Append模式。只输出更新数据(更新和新增)。

注意,不同查询Query,支持对应的输出模式,如下表所示:

​​​​​​​触发间隔-了解

触发器Trigger决定了多久执行一次查询并输出结果

当不设置时,默认只要有新数据,就立即执行查询Query,再进行输出。

目前来说,支持三种触发间隔设置:

其中Trigger.Processing表示每隔多少时间触发执行一次,此时流式处理依然属于微批处理;从Spark 2.3以后,支持Continue Processing流式处理,设置触发间隔为Trigger.Continuous但不成熟,使用默认的尽可能快的执行即可。

官网代码示例如下:

代码语言:javascript
复制
import org.apache.spark.sql.streaming.Trigger

// Default trigger (runs micro-batch as soon as it can)

df.writeStream

  .format("console")

  .start()

// ProcessingTime trigger with two-seconds micro-batch interval

df.writeStream

  .format("console")

  .trigger(Trigger.ProcessingTime("2 seconds"))

  .start()

// One-time trigger

df.writeStream

  .format("console")

  .trigger(Trigger.Once())

  .start()

// Continuous trigger with one-second checkpointing interval

df.writeStream

  .format("console")

  .trigger(Trigger.Continuous("1 second"))

  .start()

​​​​​​​查询名称

    可以给每个查询Query设置名称Name,必须是唯一的,直接调用DataFrameWriter中queryName方法即可,实际生产开发建议设置名称,API说明如下:

​​​​​​​检查点位置

     在Structured Streaming中使用Checkpoint 检查点进行故障恢复。如果实时应用发生故障或关机,可以恢复之前的查询的进度和状态,并从停止的地方继续执行,使用Checkpoint和预写日志WAL完成。使用检查点位置配置查询,那么查询将所有进度信息(即每个触发器中处理的偏移范围)和运行聚合(例如词频统计wordcount)保存到检查点位置。此检查点位置必须是HDFS兼容文件系统中的路径,两种方式设置Checkpoint Location位置:

1.DataStreamWrite设置

streamDF.writeStream.option("checkpointLocation", "path")

2.SparkConf设置

sparkConf.set("spark.sql.streaming.checkpointLocation", "path")

修改上述词频统计案例程序,设置输出模式、查询名称、触发间隔及检查点位置,演示代码如下:

代码语言:javascript
复制
package cn.itcast.structedstreaming

import org.apache.spark.SparkContext
import org.apache.spark.sql.streaming.{OutputMode, Trigger}
import org.apache.spark.sql.{DataFrame, Dataset, Row, SparkSession}

/**
 * Author itcast
 */
object SinkDemo {
  def main(args: Array[String]): Unit = {
    //1.准备环境
    val spark: SparkSession = SparkSession.builder()
      .appName(this.getClass.getSimpleName.stripSuffix("$"))
      .master("local[*]")
      .config("spark.sql.shuffle.partitions", "2")
      .getOrCreate()
    val sc: SparkContext = spark.sparkContext
    sc.setLogLevel("WARN")
    import spark.implicits._
    import org.apache.spark.sql.functions._

    //2.source
    val df: DataFrame = spark.readStream
      .format("socket")
      .option("host", "node1")
      .option("port", 9999)
      .load()
    val ds: Dataset[String] = df.as[String]

    //3.operation
    val result: Dataset[Row] = ds.flatMap(_.split(" "))
      .groupBy("value")
      .count()
      .orderBy($"count".desc)

    //4.output
    result.writeStream
      .outputMode(OutputMode.Complete())
      .trigger(Trigger.ProcessingTime(0))
      .format("memory")
      .queryName("t_words")
      .option("numRows", "10")
      .option("truncate", "false")
      .option("checkpointLocation", "./ckp"+System.currentTimeMillis())
      .start() //开启查询
    //.awaitTermination()//等待程序结束,注意该行后面的代码没有机会执行,所以如果要在后面继续写代码,需要把改行注掉

    while (true) {
      Thread.sleep(2000)
      println(System.currentTimeMillis())
      spark.sql("select * from t_words").show()
    }
  }
}

运行流式应用,查看Checkpoint Location,包含以下几个目录:

各个子目录及文件含义说明:

 第一、偏移量目录【offsets】:记录每个批次中的偏移量。为了保证给定的批次始终包含相同的数据,在处理数据前将其写入此日志记录。此日志中的第 N 条记录表示当前正在已处理,第 N-1 个条目指示哪些偏移已处理完成。

第二、提交记录目录【commits】:记录已完成的批次,重启任务检查完成的批次与 offsets 批次记录比对,确定接下来运行的批次;

 第三、元数据文件【metadata】:metadata 与整个查询关联的元数据,目前仅保留当前job id

 第四、数据源目录【sources】:sources 目录为数据源(Source)时各个批次读取详情

 第五、数据接收端目录【sinks】:sinks 目录为数据接收端(Sink)时批次的写出详情

 第六、记录状态目录【state】:当有状态操作时,如累加聚合、去重、最大最小等场景,这个目录会被用来记录这些状态数据,根据配置周期性地生成.snapshot文件用于记录状态。

下一篇
举报
领券