摘 要
本文将介绍通过Hadoop Mapreduce实现离线统计网站每日pv的思路及代码。
利用网站的kpi数据来分析出网站潜在的价值,那么了解网站的PV、UV、IP的状况,是一项必不可少的任务。本文将介绍通过Hadoop Mapreduce实现离线统计网站每日pv的思路及代码。
pv是指页面的浏览量或点击量(Page View),用户每访问一次或刷新一下即被计算一次。
对网站以往的访问数据进行日pv、月PV、年PV统计。
对于访问量大的网站来说,普通程序计算实现成本非常大。我们可以利用Hadoop来实现分布式计算,将固有的数据量分散到多台机器进行计算,无疑加快了计算速度,也降低了宕机的风险。
在map阶段将数据清洗,并进行排序、分组。在reduce阶段完成统计。非常简单。
下面具体看实现代码:
KPI.java 用于封装日志信息及排序
package com.itunic.mr.kpi;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import java.text.ParseException;
import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.HashSet;
import java.util.Locale;
import java.util.Set;
import org.apache.hadoop.io.WritableComparable;
/**
* 封装一个实体类,将nginx的日志分解成有用的信息。
* @author itunic
*
*/
public class KPI implements WritableComparable<KPI> {
private String remote_addr;// 来访ip
private String remote_user;// 来访用户名称,忽略属性“-”
private String time_local;// 记录时间与时区
private String request;// 访问页面
private String status;// 返回状态
private String body_bytes_sent;// 返回客户端内容主体大小
private String http_referer;// 来访页面
private String http_user_agent;// 客户浏览器的相关信息
private boolean valid = true;// 检验数据是否合法
//设置需要统计的页面类型
static Set<String> pages = null;
static {
pages = new HashSet<String>();
pages.add(".php");
/*pages.add(".jsp");
pages.add(".png");*/
}
/**
* 将传过来的数据封装成bean
*
* @param line
* @return
*/
private static KPI parser(String line) {
KPI kpi = new KPI();
String[] arr = line.split(" ");
if (arr.length > 11) {
kpi.setRemote_addr(arr[0]);
kpi.setRemote_user(arr[1]);
kpi.setTime_local(arr[3].substring(1));
String url = arr[6].indexOf("?") != -1 ? arr[6].substring(0, arr[6].indexOf("?")) : arr[6];
kpi.setRequest(url);
kpi.setStatus(arr[8]);
kpi.setBody_bytes_sent(arr[9]);
kpi.setHttp_referer(arr[10]);
if (arr.length > 12) {
kpi.setHttp_user_agent(arr[11] + " " + arr[12]);
} else {
kpi.setHttp_user_agent(arr[11]);
}
if (Integer.parseInt(kpi.getStatus()) >= 400) {
kpi.setValid(false);
}
} else {
kpi.setValid(false);
}
return kpi;
}
/**
* 提取需要的url
*
* @param line
* @return
*/
public static KPI filterPVs(String line) {
KPI kpi = parser(line);
kpi.setValid(false);
for (String page : pages) {
if (kpi.getRequest() != null) {
if (kpi.getRequest().contains(page)) {
kpi.setValid(true);
break;
}
}
}
return kpi;
}
public String getRemote_addr() {
return remote_addr;
}
public void setRemote_addr(String remote_addr) {
this.remote_addr = remote_addr;
}
public String getRemote_user() {
return remote_user;
}
public void setRemote_user(String remote_user) {
this.remote_user = remote_user;
}
public String getTime_local() {
return time_local;
}
public Date getTime_local_Date() {
SimpleDateFormat df = new SimpleDateFormat("dd/MMM/yyyy:HH:mm:ss", Locale.US);
try {
return df.parse(this.time_local);
} catch (ParseException e) {
return null;
}
}
public String getTime_local_day() throws ParseException {
SimpleDateFormat df = new SimpleDateFormat("yyyy-MM-dd");
return df.format(this.getTime_local_Date());
}
public String getTime_local_Date_hour() throws ParseException {
SimpleDateFormat df = new SimpleDateFormat("yyyyMMddHH");
return df.format(this.getTime_local_Date());
}
public void setTime_local(String time_local) {
this.time_local = time_local;
}
public String getRequest() {
return request;
}
public void setRequest(String request) {
this.request = request;
}
public String getStatus() {
return status;
}
public void setStatus(String status) {
this.status = status;
}
public String getBody_bytes_sent() {
return body_bytes_sent;
}
public void setBody_bytes_sent(String body_bytes_sent) {
this.body_bytes_sent = body_bytes_sent;
}
public String getHttp_referer() {
return http_referer;
}
public void setHttp_referer(String http_referer) {
this.http_referer = http_referer;
}
public String getHttp_user_agent() {
return http_user_agent;
}
public void setHttp_user_agent(String http_user_agent) {
this.http_user_agent = http_user_agent;
}
public boolean isValid() {
return valid;
}
public void setValid(boolean valid) {
this.valid = valid;
}
@Override
public void write(DataOutput out) throws IOException {
out.writeUTF(this.getRemote_addr());
out.writeUTF(this.getRemote_user());
out.writeUTF(this.getTime_local());
out.writeUTF(this.getRequest());
out.writeUTF(this.getStatus());
out.writeUTF(this.getBody_bytes_sent());
out.writeUTF(this.getHttp_referer());
}
@Override
public void readFields(DataInput in) throws IOException {
this.setRemote_addr(in.readUTF());
this.setRemote_user(in.readUTF());
this.setTime_local(in.readUTF());
this.setRequest(in.readUTF());
this.setStatus(in.readUTF());
this.setBody_bytes_sent(in.readUTF());
this.setHttp_referer(in.readUTF());
}
/**
* 排序,按照日期降序、url升序处理。
*/
@Override
public int compareTo(KPI o) {
int i;
try {
i = this.getTime_local_day().compareTo(o.getTime_local_day());
if (i != 0) {
return -i;
}
} catch (ParseException e) {
e.printStackTrace();
}
return this.getRequest().compareTo(o.getRequest());
}
}
PageViewGroup.java 用于将日志信息分组
package com.itunic.mr.kpi;
import java.text.ParseException;
import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.io.WritableComparator;
/**
* 分组,
* 由于需要按照每日的维度统计pv,必须将日期分组。
* @author itunic
*
*/
public class PageViewGroup extends WritableComparator {
public PageViewGroup() {
super(KPI.class, true);
}
@SuppressWarnings("rawtypes")
@Override
public int compare(WritableComparable a, WritableComparable b) {
KPI kpi1 = (KPI) a;
KPI kpi2 = (KPI) b;
try {
/**
* 判断当前bean与传入的bean的日期是否一致。如果一致则需要判断是否为同一个url
*/
int i = kpi1.getTime_local_day().compareTo(kpi2.getTime_local_day());
if (i != 0) {
return -i;
}
return kpi1.getRequest().compareTo(kpi2.getRequest());
} catch (ParseException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
return 0;
}
}
PageView.java Mapreduce计算类
package com.itunic.mr.kpi;
import java.io.IOException;
import java.text.ParseException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
public class PageView {
/**
* 利用mapreduce 离线统计每天的pv
*
* @author itunic
*
*/
public static class PageVisitsMapper extends Mapper<LongWritable, Text, KPI, LongWritable> {
@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
// 校验每一行URL的合法性
KPI kpi = KPI.filterPVs(value.toString());
if (kpi.isValid()) {
// 利用mapper特性输出给reducer
context.write(kpi, new LongWritable(1));
}
}
}
public static class PageVisitsReducer extends Reducer<KPI, LongWritable, Text, Text> {
@Override
protected void reduce(KPI key, Iterable<LongWritable> value, Context context)
throws IOException, InterruptedException {
long count = 0;
// 将相同的分组相同的页面循环叠加
for (LongWritable l : value) {
count += l.get();
}
String out = key.getRequest() + "\t" + count;
try {
context.write(new Text(key.getTime_local_day()), new Text(out));
} catch (ParseException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
}
public static void main(String[] args) throws Exception {
Configuration conf = new Configuration();
Job job = Job.getInstance(conf);
job.setJarByClass(PageView.class);
job.setMapperClass(PageVisitsMapper.class);
job.setReducerClass(PageVisitsReducer.class);
job.setMapOutputKeyClass(KPI.class);
job.setMapOutputValueClass(LongWritable.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(Text.class);
job.setGroupingComparatorClass(PageViewGroup.class);
FileInputFormat.setInputPaths(job, new Path("F:\\test\\input\\access.log.fensi"));
FileOutputFormat.setOutputPath(job, new Path("F:\\test\\output9"));
int i = job.waitForCompletion(true) ? 0 : 1;
System.exit(i);
}
}
源数据实例:nginx log
194.237.142.21 - - [18/Sep/2013:06:49:18 +0000] "GET /wp-content/uploads/2013/07/rstudio-git3.png HTTP/1.1" 304 0 "-" "Mozilla/4.0 (compatible;)"
运行分析部分结果如下:
2013-09-19 /tag/hadoop/page/3//images/stories/3xp.php 2
2013-09-19 /tag/rhadoop//images/stories/3xp.php 2
2013-09-19 /ucp.php 1
2013-09-19 /wp-admin/admin-ajax.php 120
2013-09-19 /wp-admin/async-upload.php 3
2013-09-19 /wp-admin/edit-comments.php 1
2013-09-19 /wp-admin/post.php 4
2013-09-19 /wp-cron.php 24
2013-09-19 /wp-login.php 2
2013-09-19 /xmlrpc.php 3
2013-09-18 //components/com_jnews/includes/openflashchart/php-ofc-library/ofc_upload_image.php 1
2013-09-18 //images/stories/3xp.php 6
2013-09-18 //images/stories/70cpx.php 2
2013-09-18 //images/stories/70pet.php 2
2013-09-18 //images/stories/cr0t.php 1
2013-09-18 /admin.php 2
2013-09-18 /administrator/index.php 2
2013-09-18 /batch.manage.php 21
2013-09-18 /category/hadoop-action/page/4//images/stories/3xp.php 2
2013-09-18 /images/stories/cr0t.php 1
2013-09-18 /index.php 10
2013-09-18 /index.php/blog/14 1
2013-09-18 /index.php/quiz/1 1
2013-09-18 /index.php/quiz/2 1
点击下载测试数据及分析结果数据