利用Spark通过nginx日志离线统计网站每日pv

摘 要

本文将介绍通过Apache Spark实现离线统计网站每日pv的思路及代码。

前言

在此之前,利用mapreduce实现了一版通过nginx日志离线分析网站每日pv,感兴趣的可以去看一下。本文实现思路与之前mapreduce的思路一致。可以很好的比较mapreduce和Spark的写法。在个人看来,Spark写起来更加优美简洁,有一种四两拨千斤的感觉。

想了解实现思路的,可以看一下利用Mapreduce实现的文章,详细思路已经阐述。

点击查看->利用HadoopMareduce实现pv统计分析

本文与Hadoop Mapreduce采用的数据集为同一个,为标准的nginx日志文件。在上文中已经提供了下载附件。感兴趣的可以去下载。

代码实现

package com.itunic.rdd

import java.text.SimpleDateFormat
import java.util.{Date, Locale}

import org.apache.spark.{SparkConf, SparkContext}

import scala.collection.mutable

/**
  * Created by c on 2017/1/11.
  * 通过nginx日志统计每日pv,并按照日期和pv排序
  * by me:
  * 我本沉默是关注互联网以及分享IT相关工作经验的博客,
  * 主要涵盖了操作系统运维、计算机编程、项目开发以及系统架构等经验。
  * 博客宗旨:把最实用的经验,分享给最需要的你,
  * 希望每一位来访的朋友都能有所收获!
  *
  */
object NginxLogPV {

 /**
    * 设置需要统计的页面
    */
  val pages = new mutable.HashSet[String]()
  pages += ".php"

 /**
    * 封装KPI实体类
    *
    * @param line
    * @return KPI
    */
  def parser(line: String): KPI = {
 //
    val fields = line.split(" ")
    val remote_addr = fields(0)
    val time_local = fields(3).substring(1)
    val request = fields(6)
    val status = fields(8)
    var valid = true
 if (fields.length <= 11) {
      valid = false
    } else {
      valid = if (status.toInt >= 400) false else true
    }

    val url = if (request.indexOf("?") != -1) request.substring(0, request.indexOf("?")) else request
    KPI(remote_addr, time_local, url, status, valid)
  }

 /**
    * 过滤无效数据
    *
    * @param line
    * @return
    */
  def filterPVs(line: String): KPI = {
    val kpi: KPI = parser(line)

 /**
      * 过滤需要统计的URL
      */
    kpi.valid = false
 for (page <- pages) {
 if (kpi.request != null) {
 if (kpi.request.contains(page)) {
          kpi.valid = true
        }
      }
    }
 return kpi;
  }

 /**
    * 将nginx日志时间转换为常规日期
    *
    * @param time_local
    * @return
    */
  def getTime_local_Date(time_local: String): Date = {
    val df = new SimpleDateFormat("dd/MMM/yyyy:HH:mm:ss", Locale.US)
    df.parse(time_local)
  }

 /**
    * 日期格式化
    *
    * @param time_local
    * @return
    */
  def getTime_local_day(time_local: String): String = {
    val df = new SimpleDateFormat("yyyy-MM-dd");
    df.format(getTime_local_Date(time_local));
  }

  def main(args: Array[String]): Unit = {
 // StreamingExamples.setStreamingLogLevels()
    val conf = new SparkConf().setAppName("NginxLogPV").setMaster("local")
    val sc = new SparkContext(conf)
    val rdd = sc.textFile("F:\\test\\input\\access.log").map(x => {
 /**
        * 封装并过滤数据
        */
      filterPVs(x)
    }).filter(x => {
 /**
        * 过滤有效数据
        */
      x.valid
    }).map(x => {
 /**
        * 封装 key-value数据
        */
      ((getTime_local_day(x.time_local), x.request), 1)
    }).reduceByKey(_ + _) //聚合
 /**
      * 二次排序
      */
    val rdd6 = rdd.sortBy(x => PVSort(x._1._1, x._2))

 /**
      * 格式化数据并输出到磁盘
      */
    rdd6.map(x => {
      x._1._1 + "\t" + x._1._2 + "\t" + x._2
    }).saveAsTextFile("F:\\test\\input\\wc231")
 // println(rdd5.collect().toBuffer)

    sc.stop()
  }

}

/**
  * 自定义排序,日期升序,点击量降序
  *
  * @param date
  * @param count
  */
case class PVSort(date: String, count: Int) extends Ordered[PVSort] with Serializable {
  override def compare(that: PVSort): Int = {
    val i = this.date.compareTo(that.date)
 if (i == 0) {
 return -this.count.compareTo(that.count)
    } else {
 return i
    }
  }
}

/**
  * kpi样例类
  *
  * @param remote_addr
  * @param time_local
  * @param request
  * @param status
  * @param valid
  */
case class KPI(
                remote_addr: String, //来访ip
                time_local: String, //来访时间
                request: String, //受访页面
                status: String, //状态
                var valid: Boolean = true //判断是否合法
              ) extends Serializable

nginx 日志示例

50.116.27.194 - - [18/Sep/2013:07:11:29 +0000] "POST /wp-cron.php?doing_wp_cron=1379488288.8893849849700927734375 HTTP/1.0" 200 0 "-" "WordPress/3.6; http://itunic.com"

统计结果示例

2013-09-18  /wp-admin/admin-ajax.php    200
2013-09-18  /wp-cron.php    73
2013-09-18  /batch.manage.php   21
2013-09-18  /index.php  10
2013-09-18  /tag/waitoutputthreads/index.php    10
2013-09-19  /wp-admin/admin-ajax.php    120
2013-09-19  /wp-cron.php    24
2013-09-19  /index.php  13
2013-09-19  /register.php   9
2013-09-19  /wp-admin/post.php  4
2013-09-19  /wp-admin/async-upload.php  3

本文参与腾讯云自媒体分享计划,欢迎正在阅读的你也加入,一起分享。

发表于

我来说两句

0 条评论
登录 后参与评论

相关文章

来自专栏Kevin-ZhangCG

[ Java面试题 ]框架篇二

1034
来自专栏java初学

java — 设计模式

41112
来自专栏java工会

Java Web技术经验总结

1375
来自专栏菩提树下的杨过

puremvc框架之Command

在前一篇 puremvc框架之hello world! 里,已经对这个框架有了一个大概的认识,不过在消息的处理上,有一个不太适合的地方: 为了完成响应消息,Te...

1787
来自专栏nnngu

02 整合IDEA+Maven+SSM框架的高并发的商品秒杀项目之Service层

项目源代码:https://github.com/nnngu/nguSeckill ---- 首先在编写Service层代码前,我们应该首先要知道这一层到底是...

6469
来自专栏程序猿DD

Spring Cloud实战小贴士:Zuul统一异常处理(三)【Dalston版】

本篇作为《Spring Cloud微服务实战》一书关于Spring Cloud Zuul网关在Dalston版本对异常处理的补充。没有看过本书的读书也不要紧,可...

2129
来自专栏木东居士的专栏

漫谈并发编程:Future模型(Java、Clojure、Scala多语言角度分析)

2213
来自专栏KK的小酒馆

Android设计模式一

模式定义 定义一个操作中的算法的骨架(稳定),而将一些步骤延迟(变化)到子类中。Template Method使子类可以不改变(复用)一个算法的结构即可重定义...

952
来自专栏KK的小酒馆

Android设计模式二

在组件构建过程中,某些接口之间直接的依赖常常会带来很多问题,甚至根本无法实现。采用添加一层间接(稳定)接口,来隔离本来互相紧密关联的接口是一种常见的解决方案。

702
来自专栏developerHaoz 的安卓之旅

Android Volley 源码解析(一),网络请求的执行流程

花了好几天,重新研究了 Volley 的源码实现,比起之前又有了一番新的体会,啃源码真的是一件让人纠结的事情,阅读优秀的源码,特别是难度相对较大的源码,一旦陷入...

1014

扫码关注云+社区