利用Hadoop Mapreduce实现pv统计分析

摘 要

本文将介绍通过Hadoop Mapreduce实现离线统计网站每日pv的思路及代码。

前言

利用网站的kpi数据来分析出网站潜在的价值,那么了解网站的PV、UV、IP的状况,是一项必不可少的任务。本文将介绍通过Hadoop Mapreduce实现离线统计网站每日pv的思路及代码。

什么是PV

pv是指页面的浏览量或点击量(Page View),用户每访问一次或刷新一下即被计算一次。

需求

对网站以往的访问数据进行日pv、月PV、年PV统计。

技术选型

对于访问量大的网站来说,普通程序计算实现成本非常大。我们可以利用Hadoop来实现分布式计算,将固有的数据量分散到多台机器进行计算,无疑加快了计算速度,也降低了宕机的风险。

实现思路

在map阶段将数据清洗,并进行排序、分组。在reduce阶段完成统计。非常简单。

下面具体看实现代码:

KPI.java 用于封装日志信息及排序

package com.itunic.mr.kpi;

import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import java.text.ParseException;
import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.HashSet;
import java.util.Locale;
import java.util.Set;

import org.apache.hadoop.io.WritableComparable;
/**
 * 封装一个实体类,将nginx的日志分解成有用的信息。
 * @author itunic
 *
 */
public class KPI implements WritableComparable<KPI> {
 private String remote_addr;// 来访ip
 private String remote_user;// 来访用户名称,忽略属性“-”
 private String time_local;// 记录时间与时区
 private String request;// 访问页面
 private String status;// 返回状态
 private String body_bytes_sent;// 返回客户端内容主体大小
 private String http_referer;// 来访页面
 private String http_user_agent;// 客户浏览器的相关信息

 private boolean valid = true;// 检验数据是否合法
 //设置需要统计的页面类型
 static Set<String> pages = null;
 static {
        pages = new HashSet<String>();
        pages.add(".php");
 /*pages.add(".jsp");
        pages.add(".png");*/
    }

 /**
     * 将传过来的数据封装成bean
     * 
     * @param line
     * @return
     */
 private static KPI parser(String line) {
        KPI kpi = new KPI();
        String[] arr = line.split(" ");
 if (arr.length > 11) {
            kpi.setRemote_addr(arr[0]);
            kpi.setRemote_user(arr[1]);
            kpi.setTime_local(arr[3].substring(1));
            String url = arr[6].indexOf("?") != -1 ? arr[6].substring(0, arr[6].indexOf("?")) : arr[6];
            kpi.setRequest(url);
            kpi.setStatus(arr[8]);
            kpi.setBody_bytes_sent(arr[9]);
            kpi.setHttp_referer(arr[10]);

 if (arr.length > 12) {
                kpi.setHttp_user_agent(arr[11] + " " + arr[12]);
            } else {
                kpi.setHttp_user_agent(arr[11]);
            }

 if (Integer.parseInt(kpi.getStatus()) >= 400) {
                kpi.setValid(false);
            }
        } else {
            kpi.setValid(false);
        }
 return kpi;
    }

 /**
     * 提取需要的url
     * 
     * @param line
     * @return
     */
 public static KPI filterPVs(String line) {
        KPI kpi = parser(line);
        kpi.setValid(false);
 for (String page : pages) {
 if (kpi.getRequest() != null) {
 if (kpi.getRequest().contains(page)) {
                    kpi.setValid(true);
 break;
                }
            }
        }
 return kpi;
    }

 public String getRemote_addr() {
 return remote_addr;
    }

 public void setRemote_addr(String remote_addr) {
 this.remote_addr = remote_addr;
    }

 public String getRemote_user() {
 return remote_user;
    }

 public void setRemote_user(String remote_user) {
 this.remote_user = remote_user;
    }

 public String getTime_local() {
 return time_local;
    }

 public Date getTime_local_Date() {
        SimpleDateFormat df = new SimpleDateFormat("dd/MMM/yyyy:HH:mm:ss", Locale.US);
 try {
 return df.parse(this.time_local);
        } catch (ParseException e) {
 return null;
        }

    }

 public String getTime_local_day() throws ParseException {
        SimpleDateFormat df = new SimpleDateFormat("yyyy-MM-dd");
 return df.format(this.getTime_local_Date());
    }

 public String getTime_local_Date_hour() throws ParseException {
        SimpleDateFormat df = new SimpleDateFormat("yyyyMMddHH");
 return df.format(this.getTime_local_Date());
    }

 public void setTime_local(String time_local) {
 this.time_local = time_local;
    }

 public String getRequest() {
 return request;
    }

 public void setRequest(String request) {
 this.request = request;
    }

 public String getStatus() {
 return status;
    }

 public void setStatus(String status) {
 this.status = status;
    }

 public String getBody_bytes_sent() {
 return body_bytes_sent;
    }

 public void setBody_bytes_sent(String body_bytes_sent) {
 this.body_bytes_sent = body_bytes_sent;
    }

 public String getHttp_referer() {
 return http_referer;
    }


 public void setHttp_referer(String http_referer) {
 this.http_referer = http_referer;
    }

 public String getHttp_user_agent() {
 return http_user_agent;
    }

 public void setHttp_user_agent(String http_user_agent) {
 this.http_user_agent = http_user_agent;
    }

 public boolean isValid() {
 return valid;
    }

 public void setValid(boolean valid) {
 this.valid = valid;
    }


 @Override
 public void write(DataOutput out) throws IOException {
        out.writeUTF(this.getRemote_addr());
        out.writeUTF(this.getRemote_user());
        out.writeUTF(this.getTime_local());
        out.writeUTF(this.getRequest());
        out.writeUTF(this.getStatus());
        out.writeUTF(this.getBody_bytes_sent());
        out.writeUTF(this.getHttp_referer());

    }

 @Override
 public void readFields(DataInput in) throws IOException {
 this.setRemote_addr(in.readUTF());
 this.setRemote_user(in.readUTF());
 this.setTime_local(in.readUTF());
 this.setRequest(in.readUTF());
 this.setStatus(in.readUTF());
 this.setBody_bytes_sent(in.readUTF());
 this.setHttp_referer(in.readUTF());

    }

 /**
     * 排序,按照日期降序、url升序处理。
     */
 @Override
 public int compareTo(KPI o) {
 int i;
 try {
            i = this.getTime_local_day().compareTo(o.getTime_local_day());

 if (i != 0) {
 return -i;
            }
        } catch (ParseException e) {
            e.printStackTrace();
        }
 return this.getRequest().compareTo(o.getRequest());
    }
}

PageViewGroup.java 用于将日志信息分组

package com.itunic.mr.kpi;

import java.text.ParseException;

import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.io.WritableComparator;

/**
 * 分组,
 * 由于需要按照每日的维度统计pv,必须将日期分组。
 * @author itunic
 *
 */
public class PageViewGroup extends WritableComparator {
 public PageViewGroup() {
 super(KPI.class, true);
    }

 @SuppressWarnings("rawtypes")
 @Override
 public int compare(WritableComparable a, WritableComparable b) {
        KPI kpi1 = (KPI) a;
        KPI kpi2 = (KPI) b;
 try {
 /**
             * 判断当前bean与传入的bean的日期是否一致。如果一致则需要判断是否为同一个url
             */
 int i = kpi1.getTime_local_day().compareTo(kpi2.getTime_local_day());
 if (i != 0) {
 return -i;
            }
 return kpi1.getRequest().compareTo(kpi2.getRequest());
        } catch (ParseException e) {
 // TODO Auto-generated catch block
            e.printStackTrace();
        }
 return 0;
    }
}

PageView.java Mapreduce计算类

package com.itunic.mr.kpi;

import java.io.IOException;
import java.text.ParseException;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

public class PageView {

 /**
     * 利用mapreduce 离线统计每天的pv
     * 
     * @author itunic
     *
     */
 public static class PageVisitsMapper extends Mapper<LongWritable, Text, KPI, LongWritable> {
 @Override
 protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
 // 校验每一行URL的合法性
            KPI kpi = KPI.filterPVs(value.toString());
 if (kpi.isValid()) {
 // 利用mapper特性输出给reducer
                context.write(kpi, new LongWritable(1));
            }
        }
    }

 public static class PageVisitsReducer extends Reducer<KPI, LongWritable, Text, Text> {
 @Override
 protected void reduce(KPI key, Iterable<LongWritable> value, Context context)
 throws IOException, InterruptedException {
 long count = 0;
 // 将相同的分组相同的页面循环叠加
 for (LongWritable l : value) {
                count += l.get();
            }
            String out = key.getRequest() + "\t" + count;
 try {
                context.write(new Text(key.getTime_local_day()), new Text(out));
            } catch (ParseException e) {
 // TODO Auto-generated catch block
                e.printStackTrace();
            }
        }
    }

 public static void main(String[] args) throws Exception {
        Configuration conf = new Configuration();
        Job job = Job.getInstance(conf);
        job.setJarByClass(PageView.class);
        job.setMapperClass(PageVisitsMapper.class);
        job.setReducerClass(PageVisitsReducer.class);
        job.setMapOutputKeyClass(KPI.class);
        job.setMapOutputValueClass(LongWritable.class);
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(Text.class);
        job.setGroupingComparatorClass(PageViewGroup.class);
        FileInputFormat.setInputPaths(job, new Path("F:\\test\\input\\access.log.fensi"));
        FileOutputFormat.setOutputPath(job, new Path("F:\\test\\output9"));

 int i = job.waitForCompletion(true) ? 0 : 1;
        System.exit(i);
    }
}
源数据实例:nginx log
194.237.142.21 - - [18/Sep/2013:06:49:18 +0000] "GET /wp-content/uploads/2013/07/rstudio-git3.png HTTP/1.1" 304 0 "-" "Mozilla/4.0 (compatible;)"
运行分析部分结果如下:
2013-09-19  /tag/hadoop/page/3//images/stories/3xp.php  2
2013-09-19  /tag/rhadoop//images/stories/3xp.php    2
2013-09-19  /ucp.php    1
2013-09-19  /wp-admin/admin-ajax.php    120
2013-09-19  /wp-admin/async-upload.php  3
2013-09-19  /wp-admin/edit-comments.php 1
2013-09-19  /wp-admin/post.php  4
2013-09-19  /wp-cron.php    24
2013-09-19  /wp-login.php   2
2013-09-19  /xmlrpc.php 3
2013-09-18  //components/com_jnews/includes/openflashchart/php-ofc-library/ofc_upload_image.php 1
2013-09-18  //images/stories/3xp.php    6
2013-09-18  //images/stories/70cpx.php  2
2013-09-18  //images/stories/70pet.php  2
2013-09-18  //images/stories/cr0t.php   1
2013-09-18  /admin.php  2
2013-09-18  /administrator/index.php    2
2013-09-18  /batch.manage.php   21
2013-09-18  /category/hadoop-action/page/4//images/stories/3xp.php  2
2013-09-18  /images/stories/cr0t.php    1
2013-09-18  /index.php  10
2013-09-18  /index.php/blog/14  1
2013-09-18  /index.php/quiz/1   1
2013-09-18  /index.php/quiz/2   1

点击下载测试数据及分析结果数据

下载测试数据

相关

利用Apache Spark实现pv统计分析

本文参与腾讯云自媒体分享计划,欢迎正在阅读的你也加入,一起分享。

发表于

我来说两句

0 条评论
登录 后参与评论

相关文章

来自专栏SpringBoot

第三节,Springboot@Value和@ConfigurationProperties比较

这里可以看出@value 名称必须跟配置文件一致,但是@ConfigurationProperties支持松散绑定,意思就是可以用"_","-"代表大写

923
来自专栏函数式编程语言及工具

SDP(12): MongoDB-Engine - Streaming

   在akka-alpakka工具包里也提供了对MongoDB的stream-connector,能针对MongoDB数据库进行streaming操作。这个M...

42210
来自专栏函数式编程语言及工具

FunDA(15)- 示范:任务并行运算 - user task parallel execution

    FunDA的并行运算施用就是对用户自定义函数的并行运算。原理上就是把一个输入流截分成多个输入流并行地输入到一个自定义函数的多个运行实例。这些函数运行实例...

1759
来自专栏函数式编程语言及工具

SDP(2):ScalikeJDBC-Connection Pool Configuration

  scalikeJDBC可以通过配置文件来设置连接池及全局系统参数。对配置文件的解析是通过TypesafeConfig工具库实现的。默认加载classpath...

2934
来自专栏积累沉淀

研究MapReduce源码之实现自定义LineRecordReader完成多行读取文件内容

TextInputFormat是Hadoop默认的数据输入格式,但是它只能一行一行的读记录,如果要读取多行怎么办? 很简单 自己写一个输入格式,然后写一个对...

1749
来自专栏大内老A

我的WCF之旅(13):创建基于MSMQ的Responsive Service

一、One-way MEP V.S. Responsible Service 我们知道MSMQ天生就具有异步的特性,它只能以One-way的MEP(Messag...

2036
来自专栏Jed的技术阶梯

Kafka 中使用 Avro 序列化框架(二):使用 Twitter 的 Bijection 类库实现 avro 的序列化与反序列化

使用传统的 avro API 自定义序列化类和反序列化类比较麻烦,需要根据 schema 生成实体类,需要调用 avro 的 API 实现 对象到 byte[]...

1034
来自专栏后端之路

SpringBoot之条件注解

背景 之前写过关于Spring和Maven的profile的区别 maven profile VS spring profile 我们可以通过上述的profil...

2895
来自专栏码匠的流水账

聊聊spring cloud gateway的LoadBalancerClientFilter

本文主要研究一下spring cloud gateway的LoadBalancerClientFilter

501
来自专栏大大的微笑

spring boot加载复杂的yml文件获取不到值的问题

今天使用spring boot读取yml文件,这种多层嵌套的竟然无法读取到(value注解spring.redis.pool.max.wait),即便加上全名...

33910

扫码关注云+社区