前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >【Spark Streaming】Spark Day11:Spark Streaming 学习笔记

【Spark Streaming】Spark Day11:Spark Streaming 学习笔记

作者头像
Maynor
发布2021-12-06 20:11:31
1.1K0
发布2021-12-06 20:11:31
举报

Spark Day11:Spark Streaming

image-20210429064334280
image-20210429064334280

01-[了解]-昨日课程内容回顾

主要讲解:Spark Streaming 模块快速入门

代码语言:javascript
复制
1、Streaming 流式计算概述
	- Streaming 应用场景
		实时报表RealTime Report
		实时增量ETL
		实时预警和监控
		实时搜索推荐
		等等
	- 大数据架构:Lambda架构
		离线分析,实时计算
		分为三层:
			- 批处理层,BatchLayer
			- 速度层,SpeedLayer
			- 服务层,ServingLayer
	- 流式数据处理模式
		第一种模式:原生流处理native
			来一条数据,处理一条数据
		第二种模式:微批处理Mirco-Batch
			将流式数据划分小批次,每个小批次快速处理
	- SparkStreaming 计算思想
		将流式数据按照时间间隔BatchInterval划分为很多批次Batch,每批次数据当做RDD,进行处理分析
		DStream = Seq[RDD/Batch]

2、快速入门:词频统计WordCount
	- 需求:
		使用SparkStreaming对流式数据进行分析,从TCP Socket读取数据,对每批次数据进行词频统计,打印控制台,【注意,此处词频统计不是全局的,而是每批次的(局部)】
	- 官方案例
		run-example
	- SparkStreaming应用开发入口
		StreamingContext,流式上下文实例对象
		开发步骤:
			数据源DStream、数据处理和输出(调用DStream中函数)、启动流式应用start、等待终止await,最后关闭资源stop
	- 编程开发,类似RDD中词频统计,调用函数flatMap、map、redueByKey等
	- 流式应用原理
		- 运行程序时,首先创建StreamingContext对象,底层sparkContext
		- ssc.start,启动接收器Receivers,每个接收器以Task方式运行在Executor中
		- Receiver接收器开始从数据源接受数据,按照时间间隔BlockInterval划分数据时Block,默认200ms,将Block存储到Executor内存中,如果设置多副本,在其他Executor再进行存储,最后发送BlockReport给SSC
		- 当达到BatchINterval批次时间间隔时,产生一个Batch批次,将Block分配到该批次,底层将改配中数据当做RDD进行处理分析
		

3、数据结构:DStream = Seq[RDD]
	封装数据流,数据源源不断产生,按照时间间隔划分为很多批次Batch,DStream = Seq[RDD]
	函数:2种类型
		- 转换函数Transformation,类似RDD中转换函数
		- 输出函数Output
	2个重要函数,都是针对每批次RDD进行操作
		- 转换函数:tranform(rdd => rdd)
		- 输出函数:foreachRDD(rdd => Unit)
		修改词频统计代码

02-[了解]-今日课程内容提纲

主要讲解三个方面内容:集成Kafka,应用案例(状态、窗口)和偏移量管理

代码语言:javascript
复制
1、集成Kafka
	SparkStreaming实际项目中,基本上都是从Kafka消费数据进行实时处理
	- 集成时2套API
		由于Kafka Consumer API有2套,所以集成也有2套API
	- 编写代码
		如何从Kafka消费数据,必须掌握
	- 获取每批次数据偏移量信息
		offset

2、应用案例:百度搜索排行榜
	进行相关初始化操作
		- 工具类,创建StreamingContext对象和消费Kafka数据
		- 模拟数据生气生成器,实时产生用户搜索日志数据,发送到Kafka中
	- 实时ETL(无状态)
	- 累加统计(有状态)
	- 窗口统计

3、偏移量管理
	SparkStreaming一大败笔,需要用户管理从Kafka消费数据偏移量,了解知识点即可
image-20210430083314049
image-20210430083314049

03-[理解]-流式应用技术栈

​ 在实际项目中,无论使用Storm还是Spark Streaming与Flink,主要从Kafka实时消费数据进行处理分析,流式数据实时处理技术架构大致如下:

image-20210430083443433
image-20210430083443433
代码语言:javascript
复制
- 数据源Source
	分布式消息队列Kafka
		flume集成Kafka
		调用Producer API写入数据
		Canal实时间MySQL表数据同步到Kafka中,数据格式JSON字符串
		.....
		
- 应用程序运行
	目前企业中只要时流式应用程序,基本上都是运行在Hadoop YARN集群

- 数据终端
	将数据写入NoSQL数据库中,比如Redis、HBase、Kafka
	
Flume/SDK/Kafka Producer API -> KafKa  —> SparkStreaming/Flink/Storm  -> Hadoop YARN -> Redis -> UI
image-20210430084133876
image-20210430084133876

04-[理解]-Kafka回顾及集成Kafka两套API

Apache Kafka: 最原始功能【消息队列】,缓冲数据,具有发布订阅功能(类似微信公众号)。

image-20210430084420445
image-20210430084420445

Kafka 框架架构图如下所示:

image-20210430084442009
image-20210430084442009
代码语言:javascript
复制
1、服务:Broker,每台机器启动服务
	一个Kafka集群,至少3台机器
2、依赖Zookeeper
	配置信息存储在ZK中
3、Producer生产者
	向Kafka中写入数据
4、Consumer 消费者
	从Kafka中消费数据,订阅数据

5、数据如何存储和管理
	使用Topic主题,管理不同类型数据,划分为多个分区partition,采用副本机制
		leader 副本:读写数据,1
		follower 副本:同步数据,保证数据可靠性,1或多个

​ Spark Streaming与Kafka集成,有两套API,原因在于Kafka Consumer API有两套,从Kafka 0.9版本开始出现New Consumer API,方便用户使用,从Kafka Topic中消费数据,到0.10版本稳定。

image-20210430085030228
image-20210430085030228
代码语言:javascript
复制
目前,企业中基本上都是使用Kafka New Consumer API消费Kafka中数据。
	- 核心类:KafkaConsumer、ConsumerRecorder

05-[掌握]-New Consumer API方式集成编程

使用Kafka 0.10.+提供新版本Consumer API集成Streaming,实时消费Topic数据,进行处理。

  • 添加相关Maven依赖:
代码语言:javascript
复制
<dependency>
    <groupId>org.apache.sparkgroupId>
    <artifactId>spark-streaming-kafka-0-10_2.11artifactId>
    <version>2.4.5version>
dependency>

目前企业中基本都使用New Consumer API集成,优势如下:

  • 第一、类似 Old Consumer API中Direct方式
image-20210430090721527
image-20210430090721527
  • 第二、简单并行度1:1
image-20210430090753969
image-20210430090753969

工具类KafkaUtilscreateDirectStream函数API使用说明(函数声明):

image-20210430090825875
image-20210430090825875
代码语言:javascript
复制
官方文档:http://spark.apache.org/docs/2.4.5/streaming-kafka-0-10-integration.html

首先启动Kafka服务,创建Topic:wc-topic

代码语言:javascript
复制
[root@node1 ~]# zookeeper-daemon.sh start 

[root@node1 ~]# kafka-daemon.sh start 

[root@node1 ~]# jps
2945 Kafka

# 使用KafkaTools创建Topic,设置1个副本和3个分区


kafka-console-producer.sh --topic wc-topic --broker-list node1.itcast.cn:9092
image-20210430091841563
image-20210430091841563

具体实现代码,其中需要创建位置策略对象和消费策略对象

代码语言:javascript
复制
package cn.itcast.spark.kafka

import java.util

import org.apache.commons.lang3.time.FastDateFormat
import org.apache.kafka.clients.consumer.ConsumerRecord
import org.apache.kafka.common.serialization.StringDeserializer
import org.apache.spark.SparkConf
import org.apache.spark.streaming.dstream.{DStream, InputDStream}
import org.apache.spark.streaming.kafka010.{ConsumerStrategies, ConsumerStrategy, KafkaUtils, LocationStrategies, LocationStrategy}
import org.apache.spark.streaming.{Seconds, StreamingContext}

/**
 * Streaming通过Kafka New Consumer消费者API获取数据
 */
object _01StreamingSourceKafka {
	
	def main(args: Array[String]): Unit = {
		
		// 1. 构建StreamingContext实例对象,传递时间间隔BatchInterval
		val ssc: StreamingContext = {
			// a. 创建SparkConf对象,设置应用基本信息
			val sparkConf = new SparkConf()
    			.setAppName(this.getClass.getSimpleName.stripSuffix("$"))
    			.setMaster("local[3]")
				// 设置数据输出文件系统的算法版本为2
				.set("spark.hadoop.mapreduce.fileoutputcommitter.algorithm.version", "2")
			// b. 创建实例对象,设置BatchInterval
			new StreamingContext(sparkConf, Seconds(5))
		}
		
		// 2. 定义数据源,获取流式数据,封装到DStream中
		// TODO: 从Kafka消费数据,采用New Consumer API方式
		/*
		  def createDirectStream[K, V](
		      ssc: StreamingContext,
		      locationStrategy: LocationStrategy,
		      consumerStrategy: ConsumerStrategy[K, V]
		   ): InputDStream[ConsumerRecord[K, V]]
		 */
		// a. 位置策略对象
		val locationStrategy: LocationStrategy = LocationStrategies.PreferConsistent
		// b. 消费策略
		val kafkaParams: Map[String, Object] = Map[String, Object](
			"bootstrap.servers" -> "node1.itcast.cn:9092",
			"key.deserializer" -> classOf[StringDeserializer],
			"value.deserializer" -> classOf[StringDeserializer],
			"group.id" -> "gui-1001",
			"auto.offset.reset" -> "latest",
			"enable.auto.commit" -> (false: java.lang.Boolean)
		)
		val consumerStrategy: ConsumerStrategy[String, String] = ConsumerStrategies.Subscribe(
			Array("wc-topic"), //
			kafkaParams //
		)
		// c. 采用New Consumer API获取Kafka Topic中数据
		val kafkaDStream: InputDStream[ConsumerRecord[String, String]] = KafkaUtils.createDirectStream(
			ssc, //
			locationStrategy, //
			consumerStrategy //
		)
		
		// 仅仅获取Kafka Topic中Value数据:Message消息
		val inputDStream: DStream[String] = kafkaDStream.map(record => record.value())
		
		// 3. 依据业务需求,调用DStream中转换函数(类似RDD中转换函数)
		/*
			def transform[U: ClassTag](transformFunc: RDD[T] => RDD[U]): DStream[U]
		 */
		// 此处rdd就是DStream中每批次RDD数据
		val resultDStream: DStream[(String, Int)] = inputDStream.transform{ rdd =>
			rdd
				.filter(line => null != line && line.trim.length > 0)
				.flatMap(line => line.trim.split("\\s+"))
				.map(word => (word, 1))
				.reduceByKey((tmp, item) => tmp + item)
		}
		
		// 4. 定义数据终端,将每批次结果数据进行输出
		/*
			def foreachRDD(foreachFunc: (RDD[T], Time) => Unit): Unit
		 */
		resultDStream.foreachRDD((rdd, time) => {
			//val xx: Time = time
			val format: FastDateFormat = FastDateFormat.getInstance("yyyy/MM/dd HH:mm:ss")
			println("-------------------------------------------")
			println(s"Time: ${format.format(time.milliseconds)}")
			println("-------------------------------------------")
			// 判断每批次结果RDD是否有数据,如果有数据,再进行输出
			if(!rdd.isEmpty()){
				rdd.coalesce(1).foreachPartition(iter => iter.foreach(println))
			}
		})
		
		// 5. 启动流式应用,等待终止
		ssc.start()
		ssc.awaitTermination()
		ssc.stop(stopSparkContext = true, stopGracefully = true)
	}
	
}

06-[理解]-集成Kafka时获取消费偏移量信息

​ 当 SparkStreaming 集 成 Kafka 时 , 无 论 是 Old Consumer API 中 Direct 方 式 还 是 NewConsumer API方式获取的数据,每批次的数据封装在KafkaRDD中,其中包含每条数据的元数据信息。

image-20210430093032041
image-20210430093032041

​ 当流式应用程序运行时,在WEB UI监控界面中,可以看到每批次消费数据的偏移量范围,能否在程序中获取数据呢?? 官方文档:http://spark.apache.org/docs/2.2.0/streaming-kafka-0-10-integration.html#obtaining-offsets

image-20210430093230238
image-20210430093230238

获取偏移量信息代码如下:

image-20210430093248283
image-20210430093248283

修改前面代码,获取消费Kafka数据时,每个批次中各个分区数据偏移量范围:

代码语言:javascript
复制
package cn.itcast.spark.kafka

import org.apache.commons.lang3.time.FastDateFormat
import org.apache.kafka.clients.consumer.ConsumerRecord
import org.apache.kafka.common.serialization.StringDeserializer
import org.apache.spark.SparkConf
import org.apache.spark.streaming.dstream.{DStream, InputDStream}
import org.apache.spark.streaming.kafka010._
import org.apache.spark.streaming.{Seconds, StreamingContext}

/**
 * Streaming通过Kafka New Consumer消费者API获取数据,获取每批次处理数据偏移量OFFSET
 */
object _02StreamingKafkaOffset {
	
	def main(args: Array[String]): Unit = {
		
		// 1. 构建StreamingContext实例对象,传递时间间隔BatchInterval
		val ssc: StreamingContext = {
			// a. 创建SparkConf对象,设置应用基本信息
			val sparkConf = new SparkConf()
    			.setAppName(this.getClass.getSimpleName.stripSuffix("$"))
    			.setMaster("local[3]")
				// 设置数据输出文件系统的算法版本为2
				.set("spark.hadoop.mapreduce.fileoutputcommitter.algorithm.version", "2")
			// b. 创建实例对象,设置BatchInterval
			new StreamingContext(sparkConf, Seconds(5))
		}
		
		// 2. 定义数据源,获取流式数据,封装到DStream中
		// TODO: 从Kafka消费数据,采用New Consumer API方式
		/*
		def createDirectStream[K, V](
		      ssc: StreamingContext,
		      locationStrategy: LocationStrategy,
		      consumerStrategy: ConsumerStrategy[K, V]
		    ): InputDStream[ConsumerRecord[K, V]]
		 */
		// step1. 表示消费Kafka中Topic数据时,位置策略
		val locationStrategy: LocationStrategy = LocationStrategies.PreferConsistent
		// step2. 表示消费Kafka中topic数据时,消费策略,封装消费配置信息
		/*
	        def Subscribe[K, V](
		      topics: Iterable[jl.String],
		      kafkaParams: collection.Map[String, Object]
		    ): ConsumerStrategy[K, V]
		 */
		val kafkaParams: collection.Map[String, Object] = Map(
			"bootstrap.servers" -> "node1.itcast.cn:9092", //
			"key.deserializer" -> classOf[StringDeserializer],
			"value.deserializer" -> classOf[StringDeserializer],
			"group.id" -> "groop_id_1001",
			"auto.offset.reset" -> "latest",
			"enable.auto.commit" -> (false: java.lang.Boolean)
		)
		val consumerStrategy: ConsumerStrategy[String, String] = ConsumerStrategies.Subscribe (
			Array("wc-topic"), kafkaParams
		)
		// step3. 使用Kafka New Consumer API消费数据
		val kafkaDStream: InputDStream[ConsumerRecord[String, String]] = KafkaUtils.createDirectStream(
			ssc, locationStrategy, consumerStrategy
		)
		
		// TODO: 其一、定义数组,用于存储偏移量
		var offsetRanges: Array[OffsetRange] = Array.empty[OffsetRange] // 每个Kafka分区数据偏移量信息封装在OffsetRange对象中
		
		// 3. 依据业务需求,调用DStream中转换函数(类似RDD中转换函数)
		/*
			def transform[U: ClassTag](transformFunc: RDD[T] => RDD[U]): DStream[U]
		 */
		// 此处rdd就是DStream中每批次RDD数据
		val resultDStream: DStream[(String, Int)] = kafkaDStream.transform{ rdd =>
			// TODO: 此时直接针对获取KafkaDStream进行转换操作,rdd属于KafkaRDD,包含相关偏移量信息
			// TODO: 其二、转换KafkaRDD为HasOffsetRanges类型对象,获取偏移量范围
			offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
			
			rdd
				.map(record => record.value())
				.filter(line => null != line && line.trim.length > 0)
				.flatMap(line => line.trim.split("\\s+"))
				.map(word => (word, 1))
				.reduceByKey((tmp, item) => tmp + item)
		}
		
		// 4. 定义数据终端,将每批次结果数据进行输出
		/*
			def foreachRDD(foreachFunc: (RDD[T], Time) => Unit): Unit
		 */
		resultDStream.foreachRDD((rdd, time) => {
			//val xx: Time = time
			val format: FastDateFormat = FastDateFormat.getInstance("yyyy/MM/dd HH:mm:ss")
			println("-------------------------------------------")
			println(s"Time: ${format.format(time.milliseconds)}")
			println("-------------------------------------------")
			// 判断每批次结果RDD是否有数据,如果有数据,再进行输出
			if(!rdd.isEmpty()){
				rdd.coalesce(1).foreachPartition(iter => iter.foreach(println))
			}
			
			// TODO: 其三、当当前批次数据处理完成以后,打印当前批次中数据偏移量信息
			offsetRanges.foreach{offsetRange =>
				println(s"topic: ${offsetRange.topic}    partition: ${offsetRange.partition}    offsets: ${offsetRange.fromOffset} to ${offsetRange.untilOffset}")
			}
		})
		
		// 5. 启动流式应用,等待终止
		ssc.start()
		ssc.awaitTermination()
		ssc.stop(stopSparkContext = true, stopGracefully = true)
	}
	
}

07-[了解]-应用案例之业务场景和需求说明

仿【百度搜索风云榜】对用户使用百度搜索时日志进行分析:【百度搜索日志实时分析】,主要业务需求如下三个方面:

image-20210430094431336
image-20210430094431336
代码语言:javascript
复制
业务一:搜索日志数据存储HDFS,实时对日志数据进行ETL提取转换,存储HDFS文件系统;

业务二:百度热搜排行榜Top10,累加统计所有用户搜索词次数,获取Top10搜索词及次数;

业务三:近期时间内热搜Top10,统计最近一段时间范围(比如,最近半个小时或最近2个小时)内用户搜索词次数,获取Top10搜索词及次数;

开发Maven Project中目录结构如下所示:

image-20210430094632342
image-20210430094632342

08-[掌握]-应用案例之初始化环境和工具类

​ 编程实现业务之前,首先编写程序模拟产生用户使用百度搜索产生日志数据和创建工具StreamingContextUtils提供StreamingContext对象与从Kafka接收数据方法。

  • 启动Kafka Broker服务,创建Topic【search-log-topic】,命令如下所示:
image-20210430094909566
image-20210430094909566
  • 模拟日志数据

模拟用户搜索日志数据,字段信息封装到CaseClass样例类【SearchLog】类,代码如下:

代码语言:javascript
复制
package cn.itcast.spark.app.mock

/**
 * 用户百度搜索时日志数据封装样例类CaseClass
 * 
 *
 * @param sessionId 会话ID
 * @param ip        IP地址
 * @param datetime  搜索日期时间
 * @param keyword   搜索关键词
 */
case class SearchLog(
	                    sessionId: String, //
	                    ip: String, //
	                    datetime: String, //
	                    keyword: String //
                    ) {
	override def toString: String = s"$sessionId,$ip,$datetime,$keyword"
}

模拟产生搜索日志数据类【MockSearchLogs】具体代码如下:

代码语言:javascript
复制
package cn.itcast.spark.app.mock

import java.util.{Properties, UUID}

import org.apache.commons.lang3.time.FastDateFormat
import org.apache.kafka.clients.producer.{KafkaProducer, ProducerRecord}
import org.apache.kafka.common.serialization.StringSerializer

import scala.util.Random

/**
 * 模拟产生用户使用百度搜索引擎时,搜索查询日志数据,包含字段为:
 *      uid, ip, search_datetime, search_keyword
 */
object MockSearchLogs {
    
    def main(args: Array[String]): Unit = {
    
        // 搜索关键词,直接到百度热搜榜获取即可
        val keywords: Array[String] = Array(
            "吴尊友提醒五一不参加大型聚集聚会", "称孩子没死就得购物导游被处罚", "刷视频刷出的双胞胎姐妹系同卵双生",
            "云公民受审认罪 涉嫌受贿超4.6亿", "印度男子下跪求警察别拿走氧气瓶", "广电总局:支持查处阴阳合同等问题",
            "75位一线艺人注销200家关联公司", "空间站天和核心舱发射成功", "中国海军舰艇警告驱离美舰",
            "印度德里将狗用火葬场改为人用", "公安部派出工作组赴广西", "美一男子遭警察跪压5分钟死亡",
            "华尔街传奇基金经理跳楼身亡", "阿波罗11号宇航员柯林斯去世", "刘嘉玲向窦骁何超莲道歉"
        )
        
        // 发送Kafka Topic
        val props = new Properties()
        props.put("bootstrap.servers", "node1.itcast.cn:9092")
        props.put("acks", "1")
        props.put("retries", "3")
        props.put("key.serializer", classOf[StringSerializer].getName)
        props.put("value.serializer", classOf[StringSerializer].getName)
        val producer = new KafkaProducer[String, String](props)
        
        val random: Random = new Random()
        while (true){
            // 随机产生一条搜索查询日志
            val searchLog: SearchLog = SearchLog(
                getUserId(), //
                getRandomIp(), //
                getCurrentDateTime(), //
                keywords(random.nextInt(keywords.length)) //
            )
            println(searchLog.toString)
            Thread.sleep(100 + random.nextInt(100))
            
            val record = new ProducerRecord[String, String]("search-log-topic", searchLog.toString)
            producer.send(record)
        }
        // 关闭连接
        producer.close()
    }
    
    /**
     * 随机生成用户SessionId
     */
    def getUserId(): String = {
        val uuid: String = UUID.randomUUID().toString
        uuid.replaceAll("-", "").substring(16)
    }
    
    /**
     * 获取当前日期时间,格式为yyyyMMddHHmmssSSS
     */
    def getCurrentDateTime(): String = {
        val format =  FastDateFormat.getInstance("yyyyMMddHHmmssSSS")
        val nowDateTime: Long = System.currentTimeMillis()
        format.format(nowDateTime)
    }
    
    /**
     * 获取随机IP地址
     */
    def getRandomIp(): String = {
        // ip范围
        val range: Array[(Int, Int)] = Array(
            (607649792,608174079), //36.56.0.0-36.63.255.255
            (1038614528,1039007743), //61.232.0.0-61.237.255.255
            (1783627776,1784676351), //106.80.0.0-106.95.255.255
            (2035023872,2035154943), //121.76.0.0-121.77.255.255
            (2078801920,2079064063), //123.232.0.0-123.235.255.255
            (-1950089216,-1948778497),//139.196.0.0-139.215.255.255
            (-1425539072,-1425014785),//171.8.0.0-171.15.255.255
            (-1236271104,-1235419137),//182.80.0.0-182.92.255.255
            (-770113536,-768606209),//210.25.0.0-210.47.255.255
            (-569376768,-564133889) //222.16.0.0-222.95.255.255
        )
        // 随机数:IP地址范围下标
        val random = new Random()
        val index = random.nextInt(10)
        val ipNumber: Int = range(index)._1 + random.nextInt(range(index)._2 - range(index)._1)
        //println(s"ipNumber = ${ipNumber}")
        
        // 转换Int类型IP地址为IPv4格式
        number2IpString(ipNumber)
    }
    
    /**
     * 将Int类型IPv4地址转换为字符串类型
     */
    def number2IpString(ip: Int): String = {
        val buffer: Array[Int] = new Array[Int](4)
        buffer(0) = (ip >> 24) & 0xff
        buffer(1) = (ip >> 16) & 0xff
        buffer(2) = (ip >> 8) & 0xff
        buffer(3) = ip & 0xff
        // 返回IPv4地址
        buffer.mkString(".")
    }
    
}
  • 所有SparkStreaming应用都需要构建StreamingContext实例对象,并且从采用New KafkaConsumer API消费Kafka数据,编写工具类【StreamingContextUtils】,提供两个方法:
image-20210430095446723
image-20210430095446723
代码语言:javascript
复制
package cn.itcast.spark.app

import org.apache.kafka.clients.consumer.ConsumerRecord
import org.apache.kafka.common.serialization.StringDeserializer
import org.apache.spark.SparkConf
import org.apache.spark.streaming.dstream.DStream
import org.apache.spark.streaming.kafka010._
import org.apache.spark.streaming.{Seconds, StreamingContext}

/**
 * 工具类提供:构建流式应用上下文StreamingContext实例对象和从Kafka Topic消费数据
 */
object StreamingContextUtils {
	
	/**
	 * 获取StreamingContext实例,传递批处理时间间隔
	 * @param batchInterval 批处理时间间隔,单位为秒
	 */
	def getStreamingContext(clazz: Class[_], batchInterval: Int): StreamingContext = {
		// i. 创建SparkConf对象,设置应用配置信息
		val sparkConf = new SparkConf()
			.setAppName(clazz.getSimpleName.stripSuffix("$"))
			.setMaster("local[3]")
			// 设置Kryo序列化
			.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
			.registerKryoClasses(Array(classOf[ConsumerRecord[String, String]]))
			// 设置保存文件数据时,算法版本:2
			.set("spark.hadoop.mapreduce.fileoutputcommitter.algorithm.version", "2")
		// ii.创建流式上下文对象, 传递SparkConf对象和时间间隔
		val context = new StreamingContext(sparkConf, Seconds(batchInterval))
		// iii. 返回
		context
	}
	
	/**
	 * 从指定的Kafka Topic中消费数据,默认从最新偏移量(largest)开始消费
	 * @param ssc StreamingContext实例对象
	 * @param topicName 消费Kafka中Topic名称
	 */
	def consumerKafka(ssc: StreamingContext, topicName: String): DStream[ConsumerRecord[String, String]] = {
		// i.位置策略
		val locationStrategy: LocationStrategy = LocationStrategies.PreferConsistent
		// ii.读取哪些Topic数据
		val topics = Array(topicName)
		// iii.消费Kafka 数据配置参数
		val kafkaParams = Map[String, Object](
			"bootstrap.servers" -> "node1.itcast.cn:9092",
			"key.deserializer" -> classOf[StringDeserializer],
			"value.deserializer" -> classOf[StringDeserializer],
			"group.id" -> "gui_0001",
			"auto.offset.reset" -> "latest",
			"enable.auto.commit" -> (false: java.lang.Boolean)
		)
		// iv.消费数据策略
		val consumerStrategy: ConsumerStrategy[String, String] = ConsumerStrategies.Subscribe(
			topics, kafkaParams
		)
		
		// v.采用新消费者API获取数据,类似于Direct方式
		val kafkaDStream: DStream[ConsumerRecord[String, String]] = KafkaUtils.createDirectStream(
			ssc, locationStrategy, consumerStrategy
		)
		// vi.返回DStream
		kafkaDStream
	}
	
}

09-[掌握]-应用案例之实时数据ETL存储

​ 实时从Kafka Topic消费数据,提取ip地址字段,调用【ip2Region】库解析为省份和城市,存储到HDFS文件中,设置批处理时间间隔BatchInterval为10秒。 此需求,属于流式应用中【无状态Stateless】应用场景,使用transformforeachRDD函数即可

代码语言:javascript
复制
package cn.itcast.spark.app.etl

import cn.itcast.spark.app.StreamingContextUtils
import org.apache.kafka.clients.consumer.ConsumerRecord
import org.apache.spark.rdd.RDD
import org.apache.spark.streaming.StreamingContext
import org.apache.spark.streaming.dstream.DStream
import org.lionsoul.ip2region.{DbConfig, DbSearcher}

/**
 * 实时消费Kafka Topic数据,经过ETL(过滤、转换)后,保存至HDFS文件系统中,BatchInterval为:10s
 */
object _03StreamingETLHdfs {
	
	def main(args: Array[String]): Unit = {
	
		// 1. 创建StreamingContext实例对象
		val ssc: StreamingContext = StreamingContextUtils.getStreamingContext(this.getClass, 10)
		
		// 2. 从Kafka消费数据,采用New Consumer API
		val kafkaDStream: DStream[ConsumerRecord[String, String]] = StreamingContextUtils.consumerKafka(ssc, "search-log-topic")
		
		// TODO:3. 对获取数据,进行ETL转换,将IP地址转换为省份和城市
		val etlDStream: DStream[String] = kafkaDStream.transform { rdd =>
			val etlRDD: RDD[String] = rdd
				// 过滤数据
				.filter(record => null != record.value() && record.value().trim.split(",").length == 4)
				// 针对每个分区操作,获取每条数据中ip地址,转换为省份和城市
				.mapPartitions { iter =>
					// a. 创建DbSearch对象
					val dbSearcher = new DbSearcher(new DbConfig(), "dataset/ip2region.db")
					// b. 对分区中数据的IP值进行转换解析
					iter.map { record =>
						// 获取Message信息Value值
						val message: String = record.value()
						// 获取IP地址值
						val ipValue: String = message.split(",")(1)
						// 解析IP地址
						val region: String = dbSearcher.btreeSearch(ipValue).getRegion
						val Array(_, _, province, city, _) = region.split("\\|")
						// 拼接字符串
						s"${message},${province},${city}"
					}
				}
			// 返回转换后RDD
			etlRDD
		}
		etlDStream
		
		// 4. 保存数据至HDFS文件系统
		etlDStream.foreachRDD((rdd, batchTime) => {
			if(!rdd.isEmpty()){
				rdd.coalesce(1).saveAsTextFile(s"datas/streaming/search-logs-${batchTime}")
			}
		})
		
		// 启动流式应用,等待终止结束
		ssc.start()
		ssc.awaitTermination()
		ssc.stop(stopSparkContext = true, stopGracefully = true)
	}
	
}

运行模拟日志数据程序和ETL应用程序,查看实时数据ETL后保存文件,截图如下:

image-20210430103032486
image-20210430103032486

10-[掌握]-应用案例之updateStateByKey 函数

实 时 累 加 统 计 用 户 各 个 搜 索 词 出 现 的 次 数 , 在 SparkStreaming 中 提 供 函 数【updateStateByKey】实现累加统计,Spark 1.6提供【mapWithState】函数状态统计,性能更好,实际应用中也推荐使用。

image-20210430103315171
image-20210430103315171

将每批次数据状态,按照Key与以前状态,使用定义函数【updateFunc】进行更新,示意图如下:

image-20210430103411201
image-20210430103411201

针对搜索词词频统计WordCount,状态更新逻辑示意图如下:

image-20210430103609276
image-20210430103609276
代码语言:javascript
复制
使用updatStateByKey状态更新函数,要点如下:
	- 第一点、依据Key更新状态
		Key就是关键字段,针对应用来说,Key就是搜索词
	- 第二点、更新原则
		step1、计算当前批次中,Key的状态
		step2、获取Key以前状态
		step3、合并当前批次状态和以前状态
针对此应用来说,
	Key搜索词,对应状态State,数据类型:Int,要么Long

编程实现,累加实时统计,使用updateStateByKey函数

代码语言:javascript
复制
package cn.itcast.spark.app.state

import cn.itcast.spark.app.StreamingContextUtils
import org.apache.commons.lang3.time.FastDateFormat
import org.apache.kafka.clients.consumer.ConsumerRecord
import org.apache.spark.streaming.StreamingContext
import org.apache.spark.streaming.dstream.DStream

/**
 * 实时消费Kafka Topic数据,累加统计各个搜索词的搜索次数,实现百度搜索风云榜
 */
object _04StreamingUpdateState {
	
	def main(args: Array[String]): Unit = {
		
		// 1. 创建StreamingContext实例对象
		val ssc: StreamingContext = StreamingContextUtils.getStreamingContext(this.getClass, 10)
		// TODO: 设置检查点目录
       	ssc.checkpoint("datas/streaming/ckpt-1001")
		
		// 2. 从Kafka消费数据,采用New Consumer API
		val kafkaDStream: DStream[ConsumerRecord[String, String]] = StreamingContextUtils.consumerKafka(ssc, "search-log-topic")
		
		// 3. TODO: step1. 对当前批次数据进行聚合统计
		val batchReduceDStream: DStream[(String, Int)] = kafkaDStream.transform{rdd =>
			rdd
				// 获取Message信息
    			.map(record => record.value())
    			.filter(msg => null != msg && msg.trim.split(",").length == 4)
				// 提取搜索词,表示出现一次
    			.map(msg => msg.trim.split(",")(3) -> 1)
				// TODO: 优化,对当前批次中数据进行一次聚合
    			.reduceByKey(_ + _)
		}
		
		
		// 3. TODO: step2. 将当前批次聚合结果与以前状态数据进行聚合操作(状态更新)
		/*
		  def updateStateByKey[S: ClassTag](
		      updateFunc: (Seq[V], Option[S]) => Option[S]
		    ): DStream[(K, S)]
		    - Seq[V]表示当前批次中Key对应的value值得集合
		        如果对当前批次中数据按照Key进行聚合以后,此时,只有一个值
		        V类型:Int
		    - Option[S]):表示Key的以前状态,如果以前没有出现过该Key,状态就是None
		        S类型:Int
		 */
		val stateDStream: DStream[(String, Int)] = batchReduceDStream.updateStateByKey(
			(values: Seq[Int], state: Option[Int]) => {
				// a. 获取Key的以前状态
				val previousState: Int = state.getOrElse(0)
				// b. 获取当前批次中Key的状态
				val currentState: Int = values.sum
				// c. 合并状态呢
				val latestState: Int = previousState + currentState
				// 返回最新状态
				Some(latestState)
			}
		)
		
		// 4. 将每批次结果数据进行输出
		stateDStream.foreachRDD((rdd, time) => {
			val format: FastDateFormat = FastDateFormat.getInstance("yyyy/MM/dd HH:mm:ss")
			println("-------------------------------------------")
			println(s"Time: ${format.format(time.milliseconds)}")
			println("-------------------------------------------")
			// 判断每批次结果RDD是否有数据,如果有数据,再进行输出
			if(!rdd.isEmpty()){
				rdd.coalesce(1).foreachPartition(iter => iter.foreach(println))
			}
		})
		
		// 启动流式应用,等待终止结束
		ssc.start()
		ssc.awaitTermination()
		ssc.stop(stopSparkContext = true, stopGracefully = true)
	}
	
}

11-[掌握]-应用案例之mapWithState 函数

​ Spark 1.6提供新的状态更新函数【mapWithState】,mapWithState函数也会统计全局的key的状态,但是如果没有数据输入,便不会返回之前的key的状态,只是关心那些已经发生的变化的key,对于没有数据输入,则不会返回那些没有变化的key的数据。

image-20210430105619161
image-20210430105619161

这样的话,即使数据量很大,checkpoint也不会像updateStateByKey那样,占用太多的存储,效率比较高;

image-20210430105719324
image-20210430105719324

需要构建StateSpec对象,对状态State进行封装,可以进行相关操作,类的声明定义如下:

image-20210430105813420
image-20210430105813420

状态函数【mapWithState】参数相关说明:

image-20210430110010118
image-20210430110010118

修改前面案例代码,使用mapWithState函数更新状态,

代码语言:javascript
复制
package cn.itcast.spark.app.state

import cn.itcast.spark.app.StreamingContextUtils
import org.apache.commons.lang3.time.FastDateFormat
import org.apache.kafka.clients.consumer.ConsumerRecord
import org.apache.spark.streaming.{State, StateSpec, StreamingContext}
import org.apache.spark.streaming.dstream.DStream

/**
 * 实时消费Kafka Topic数据,累加统计各个搜索词的搜索次数,实现百度搜索风云榜
 */
object _05StreamingMapWithState {
	
	def main(args: Array[String]): Unit = {
		
		// 1. 创建StreamingContext实例对象
		val ssc: StreamingContext = StreamingContextUtils.getStreamingContext(this.getClass, 5)
		// TODO: 设置检查点目录
		ssc.checkpoint("datas/streaming-ckpt-999999")
		
		// 2. 从Kafka消费数据,采用New Consumer API
		val kafkaDStream: DStream[ConsumerRecord[String, String]] = StreamingContextUtils.consumerKafka(ssc, "search-log-topic")
		
		// 3. TODO: step1. 对当前批次数据进行聚合统计
		val batchReduceDStream: DStream[(String, Int)] = kafkaDStream.transform{ rdd =>
			rdd
				.filter(record => null != record && record.value().trim.split(",").length == 4)
				.map{record =>
					// 获取Kafka Topic每条数据Message
					val msg: String = record.value()
					// 获取搜索关键词
					val searchWord: String = msg.trim.split(",").last
					// 返回二元组
					searchWord -> 1
				}
				// 按照搜索词分组,聚合统计各个搜索词出现次数
    			.reduceByKey(_ + _)  // 此处属于性能优化
		}
		
		// 3. TODO: step2. 将当前批次聚合结果与以前状态数据进行聚合操作(状态更新)
		/*
		  def mapWithState[StateType: ClassTag, MappedType: ClassTag](
		      spec: StateSpec[K, V, StateType, MappedType]
		    ): MapWithStateDStream[K, V, StateType, MappedType]
		 */
		// 构建stateSpec对象
		/*
		def function[KeyType, ValueType, StateType, MappedType](
		      mappingFunction: (KeyType, Option[ValueType], State[StateType]) => MappedType
		 ): StateSpec[KeyType, ValueType, StateType, MappedType]
		 */
		val spec: StateSpec[String, Int, Int, (String, Int)] = StateSpec.function(
			(key: String, option: Option[Int], state: State[Int]) => {
				// a. 获取当前Key的之
				val currentState: Int = option.getOrElse(0)
				// b. 获取以前状态
				val previousState: Int = state.getOption().getOrElse(0)
				// c. 合并状态
				val latestState: Int = currentState + previousState
				// d. 更新状态
				state.update(latestState)
				// e. 返回key和状态的之,封装到二元组
				key -> latestState
			}
		)
		// 按照Key进行状态更新统计
		val stateDStream: DStream[(String, Int)] = batchReduceDStream.mapWithState(spec)
		
		// 4. 将每批次结果数据进行输出
		stateDStream.foreachRDD((rdd, time) => {
			val format: FastDateFormat = FastDateFormat.getInstance("yyyy/MM/dd HH:mm:ss")
			println("-------------------------------------------")
			println(s"Time: ${format.format(time.milliseconds)}")
			println("-------------------------------------------")
			// 判断每批次结果RDD是否有数据,如果有数据,再进行输出
			if(!rdd.isEmpty()){
				rdd.coalesce(1).foreachPartition(iter => iter.foreach(println))
			}
		})
		
		// 启动流式应用,等待终止结束
		ssc.start()
		ssc.awaitTermination()
		ssc.stop(stopSparkContext = true, stopGracefully = true)
	}
	
}

12-[掌握]-应用案例之实时窗口统计window

SparkStreaming中提供一些列窗口函数,方便对窗口数据进行分析,文档: http://spark.apache.org/docs/2.4.5/streaming-programming-guide.html#window-operations

在实际项目中,很多时候需求:每隔一段时间统计最近数据状态,并不是对所有数据进行统计,称为趋势统计或者窗口统计,SparkStreaming中提供相关函数实现功能,业务逻辑如下:

image-20210430113610323
image-20210430113610323

窗口函数【window】声明如下,包含两个参数:窗口大小(WindowInterval,每次统计数据范围)和滑动大小(每隔多久统计一次),都必须是批处理时间间隔BatchInterval整数倍。

image-20210430113627598
image-20210430113627598
代码语言:javascript
复制
package cn.itcast.spark.app.window

import cn.itcast.spark.app.StreamingContextUtils
import org.apache.commons.lang3.time.FastDateFormat
import org.apache.kafka.clients.consumer.ConsumerRecord
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.streaming.dstream.DStream

/**
 * 实时消费Kafka Topic数据,每隔一段时间统计最近搜索日志中搜索词次数
	 * 批处理时间间隔:BatchInterval = 2s
	 * 窗口大小间隔:WindowInterval = 4s
	 * 滑动大小间隔:SliderInterval = 2s
 */
object _06StreamingWindow {
	
	def main(args: Array[String]): Unit = {
		
		// 1. 创建StreamingContext实例对象
		val ssc: StreamingContext = StreamingContextUtils.getStreamingContext(this.getClass, 2)
		// TODO: 设置检查点目录
		ssc.checkpoint(s"datas/spark/ckpt-${System.nanoTime()}")
		
		// 2. 从Kafka消费数据,采用New Consumer API
		val kafkaDStream: DStream[ConsumerRecord[String, String]] = StreamingContextUtils.consumerKafka(ssc, "search-log-topic")
		
		
		// TODO: 设置窗口:大小为4秒,滑动为2秒
		/*
		def window(windowDuration: Duration, slideDuration: Duration): DStream[T]
		 */
		val windowDStream: DStream[ConsumerRecord[String, String]] = kafkaDStream.window(
			Seconds(4), // 窗口大小
			Seconds(2) // 滑动大小
		)
		
		// 3. 对窗口中数据进行聚合统计
		val resultDStream: DStream[(String, Int)] = windowDStream.transform{rdd =>
			// 此处rdd就是窗口中RDD数据
			rdd
				// 获取Message信息
				.map(record => record.value())
				.filter(msg => null != msg && msg.trim.split(",").length == 4)
				// 提取搜索词,表示出现一次
				.map(msg => msg.trim.split(",").last -> 1)
				// TODO: 对当前窗口中数据进行一次聚合
				.reduceByKey(_ + _)
		}
		
		// 4. 将每批次结果数据进行输出
		resultDStream.foreachRDD((rdd, time) => {
			val format: FastDateFormat = FastDateFormat.getInstance("yyyy/MM/dd HH:mm:ss")
			println("-------------------------------------------")
			println(s"Time: ${format.format(time.milliseconds)}")
			println("-------------------------------------------")
			// 判断每批次结果RDD是否有数据,如果有数据,再进行输出
			if(!rdd.isEmpty()){
				rdd.coalesce(1).foreachPartition(iter => iter.foreach(println))
			}
		})
		
		// 启动流式应用,等待终止结束
		ssc.start()
		ssc.awaitTermination()
		ssc.stop(stopSparkContext = true, stopGracefully = true)
	}
	
}

SparkStreaming中同时提供将窗口Window设置与聚合reduceByKey合在一起的函数,为了更加方便编程。

image-20210430114518510
image-20210430114518510

修改上述代码,将聚合函数和窗口window何在一起编写:

代码语言:javascript
复制
package cn.itcast.spark.app.window

import cn.itcast.spark.app.StreamingContextUtils
import org.apache.commons.lang3.time.FastDateFormat
import org.apache.kafka.clients.consumer.ConsumerRecord
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.streaming.dstream.DStream

/**
 * 实时消费Kafka Topic数据,每隔一段时间统计最近搜索日志中搜索词次数
	 * 批处理时间间隔:BatchInterval = 2s
	 * 窗口大小间隔:WindowInterval = 4s
	 * 滑动大小间隔:SliderInterval = 2s
 */
object _07StreamingReduceWindow {
	
	def main(args: Array[String]): Unit = {
		
		// 1. 创建StreamingContext实例对象
		val ssc: StreamingContext = StreamingContextUtils.getStreamingContext(this.getClass, 2)
		// TODO: 设置检查点目录
		ssc.checkpoint(s"datas/spark/ckpt-${System.nanoTime()}")
		
		// 2. 从Kafka消费数据,采用New Consumer API
		val kafkaDStream: DStream[ConsumerRecord[String, String]] = StreamingContextUtils.consumerKafka(ssc, "search-log-topic")
		
		// 3. TODO: 对批次数据进行转换
		val etlDStream: DStream[(String, Int)] = kafkaDStream.transform{ rdd =>
			rdd
				.filter(record => null != record && record.value().trim.split(",").length == 4)
				.map{record =>
					// 获取Kafka Topic每条数据Message
					val msg: String = record.value()
					// 获取搜索关键词
					val searchWord: String = msg.trim.split(",").last
					// 返回二元组
					searchWord -> 1
				}
			
		}
		
		// TODO: 设置窗口:大小为4秒,滑动为2秒,并对窗口中数据聚合统计
		/*
		  def reduceByKeyAndWindow(
		      reduceFunc: (V, V) => V,
		      windowDuration: Duration,
		      slideDuration: Duration
		    ): DStream[(K, V)]
		 */
		val resultDStream: DStream[(String, Int)] = etlDStream.reduceByKeyAndWindow(
			(v1: Int, v2: Int) => v1 + v2, // 对窗口中数据,按照Key分组后,对Value之进行聚合操作函数
			Seconds(4), //窗口大小
			Seconds(2) // 滑动大小
		)
		
		// 4. 将每批次结果数据进行输出
		resultDStream.foreachRDD((rdd, time) => {
			val format: FastDateFormat = FastDateFormat.getInstance("yyyy/MM/dd HH:mm:ss")
			println("-------------------------------------------")
			println(s"Time: ${format.format(time.milliseconds)}")
			println("-------------------------------------------")
			// 判断每批次结果RDD是否有数据,如果有数据,再进行输出
			if(!rdd.isEmpty()){
				rdd.coalesce(1).foreachPartition(iter => iter.foreach(println))
			}
		})
		
		// 启动流式应用,等待终止结束
		ssc.start()
		ssc.awaitTermination()
		ssc.stop(stopSparkContext = true, stopGracefully = true)
	}
	
}

附录一、创建Maven模块

1)、Maven 工程结构

image-20210429065124285
image-20210429065124285

2)、POM 文件内容

​ Maven 工程POM文件中内容(依赖包):

代码语言:javascript
复制
            aliyun
            http://maven.aliyun.com/nexus/content/groups/public/
        
        
            cloudera
            https://repository.cloudera.com/artifactory/cloudera-repos/
        
        
            jboss
            http://repository.jboss.com/nexus/content/groups/public
        
    

    
        2.11.12
        2.11
        2.4.5
        2.6.0-cdh5.16.2
        1.2.0-cdh5.16.2
        2.0.0
        8.0.19
    

    

        
        
            org.scala-lang
            scala-library
            ${scala.version}
        
        
        
            org.apache.spark
            spark-core_${scala.binary.version}
            ${spark.version}
        
        
        
            org.apache.spark
            spark-sql_${scala.binary.version}
            ${spark.version}
        
        
        
            org.apache.spark
            spark-streaming_${scala.binary.version}
            ${spark.version}
        
        
        
            org.apache.spark
            spark-streaming-kafka-0-8_${scala.binary.version}
            ${spark.version}
        
        
        
            org.apache.spark
            spark-streaming-kafka-0-10_${scala.binary.version}
            ${spark.version}
        
        
        
            org.apache.hadoop
            hadoop-client
            ${hadoop.version}
        
        
        
            org.apache.hbase
            hbase-server
            ${hbase.version}
        
        
            org.apache.hbase
            hbase-hadoop2-compat
            ${hbase.version}
        
        
            org.apache.hbase
            hbase-client
            ${hbase.version}
        
        
        
            org.apache.kafka
            kafka-clients
            2.0.0
        
        
        
            org.lionsoul
            ip2region
            1.7.2
        
        
        
            mysql
            mysql-connector-java
            ${mysql.version}
        
        
            c3p0
            c3p0
            0.9.1.2
        
    

    
        target/classes
        target/test-classes
        
            
                ${project.basedir}/src/main/resources
            
        
        
        
            
                org.apache.maven.plugins
                maven-compiler-plugin
                3.0
                
                    1.8
                    1.8
                    UTF-8
                
            
            
                net.alchim31.maven
                scala-maven-plugin
                3.2.0
                
                    
                        
                            compile
                            testCompile
本文参与 腾讯云自媒体分享计划,分享自作者个人站点/博客。
原始发表:2021-11-28 ,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 作者个人站点/博客 前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体分享计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • Spark Day11:Spark Streaming
    • 01-[了解]-昨日课程内容回顾
      • 02-[了解]-今日课程内容提纲
        • 03-[理解]-流式应用技术栈
          • 04-[理解]-Kafka回顾及集成Kafka两套API
            • 05-[掌握]-New Consumer API方式集成编程
              • 06-[理解]-集成Kafka时获取消费偏移量信息
                • 07-[了解]-应用案例之业务场景和需求说明
                  • 08-[掌握]-应用案例之初始化环境和工具类
                    • 09-[掌握]-应用案例之实时数据ETL存储
                      • 10-[掌握]-应用案例之updateStateByKey 函数
                        • 11-[掌握]-应用案例之mapWithState 函数
                          • 12-[掌握]-应用案例之实时窗口统计window
                            • 附录一、创建Maven模块
                              • 1)、Maven 工程结构
                              • 2)、POM 文件内容
                          相关产品与服务
                          云数据库 Redis
                          腾讯云数据库 Redis(TencentDB for Redis)是腾讯云打造的兼容 Redis 协议的缓存和存储服务。丰富的数据结构能帮助您完成不同类型的业务场景开发。支持主从热备,提供自动容灾切换、数据备份、故障迁移、实例监控、在线扩容、数据回档等全套的数据库服务。
                          领券
                          问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档