前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >Scala写Spark笔记

Scala写Spark笔记

作者头像
CBeann
发布2023-12-25 17:15:31
1190
发布2023-12-25 17:15:31
举报
文章被收录于专栏:CBeann的博客CBeann的博客

学习感悟

(1)配置环境最费劲

(2)动手写,动手写,动手写

WordCount

代码语言:javascript
复制
package wordcount

import org.apache.spark.{SparkConf, SparkContext}

/**
  * @author CBeann
  * @create 2019-08-10 18:02
  */
object WordCount {

  def main(args: Array[String]): Unit = {
    //创建SparkConf()并且设置App名称
    val conf = new SparkConf().setMaster("local[8]").setAppName("cbeann")
    //创建SparkContext
    val sc = new SparkContext(conf);
    //使用sc创建rdd并且执行相应的tranformation和action
    val data = sc.textFile("C:\\Users\\Lenovo\\Desktop\\leetcode.txt")

    //操作
    val result = data.flatMap(_.split(" ")).map((_, 1)).reduceByKey(_ + _,1)

    //控制台打印
    result.collect().foreach(println _)

    //保存
    result.saveAsTextFile("F:\\temp\\aa")


    sc.stop()
    println("-----over-----")
  }

}

排序

代码语言:javascript
复制
第一种方式:按照某一字段排序
val result = data.sortBy(_._2, false)

第二种方式:用类继承Ordered
val result =data.sortBy(x => Boy(x._1,x._2,x._3),false)
代码语言:javascript
复制
package mysort

import org.apache.spark.{SparkConf, SparkContext}

/**
  * @author CBeann
  * @create 2019-08-10 18:26
  */
object MysortDemo {

  def main(args: Array[String]): Unit = {
    //创建SparkConf()并且设置App名称
    val conf = new SparkConf().setMaster("local[8]").setAppName("cbeann")
    //创建SparkContext
    val sc = new SparkContext(conf);
    //使用sc创建rdd并且执行相应的tranformation和action
    val data = sc.makeRDD(List(("张三", 10, 14), ("张三", 9, 9), ("张三", 13, 15)))

//    //第一种方式:按照某一字段排序
//    val result = data.sortBy(_._2, false)

    //第二种方式:用类继承Ordered
     val result =data.sortBy(x => Boy(x._1,x._2,x._3),false)



    //控制台打印
    result.collect().foreach(println _)


  }

}

case class Boy(name: String, faceVale: Int, age: Int) extends Ordered[Boy]{
  override def compare(that: Boy): Int = {
    if(this.faceVale!=that.faceVale){
      this.faceVale-that.faceVale
    }else{
      this.age-that.age
    }

  }
}

自定义分区

自定义分区器
代码语言:javascript
复制
package mypartition

import org.apache.spark.Partitioner

import scala.collection.mutable

/**
  * @author CBeann
  * @create 2019-08-10 18:36
  *         自定义分区器,继承Partitioner
  */
class MyPartitioner extends Partitioner {


  val map = new mutable.HashMap[String, Int]()
  map.put("Java", 0)
  map.put("Scala", 1)
  map.put("Go", 2)


  //一共分多少个区
  override def numPartitions: Int = map.size

  //分区的业务逻辑
  override def getPartition(key: Any): Int = {
    map.getOrElse(key.toString, 0)

  }
}
测试类
代码语言:javascript
复制
package mypartition

import org.apache.spark.{SparkConf, SparkContext}

/**
  * @author CBeann
  * @create 2019-08-10 18:59
  */
object PartitionDemo {

  def main(args: Array[String]): Unit = {
    //创建SparkConf()并且设置App名称
    val conf = new SparkConf().setMaster("local[8]").setAppName("cbeann")
    //创建SparkContext
    val sc = new SparkContext(conf);

    val data = sc.makeRDD(List(("Java", 11), ("Java", 9), ("Scala", 13), ("Go", 11)))


    val result = data.partitionBy(new MyPartitioner)

    result.saveAsTextFile("F:\\temp\\aaa")

    println("--------------OVER------------")


  }

}

SparkSQL

person.json

代码语言:javascript
复制
{  "name": "王小二",   "age": 15}
{  "name": "王小三",   "age": 25}
{  "name": "王小四",   "age": 35}

测试类

代码语言:javascript
复制
package sparksql

import org.apache.spark.sql.SparkSession

/**
  * @author CBeann
  * @create 2019-08-10 18:20
  */
object SparkSqlDemo {

  def main(args: Array[String]): Unit = {


    //创建SparkConf()并设置App名称
    val spark = SparkSession
      .builder()
      .appName("Spark SQL basic example").master("local[8]")
      //.config("spark.some.config.option", "some-value")
      .getOrCreate()

    // For implicit conversions like converting RDDs to DataFrames
    import spark.implicits._

    val df = spark.read.json("E:\\IntelliJ IDEA 2019.1.3Workspace\\ScalaSpark\\SparkDemo\\src\\main\\resources\\json\\person.json")

    // Displays the content of the DataFrame to stdout
    df.show()

    df.filter($"age" > 21).show()

    df.createOrReplaceTempView("persons")

    spark.sql("SELECT * FROM persons where age > 21").show()

    spark.stop()


    printf("-----over---------")


  }

}

SparkStream

无状态wordcount
代码语言:javascript
复制
package stream

import org.apache.spark.SparkConf
import org.apache.spark.streaming.{Seconds, StreamingContext}

/**
  * @author CBeann
  * @create 2019-08-10 18:38
  */
object StreamDemo {

  def main(args: Array[String]): Unit = {

    //需要新建了一个sparkconf变量
    val conf = new SparkConf().setAppName("SparkStreamDemo").setMaster("local[8]")
    //新建一个StreamContext入口
    val ssc = new StreamingContext(conf, Seconds(5))

    //从hostname 机器上的9999短空不断的获取数据
    val lines = ssc.socketTextStream("iZm5ea99qngm2v98asii1aZ", 9999);
    //val lines = ssc.receiverStream(new MyRecever("iZm5ea99qngm2v98asii1aZ",9999));

    //处理数据(wordcount)
    val result = lines.flatMap(_.split(" ")).map(x => (x, 1)).reduceByKey(_ + _)
    result.print()


    //启动流式处理程序
    ssc.start()
    //等待你的停止信号
    ssc.awaitTermination()


    printf("--------OVER-------------")
  }

}
有状态wordcount

updateStateByKey方法是关键,传入一个固定参数的方法

代码语言:javascript
复制
package stream.withstatus

import org.apache.spark.SparkConf
import org.apache.spark.streaming.{Seconds, StreamingContext}
import stream.MyRecever

/**
  * @author CBeann
  * @create 2019-08-10 19:24
  */
object UpdateStateByKeyTest {

  def main(args: Array[String]): Unit = {
    //需要新建了一个sparkconf变量
    val conf = new SparkConf().setAppName("SparkStreamDemo").setMaster("local[8]")
    //新建一个StreamContext入口
    val ssc = new StreamingContext(conf, Seconds(5))

    ssc.checkpoint("F:\\temp\\aaa")

    //从hostname 机器上的9999短空不断的获取数据
    val lines = ssc.socketTextStream("iZm5ea99qngm2v98asii1aZ", 9999);

    //处理数据
    val result = lines.flatMap(_.split(" ")).map(x => (x, 1)).reduceByKey(_ + _)

    //重点
    val allresult = result.updateStateByKey(updateFunction)

    allresult.print()


    //启动流式处理程序
    ssc.start()
    //等待你的停止信号
    ssc.awaitTermination()


    printf("--------OVER-------------")
  }

  //参数列表的类型是固定的,参数名称不是固定的,参数类型是固定的
  // currValues是当前批次RDD中相同的key的value集合
  //preValue是框架提供的上一次的值
  def updateFunction(currValues: Seq[Int], preValue: Option[Int]): Option[Int] = {

    //当前时间段内的数据
    val currValueSum = currValues.sum
    //当前时间段以前的数据
    val oldValueSum = preValue.getOrElse(0)
    //当前值的和加上历史值
    Some(currValueSum+oldValueSum)

  }

}
自定义接收器
代码语言:javascript
复制
package stream

import java.io.{BufferedReader, InputStreamReader}
import java.net.Socket
import java.nio.charset.StandardCharsets

import org.apache.spark.storage.StorageLevel
import org.apache.spark.streaming.receiver.Receiver

/**
  * @author CBeann
  * @create 2019-08-10 18:39
  */
class MyRecever (host: String, port: Int) extends Receiver[String](StorageLevel.MEMORY_AND_DISK_2) {

  //recever启动调用的方法
  override def onStart(): Unit = {

    new Thread() {
      override def run(): Unit = {
        receive()
      }
    }.start()


  }

  /** Create a socket connection and receive data until receiver is stopped */
  private def receive() {
    var socket: Socket = null
    var userInput: String = null
    try {
      // Connect to host:port
      socket = new Socket(host, port)

      // Until stopped or connection broken continue reading
      val reader = new BufferedReader(new InputStreamReader(socket.getInputStream(), StandardCharsets.UTF_8))

      userInput = reader.readLine()
      while (!isStopped && userInput != null) {

        // 传送出来
        store(userInput)

        userInput = reader.readLine()
      }
      reader.close()
      socket.close()

      // Restart in an attempt to connect again when server is active again
      restart("Trying to connect again")
    } catch {
      case e: java.net.ConnectException =>
        // restart if could not connect to server
        restart("Error connecting to " + host + ":" + port, e)
      case t: Throwable =>
        // restart if there is any other error
        restart("Error receiving data", t)
    }
  }

  override def onStop(): Unit = ???



}
代码语言:javascript
复制
val lines = ssc.receiverStream(new MyRecever("iZm5ea99qngm2v98asii1aZ",9999));

pom.xml

代码语言:javascript
复制
 <properties>
        <mysql.version>6.0.5</mysql.version>
        <spring.version>4.3.6.RELEASE</spring.version>
        <spring.data.jpa.version>1.11.0.RELEASE</spring.data.jpa.version>
        <log4j.version>1.2.17</log4j.version>
        <quartz.version>2.2.3</quartz.version>
        <slf4j.version>1.7.22</slf4j.version>
        <hibernate.version>5.2.6.Final</hibernate.version>
        <camel.version>2.18.2</camel.version>
        <config.version>1.10</config.version>
        <jackson.version>2.8.6</jackson.version>
        <servlet.version>3.0.1</servlet.version>
        <net.sf.json.version>2.4</net.sf.json.version>
        <activemq.version>5.14.3</activemq.version>
        <spark.version>2.1.1</spark.version>
        <scala.version>2.11.11</scala.version>
        <hadoop.version>2.7.3</hadoop.version>
    </properties>
<dependencies>
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-streaming_2.11</artifactId>
            <version>${spark.version}</version>
        </dependency>

        <dependency>
            <groupId>org.scala-lang</groupId>
            <artifactId>scala-library</artifactId>
            <version>${scala.version}</version>
        </dependency>

        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-core_2.11</artifactId>
            <version>${spark.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-sql_2.11</artifactId>
            <version>${spark.version}</version>
        </dependency>
        <!-- Logging -->
        <dependency>
            <groupId>org.slf4j</groupId>
            <artifactId>jcl-over-slf4j</artifactId>
            <version>${slf4j.version}</version>
        </dependency>
        <dependency>
            <groupId>org.slf4j</groupId>
            <artifactId>slf4j-api</artifactId>
            <version>${slf4j.version}</version>
        </dependency>
        <dependency>
            <groupId>org.slf4j</groupId>
            <artifactId>slf4j-log4j12</artifactId>
            <version>${slf4j.version}</version>
        </dependency>
        <dependency>
            <groupId>log4j</groupId>
            <artifactId>log4j</artifactId>
            <version>${log4j.version}</version>
        </dependency>
本文参与 腾讯云自媒体分享计划,分享自作者个人站点/博客。
原始发表:2022-03-07,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 作者个人站点/博客 前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体分享计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 学习感悟
  • WordCount
  • 排序
  • 自定义分区
    • 自定义分区器
      • 测试类
      • SparkSQL
      • SparkStream
        • 无状态wordcount
          • 有状态wordcount
            • 自定义接收器
            • pom.xml
            领券
            问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档