前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >Spark案例库V1.0版

Spark案例库V1.0版

作者头像
Maynor
发布2021-12-07 09:03:51
1.1K0
发布2021-12-07 09:03:51
举报

Spark案例库

案例一:使用SparkRDD实现词频统计

pom.xml文件

代码语言:javascript
复制
<repositories>
    <repository>
        <id>aliyunid>
        <url>http://maven.aliyun.com/nexus/content/groups/public/url>
    repository>
    <repository>
        <id>clouderaid>
        <url>https://repository.cloudera.com/artifactory/cloudera-repos/url>
    repository>
    <repository>
        <id>jbossid>
        <url>http://repository.jboss.com/nexus/content/groups/publicurl>
    repository>
repositories>

<properties>
    <scala.version>2.11.12scala.version>
    <scala.binary.version>2.11scala.binary.version>
    <spark.version>2.4.5spark.version>
    <hadoop.version>2.6.0-cdh5.16.2hadoop.version>
properties>

<dependencies>
    
    <dependency>
        <groupId>org.scala-langgroupId>
        <artifactId>scala-libraryartifactId>
        <version>${scala.version}version>
    dependency>
    
    <dependency>
        <groupId>org.apache.sparkgroupId>
        <artifactId>spark-core_${scala.binary.version}artifactId>
        <version>${spark.version}version>
    dependency>
    
    <dependency>
        <groupId>org.apache.hadoopgroupId>
        <artifactId>hadoop-clientartifactId>
        <version>${hadoop.version}version>
    dependency>
dependencies>

<build>
    <outputDirectory>target/classesoutputDirectory>
    <testOutputDirectory>target/test-classestestOutputDirectory>
    <resources>
        <resource>
            <directory>${project.basedir}/src/main/resourcesdirectory>
        resource>
    resources>
    
    <plugins>
        <plugin>
            <groupId>org.apache.maven.pluginsgroupId>
            <artifactId>maven-compiler-pluginartifactId>
            <version>3.0version>
            <configuration>
                <source>1.8source>
                <target>1.8target>
                <encoding>UTF-8encoding>
            configuration>
        plugin>
        <plugin>
            <groupId>net.alchim31.mavengroupId>
            <artifactId>scala-maven-pluginartifactId>
            <version>3.2.0version>
            <executions>
                <execution>
                    <goals>
                        <goal>compilegoal>
                        <goal>testCompilegoal>
                    goals>
                execution>
            executions>
        plugin>
    plugins>
build>

实现代码

代码语言:javascript
复制
object SparkWordCount {
	
	def main(args: Array[String]): Unit = {
		// TODO: 创建SparkContext实例对象,首先构建SparkConf实例,设置应用基本信息
		val sc: SparkContext = {
			// 其一、构建SparkConf对象,设置应用名称和master
			val sparkConf: SparkConf = new SparkConf()
    			.setAppName("SparkWordCount")
    			.setMaster("local[2]")
			// 其二、创建SparkContext实例,传递sparkConf对象
			new SparkContext(sparkConf)
		}
		
		// TODO: 第一步、读取文件数据,sc.textFile方法,将数据封装到RDD中
		val inputRDD: RDD[String] = sc.textFile("/datas/wordcount.data")
		
		// TODO: 第二步、调用RDD中高阶函数,进行处理转换处理,函数:flapMap、map和reduceByKey
		val resultRDD: RDD[(String, Int)] = inputRDD
			// 按照分隔符分割单词
			.flatMap(line => line.split("\\s+"))
			// 转换单词为二元组,表示每个单词出现一次
			.map(word => word -> 1)
			// 按照单词分组,对组内执进行聚合reduce操作,求和
			.reduceByKey((tmp, item) => tmp + item)
		// TODO: 第三步、将最终处理结果打印控制台
		resultRDD.foreach(tuple => println(tuple))
		// 应用结束,关闭资源
		sc.stop()
	}
}

案例二:WordCount程序,按照词频降序排序取Top3

pom.xml

代码语言:javascript
复制
<repositories>
    <repository>
        <id>aliyunid>
        <url>http://maven.aliyun.com/nexus/content/groups/public/url>
    repository>
    <repository>
        <id>clouderaid>
        <url>https://repository.cloudera.com/artifactory/cloudera-repos/url>
    repository>
    <repository>
        <id>jbossid>
        <url>http://repository.jboss.com/nexus/content/groups/publicurl>
    repository>
repositories>

<properties>
    <scala.version>2.11.12scala.version>
    <scala.binary.version>2.11scala.binary.version>
    <spark.version>2.4.5spark.version>
    <hadoop.version>2.6.0-cdh5.16.2hadoop.version>
properties>

<dependencies>
    
    <dependency>
        <groupId>org.scala-langgroupId>
        <artifactId>scala-libraryartifactId>
        <version>${scala.version}version>
    dependency>
    
    <dependency>
        <groupId>org.apache.sparkgroupId>
        <artifactId>spark-core_${scala.binary.version}artifactId>
        <version>${spark.version}version>
    dependency>
    
    <dependency>
        <groupId>org.apache.hadoopgroupId>
        <artifactId>hadoop-clientartifactId>
        <version>${hadoop.version}version>
    dependency>
dependencies>

<build>
    <outputDirectory>target/classesoutputDirectory>
    <testOutputDirectory>target/test-classestestOutputDirectory>
    <resources>
        <resource>
            <directory>${project.basedir}/src/main/resourcesdirectory>
        resource>
    resources>
    
    <plugins>
        <plugin>
            <groupId>org.apache.maven.pluginsgroupId>
            <artifactId>maven-compiler-pluginartifactId>
            <version>3.0version>
            <configuration>
                <source>1.8source>
                <target>1.8target>
                <encoding>UTF-8encoding>
            configuration>
        plugin>
        <plugin>
            <groupId>net.alchim31.mavengroupId>
            <artifactId>scala-maven-pluginartifactId>
            <version>3.2.0version>
            <executions>
                <execution>
                    <goals>
                        <goal>compilegoal>
                        <goal>testCompilegoal>
                    goals>
                execution>
            executions>
        plugin>
    plugins>
build>

代码实现

代码语言:javascript
复制
object SparkTopKey {
	
	def main(args: Array[String]): Unit = {
		// TODO: 创建SparkContext实例对象,首先构建SparkConf实例,设置应用基本信息
		val sc: SparkContext = {
			// 其一、构建SparkConf对象,设置应用名称和master
			val sparkConf: SparkConf = new SparkConf()
    			.setAppName("SparkWordCount")
    			.setMaster("local[2]")
			// 其二、创建SparkContext实例,传递sparkConf对象
			new SparkContext(sparkConf)
		}
		// TODO: 第一步、读取文件数据,sc.textFile方法,将数据封装到RDD中
		val inputRDD: RDD[String] = sc.textFile("/datas/wordcount.data")
		// TODO: 第二步、调用RDD中高阶函数,进行处理转换处理,函数:flapMap、map和reduceByKey
		val resultRDD: RDD[(String, Int)] = inputRDD
			// 按照分隔符分割单词
			.flatMap(line => line.split("\\s+"))
			// 转换单词为二元组,表示每个单词出现一次
			.map(word => word -> 1)
			// 按照单词分组,对组内执进行聚合reduce操作,求和
			.reduceByKey((tmp, item) => tmp + item)
		resultRDD
			.sortBy(tuple => tuple._2, ascending = false)
			// 打印结果
			.take(3)
			.foreach(tuple => println(tuple))
		// 应用结束,关闭资源
		sc.stop()
	}
	
}

案例三:采用并行化的方式构建集合Seq中的数据为RDD,进行词频统计

pom.xml

代码语言:javascript
复制
<repositories>
    <repository>
        <id>aliyunid>
        <url>http://maven.aliyun.com/nexus/content/groups/public/url>
    repository>
    <repository>
        <id>clouderaid>
        <url>https://repository.cloudera.com/artifactory/cloudera-repos/url>
    repository>
    <repository>
        <id>jbossid>
        <url>http://repository.jboss.com/nexus/content/groups/publicurl>
    repository>
repositories>

<properties>
    <scala.version>2.11.12scala.version>
    <scala.binary.version>2.11scala.binary.version>
    <spark.version>2.4.5spark.version>
    <hadoop.version>2.6.0-cdh5.16.2hadoop.version>
properties>

<dependencies>
    
    <dependency>
        <groupId>org.scala-langgroupId>
        <artifactId>scala-libraryartifactId>
        <version>${scala.version}version>
    dependency>
    
    <dependency>
        <groupId>org.apache.sparkgroupId>
        <artifactId>spark-core_${scala.binary.version}artifactId>
        <version>${spark.version}version>
    dependency>
    
    <dependency>
        <groupId>org.apache.hadoopgroupId>
        <artifactId>hadoop-clientartifactId>
        <version>${hadoop.version}version>
    dependency>
dependencies>

<build>
    <outputDirectory>target/classesoutputDirectory>
    <testOutputDirectory>target/test-classestestOutputDirectory>
    <resources>
        <resource>
            <directory>${project.basedir}/src/main/resourcesdirectory>
        resource>
    resources>
    
    <plugins>
        <plugin>
            <groupId>org.apache.maven.pluginsgroupId>
            <artifactId>maven-compiler-pluginartifactId>
            <version>3.0version>
            <configuration>
                <source>1.8source>
                <target>1.8target>
                <encoding>UTF-8encoding>
            configuration>
        plugin>
        <plugin>
            <groupId>net.alchim31.mavengroupId>
            <artifactId>scala-maven-pluginartifactId>
            <version>3.2.0version>
            <executions>
                <execution>
                    <goals>
                        <goal>compilegoal>
                        <goal>testCompilegoal>
                    goals>
                execution>
            executions>
        plugin>
    plugins>
build>

代码实现

代码语言:javascript
复制
object _01SparkParallelizeTest {
	
	def main(args: Array[String]): Unit = {
		
		val sc: SparkContext = {
			// sparkConf对象
			val sparkConf = new SparkConf()
				// _01SparkParallelizeTest$  ->(.stripSuffix("$"))   ->  _01SparkParallelizeTest
    			.setAppName(this.getClass.getSimpleName.stripSuffix("$"))
    			.setMaster("local[2]")
			// sc 实例对象
			SparkContext.getOrCreate(sparkConf)
		}
		// TODO: 1、Scala中集合Seq序列存储数据
		val linesSeq: Seq[String] = Seq(
			"hadoop scala hive spark scala sql sql", 
			"hadoop scala spark hdfs hive spark", 
			"spark hdfs spark hdfs scala hive spark"
		)
		// TODO: 2、并行化集合
		val inputRDD: RDD[String] = sc.parallelize(linesSeq, numSlices = 2)
		// TODO: 3、词频统计
		val resultRDD = inputRDD
			.flatMap(line => line.split("\\s+"))
			.map(word => (word, 1))
			.reduceByKey((tmp, item) => tmp + item)
		// TODO: 4、输出结果
		resultRDD.foreach(println)
		// 应用结束,关闭资源
		sc.stop()
	}
}

案例四:采用wholeTextFiles()方法读取小文件

pom.xml

代码语言:javascript
复制
<repositories>
    <repository>
        <id>aliyunid>
        <url>http://maven.aliyun.com/nexus/content/groups/public/url>
    repository>
    <repository>
        <id>clouderaid>
        <url>https://repository.cloudera.com/artifactory/cloudera-repos/url>
    repository>
    <repository>
        <id>jbossid>
        <url>http://repository.jboss.com/nexus/content/groups/publicurl>
    repository>
repositories>

<properties>
    <scala.version>2.11.12scala.version>
    <scala.binary.version>2.11scala.binary.version>
    <spark.version>2.4.5spark.version>
    <hadoop.version>2.6.0-cdh5.16.2hadoop.version>
properties>

<dependencies>
    
    <dependency>
        <groupId>org.scala-langgroupId>
        <artifactId>scala-libraryartifactId>
        <version>${scala.version}version>
    dependency>
    
    <dependency>
        <groupId>org.apache.sparkgroupId>
        <artifactId>spark-core_${scala.binary.version}artifactId>
        <version>${spark.version}version>
    dependency>
    
    <dependency>
        <groupId>org.apache.hadoopgroupId>
        <artifactId>hadoop-clientartifactId>
        <version>${hadoop.version}version>
    dependency>
dependencies>

<build>
    <outputDirectory>target/classesoutputDirectory>
    <testOutputDirectory>target/test-classestestOutputDirectory>
    <resources>
        <resource>
            <directory>${project.basedir}/src/main/resourcesdirectory>
        resource>
    resources>
    
    <plugins>
        <plugin>
            <groupId>org.apache.maven.pluginsgroupId>
            <artifactId>maven-compiler-pluginartifactId>
            <version>3.0version>
            <configuration>
                <source>1.8source>
                <target>1.8target>
                <encoding>UTF-8encoding>
            configuration>
        plugin>
        <plugin>
            <groupId>net.alchim31.mavengroupId>
            <artifactId>scala-maven-pluginartifactId>
            <version>3.2.0version>
            <executions>
                <execution>
                    <goals>
                        <goal>compilegoal>
                        <goal>testCompilegoal>
                    goals>
                execution>
            executions>
        plugin>
    plugins>
build>

代码实现

代码语言:javascript
复制
object _02SparkWholeTextFileTest {
	
	def main(args: Array[String]): Unit = {
		val sc: SparkContext = {
			// sparkConf对象
			val sparkConf = new SparkConf()
				.setAppName(this.getClass.getSimpleName.stripSuffix("$"))
				.setMaster("local[2]")
			// sc 实例对象
			SparkContext.getOrCreate(sparkConf)
		}
		
		/*
		  def wholeTextFiles(
		      path: String,
		      minPartitions: Int = defaultMinPartitions
		  ): RDD[(String, String)]
		  Key: 每个小文件名称路径
		  Value:每个小文件的内容
		 */
		val inputRDD: RDD[(String, String)] = sc.wholeTextFiles("datas/ratings100", minPartitions = 2)
		
		println(s"RDD 分区数目 = ${inputRDD.getNumPartitions}")
		
		inputRDD.take(2).foreach(tuple => println(tuple))
		
		// 应用结束,关闭资源
		sc.stop()
		
	}
}	

案例五:RDD中缓存函数,将数据缓存到内存或磁盘、释放缓存

pom.xml

代码语言:javascript
复制
<repositories>
    <repository>
        <id>aliyunid>
        <url>http://maven.aliyun.com/nexus/content/groups/public/url>
    repository>
    <repository>
        <id>clouderaid>
        <url>https://repository.cloudera.com/artifactory/cloudera-repos/url>
    repository>
    <repository>
        <id>jbossid>
        <url>http://repository.jboss.com/nexus/content/groups/publicurl>
    repository>
repositories>

<properties>
    <scala.version>2.11.12scala.version>
    <scala.binary.version>2.11scala.binary.version>
    <spark.version>2.4.5spark.version>
    <hadoop.version>2.6.0-cdh5.16.2hadoop.version>
properties>

<dependencies>
    
    <dependency>
        <groupId>org.scala-langgroupId>
        <artifactId>scala-libraryartifactId>
        <version>${scala.version}version>
    dependency>
    
    <dependency>
        <groupId>org.apache.sparkgroupId>
        <artifactId>spark-core_${scala.binary.version}artifactId>
        <version>${spark.version}version>
    dependency>
    
    <dependency>
        <groupId>org.apache.hadoopgroupId>
        <artifactId>hadoop-clientartifactId>
        <version>${hadoop.version}version>
    dependency>
dependencies>

<build>
    <outputDirectory>target/classesoutputDirectory>
    <testOutputDirectory>target/test-classestestOutputDirectory>
    <resources>
        <resource>
            <directory>${project.basedir}/src/main/resourcesdirectory>
        resource>
    resources>
    
    <plugins>
        <plugin>
            <groupId>org.apache.maven.pluginsgroupId>
            <artifactId>maven-compiler-pluginartifactId>
            <version>3.0version>
            <configuration>
                <source>1.8source>
                <target>1.8target>
                <encoding>UTF-8encoding>
            configuration>
        plugin>
        <plugin>
            <groupId>net.alchim31.mavengroupId>
            <artifactId>scala-maven-pluginartifactId>
            <version>3.2.0version>
            <executions>
                <execution>
                    <goals>
                        <goal>compilegoal>
                        <goal>testCompilegoal>
                    goals>
                execution>
            executions>
        plugin>
    plugins>
build>

代码实现

代码语言:javascript
复制
object _05SparkCacheTest {
	
	def main(args: Array[String]): Unit = {
		// 创建应用程序入口SparkContext实例对象
		val sc: SparkContext = {
			// 1.a 创建SparkConf对象,设置应用的配置信息
			val sparkConf: SparkConf = new SparkConf()
				.setAppName(this.getClass.getSimpleName.stripSuffix("$"))
				.setMaster("local[2]")
			// 1.b 传递SparkConf对象,构建Context实例
			new SparkContext(sparkConf)
		}
		// 读取文本文件数据
		val inputRDD: RDD[String] = sc.textFile("datas/wordcount.data", minPartitions = 2)
		// 缓存数据: 将数据缓存至内存
		inputRDD.persist()
		// 使用Action函数触发缓存
		inputRDD.count()
		// 释放缓存
		inputRDD.unpersist()
		//缓存数据:选择缓存级别
		inputRDD.persist(StorageLevel.MEMORY_AND_DISK)
		// 应用程序运行结束,关闭资源
		sc.stop()
	}
}

案例六:RDD数据Checkpoint设置案例

pom.xml

代码语言:javascript
复制
<repositories>
    <repository>
        <id>aliyunid>
        <url>http://maven.aliyun.com/nexus/content/groups/public/url>
    repository>
    <repository>
        <id>clouderaid>
        <url>https://repository.cloudera.com/artifactory/cloudera-repos/url>
    repository>
    <repository>
        <id>jbossid>
        <url>http://repository.jboss.com/nexus/content/groups/publicurl>
    repository>
repositories>

<properties>
    <scala.version>2.11.12scala.version>
    <scala.binary.version>2.11scala.binary.version>
    <spark.version>2.4.5spark.version>
    <hadoop.version>2.6.0-cdh5.16.2hadoop.version>
properties>

<dependencies>
    
    <dependency>
        <groupId>org.scala-langgroupId>
        <artifactId>scala-libraryartifactId>
        <version>${scala.version}version>
    dependency>
    
    <dependency>
        <groupId>org.apache.sparkgroupId>
        <artifactId>spark-core_${scala.binary.version}artifactId>
        <version>${spark.version}version>
    dependency>
    
    <dependency>
        <groupId>org.apache.hadoopgroupId>
        <artifactId>hadoop-clientartifactId>
        <version>${hadoop.version}version>
    dependency>
dependencies>

<build>
    <outputDirectory>target/classesoutputDirectory>
    <testOutputDirectory>target/test-classestestOutputDirectory>
    <resources>
        <resource>
            <directory>${project.basedir}/src/main/resourcesdirectory>
        resource>
    resources>
    
    <plugins>
        <plugin>
            <groupId>org.apache.maven.pluginsgroupId>
            <artifactId>maven-compiler-pluginartifactId>
            <version>3.0version>
            <configuration>
                <source>1.8source>
                <target>1.8target>
                <encoding>UTF-8encoding>
            configuration>
        plugin>
        <plugin>
            <groupId>net.alchim31.mavengroupId>
            <artifactId>scala-maven-pluginartifactId>
            <version>3.2.0version>
            <executions>
                <execution>
                    <goals>
                        <goal>compilegoal>
                        <goal>testCompilegoal>
                    goals>
                execution>
            executions>
        plugin>
    plugins>
build>

代码实现

代码语言:javascript
复制
object _06SparkCkptTest {
	
	def main(args: Array[String]): Unit = {
		// 创建应用程序入口SparkContext实例对象
		val sc: SparkContext = {
			// 1.a 创建SparkConf对象,设置应用的配置信息
			val sparkConf: SparkConf = new SparkConf()
				.setAppName(this.getClass.getSimpleName.stripSuffix("$"))
				.setMaster("local[2]")
			// 1.b 传递SparkConf对象,构建Context实例
			new SparkContext(sparkConf)
		}
		
		// TODO: 设置检查点目录,将RDD数据保存到那个目录
		sc.setCheckpointDir("datas/ckpt/")
		
		// 读取文件数据
		val datasRDD = sc.textFile("datas/wordcount.data")
		
		// TODO: 调用checkpoint函数,将RDD进行备份,需要RDD中Action函数触发
		datasRDD.checkpoint()
		datasRDD.count()

		// TODO: 再次执行count函数, 此时从checkpoint读取数据
		println(datasRDD.count())

		// 应用程序运行结束,关闭资源
		sc.stop()
	}
}

案例七:广播变量和累加器案例

基于Spark框架使用Scala语言编程实现词频统计WordCount程序,将符号数据过滤,并统计出现的次数

-a. 过滤标点符号数据 使用广播变量 -b. 统计出标点符号数据出现次数 使用累加器

代码实现

代码语言:javascript
复制
object _05SparkSharedVariableTest {
	
	def main(args: Array[String]): Unit = {
		// 1. 在Spark 应用程序中,入口为:SparkContext,必须创建实例对象,加载数据和调度程序执行
		val sc: SparkContext = {
			// 创建SparkConf对象,设置应用相关信息,比如名称和master
			val sparkConf: SparkConf = new SparkConf()
				.setAppName(this.getClass.getSimpleName.stripSuffix("$"))
				.setMaster("local[2]")
			// 构建SparkContext实例对象,传递SparkConf
			new SparkContext(sparkConf)
		}
		// 2. 第一步、从LocalFS读取文件数据,sc.textFile方法,将数据封装到RDD中
		val inputRDD: RDD[String] = sc.textFile("datas/filter/datas.input", minPartitions = 2)
		
		// TODO: 字典数据,只要有这些单词就过滤: 特殊字符存储列表List中
		val list: List[String] = List(",", ".", "!", "#", "$", "%")
		// TODO: 将字典数据进行广播变量
		val broadcastList: Broadcast[List[String]] = sc.broadcast(list)
		
		// TODO: 定义计数器
		val accumulator: LongAccumulator = sc.longAccumulator("number_accu")
		
		// 3. 第二步、调用RDD中高阶函数,进行处理转换处理,函数:flapMap、map和reduceByKey
		val resultRDD: RDD[(String, Int)] = inputRDD
			// 过滤空行数据
			.filter(line => null != line && line.trim.length > 0)
			// 分割为单词
			.flatMap(line => line.trim.split("\\s+"))
			// TODO: 过滤非单词字符
            .filter{word =>
			    // 获取广播变量的值
	            val wordsList: List[String] = broadcastList.value
	            // 判断每个单词是否时非单词字符
	            val flag: Boolean = wordsList.contains(word)
	            if(flag){
		            // 如果是非单词字符,累加器加1
		            accumulator.add(1L)
	            }
	            // 返回
	            ! flag
		    }
			// 按照单词分组,进行聚合操作
            .map(word => (word, 1))
            .reduceByKey(_ + _)
		// 4. 第三步、将最终处理结果RDD保存到HDFS或打印控制台
		resultRDD.foreach(println)
		// 可以累加器的值,必须使用RDD Action函数进行触发
		println("Accumulator: " + accumulator.value)
		// 5. 当应用运行结束以后,关闭资源
		sc.stop()
	}
	
}

案例八:将RDD数据保存至MySQL表中一般模式

代码语言:javascript
复制
		a. 对结果数据降低分区数目
		b. 针对每个分区数据进行操作
			每个分区数据插入数据库时,创建一个连接Connection

pom.xml

代码语言:javascript
复制
<repositories>
    <repository>
        <id>aliyunid>
        <url>http://maven.aliyun.com/nexus/content/groups/public/url>
    repository>
    <repository>
        <id>clouderaid>
        <url>https://repository.cloudera.com/artifactory/cloudera-repos/url>
    repository>
    <repository>
        <id>jbossid>
        <url>http://repository.jboss.com/nexus/content/groups/publicurl>
    repository>
repositories>

<properties>
    <scala.version>2.11.12scala.version>
    <scala.binary.version>2.11scala.binary.version>
    <spark.version>2.4.5spark.version>
    <hadoop.version>2.6.0-cdh5.16.2hadoop.version>
    <hbase.version>1.2.0-cdh5.16.2hbase.version>
    <mysql.version>8.0.19mysql.version>
properties>

<dependencies>
    
    <dependency>
        <groupId>org.scala-langgroupId>
        <artifactId>scala-libraryartifactId>
        <version>${scala.version}version>
    dependency>
    
    <dependency>
        <groupId>org.apache.sparkgroupId>
        <artifactId>spark-core_${scala.binary.version}artifactId>
        <version>${spark.version}version>
    dependency>
    
    <dependency>
        <groupId>org.apache.hadoopgroupId>
        <artifactId>hadoop-clientartifactId>
        <version>${hadoop.version}version>
    dependency>

    
    <dependency>
        <groupId>org.apache.hbasegroupId>
        <artifactId>hbase-serverartifactId>
        <version>${hbase.version}version>
    dependency>
    <dependency>
        <groupId>org.apache.hbasegroupId>
        <artifactId>hbase-hadoop2-compatartifactId>
        <version>${hbase.version}version>
    dependency>
    <dependency>
        <groupId>org.apache.hbasegroupId>
        <artifactId>hbase-clientartifactId>
        <version>${hbase.version}version>
    dependency>

    
    <dependency>
        <groupId>mysqlgroupId>
        <artifactId>mysql-connector-javaartifactId>
        <version>${mysql.version}version>
    dependency>

    
    <dependency>
        <groupId>com.hankcsgroupId>
        <artifactId>hanlpartifactId>
        <version>portable-1.7.7version>
    dependency>

dependencies>

<build>
    <outputDirectory>target/classesoutputDirectory>
    <testOutputDirectory>target/test-classestestOutputDirectory>
    <resources>
        <resource>
            <directory>${project.basedir}/src/main/resourcesdirectory>
        resource>
    resources>
    
    <plugins>
        <plugin>
            <groupId>org.apache.maven.pluginsgroupId>
            <artifactId>maven-compiler-pluginartifactId>
            <version>3.0version>
            <configuration>
                <source>1.8source>
                <target>1.8target>
                <encoding>UTF-8encoding>
            configuration>
        plugin>
        <plugin>
            <groupId>net.alchim31.mavengroupId>
            <artifactId>scala-maven-pluginartifactId>
            <version>3.2.0version>
            <executions>
                <execution>
                    <goals>
                        <goal>compilegoal>
                        <goal>testCompilegoal>
                    goals>
                execution>
            executions>
        plugin>
    plugins>
build>

代码实现:

代码语言:javascript
复制
object _04SparkWriteMySQL {
	
	def main(args: Array[String]): Unit = {
		// 1. 在Spark 应用程序中,入口为:SparkContext,必须创建实例对象,加载数据和调度程序执行
		val sc: SparkContext = {
			// 创建SparkConf对象,设置应用相关信息,比如名称和master
			val sparkConf: SparkConf = new SparkConf()
				.setAppName(this.getClass.getSimpleName.stripSuffix("$"))
				.setMaster("local[2]")
			// 构建SparkContext实例对象,传递SparkConf
			new SparkContext(sparkConf)
		}
		
		// 2. 第一步、从LocalFS读取文件数据,sc.textFile方法,将数据封装到RDD中
		val inputRDD: RDD[String] = sc.textFile("datas/wordcount.data")
		
		// 3. 第二步、调用RDD中高阶函数,进行处理转换处理,函数:flapMap、map和reduceByKey
		val resultRDD: RDD[(String, Int)] = inputRDD
			// TODO: 过滤
			.filter(line => null != line && line.trim.length > 0 )
			// a. 对每行数据按照分割符分割
			.flatMap(line => line.trim.split("\\s+"))
			// b. 将每个单词转换为二元组,表示出现一次
			.map(word => (word ,1))
			.reduceByKey((temp, item) => temp + item)
		// TODO: 将结果数据resultRDD保存至MySQL表中
		resultRDD
			// 降低RDD分区数目
			.coalesce(1)
			.foreachPartition{iter =>
				// val xx: Iterator[(String, Int)] = iter
				// 直接调用保存分区数据到MySQL表的方法
				saveToMySQL(iter)
			}
		// 5. 当应用运行结束以后,关闭资源
		sc.stop()
	}
	/**
	 * 定义一个方法,将RDD中分区数据保存至MySQL表
	 */
	def saveToMySQL(iter: Iterator[(String, Int)]): Unit = {
		// step1. 加载驱动类
		Class.forName("com.mysql.cj.jdbc.Driver")
		
		// 声明变量
		var conn: Connection = null
		var pstmt: PreparedStatement = null
		
		try{
			// step2. 创建连接
			conn = DriverManager.getConnection(
				"jdbc:mysql://localhost:3306/?serverTimezone=UTC&characterEncoding=utf8&useUnicode=true",
				"root",
				"123456"
			)
			pstmt = conn.prepareStatement("INSERT INTO db_test.tb_wordcount (word, count) VALUES(?, ?)")
			// step3. 插入数据
			iter.foreach{case (word, count) =>
				pstmt.setString(1, word)
				pstmt.setInt(2, count)
				pstmt.execute()
			}
		}catch {
			case e: Exception => e.printStackTrace()
		}finally {
			// step4. 关闭连接
			if(null != pstmt) pstmt.close()
			if(null != conn) conn.close()
		}
	}
}

案例九:将RDD数据保存至MySQL表中高级模式

要求:a. 对结果数据降低分区数目 b. 针对每个分区数据进行操作 每个分区数据插入数据库时,创建一个连接Connection c. 批次插入每个分区数据 addBatch executeBatch d. 事务性 手动提交事务,并且还原原来事务 e. 考虑主键存在时,如何保存数据数据 存在,更新数据;不存在,插入数据

pom.xml

代码语言:javascript
复制
<repositories>
    <repository>
        <id>aliyunid>
        <url>http://maven.aliyun.com/nexus/content/groups/public/url>
    repository>
    <repository>
        <id>clouderaid>
        <url>https://repository.cloudera.com/artifactory/cloudera-repos/url>
    repository>
    <repository>
        <id>jbossid>
        <url>http://repository.jboss.com/nexus/content/groups/publicurl>
    repository>
repositories>

<properties>
    <scala.version>2.11.12scala.version>
    <scala.binary.version>2.11scala.binary.version>
    <spark.version>2.4.5spark.version>
    <hadoop.version>2.6.0-cdh5.16.2hadoop.version>
    <hbase.version>1.2.0-cdh5.16.2hbase.version>
    <mysql.version>8.0.19mysql.version>
properties>

<dependencies>
    
    <dependency>
        <groupId>org.scala-langgroupId>
        <artifactId>scala-libraryartifactId>
        <version>${scala.version}version>
    dependency>
    
    <dependency>
        <groupId>org.apache.sparkgroupId>
        <artifactId>spark-core_${scala.binary.version}artifactId>
        <version>${spark.version}version>
    dependency>
    
    <dependency>
        <groupId>org.apache.hadoopgroupId>
        <artifactId>hadoop-clientartifactId>
        <version>${hadoop.version}version>
    dependency>

    
    <dependency>
        <groupId>org.apache.hbasegroupId>
        <artifactId>hbase-serverartifactId>
        <version>${hbase.version}version>
    dependency>
    <dependency>
        <groupId>org.apache.hbasegroupId>
        <artifactId>hbase-hadoop2-compatartifactId>
        <version>${hbase.version}version>
    dependency>
    <dependency>
        <groupId>org.apache.hbasegroupId>
        <artifactId>hbase-clientartifactId>
        <version>${hbase.version}version>
    dependency>

    
    <dependency>
        <groupId>mysqlgroupId>
        <artifactId>mysql-connector-javaartifactId>
        <version>${mysql.version}version>
    dependency>

    
    <dependency>
        <groupId>com.hankcsgroupId>
        <artifactId>hanlpartifactId>
        <version>portable-1.7.7version>
    dependency>

dependencies>

<build>
    <outputDirectory>target/classesoutputDirectory>
    <testOutputDirectory>target/test-classestestOutputDirectory>
    <resources>
        <resource>
            <directory>${project.basedir}/src/main/resourcesdirectory>
        resource>
    resources>
    
    <plugins>
        <plugin>
            <groupId>org.apache.maven.pluginsgroupId>
            <artifactId>maven-compiler-pluginartifactId>
            <version>3.0version>
            <configuration>
                <source>1.8source>
                <target>1.8target>
                <encoding>UTF-8encoding>
            configuration>
        plugin>
        <plugin>
            <groupId>net.alchim31.mavengroupId>
            <artifactId>scala-maven-pluginartifactId>
            <version>3.2.0version>
            <executions>
                <execution>
                    <goals>
                        <goal>compilegoal>
                        <goal>testCompilegoal>
                    goals>
                execution>
            executions>
        plugin>
    plugins>
build>

代码实现:

代码语言:javascript
复制
object _04SparkWriteMySQLV3 {
	
	def main(args: Array[String]): Unit = {
		// 1. 在Spark 应用程序中,入口为:SparkContext,必须创建实例对象,加载数据和调度程序执行
		val sc: SparkContext = {
			// 创建SparkConf对象,设置应用相关信息,比如名称和master
			val sparkConf: SparkConf = new SparkConf()
				.setAppName(this.getClass.getSimpleName.stripSuffix("$"))
				.setMaster("local[2]")
			// 构建SparkContext实例对象,传递SparkConf
			new SparkContext(sparkConf)
		}
		
		// 2. 第一步、从LocalFS读取文件数据,sc.textFile方法,将数据封装到RDD中
		val inputRDD: RDD[String] = sc.textFile("datas/wordcount.data")
		
		// 3. 第二步、调用RDD中高阶函数,进行处理转换处理,函数:flapMap、map和reduceByKey
		val resultRDD: RDD[(String, Int)] = inputRDD
			// TODO: 过滤
			.filter(line => null != line && line.trim.length > 0 )
			// a. 对每行数据按照分割符分割
			.flatMap(line => line.trim.split("\\s+"))
			// b. 将每个单词转换为二元组,表示出现一次
			.map(word => (word ,1))
			.reduceByKey((temp, item) => temp + item)
		
		// TODO: 将结果数据resultRDD保存至MySQL表中
		resultRDD.coalesce(1).foreachPartition(saveToMySQL)
		// 4. 当应用运行结束以后,关闭资源
		sc.stop()
	}
	
	/**
	 * 定义一个方法,将RDD中分区数据保存至MySQL表
	 */
	def saveToMySQL(iter: Iterator[(String, Int)]): Unit = {
		// step1. 加载驱动类
		Class.forName("com.mysql.cj.jdbc.Driver")
		
		// 声明变量
		var conn: Connection = null
		var pstmt: PreparedStatement = null
		
		try{
			// step2. 创建连接
			conn = DriverManager.getConnection(
				"jdbc:mysql://localhost:3306/?serverTimezone=UTC&characterEncoding=utf8&useUnicode=true",
				"root",
				"123456"
			)
			pstmt = conn.prepareStatement("replace INTO db_test.tb_wordcount (word, count) VALUES(?, ?)")
			
			// TODO: 考虑事务性,一个分区数据要全部保存,要不都不保存
			val autoCommit: Boolean = conn.getAutoCommit // 获取数据库默认事务提交方式
			conn.setAutoCommit(false)
			// step3. 插入数据
			iter.foreach{case (word, count) =>
				pstmt.setString(1, word)
				pstmt.setInt(2, count)
				// TODO: 加入一个批次中
				pstmt.addBatch()
			}
			// TODO:批量执行批次
			pstmt.executeBatch()
			conn.commit() // 手动提交事务,进行批量插入
			// 还原数据库原来事务
			conn.setAutoCommit(autoCommit)
		}catch {
			case e: Exception => e.printStackTrace()
		}finally {
			// step4. 关闭连接
			if(null != pstmt) pstmt.close()
			if(null != conn) conn.close()
		}
	}
	
}

案例十:从HBase 表中读取数据,封装到RDD数据集

pom.xml

代码语言:javascript
复制
<repositories>
    <repository>
        <id>aliyunid>
        <url>http://maven.aliyun.com/nexus/content/groups/public/url>
    repository>
    <repository>
        <id>clouderaid>
        <url>https://repository.cloudera.com/artifactory/cloudera-repos/url>
    repository>
    <repository>
        <id>jbossid>
        <url>http://repository.jboss.com/nexus/content/groups/publicurl>
    repository>
repositories>

<properties>
    <scala.version>2.11.12scala.version>
    <scala.binary.version>2.11scala.binary.version>
    <spark.version>2.4.5spark.version>
    <hadoop.version>2.6.0-cdh5.16.2hadoop.version>
    <hbase.version>1.2.0-cdh5.16.2hbase.version>
    <mysql.version>8.0.19mysql.version>
properties>

<dependencies>
    
    <dependency>
        <groupId>org.scala-langgroupId>
        <artifactId>scala-libraryartifactId>
        <version>${scala.version}version>
    dependency>
    
    <dependency>
        <groupId>org.apache.sparkgroupId>
        <artifactId>spark-core_${scala.binary.version}artifactId>
        <version>${spark.version}version>
    dependency>
    
    <dependency>
        <groupId>org.apache.hadoopgroupId>
        <artifactId>hadoop-clientartifactId>
        <version>${hadoop.version}version>
    dependency>

    
    <dependency>
        <groupId>org.apache.hbasegroupId>
        <artifactId>hbase-serverartifactId>
        <version>${hbase.version}version>
    dependency>
    <dependency>
        <groupId>org.apache.hbasegroupId>
        <artifactId>hbase-hadoop2-compatartifactId>
        <version>${hbase.version}version>
    dependency>
    <dependency>
        <groupId>org.apache.hbasegroupId>
        <artifactId>hbase-clientartifactId>
        <version>${hbase.version}version>
    dependency>

    
    <dependency>
        <groupId>mysqlgroupId>
        <artifactId>mysql-connector-javaartifactId>
        <version>${mysql.version}version>
    dependency>

    
    <dependency>
        <groupId>com.hankcsgroupId>
        <artifactId>hanlpartifactId>
        <version>portable-1.7.7version>
    dependency>

dependencies>

<build>
    <outputDirectory>target/classesoutputDirectory>
    <testOutputDirectory>target/test-classestestOutputDirectory>
    <resources>
        <resource>
            <directory>${project.basedir}/src/main/resourcesdirectory>
        resource>
    resources>
    
    <plugins>
        <plugin>
            <groupId>org.apache.maven.pluginsgroupId>
            <artifactId>maven-compiler-pluginartifactId>
            <version>3.0version>
            <configuration>
                <source>1.8source>
                <target>1.8target>
                <encoding>UTF-8encoding>
            configuration>
        plugin>
        <plugin>
            <groupId>net.alchim31.mavengroupId>
            <artifactId>scala-maven-pluginartifactId>
            <version>3.2.0version>
            <executions>
                <execution>
                    <goals>
                        <goal>compilegoal>
                        <goal>testCompilegoal>
                    goals>
                execution>
            executions>
        plugin>
    plugins>
build>

代码实现:

代码语言:javascript
复制
object _03SparkReadHBase {
	
	def main(args: Array[String]): Unit = {
		// 1. 在Spark 应用程序中,入口为:SparkContext,必须创建实例对象,加载数据和调度程序执行
		val sc: SparkContext = {
			// 创建SparkConf对象,设置应用相关信息,比如名称和master
			val sparkConf: SparkConf = new SparkConf()
				.setAppName(this.getClass.getSimpleName.stripSuffix("$"))
				.setMaster("local[2]")
				// TODO: 设置使用Kryo 序列化方式
				.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
				// TODO: 注册序列化的数据类型
				.registerKryoClasses(Array(classOf[ImmutableBytesWritable], classOf[Result]))
			// 构建SparkContext实例对象,传递SparkConf
			new SparkContext(sparkConf)
		}
		
		// TODO: 从HBase表读取数据,调用RDD方法:newAPIHadoopRDD
		val conf: Configuration = HBaseConfiguration.create()
		// 设置连接Zookeeper属性
		conf.set("hbase.zookeeper.quorum", "node1")
		conf.set("hbase.zookeeper.property.clientPort", "2181")
		conf.set("zookeeper.znode.parent", "/hbase")
		// 设置将数据保存的HBase表的名称
		conf.set(TableInputFormat.INPUT_TABLE, "htb_wordcount")
		val hbaseRDD: RDD[(ImmutableBytesWritable, Result)] = sc.newAPIHadoopRDD(
			conf,
			classOf[TableInputFormat],
			classOf[ImmutableBytesWritable],
			classOf[Result]
		)
		// 打印HBase表样本数据
		hbaseRDD
			.take(6)
			.foreach{case (rowKey, result) =>
				result.rawCells().foreach{cell =>
					println(s"RowKey = ${Bytes.toString(result.getRow)}")
					println(s"\t${Bytes.toString(CellUtil.cloneFamily(cell))}:" +
						s"${Bytes.toString(CellUtil.cloneQualifier(cell))} = " +
						s"${Bytes.toString(CellUtil.cloneValue(cell))}")
				}
			}
		// 5. 当应用运行结束以后,关闭资源
		sc.stop()
	}
	
}

案例十一:将RDD数据保存至HBase表中

pom.xml

代码语言:javascript
复制
<repositories>
    <repository>
        <id>aliyunid>
        <url>http://maven.aliyun.com/nexus/content/groups/public/url>
    repository>
    <repository>
        <id>clouderaid>
        <url>https://repository.cloudera.com/artifactory/cloudera-repos/url>
    repository>
    <repository>
        <id>jbossid>
        <url>http://repository.jboss.com/nexus/content/groups/publicurl>
    repository>
repositories>

<properties>
    <scala.version>2.11.12scala.version>
    <scala.binary.version>2.11scala.binary.version>
    <spark.version>2.4.5spark.version>
    <hadoop.version>2.6.0-cdh5.16.2hadoop.version>
    <hbase.version>1.2.0-cdh5.16.2hbase.version>
    <mysql.version>8.0.19mysql.version>
properties>

<dependencies>
    
    <dependency>
        <groupId>org.scala-langgroupId>
        <artifactId>scala-libraryartifactId>
        <version>${scala.version}version>
    dependency>
    
    <dependency>
        <groupId>org.apache.sparkgroupId>
        <artifactId>spark-core_${scala.binary.version}artifactId>
        <version>${spark.version}version>
    dependency>
    
    <dependency>
        <groupId>org.apache.hadoopgroupId>
        <artifactId>hadoop-clientartifactId>
        <version>${hadoop.version}version>
    dependency>

    
    <dependency>
        <groupId>org.apache.hbasegroupId>
        <artifactId>hbase-serverartifactId>
        <version>${hbase.version}version>
    dependency>
    <dependency>
        <groupId>org.apache.hbasegroupId>
        <artifactId>hbase-hadoop2-compatartifactId>
        <version>${hbase.version}version>
    dependency>
    <dependency>
        <groupId>org.apache.hbasegroupId>
        <artifactId>hbase-clientartifactId>
        <version>${hbase.version}version>
    dependency>

    
    <dependency>
        <groupId>mysqlgroupId>
        <artifactId>mysql-connector-javaartifactId>
        <version>${mysql.version}version>
    dependency>

    
    <dependency>
        <groupId>com.hankcsgroupId>
        <artifactId>hanlpartifactId>
        <version>portable-1.7.7version>
    dependency>

dependencies>

<build>
    <outputDirectory>target/classesoutputDirectory>
    <testOutputDirectory>target/test-classestestOutputDirectory>
    <resources>
        <resource>
            <directory>${project.basedir}/src/main/resourcesdirectory>
        resource>
    resources>
    
    <plugins>
        <plugin>
            <groupId>org.apache.maven.pluginsgroupId>
            <artifactId>maven-compiler-pluginartifactId>
            <version>3.0version>
            <configuration>
                <source>1.8source>
                <target>1.8target>
                <encoding>UTF-8encoding>
            configuration>
        plugin>
        <plugin>
            <groupId>net.alchim31.mavengroupId>
            <artifactId>scala-maven-pluginartifactId>
            <version>3.2.0version>
            <executions>
                <execution>
                    <goals>
                        <goal>compilegoal>
                        <goal>testCompilegoal>
                    goals>
                execution>
            executions>
        plugin>
    plugins>
build>

代码实现:

代码语言:javascript
复制
object _02SparkWriteHBase {
	
	def main(args: Array[String]): Unit = {
		// 1. 在Spark 应用程序中,入口为:SparkContext,必须创建实例对象,加载数据和调度程序执行
		val sc: SparkContext = {
			// 创建SparkConf对象,设置应用相关信息,比如名称和master
			val sparkConf: SparkConf = new SparkConf()
				.setAppName(this.getClass.getSimpleName.stripSuffix("$"))
				.setMaster("local[2]")
			// 构建SparkContext实例对象,传递SparkConf
			new SparkContext(sparkConf)
		}
		// 2. 第一步、从LocalFS读取文件数据,sc.textFile方法,将数据封装到RDD中
		val inputRDD: RDD[String] = sc.textFile("datas/wordcount.data")
		
		// 3. 第二步、调用RDD中高阶函数,进行处理转换处理,函数:flapMap、map和reduceByKey
		val resultRDD: RDD[(String, Int)] = inputRDD
			// 过滤
			.filter(line => null != line && line.trim.length > 0 )
			// a. 对每行数据按照分割符分割
			.flatMap(line => line.trim.split("\\s+"))
			// b. 将每个单词转换为二元组,表示出现一次
			.map(word => (word ,1))
			.reduceByKey((temp, item) => temp + item)
		// TODO: step 1. 转换RDD为RDD[(RowKey, Put)]
		/*
			* HBase表的设计:
				* 表的名称:htb_wordcount
				* Rowkey: word
				* 列簇: info
				* 字段名称: count
			create 'htb_wordcount', 'info'
		 */
		val putsRDD: RDD[(ImmutableBytesWritable, Put)] = resultRDD.map{case (word, count) =>
			// 其一、构建RowKey对象
			val rowKey: ImmutableBytesWritable = new ImmutableBytesWritable(Bytes.toBytes(word))
			// 其二、构建Put对象
			val put: Put = new Put(rowKey.get())
			// 设置字段的值
			put.addColumn(Bytes.toBytes("info"), Bytes.toBytes("count"), Bytes.toBytes(count + ""))
			// 其三、返回二元组(RowKey, Put)
			rowKey -> put
		}
		
		// TODO: step2. 调用RDD中saveAsNewAPIHadoopFile保存数据
		val conf: Configuration = HBaseConfiguration.create()
		// 设置连接Zookeeper属性
		conf.set("hbase.zookeeper.quorum", "node1")
		conf.set("hbase.zookeeper.property.clientPort", "2181")
		conf.set("zookeeper.znode.parent", "/hbase")
		// 设置将数据保存的HBase表的名称
		conf.set(TableOutputFormat.OUTPUT_TABLE, "htb_wordcount")
		putsRDD.saveAsNewAPIHadoopFile(
			"datas/hbase/htb_wordcount/",
			classOf[ImmutableBytesWritable],
			classOf[Put],
			classOf[TableOutputFormat[ImmutableBytesWritable]],
			conf
		)
		// 5. 当应用运行结束以后,关闭资源
		sc.stop()
	}
}
本文参与 腾讯云自媒体分享计划,分享自作者个人站点/博客。
原始发表:2021-05-24 ,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 作者个人站点/博客 前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体分享计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • Spark案例库
    • 案例一:使用SparkRDD实现词频统计
      • 案例二:WordCount程序,按照词频降序排序取Top3
        • 案例三:采用并行化的方式构建集合Seq中的数据为RDD,进行词频统计
          • 案例四:采用wholeTextFiles()方法读取小文件
            • 案例五:RDD中缓存函数,将数据缓存到内存或磁盘、释放缓存
              • 案例六:RDD数据Checkpoint设置案例
                • 案例七:广播变量和累加器案例
                  • 案例八:将RDD数据保存至MySQL表中一般模式
                    • 案例九:将RDD数据保存至MySQL表中高级模式
                      • 案例十:从HBase 表中读取数据,封装到RDD数据集
                        • 案例十一:将RDD数据保存至HBase表中
                        相关产品与服务
                        云数据库 MySQL
                        腾讯云数据库 MySQL(TencentDB for MySQL)为用户提供安全可靠,性能卓越、易于维护的企业级云数据库服务。其具备6大企业级特性,包括企业级定制内核、企业级高可用、企业级高可靠、企业级安全、企业级扩展以及企业级智能运维。通过使用腾讯云数据库 MySQL,可实现分钟级别的数据库部署、弹性扩展以及全自动化的运维管理,不仅经济实惠,而且稳定可靠,易于运维。
                        领券
                        问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档