前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >2021年大数据Spark(二十一):Spark Core案例-SogouQ日志分析

2021年大数据Spark(二十一):Spark Core案例-SogouQ日志分析

作者头像
Lansonli
发布2021-10-09 16:58:59
1.8K1
发布2021-10-09 16:58:59
举报
文章被收录于专栏:Lansonli技术博客Lansonli技术博客

案例-SogouQ日志分析

使用搜狗实验室提供【用户查询日志(SogouQ)】数据,使用Spark框架,将数据封装到RDD中进行业务数据处理分析。数据网址:http://www.sogou.com/labs/resource/q.php

 1)、数据介绍:搜索引擎查询日志库设计为包括约1个月(2008年6月)Sogou搜索引擎部分网页查询需求及用户点击情况的网页查询日志数据集合。

 2)、数据格式

访问时间\t用户ID\t[查询词]\t该URL在返回结果中的排名\t用户点击的顺序号\t用户点击的URL

用户ID是根据用户使用浏览器访问搜索引擎时的Cookie信息自动赋值,即同一次使用浏览器输入的不同查询对应同一个用户ID

 3)、数据下载:分为三个数据集,大小不一样

迷你版(样例数据, 376KB):http://download.labs.sogou.com/dl/sogoulabdown/SogouQ/SogouQ.mini.zip

精简版(1天数据,63MB):http://download.labs.sogou.com/dl/sogoulabdown/SogouQ/SogouQ.reduced.zip

完整版(1.9GB):http://www.sogou.com/labs/resource/ftp.php?dir=/Data/SogouQ/SogouQ.zip

业务需求

针对SougoQ用户查询日志数据中不同字段,不同业务进行统计分析:

使用SparkContext读取日志数据,封装到RDD数据集中,调用Transformation函数和Action函数处理分析,灵活掌握Scala语言编程。

准备工作

     在编程实现业务功能之前,首先考虑如何对【查询词】进行中文分词及将日志数据解析封装。

HanLP 中文分词

    使用比较流行好用中文分词:HanLP,面向生产环境的自然语言处理工具包,HanLP 是由一系列模型与算法组成的 Java 工具包,目标是普及自然语言处理在生产环境中的应用。

官方网站:http://www.hanlp.com/,添加Maven依赖

代码语言:javascript
复制
<dependency>

<groupId>com.hankcs</groupId>

<artifactId>hanlp</artifactId>

<version>portable-1.7.7</version>

</dependency>

演示范例:HanLP 入门案例,基本使用

代码语言:javascript
复制
package cn.itcast.core

import java.util

import com.hankcs.hanlp.HanLP
import com.hankcs.hanlp.seg.common.Term
import com.hankcs.hanlp.tokenizer.StandardTokenizer

import scala.collection.JavaConverters._

/**
 * HanLP 入门案例,基本使用
 */
object HanLPTest {
    def main(args: Array[String]): Unit = {
        // 入门Demo
        val terms: util.List[Term] = HanLP.segment("杰克奥特曼全集视频")
        println(terms)
        println(terms.asScala.map(_.word.trim))
        
        // 标准分词
        val terms1: util.List[Term] = StandardTokenizer.segment("放假++端午++重阳")
        println(terms1)
        println(terms1.asScala.map(_.word.replaceAll("\\s+", "")))
        
        val words: Array[String] =
            """00:00:00 2982199073774412    [360安全卫士]   8 3 download.it.com.cn/softweb/software/firewall/antivirus/20067/17938.html"""
            .split("\\s+")
        println(words(2).replaceAll("\\[|\\]", ""))//将"["和"]"替换为空""
    }
    
}

​​​​​​​样例类 SogouRecord

将每行日志数据封装到CaseClass样例类SogouRecord中,方便后续处理:

代码语言:javascript
复制
/**
 * 用户搜索点击网页记录Record
 * @param queryTime  访问时间,格式为:HH:mm:ss
 * @param userId     用户ID
 * @param queryWords 查询词
 * @param resultRank 该URL在返回结果中的排名
 * @param clickRank  用户点击的顺序号
 * @param clickUrl   用户点击的URL
 */
case class SogouRecord(
                        queryTime: String, 
                        userId: String, 
                        queryWords: String, 
                        resultRank: Int, 
                        clickRank: Int, 
                        clickUrl: String 
                      )

​​​​​​​业务实现

先读取数据,封装到SougoRecord类中,再按照业务处理数据。

最后也可以将分析的结果存储到MySQL表中。

​​​​​​​读取数据

     构建SparkContext实例对象,读取本次SogouQ.sample数据,封装到SougoRecord中 。

代码语言:javascript
复制
object SogouQueryAnalysis {
  def main(args: Array[String]): Unit = {
    val sparkConf: SparkConf = new SparkConf()
      .setAppName(this.getClass.getSimpleName.stripSuffix("$"))
      .setMaster("local[*]")
    val sc: SparkContext = new SparkContext(sparkConf)
    sc.setLogLevel("WARN")

    // TODO: 1. 本地读取SogouQ用户查询日志数据
    val rawLogsRDD: RDD[String] = sc.textFile("data/input/SogouQ.sample")
    //println(s"Count = ${rawLogsRDD.count()}")

    // TODO: 2. 解析数据,封装到CaseClass样例类中
    val recordsRDD: RDD[SogouRecord] = rawLogsRDD
      // 过滤不合法数据,如null,分割后长度不等于6
      .filter(log => log != null && log.trim.split("\\s+").length == 6)
      // 对每个分区中数据进行解析,封装到SogouRecord
      .mapPartitions(iter => {
        iter.map(log => {
          val arr: Array[String] = log.trim.split("\\s+")
          SogouRecord(
            arr(0),
            arr(1),
            arr(2).replaceAll("\\[|\\]", ""),
            arr(3).toInt,
            arr(4).toInt,
            arr(5)
          )
        })
      })
    println(s"Count = ${recordsRDD.count()},\nFirst = ${recordsRDD.first()}")

    // 数据使用多次,进行缓存操作,使用count触发
    recordsRDD.persist(StorageLevel.MEMORY_AND_DISK).count()

​​​​​​​搜索关键词统计

获取用户【查询词】,使用HanLP进行分词,按照单词分组聚合统计出现次数,类似WordCount程序,具体代码如下:

代码语言:javascript
复制
// =================== 3.1 搜索关键词统计 ===================
// a. 获取搜索词,进行中文分词
val wordsRDD: RDD[String] = recordsRDD.mapPartitions(iter => {
  iter.flatMap(record => {
    // 使用HanLP中文分词库进行分词
    val terms: util.List[Term] = HanLP.segment(record.queryWords.trim)
    // 将Java中集合对转换为Scala中集合对象
    import scala.collection.JavaConverters._
    terms.asScala.map(_.word)
  })
})
println(s"Count = ${wordsRDD.count()}, Example = ${wordsRDD.take(5).mkString(",")}")

// b. 统计搜索词出现次数,获取次数最多Top10
val top10SearchWords: Array[(Int, String)] = wordsRDD
  .map((_, 1)) // 每个单词出现一次
  .reduceByKey(_ + _) // 分组统计次数
  .map(_.swap)
  .sortByKey(ascending = false) // 词频降序排序
  .take(10) // 获取前10个搜索词
top10SearchWords.foreach(println)

运行结果如下:

​​​​​​​用户搜索点击统计

统计出每个用户每个搜索词点击网页的次数,可以作为搜索引擎搜索效果评价指标。先按照用户ID分组,再按照【查询词】分组,最后统计次数,求取最大次数、最小次数及平均次数。

代码语言:javascript
复制
// =================== 3.2 用户搜索点击次数统计 ===================
/*
    每个用户在搜索引擎输入关键词以后,统计点击网页数目,反应搜索引擎准确度
    先按照用户ID分组,再按照搜索词分组,统计出每个用户每个搜索词点击网页个数
 */
val clickCountRDD: RDD[((String, String), Int)] = recordsRDD
  .map(record => {
    // 获取用户ID和搜索词
    val key = (record.userId, record.queryWords)
    (key, 1)
  })
  // 按照用户ID和搜索词组合的Key分组聚合
  .reduceByKey(_ + _)

clickCountRDD
  .sortBy(_._2, ascending = false)
  .take(10).foreach(println)

println(s"Max Click Count = ${clickCountRDD.map(_._2).max()}")
println(s"Min Click Count = ${clickCountRDD.map(_._2).min()}")
println(s"Avg Click Count = ${clickCountRDD.map(_._2).mean()}")

程序运行结果如下:

​​​​​​​搜索时间段统计

按照【访问时间】字段获取【小时:分钟】,分组统计各个小时段用户查询搜索的数量,进一步观察用户喜欢在哪些时间段上网,使用搜狗引擎搜索,代码如下:

代码语言:javascript
复制
// =================== 3.3 搜索时间段统计 ===================
/*
    从搜索时间字段获取小时,统计个小时搜索次数
 */
val hourSearchRDD: RDD[(String, Int)] = recordsRDD
  // 提取小时和分钟
 .map(record => {
   // 03:12:50
   record.queryTime.substring(0, 5)
 })

  // 分组聚合
  .map((_, 1)) // 每个单词出现一次
  .reduceByKey(_ + _) // 分组统计次数
  .sortBy(_._2, ascending = false)
hourSearchRDD.foreach(println)

  程序运行结果如下:

​​​​​​​完整代码

业务实现完整代码SogouQueryAnalysis如下所示:

代码语言:javascript
复制
package cn.itcast.core

import java.util

import com.hankcs.hanlp.HanLP
import com.hankcs.hanlp.seg.common.Term
import org.apache.spark.rdd.RDD
import org.apache.spark.storage.StorageLevel
import org.apache.spark.{SparkConf, SparkContext}

/**
 * 用户查询日志(SogouQ)分析,数据来源Sogou搜索引擎部分网页查询需求及用户点击情况的网页查询日志数据集合。
 *     1. 搜索关键词统计,使用HanLP中文分词
 *     2. 用户搜索次数统计
 *     3. 搜索时间段统计
 * 数据格式:
 * 访问时间\t用户ID\t[查询词]\t该URL在返回结果中的排名\t用户点击的顺序号\t用户点击的URL
 * 其中,用户ID是根据用户使用浏览器访问搜索引擎时的Cookie信息自动赋值,即同一次使用浏览器输入的不同查询对应同一个用户ID
 */
object SogouQueryAnalysis {
  def main(args: Array[String]): Unit = {
    val sparkConf: SparkConf = new SparkConf()
      .setAppName(this.getClass.getSimpleName.stripSuffix("$"))
      .setMaster("local[*]")
    val sc: SparkContext = new SparkContext(sparkConf)
    sc.setLogLevel("WARN")

    // TODO: 1. 本地读取SogouQ用户查询日志数据
    val rawLogsRDD: RDD[String] = sc.textFile("data/input/SogouQ.sample")
    //val rawLogsRDD: RDD[String] = sc.textFile("D:/data/sogou/SogouQ.reduced")
    //println(s"Count = ${rawLogsRDD.count()}")

    // TODO: 2. 解析数据,封装到CaseClass样例类中
    val recordsRDD: RDD[SogouRecord] = rawLogsRDD
      // 过滤不合法数据,如null,分割后长度不等于6
      .filter(log => log != null && log.trim.split("\\s+").length == 6)
      // 对每个分区中数据进行解析,封装到SogouRecord
      .mapPartitions(iter => {
        iter.map(log => {
          val arr: Array[String] = log.trim.split("\\s+")
          SogouRecord(
            arr(0),
            arr(1),
            arr(2).replaceAll("\\[|\\]", ""),
            arr(3).toInt,
            arr(4).toInt,
            arr(5)
          )
        })
      })
    println("====解析数据===")
    println(s"Count = ${recordsRDD.count()},\nFirst = ${recordsRDD.first()}")

    // 数据使用多次,进行缓存操作,使用count触发
    recordsRDD.persist(StorageLevel.MEMORY_AND_DISK).count()

    // TODO: 3. 依据需求统计分析
    /*
        1. 搜索关键词统计,使用HanLP中文分词
        2. 用户搜索次数统计
        3. 搜索时间段统计
     */
    println("====3.1 搜索关键词统计===")
    // =================== 3.1 搜索关键词统计 ===================
    // a. 获取搜索词,进行中文分词
    val wordsRDD: RDD[String] = recordsRDD.mapPartitions(iter => {
      iter.flatMap(record => {
        // 使用HanLP中文分词库进行分词
        val terms: util.List[Term] = HanLP.segment(record.queryWords.trim)
        // 将Java中集合对转换为Scala中集合对象
        import scala.collection.JavaConverters._
        terms.asScala.map(_.word)
      })
    })
    //println(s"Count = ${wordsRDD.count()}, Example = ${wordsRDD.take(5).mkString(",")}")

    // b. 统计搜索词出现次数,获取次数最多Top10
    val top10SearchWords: Array[(Int, String)] = wordsRDD
      .map((_, 1)) // 每个单词出现一次
      .reduceByKey(_ + _) // 分组统计次数
      .map(_.swap)
      .sortByKey(ascending = false) // 词频降序排序
      .take(10) // 获取前10个搜索词
    top10SearchWords.foreach(println)

    println("====3.2 用户搜索点击次数统计===")
    // =================== 3.2 用户搜索点击次数统计 ===================
    /*
        每个用户在搜索引擎输入关键词以后,统计点击网页数目,反应搜索引擎准确度
        先按照用户ID分组,再按照搜索词分组,统计出每个用户每个搜索词点击网页个数
     */
    val clickCountRDD: RDD[((String, String), Int)] = recordsRDD
      .map(record => {
        // 获取用户ID和搜索词
        val key = (record.userId, record.queryWords)
        (key, 1)
      })
      // 按照用户ID和搜索词组合的Key分组聚合
      .reduceByKey(_ + _)

    clickCountRDD
      .sortBy(_._2, ascending = false)
      .take(10).foreach(println)

    println(s"Max Click Count = ${clickCountRDD.map(_._2).max()}")
    println(s"Min Click Count = ${clickCountRDD.map(_._2).min()}")
    println(s"Avg Click Count = ${clickCountRDD.map(_._2).mean()}")

    println("====3.3 搜索时间段统计===")
    // =================== 3.3 搜索时间段统计 ===================
    /*
        从搜索时间字段获取小时,统计个小时搜索次数
     */
    val hourSearchRDD: RDD[(String, Int)] = recordsRDD
      // 提取小时和分钟
      .map(record => {
        // 03:12:50
        record.queryTime.substring(0, 5)
      })
      // 分组聚合
      .map((_, 1)) // 每个单词出现一次
      .reduceByKey(_ + _) // 分组统计次数
      .sortBy(_._2, ascending = false)
    hourSearchRDD.foreach(println)

    // 释放缓存数据
    recordsRDD.unpersist()

    // 应用结束,关闭资源
    sc.stop()
  }

  /**
   * 用户搜索点击网页记录Record
   *
   * @param queryTime  访问时间,格式为:HH:mm:ss
   * @param userId     用户ID
   * @param queryWords 查询词
   * @param resultRank 该URL在返回结果中的排名
   * @param clickRank  用户点击的顺序号
   * @param clickUrl   用户点击的URL
   */
  case class SogouRecord(
                          queryTime: String,
                          userId: String,
                          queryWords: String,
                          resultRank: Int,
                          clickRank: Int,
                          clickUrl: String
                        )

}

​​​​​​​

本文参与 腾讯云自媒体同步曝光计划,分享自作者个人站点/博客。
原始发表:2021-04-14 ,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 作者个人站点/博客 前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 案例-SogouQ日志分析
    • 业务需求
      • 准备工作
        • HanLP 中文分词
        • ​​​​​​​样例类 SogouRecord
      • ​​​​​​​业务实现
        • ​​​​​​​搜索关键词统计
        • ​​​​​​​用户搜索点击统计
        • ​​​​​​​搜索时间段统计
        • ​​​​​​​完整代码
    相关产品与服务
    NLP 服务
    NLP 服务(Natural Language Process,NLP)深度整合了腾讯内部的 NLP 技术,提供多项智能文本处理和文本生成能力,包括词法分析、相似词召回、词相似度、句子相似度、文本润色、句子纠错、文本补全、句子生成等。满足各行业的文本智能需求。
    领券
    问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档