首页
学习
活动
专区
圈层
工具
发布
50 篇文章
1
2021年大数据Spark(一):框架概述
2
2021年大数据Spark(二):四大特点
3
2021年大数据Spark(三):框架模块初步了解
4
2021年大数据Spark(四):三种常见的运行模式
5
2021年大数据Spark(五):大环境搭建本地模式 Local
6
2021年大数据Spark(六):环境搭建集群模式 Standalone
7
2021年大数据Spark(七):应用架构基本了解
8
2021年大数据Spark(八):环境搭建集群模式 Standalone HA
9
2021年大数据Spark(九):Spark On Yarn两种模式总结
10
2021年大数据Spark(十):环境搭建集群模式 Spark on YARN
11
2021年大数据Spark(十一):应用开发基于IDEA集成环境
12
2021年大数据Spark(十二):Spark Core的RDD详解
13
2021年大数据Spark(十三):Spark Core的RDD创建
14
2021年大数据Spark(十四):Spark Core的RDD操作
15
2021年大数据Spark(十五):Spark Core的RDD常用算子
16
2021年大数据Spark(十六):Spark Core的RDD算子练习
17
2021年大数据Spark(十七):Spark Core的RDD持久化
18
2021年大数据Spark(十八):Spark Core的RDD Checkpoint
19
2021年大数据Spark(十九):Spark Core的​​​​​​​共享变量
20
2021年大数据Spark(二十):Spark Core外部数据源引入
21
2021年大数据Spark(二十一):Spark Core案例-SogouQ日志分析
22
2021年大数据Spark(二十二):内核原理
23
2021年大数据Spark(二十三):SparkSQL 概述
24
2021年大数据Spark(二十四):SparkSQL数据抽象
25
2021年大数据Spark(二十五):SparkSQL的RDD、DF、DS相关操作
26
2021年大数据Spark(二十六):SparkSQL数据处理分析
27
2021年大数据Spark(二十七):SparkSQL案例一花式查询和案例二WordCount
28
2021年大数据Spark(二十八):SparkSQL案例三电影评分数据分析
29
2021年大数据Spark(二十九):SparkSQL案例四开窗函数
30
2021年大数据Spark(三十):SparkSQL自定义UDF函数
31
2021年大数据Spark(三十一):Spark On Hive
32
2021年大数据Spark(三十二):SparkSQL的External DataSource
33
2021年大数据Spark(三十三):SparkSQL分布式SQL引擎
34
2021年大数据Spark(三十四):Spark Streaming概述
35
2021年大数据Spark(三十五):SparkStreaming数据抽象 DStream
36
2021年大数据Spark(三十六):SparkStreaming实战案例一 WordCount
37
2021年大数据Spark(三十七):SparkStreaming实战案例二 UpdateStateByKey
38
2021年大数据Spark(三十八):SparkStreaming实战案例三 状态恢复 扩展
39
2021年大数据Spark(三十九):SparkStreaming实战案例四 窗口函数
40
2021年大数据Spark(四十):SparkStreaming实战案例五 TopN-transform
41
2021年大数据Spark(四十一):SparkStreaming实战案例六 自定义输出 foreachRDD
42
2021年大数据Spark(四十二):SparkStreaming的Kafka快速回顾与整合说明
43
2021年大数据Spark(四十三):SparkStreaming整合Kafka 0.10 开发使用
44
2021年大数据Spark(四十四):Structured Streaming概述
45
2021年大数据Spark(四十五):Structured Streaming Sources 输入源
46
2021年大数据Spark(四十六):Structured Streaming Operations 操作
47
2021年大数据Spark(四十七):Structured Streaming Sink 输出
48
2021年大数据Spark(四十八):Structured Streaming 输出终端/位置
49
2021年大数据Spark(四十九):Structured Streaming 整合 Kafka
50
2021年大数据Spark(五十):Structured Streaming 案例一实时数据ETL架构

2021年大数据Spark(四十五):Structured Streaming Sources 输入源


Sources 输入源

从Spark 2.0至Spark 2.4版本,目前支持数据源有4种,其中Kafka 数据源使用作为广泛,其他数据源主要用于开发测试程序。

文档http://spark.apache.org/docs/2.4.5/structured-streaming-programming-guide.html#input-sources

     可以认为Structured Streaming = SparkStreaming + SparkSQL,对流式数据处理使用SparkSQL数据结构,应用入口为SparkSession,对比SparkSQL与SparkStreaming编程:

 Spark Streaming:将流式数据按照时间间隔(BatchInterval)划分为很多Batch,每批次数据封装在RDD中,底层RDD数据,构建StreamingContext实时消费数据;

 Structured Streaming属于SparkSQL模块中一部分,对流式数据处理,构建SparkSession对象,指定读取Stream数据和保存Streamn数据,具体语法格式:

静态数据

读取spark.read

保存ds/df.write

流式数据

读取spark.readStream

保存ds/df.writeStrem

Socket数据源-入门案例

需求

http://spark.apache.org/docs/2.4.5/structured-streaming-programming-guide.html#quick-example

实时从TCP Socket读取数据(采用nc)实时进行词频统计WordCount,并将结果输出到控制台Console。

  • Socket 数据源

从Socket中读取UTF8文本数据。一般用于测试,使用nc -lk 端口号向Socket监听的端口发送数据,用于测试使用,有两个参数必须指定:

1.host

2.port

  • Console 接收器

     将结果数据打印到控制台或者标准输出,通常用于测试或Bedug使用,三种输出模式OutputMode(Append、Update、Complete)都支持,两个参数可设置:

1.numRows,打印多少条数据,默认为20条;

2.truncate,如果某列值字符串太长是否截取,默认为true,截取字符串;

编程实现

完整案例代码如下:

代码语言:javascript
复制
package cn.itcast.structedstreaming

import org.apache.commons.lang3.StringUtils
import org.apache.spark.SparkContext
import org.apache.spark.sql.streaming.{OutputMode, StreamingQuery}
import org.apache.spark.sql.{DataFrame, SparkSession}

/**
 * 使用Structured Streaming从TCP Socket实时读取数据,进行词频统计,将结果打印到控制台。
 */
object StructuredWordCount {
  def main(args: Array[String]): Unit = {
      //TODO: 0. 环境
    val spark: SparkSession = SparkSession.builder()
      .appName(this.getClass.getSimpleName.stripSuffix("$"))
      .master("local[*]")
      .config("spark.sql.shuffle.partitions", "2") // 设置Shuffle分区数目
      .getOrCreate()
    val sc: SparkContext = spark.sparkContext
    sc.setLogLevel("WARN")
    import spark.implicits._
    import org.apache.spark.sql.functions._

    // TODO: 1. 从TCP Socket 读取数据
    val inputStreamDF: DataFrame = spark.readStream
      .format("socket")
      .option("host", "node1")
      .option("port", 9999)
      .load()
    //注意:返回的df不是普通的分布式表,而是实时流数据对应的分布式的无界表!
    //df.show()//注意:该写法是离线的写法,会报错,所以应使用实时的写法:Queries with streaming sources must be executed with writeStream.start();
    inputStreamDF.printSchema()

    // TODO: 2. 业务分析:词频统计WordCount
    val resultStreamDF: DataFrame = inputStreamDF
      .as[String]
      .filter(StringUtils.isNotBlank(_))
      .flatMap(_.trim.split("\\s+"))
      .groupBy($"value")
      .count()
    //.orderBy($"count".desc)
    resultStreamDF.printSchema()

    // TODO: 3. 设置Streaming应用输出及启动
    val query: StreamingQuery = resultStreamDF.writeStream
      //- append:默认的追加模式,将新的数据输出!只支持简单查询,如果涉及的聚合就不支持了
      //- complete:完整模式,将完整的数据输出,支持聚合和排序
      //- update:更新模式,将有变化的数据输出,支持聚合但不支持排序,如果没有聚合就和append一样
      //.outputMode(OutputMode.Append())
      //.outputMode(OutputMode.Complete())
      .outputMode(OutputMode.Update())
      .format("console")
      .option("numRows", "10")
      .option("truncate", "false")
      // 流式应用,需要启动start
      .start()
    // 流式查询等待流式应用终止
    query.awaitTermination()
    // 等待所有任务运行完成才停止运行
    query.stop()
  }
}

​​​​​​​文件数据源-了解

将目录中写入的文件作为数据流读取,支持的文件格式为:text、csv、json、orc、parquet

​​​​​​​需求

监听某一个目录,读取csv格式数据,统计年龄小于25岁的人群的爱好排行榜。

测试数据

代码语言:javascript
复制
jack1;23;running

jack2;23;running

jack3;23;running

bob1;20;swimming

bob2;20;swimming

tom1;28;football

tom2;28;football

tom3;28;football

tom4;28;football

​​​​​​​代码实现

代码语言:javascript
复制
package cn.itcast.structedstreaming

import org.apache.spark.SparkContext
import org.apache.spark.sql.streaming.{OutputMode, StreamingQuery}
import org.apache.spark.sql.types.{IntegerType, StringType, StructType}
import org.apache.spark.sql.{DataFrame, Dataset, Row, SparkSession}

/**
 * 使用Structured Streaming从目录中读取文件数据:统计年龄小于25岁的人群的爱好排行榜
 */
object StructuredFileSource {
  def main(args: Array[String]): Unit = {
    val spark: SparkSession = SparkSession.builder()
      .appName(this.getClass.getSimpleName.stripSuffix("$"))
      .master("local[*]")
      .config("spark.sql.shuffle.partitions", "2")
      .getOrCreate()
    val sc: SparkContext = spark.sparkContext
    sc.setLogLevel("WARN")
    import spark.implicits._
    import org.apache.spark.sql.functions._

    // TODO: 从文件系统,监控目录,读取CSV格式数据
    // 数据格式:
    // jack;23;running
    val csvSchema: StructType = new StructType()
      .add("name", StringType, nullable = true)
      .add("age", IntegerType, nullable = true)
      .add("hobby", StringType, nullable = true)

    val inputStreamDF: DataFrame = spark.readStream
      .option("sep", ";")
      .option("header", "false")
      // 指定schema信息
      .schema(csvSchema)
      .csv("data/input/persons")

    // 依据业务需求,分析数据:统计年龄小于25岁的人群的爱好排行榜
    val resultStreamDF: Dataset[Row] = inputStreamDF
      .filter($"age" < 25)
      .groupBy($"hobby")
      .count()
      .orderBy($"count".desc)

    // 设置Streaming应用输出及启动
    val query: StreamingQuery = resultStreamDF.writeStream
      //- append:默认的追加模式,将新的数据输出!只支持简单查询,如果涉及的聚合就不支持了
      //- complete:完整模式,将完整的数据输出,支持聚合和排序
      //- update:更新模式,将有变化的数据输出,支持聚合但不支持排序,如果没有聚合就和append一样
      .outputMode(OutputMode.Complete())
      .format("console")
      .option("numRows", "10")
      .option("truncate", "false")
      .start()
    query.awaitTermination()
    query.stop()
  }
}

​​​​​​​Rate source-了解

以每秒指定的行数生成数据,每个输出行包含2个字段:timestamp和value

其中timestamp是一个Timestamp含有信息分配的时间类型,并且value是Long(包含消息的计数从0开始作为第一行)类型。此源用于测试和基准测试,可选参数如下:

演示范例代码如下:

代码语言:javascript
复制
package cn.itcast.structedstreaming

import org.apache.spark.SparkContext
import org.apache.spark.sql.streaming.{OutputMode, StreamingQuery, Trigger}
import org.apache.spark.sql.{DataFrame, SparkSession}

/**
 * 数据源:Rate Source,以每秒指定的行数生成数据,每个输出行包含一个timestamp和value。
 */
object StructuredRateSource {
  def main(args: Array[String]): Unit = {
    val spark: SparkSession = SparkSession.builder()
      .appName(this.getClass.getSimpleName.stripSuffix("$"))
      .master("local[*]")
      .config("spark.sql.shuffle.partitions", "2")
      .getOrCreate()
    val sc: SparkContext = spark.sparkContext
    sc.setLogLevel("WARN")
    import spark.implicits._
    import org.apache.spark.sql.functions._

    // TODO:从Rate数据源实时消费数据
    val rateStreamDF: DataFrame = spark.readStream
      .format("rate")
      .option("rowsPerSecond", "10") // 每秒生成数据条数
      .option("rampUpTime", "0s") // 每条数据生成间隔时间
      .option("numPartitions", "2") // 分区数目
      .load()
    rateStreamDF.printSchema()
    //root
    // |-- timestamp: timestamp (nullable = true)
    // |-- value: long (nullable = true)

    // 3. 设置Streaming应用输出及启动
    val query: StreamingQuery = rateStreamDF.writeStream
      //- append:默认的追加模式,将新的数据输出!只支持简单查询,如果涉及的聚合就不支持了
      //- complete:完整模式,将完整的数据输出,支持聚合和排序
      //- update:更新模式,将有变化的数据输出,支持聚合但不支持排序,如果没有聚合就和append一样
      .outputMode(OutputMode.Append())
      .format("console")
      .option("numRows", "10")
      .option("truncate", "false")
      .start()
    query.awaitTermination()
    query.stop()
  }
}
下一篇
举报
领券