Hadoop MapReduce / MR 是一个软件计算框架,可以轻松地编写应用程序,以可靠,容错的方式并行处理大型硬件集群(数千个节点)上的大量数据(多达TB数据集) 。
MapReduce框架由一个主资源管理器,一个集群节点一个工作器NodeManager和每个应用程序MRAppMaster组成(请参阅YARN体系结构指南)。应用程序通过适当的接口和/或抽象类的实现来指定输入/输出位置和供应图,并减少功能。这些以及其他作业参数构成作业配置。然后,Hadoop 作业客户端将作业(jar /可执行文件等)和配置提交给ResourceManager,然后由ResourceManager负责将软件/配置分发给工作人员,安排任务并对其进行监视,为工作提供状态和诊断信息,客户。
计算节点和存储节点是相同的,也就是说,MapReduce框架和Hadoop分布式文件系统(HDFS)在同一组节点上运行。此配置使框架可以在已经存在数据的节点上有效地调度任务,从而在整个群集中产生很高的聚合带宽。
尽管Hadoop框架是用Java实现的,但MapReduce应用程序不必用Java编写。 Hadoop Streaming是一个实用程序,它允许用户使用任何可执行程序(例如Shell实用程序)作为映射器和/或reducer创建和运行作业。 MapReduce官方文档
输入(格式化k,v)数据集 -> map映射成一个中间数据集(k,v) -> reduce
相同的key为一组,调用一次reduce方法,方法内迭代这一组数据, 进行计算(sum count max min)
宏观角度
MapReduce 作业通常将输入数据集拆分为独立的块,这些任务由Map Task以完全并行的方式进行处理。框架对map的输出进行排序,然后将其输入到reduce任务。通常,作业的输入和输出都存储在文件系统中。该框架负责安排任务,监视任务并重新执行失败的任务。
注意 : 一个切片对应一个Map 切片以一条记录为单位调用一次Map map数量由切片决定的 , reduce 数据由人来决定的 左边矩形 Map Task( 块/block ) , 小矩形 map方法 右边矩形 Reduce Task( 分区/partition ) , 小矩形 Reduce 方法
微观角度
理解运行原理角色模型:
架构图
弊端:
ps:
Apache Hadoop YARN (Yet Another Resource Negotiator,另一种资源协调者)是一种新的 Hadoop 资源管理器,它是一个通用资源管理系统,可为上层应用提供统一的资源管理和调度,它的引入为集群在利用率、资源统一管理和数据共享等方面带来了巨大好处。
Hadoop YARN
1.YARN:解耦资源与计算
2.MR :
3. Client:
Apache Hadoop YARN (Yet Another Resource Negotiator,另一种资源协调者)是一种新的 Hadoop 资源管理器,它是一个通用资源管理系统,可为上层应用提供统一的资源管理和调度,它的引入为集群在利用率、资源统一管理和数据共享等方面带来了巨大好处。 Hadoop 2.0新引入的资源管理系统,直接从MRv1演化而来的;
核心思想: 将MRv1中JobTracker的资源管理和任务调度两个功能分开,分别由ResourceManager和ApplicationMaster进程实现 ResourceManager: 负责整个集群的资源管理和调度 ApplicationMaster: 负责应用程序相关的事务,比如任务调度、任务监控和容错等 YARN的引入,使得多个计算框架可运行在一个集群中 ,每个应用程序对应一个ApplicationMaster 目前多个计算框架可以运行在YARN上,比如MapReduce、Spark、Storm等 YARN官方文档
MapReduce On YARN:MRv2
将MapReduce作业直接运行在YARN上,而不是由JobTracker和TaskTracker构建的MRv1系统中
详细步骤
四个节点node1,2,3,4分别对应不同的虚拟机 node1最重要, 用于管理集群节点
# 0. node1启动上次配置好的zk,hdfs服务
zkServer.sh start #zk连接需要关闭防火墙
start-dfs.sh
# 永久关闭防火墙
chkconfig iptables off
# 1. 修改配置文件 (在hadoop的/etc/hadoop/目录下修改mapred-site.xml ,以及 yarn-site.xml )
# 本来是不需要在node1配置, 但是由于node1用于管理脚本 ,及时不使用这些配置也应该在node1.
[root@node1 ~]# cd /opt/chy/hadoop/etc/hadoop/
[root@node1 hadoop]# cp mapred-site.xml.template mapred-site.xml
[root@node1 hadoop]# vim mapred-site.xml
-------------------------------------------------- mapred-site.xml-----------------------------------------------------------------
<property>
<name>mapreduce.framework.name</name>
<value>yarn</value>
</property>
-------------------------------------------------- mapred-site.xml-----------------------------------------------------------------
[root@node1 hadoop]# vim yarn-site.xml
-------------------------------------------------- yarn-site.xml -----------------------------------------------------------------
<property>
<name>yarn.nodemanager.aux-services</name>
<value>mapreduce_shuffle</value>
</property>
<property>
<name>yarn.resourcemanager.ha.enabled</name>
<value>true</value>
</property>
<property>
<name>yarn.resourcemanager.cluster-id</name>
<value>cluster1</value>
</property>
<property>
<name>yarn.resourcemanager.ha.rm-ids</name>
<value>rm1,rm2</value>
</property>
<property>
<name>yarn.resourcemanager.hostname.rm1</name>
<value>node3</value>
</property>
<property>
<name>yarn.resourcemanager.hostname.rm2</name>
<value>node4</value>
</property>
<property>
<name>yarn.resourcemanager.zk-address</name>
<value>node2:2181,node3:2181,node4:2181</value>
</property>
-------------------------------------------------- yarn-site.xml -----------------------------------------------------------------
# 2.分发到其他节点
[root@node1 hadoop]# scp mapred-site.xml yarn-site.xml node2:`pwd`
mapred-site.xml 100% 856 0.8KB/s 00:00
yarn-site.xml 100% 1372 1.3KB/s 00:00
[root@node1 hadoop]# scp mapred-site.xml yarn-site.xml node3:`pwd`
mapred-site.xml 100% 856 0.8KB/s 00:00
yarn-site.xml 100% 1372 1.3KB/s 00:00
[root@node1 hadoop]# scp mapred-site.xml yarn-site.xml node4:`pwd`
mapred-site.xml 100% 856 0.8KB/s 00:00
yarn-site.xml 100% 1372 1.3KB/s 00:00
# 3. 根据上图要求在node3,node4中启动资源管理器,并通过jps查看是否启动成功
yarn-daemon.sh start resourcemanager
[root@node3/4 ~]# jps
2285 ResourceManager
# 4. 浏览器访问这两个资源管理器(图1)
http://node3:8088/
http://node4:8088/
# 5. 利用hadoop下的MapReduce实例jar,进行简单操作(计算文本所占大小)
# 进入相关jar所在目录
cd /opt/chy/hadoop/share/hadoop/mapreduce/
# 执行wordcount 命令
hadoop jar hadoop-mapreduce-examples-2.6.5.jar wordcount /user/root/test-h.txt /data/wc/output
# 获取dhfs下的结果文件夹
hdfs dfs -get /data/wc/output
# 查看(图3, 可以看到出现的每个单词/数字都被统计了)
cat output/cat part-r-00000
图1
图2
图3
启动顺序与初始化流程
# 至此所有环境部署完毕
# 以后启动顺序
##启动zk ,
##启动hdfs ,
##启动resourcemanager
yarn-daemon.sh start resourcemanager
##启动datanode
start-yarn.sh
# 重新搭建环境顺序
启动hdfs ,
格式化hdfs.
删除/var/chy/hadoop/ha目录
# 要进行初始化, 首先应该启动JN(node1,node2,node3)
hadoop-daemon.sh start journalnode
# node1
hdfs namenode -format
# node1启动namenode(主)
hadoop-daemon.sh start namenode
# node2启动备用namenode
hdfs namenode -bootstrapStandby
# node1格式化ZK
hdfs zkfc -formatZK
# 启动HDFS服务
start-dfs.sh
# 访问主NN和备NN的图形化界面
http://node1:50070/
http://node2:50070/
# 启动RM
#启动resourcemanager
yarn-daemon.sh start resourcemanager
# hdfs创建目录
hdfs dfs -mkdir -p /user/root
#dhfs 上传
hdfs dfs -put testhadoop.txt /user/root
cd /opt/chy/hadoop/share/hadoop/mapreduce/
hadoop jar hadoop-mapreduce-examples-2.6.5.jar wordcount /user/root/testhadoop.txt .txt /data/wc/output
可以根据提示写代码, 列如Job job = Job.getInstance();,可以点击Job查看案例代码来书写
package ah.szxy.hadoop.mr;
import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
/**
* 我的第一个MapReduce程序-计算单词数量
* @author chy
*
*/
public class MyWordCount {
public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
//读取配置文件
Configuration c=new Configuration(true);
// 创建job对象
Job job = Job.getInstance();
job.setJarByClass(MyWordCount.class);
// 设置job的name
job.setJobName("myjob");
//设置输入路径
Path input =new Path("/user/root/testhadoop.txt");
FileInputFormat.addInputPath(job, input);
// 设置输出路径
Path output=new Path("/data/wc/output2");
// 判断是否存在,存在则删除
if( output.getFileSystem(c).exists(output)) {
output.getFileSystem(c).delete(output, true);
}
//导包: org.apache.hadoop.mapreduce.lib
FileOutputFormat.setOutputPath(job, output);
job.setMapperClass(MyMapper.class);
//序列化
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
job.setReducerClass(MyReducer.class);
// Submit the job, then poll for progress until the job is complete
job.waitForCompletion(true);
}
}
package ah.szxy.hadoop.mr;
import java.io.IOException;
import java.util.StringTokenizer;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
public class MyMapper extends Mapper<Object, Text, Text, IntWritable>{
//对int类型数据进行包装 ,以支持序列化和反序列化
private final static IntWritable one = new IntWritable(1);
private Text word = new Text();
/**
* key: 偏移量
*/
public void map(Object key, Text value, Context context) throws IOException, InterruptedException {
// 被读取的文本 hello csdn 1-10 0000 ,StringTokenizer:切割字符
StringTokenizer itr = new StringTokenizer(value.toString());
while (itr.hasMoreTokens()) {
word.set(itr.nextToken());
context.write(word, one);
}
}
}
/**
* Reducer
* Text, IntWritable: Map方法计算的文本 ,以及数量的初步统计
* Text, IntWritable: 输出类型,文本,单词出现的次数
*
* @author chy
*
*/
public class MyReducer extends Reducer<Text, IntWritable, Text, IntWritable> {
private IntWritable result = new IntWritable();
/**
* Reduce原语 :相同的key为一组,调用一次reduce方法,方法内迭代这一组数据, 进行计算(sum count max min)
*/
public void reduce(Text key, Iterable<IntWritable> values,
Context context) throws IOException, InterruptedException {
int sum = 0;
for (IntWritable val : values) {
sum += val.get();
}
result.set(sum);
context.write(key, result);
}
}
http://node3:8088
,查看MapReduce程序执行状况 ( 图3 )
图1
图2
图3
源码分析目的
1.MapInput
总结 读hdfs ->输入(run 方法拉取)-> 初始化(open.seek方法)-> key和value的赋值 输入(白话): 从hdfs拿流, 拿偏移量的位置 ,凑成一行然后输出
2.MapOutput
输出环节 :
总结 :
大数据瓶颈->IO shuffle :磁盘IO,网络IO ,所有事情都必须考虑IO
map比较器 : 排序(大于等于小于) reduce比较器 : 分组:(是, 不是)
再次回顾: 面试问如何理解shuffle?(在MapReduce环境下) 1.shuffle就是在reduce启动后 ,在map中拉回属于自己的数据的过程(动作角度) 2.如果面试官说不对,就说是从map输出完数据之后, 算出属于的分区号 /reduce开始 ,一直到拉取并计算数据这一整个过程(逻辑角度) shuffle :动作 :拷贝 ; 逻辑: 数据规划,整理,拉取
需求 : 找出每个月温度最高的两天
文本内容
1949-10-01 14:21:02 34c
1949-10-01 19:21:02 38c
1949-10-02 14:01:02 36c
1950-01-01 11:21:02 32c
1950-10-01 12:21:02 37c
1951-12-01 12:21:02 23c
1950-10-02 12:21:02 41c
1950-10-03 12:21:02 27c
1951-07-01 12:21:02 45c
1951-07-02 12:21:02 46c
1951-07-03 12:21:03 47c
思路:
每年
每个月
最高
2天
1天多条记录
进一步思考
年月分组
温度升序
key中要包含时间和温度呀!
MR原语:相同的key分到一组
通过GroupCompartor设置分组规则
自定义数据类型Weather
包含时间
包含温度
自定义排序比较规则
自定义分组比较
年月相同被视为相同的key
那么reduce迭代时,相同年月的记录有可能是同一天的
reduce中需要判断是否同一天
注意OOM
数据量很大
全量数据可以切分成最少按一个月份的数据量进行判断
这种业务场景可以设置多个reduce
通过实现partition
代码实现: 1.客户端
思路(其他类都是在此基础上创建): //1. 读取配置文件,获取任务实例,设置任务的jar名称 //2. 指定map类, 设置map输出key的类 ,设置map输出值的类 //3. 分区设置 : 指定分区类. 设置排序比较器类 //4. 设置分组比较器类 //5. 指定reduce类 //6.设置文件输入路径 ,需要自己添加(重点) //7.设置文件输出路径 ,需要自己设置判断是否存在(重点) //8.设置Reduce数量 //9.等待任务完成
import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
public class MyTQ {
public static void main(String[] args) throws Exception {
//1. 读取配置文件,获取任务实例,设置任务的jar名称
Configuration conf=new Configuration(true);
Job job = Job.getInstance(conf);
job.setJarByClass(MyTQ.class);
//-----------------------map环节---------------------------------
//2. 指定map类, 设置map输出key的类 ,设置map输出值的类
job.setMapperClass(TqMapper.class);
job.setOutputKeyClass(TqOutputKey.class);
job.setOutputValueClass(IntWritable.class);
//3. 分区设置 : 指定分区类. 设置排序比较器类
job.setPartitionerClass(TqPartitioner.class);
job.setSortComparatorClass(TqSortComparator.class);
//--------------------------------------------------------
//-----------------------reduce环节---------------------------------
//4. 设置分组比较器类
job.setGroupingComparatorClass(TqGroupCompartor.class);
//5. 指定reduce类
job.setReducerClass(TqReduce.class);
//--------------------------------------------------------
//6.设置文件输入路径(重点)
Path path=new Path("/data/tq/input");
FileInputFormat.addInputPath(job, path);
//7.设置文件输出路径,判断是否存在(重点)
Path output=new Path("/data/tq/output");
if (output.getFileSystem(conf).exists(output)) {
output.getFileSystem(conf).delete(output, true);//递归删除
}
FileOutputFormat.setOutputPath(job, output);
//8.设置Reduce数量
job.setNumReduceTasks(2);
//9.等待任务完成
job.waitForCompletion(true);
}
}
2.map类
import java.io.IOException;
import java.text.ParseException;
import java.text.SimpleDateFormat;
import java.util.Calendar;
import java.util.Date;
import org.apache.commons.lang.StringUtils;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
/**
* 指定的map方法类
*
* Mapper<LongWritable, Text, TqOutputKey, IntWritable>
* 输入格式化类的方法LineRecordReader类型就是LongWritable,值为Text(TextInputFormat)
* 输出类型和客户端定义的一致
* @author chy
*
*/
public class TqMapper extends Mapper<LongWritable, Text, TqOutputKey, IntWritable> {
//1.创建map的key,value属性
TqOutputKey mkey=new TqOutputKey();
IntWritable mval=new IntWritable();
@Override
protected void map(LongWritable key, Text value,
Mapper<LongWritable, Text, TqOutputKey, IntWritable>.Context context)
throws IOException, InterruptedException {
//1949-10-01 14:21:02 34c
//1949-10-01+空格+14:21:02+制表符+34c
try {
//2. 将数据通过 "/t"分割(strs[0]年月日时间 .strs[1]: 34c ) ,获取年月日
String[] strs=StringUtils.split(value.toString(),'\t');
SimpleDateFormat sdf=new SimpleDateFormat("yyyy-MM-dd");
Date date=sdf.parse(strs[0]);
//将日期传到Calendar中,我们可以通过Calendar.获取指定的年月日等信息
Calendar cal=Calendar.getInstance();
cal.setTime(date);
//3.设置map的key,value属性
mkey.setYear(cal.get(Calendar.YEAR));
mkey.setMounth(cal.get(Calendar.MONTH)+1);//MONTH取值为0-11
mkey.setDay(cal.get(Calendar.DAY_OF_MONTH));
//从34c中获取34(去除字符c)
int wd=Integer.parseInt(strs[1].substring(0,strs[1].length()-1));//substring, index从0开始,包前不包后
mkey.setWd(wd);
mval.set(wd);
//4.输出
context.write(mkey, mval);
} catch (ParseException e) {
e.printStackTrace();
}
}
}
3.设置map输出时key的类型
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import org.apache.hadoop.io.WritableComparable;
/**
* 设置map输出时key的类型
*
* 实现可写接口WritableComparable(类型:自身)
* 可以根据WritableComparable中的提示书写下面代码
* @author chy
*
*/
public class TqOutputKey implements WritableComparable<TqOutputKey>{
//1.定义输入的key的属性值,实现取值赋值方法
private int year;
private int mounth;
private int day;
private int wd;
public int getYear() {
return year;
}
public void setYear(int year) {
this.year = year;
}
public int getMounth() {
return mounth;
}
public void setMounth(int mounth) {
this.mounth = mounth;
}
public int getDay() {
return day;
}
public void setDay(int day) {
this.day = day;
}
public int getWd() {
return wd;
}
public void setWd(int wd) {
this.wd = wd;
}
@Override
public void write(DataOutput out) throws IOException {
//2. 设置序列化的相关属性
out.writeInt(this.year);
out.writeInt(this.mounth);
out.writeInt(this.day);
out.writeInt(this.wd);
}
@Override
public void readFields(DataInput in) throws IOException {
//3. 设置反序列化的相关属性
this.year = in.readInt();
this.mounth=in.readInt();
this.day=in.readInt();
this.wd=in.readInt();
}
@Override
public int compareTo(TqOutputKey that) {
//4. 设置比较器
// 约定俗成 : 日期正序
int c1=Integer.compare(this.year, that.getYear());//compareTo : 通过ascll码进行比较,结果为他们的差值,0代表相等
if(c1==0) {
int c2=Integer.compare(this.mounth, that.getMounth());
if (c2==0) {
int c3=Integer.compare(this.day, that.getDay());
return c3;
}
return c2;
}
return c1;
}
}
4.分区设置
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.mapreduce.Partitioner;
/**
* 设置分区
*
* @author chy
*
*/
public class TqPartitioner extends Partitioner<TqOutputKey, IntWritable>{
@Override
public int getPartition(TqOutputKey key, IntWritable value, int numPartitions) {
//数据倾斜解决逻辑 :
//1. 数据抽样 ,分析倾斜程度 ,将少的数据组放到一个reduce任务,大的单独放入一个reduce任务
//2. 数据做路由 ,有的数据能够进入分区 ,作为一个数据集 .剩下的放入其他分区作为另一个数据集
//return key.hashCode() % numPartitions; //;逻辑上没有效果,但是必须要有这一步
return key.getYear() % numPartitions; //以年为单位进行分区
}
}
5.排序比较器类
import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.io.WritableComparator;
/**
* 排序比较器类
* RawComparator ->WritableComparator
*
* WritableComparator是一个类 ,区别WritableComparable<TqOutputKey>,是一个接口
* @author chy
*
*/
public class TqSortComparator extends WritableComparator{
//1.重写排序比较器的构造方法
public TqSortComparator() {
super(TqOutputKey.class,true);//引用父类的构造器 .true: 创建实例
}
//2. 重写比较方法
@Override
public int compare(WritableComparable a, WritableComparable b) {
TqOutputKey t1=(TqOutputKey) a;
TqOutputKey t2=(TqOutputKey) b;
//年月+温度(倒序)
int c1=Integer.compare(t1.getYear(), t2.getYear());
if (c1==0) {
int c2=Integer.compare(t1.getMounth(), t2.getMounth());
if (c2==0) {
int c3= -Integer.compare(t1.getWd(), t2.getWd());
return c3;
}
return c2;
}
return c1;
}
}
6.分组比较器类
import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.io.WritableComparator;
/**
* 分组比较器
*
* 编写方式大致同排序比较器
* @author chy
*
*/
public class TqGroupCompartor extends WritableComparator{
// 1.重写构造器
public TqGroupCompartor(){
super(TqOutputKey.class,true);
}
//2.重写比较方法 ,比排序比机器少一个维度
@Override
public int compare(WritableComparable a, WritableComparable b) {
TqOutputKey t1=(TqOutputKey) a;
TqOutputKey t2=(TqOutputKey) b;
//年月
int c1=Integer.compare(t1.getYear(), t2.getYear());
if (c1==0) {
int c2=Integer.compare(t1.getMounth(), t2.getMounth());
return c2;
}
return c1;
}
}
7.reduce类
import java.io.IOException;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
/**
* Reduce方法
*
* 继承Reducer
* @author chy
*
*/
public class TqReduce extends Reducer<TqOutputKey, IntWritable, Text, IntWritable>{
//1.创建reduce的key,value属性
Text rkey=new Text();
IntWritable rval=new IntWritable();
//2.重写reduce方法
@Override
protected void reduce(TqOutputKey key, Iterable<IntWritable> values,Context context)
throws IOException, InterruptedException {
//mr原语 : 相同的key为一组 ,调用一个reduce方法 ,方法内迭代这一组数据
//2019 11 01 20 20
//2019 11 11 11 11
//...
int flag=0;
int day=0;
// 遍历数据 ,为reduce的key,value属性赋值,取温度最高的;两天
for (IntWritable v : values) {
//温度第一高的
if (flag==0) {
//key: 2019-11-11:11 value:11
rkey.set(key.getYear()+"-"+key.getMounth()+"-"+key.getDay()+":"+key.getWd());
rval.set(key.getWd());
flag++;
day=key.getDay();
context.write(rkey, rval);//将键值输出
}
//温度第二高的
if (flag!=0 && day!=key.getDay()) {
//key: 2019-11-11:11 value:11
rkey.set(key.getYear()+"-"+key.getMounth()+"-"+key.getDay()+":"+key.getWd());
rval.set(key.getWd());
context.write(rkey, rval);//将键值输出
break;
}
}
}
}
hadoop jar MyTQ.jar ah.szxy.hadoop.mr.tq.MyTQ
命令运行MapReduce程序时 ,发现程序卡住不动原因是NodeManager没有在hdfs启动时被同时启动 ,通过图形化界面可以查看到
解决方案 : 在node1使用start-yarn.sh
启动所有 NM即可
解决方案 :定位到map类中相关代码 .Calendar.MONTH的取值为0-11,所以应该在结果上+1
//3.设置map的key,value属性
mkey.setYear(cal.get(Calendar.YEAR));
//mkey.setMounth(cal.get(Calendar.MONTH));//MONTH的取值为0-11,所以应该在结果上+1
mkey.setMounth(cal.get(Calendar.MONTH)+1)
mkey.setDay(cal.get(Calendar.DAY_OF_MONTH));
原因是在分区设置(代码块4)中的return key.hashCode() % numPartitions; //;逻辑上没有效果,但是必须要有这一步
map是在map方法分区外实现的(代码块2),所以输出的只有一个key ,因此上方代码的值时固定的,会导致结果只在一个分区输出
解决方法: return key.getYear() % numPartitions; //以年为单位进行分区
需求: 推荐好友的好友 共同好友越多 ,建立好友关系的成功率越高
文本内容
tom hello hadoop cat
world hadoop hello hive
cat tom hive
mr hive hello
hive cat hadoop world hello mr
hadoop tom hive world
hello tom world hive mr
思考:
思路:
推荐者与被推荐者一定有一个或多个相同的好友
全局去寻找好友列表中两两关系
去除直接好友
统计两两关系出现次数
API:
map:按好友列表输出两两关系
reduce:sum两两关系
再设计一个MR
生成详细报表
reduce:来自于map
原语:“相同”的key为一组,调用一次reduce方法,方法内迭代这组数据
map端会做排序:sortcomparator
数据以多大的宽度为一组:groupingcomparator
bj,hd
bj,hd
sh,sj
bj,dx
!分组比较强依赖排序
reduce底层迭代原理:nextkeyissame
代码实现
1.客户端类
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
/**
* 客户端类
*
* @author chy
*
*/
public class MyFoF {
public static void main(String[] args) throws Exception {
//1. 读取配置文件,获取任务实例,设置任务的jar名称
Configuration conf=new Configuration(true);
Job job=Job.getInstance(conf);
job.setJarByClass(MyFoF.class);
//2. 指定map类, 设置map输出key的类 ,设置map输出值的类
job.setMapperClass(FofMapper.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(IntWritable.class);
//3. 分区设置 : 指定分区类. 设置排序比较器类
//4. 设置分组比较器类
//5. 指定reduce类
job.setReducerClass(FofReduce.class);
//6.设置文件输入路径(重点)
Path path=new Path("/data/fof/input");
FileInputFormat.addInputPath(job, path);//org.apache.hadoop.mapreduce.lib
//7.设置文件输出路径,判断是否存在(重点)
Path outputDir=new Path("/data/fof/output");
if (outputDir.getFileSystem(conf).exists(outputDir)) {
outputDir.getFileSystem(conf).delete(outputDir, true);
}
FileOutputFormat.setOutputPath(job, outputDir);
//8.设置Reduce数量
//9.等待任务完成
job.waitForCompletion(true);
}
}
2.map类
import java.io.IOException;
import org.apache.commons.lang.StringUtils;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
/**
* map类
*
* 继承Mapper接口<LongWritable, Text, Text, IntWritable>
* @author chy
*
*/
public class FofMapper extends Mapper<LongWritable, Text, Text, IntWritable>{
//0.创建map的key与value
Text mkey=new Text();
IntWritable mval=new IntWritable();
@Override
protected void map(LongWritable key, Text value, Context context)
throws IOException, InterruptedException {
//tom hello hadoop cat
//1.分割字符串
String[] strs=StringUtils.split(value.toString(),' ' );
//2.遍历设置key,value
for (int i = 1; i < strs.length; i++) {
//strs[0]+strs[i] :将每一行第一个字符串与第i(>1)个字符串组合,并且按照ascll码从小到大排序
mkey.set(getFof(strs[0], strs[1]) );
mval.set(0);
//循环输出
context.write(mkey, mval);
//外层循环拼接: 直接关系, 内层循环拼接: 间接关系
for (int j = i+1; j < strs.length; j++) {
mkey.set(getFof(strs[i], strs[j]) );
mval.set(1);//1代表间接关系
context.write(mkey, mval);
}
}
}
//作用: 按照ascll码从小到大排序
//保证读取数据时,永远是一个顺序(防止a:b 与b:a 同时出现)
public static String getFof(String s1, String s2) {
if (s1.compareTo(s2)<0) {
return s1+":"+s2;
}
return s2+":"+s1;
}
}
3,reduce类
import java.io.IOException;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
/**
* reduce类
*
* 继承Reducer<Text, IntWritable, Text, IntWritable>
* @author chy
*
*/
public class FofReduce extends Reducer<Text, IntWritable, Text, IntWritable>{
//hello:hadoop 0
//hello:hadoop 1
//hello:hadoop 0
//1. 设置reduce的value, key可以复用
IntWritable rval=new IntWritable();
//2.重写reduce方法
@Override
protected void reduce(Text key, Iterable<IntWritable> values,Context context) throws IOException, InterruptedException {
// 迭代求和 ,输出真正存在间接关系的数据
int flag=0;
int sum=0;
for (IntWritable v : values) {
if (v.get()==0) {
flag=1;
}
sum+=v.get();
}
if (flag==0) {
rval.set(sum);
context.write(key, rval);
}
}
}
要求尽量掌握思维以及编码, 最大限度的将这些知识融入到自己的知识体系中
1.什么是pagerank
PageRank是Google提出的算法,用于衡量特定网页相对于搜索引擎索引中的其他网页而言的重要程度。 是Google创始人拉里·佩奇和谢尔盖·布林于1997年创造的 PageRank实现了将链接价值概念作为排名因素。
2.计算环境
沿用之前的环境: Hadoop-2.6.5 四台主机node1,2,3,4 两台NN的HA 两台RM的HA 离线计算框架MapReduce
3.PageRank算法原理
d:阻尼系数 M(i):指向i的页面集合 L(j):页面的出链数 PR(pj):j页面的PR值 n:所有页面数
4,pageRank计算/PR值的计算
pr值的计算
第一次
访问统计 | pr值 |
---|---|
A: 收到C的访问 1/2 | 1/2 |
B: 收到A,C,D的访问 A:1/2 C:1/2 D:1/2 | 3/2 |
C: 收到B,D的访问 B:1 D:1/2 | 3/2 |
D : 收到A的访问 1/2 | 1/2 |
第二次( 利用访问统计值与第一次pr值相乘 )
访问统计 | pr值 |
---|---|
A: 收到C的访问 1/2 | 1/2x3/2=3/4 |
B: 收到A,C,D的访问 A:1/2 C:1/2 D:1/2 | 1/2x1/2+1/2x3/2+1/21/2=4/5 |
C: 收到B,D的访问 B:1 D:1/2 | 3/2+1/2x1/2=7/4 |
D : 收到A的访问 1/2 | 1/2x1/2=1/4 |
往复如此,我们可以看到访问量高的(B)不一定pr值也高 ,在筛选出一定访问量的同时也对质量作出筛选 这样得出来的pr值高的网页( C )既保证了数量有保证了质量
上面案例都是打成jar ,然后在虚拟机上通过 hadoop jar 命令运行
内容数据 xx.txt(名称随意)
A B D
B C
C A B
D B C
关系图
代码实现
1.客户端类.map类,reduce类
/**
* 客户端类
*
*/
public class RunJob {
public static enum Mycounter {
my
}
public static void main(String[] args) {
Configuration conf = new Configuration(true);
conf.set("mapreduce.app-submission.corss-paltform", "true");//开启Windows系统对客户端程序支持
//如果分布式运行,必须打jar包
//且,client在集群外非hadoop jar 这种方式启动,client中必须配置jar的位置
conf.set("mapreduce.framework.name", "local");
//这个配置,只属于,切换分布式到本地单进程模拟运行的配置
//这种方式不是分布式,所以不用打jar包
double d = 0.001;//如果想要增加精度.可以将小数点后移,增加运算次数
int i = 0;
while (true) {//pr迭代运算
i++;
try {
conf.setInt("runCount", i);
FileSystem fs = FileSystem.get(conf);
Job job = Job.getInstance(conf);
job.setJarByClass(RunJob.class);
job.setJobName("pr" + i);
job.setMapperClass(PageRankMapper.class);
job.setReducerClass(PageRankReducer.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(Text.class);
//使用了新的输入格式化类 : 会使用制表符去切数据.制表符前面是key,后面是value
job.setInputFormatClass(KeyValueTextInputFormat.class);
Path inputPath = new Path("/data/pagerank/input/");
if (i > 1) {
inputPath = new Path("/data/pagerank/output/pr" + (i - 1));
}
FileInputFormat.addInputPath(job, inputPath);
Path outpath = new Path("/data/pagerank/output/pr" + i);
if (fs.exists(outpath)) {
fs.delete(outpath, true);
}
FileOutputFormat.setOutputPath(job, outpath);
boolean f = job.waitForCompletion(true);
if (f) {
System.out.println("success.");
long sum = job.getCounters().findCounter(Mycounter.my).getValue();
System.out.println(sum);
//过程中将页面的差值放大到1000倍,四个页面,所以要在这里除以4000
double avgd = sum / 4000.0;
//如果平均值小于0.0000001,跳出迭代
if (avgd < d) {
break;
}
}
} catch (Exception e) {
e.printStackTrace();
}
}
}
/**
* map类
*
*/
static class PageRankMapper extends Mapper<Text, Text, Text, Text> {
protected void map(Text key, Text value, Context context) throws IOException, InterruptedException {
int runCount = context.getConfiguration().getInt("runCount", 1);
//A B D
//A B D 0.3
//K:A
//V:B D
//K:A
//V:0.3 B D
String page = key.toString();
Node node = null;
if (runCount == 1) {//V:B D
node = Node.fromMR("1.0" , value.toString());
} else {//V:0.3 B D
node = Node.fromMR(value.toString());
}
// A 1.0 B D 传递老的pr值和对应的页面关系
context.write(new Text(page), new Text(node.toString()));
if (node.containsAdjacentNodes()) {
double outValue = node.getPageRank() / node.getAdjacentNodeNames().length;
for (int i = 0; i < node.getAdjacentNodeNames().length; i++) {
String outPage = node.getAdjacentNodeNames()[i];
// B:0.5
// D:0.5 页面A投给谁,谁作为key,val是票面值,票面值为:A的pr值除以超链接数量
context.write(new Text(outPage), new Text(outValue + ""));
}
}
}
}
/**
* Reduce类
*
*/
static class PageRankReducer extends Reducer<Text, Text, Text, Text> {
protected void reduce(Text key, Iterable<Text> iterable, Context context)
throws IOException, InterruptedException {
//相同的key为一组
//key:页面名称比如B
//包含两类数据
//B:1.0 C //页面对应关系及老的pr值
//B:0.5 //投票值
//B:0.5
double sum = 0.0;
Node sourceNode = null;
for (Text i : iterable) {
Node node = Node.fromMR(i.toString());
if (node.containsAdjacentNodes()) {
sourceNode = node;
} else {
sum = sum + node.getPageRank();
}
}
// 4为页面总数
double newPR = (0.15 / 4.0) + (0.85 * sum);
System.out.println("*********** new pageRank value is " + newPR);
// 把新的pr值和计算之前的pr比较
double d = newPR - sourceNode.getPageRank();
int j = (int) (d * 1000.0);//放大1000倍
j = Math.abs(j);
System.out.println(j + "___________");
context.getCounter(Mycounter.my).increment(j);
sourceNode.setPageRank(newPR);
context.write(key, new Text(sourceNode.toString()));
}
}
}
2.数据模型类
mport java.io.IOException;
import java.util.Arrays;
import org.apache.commons.lang.StringUtils;
/**
* 数据模型
*
*作用:
*1.将页面投票节点放入adjacentNodeNames (包装属性信息)
*2.作投票对象的封装
*/
public class Node {
private double pageRank = 1.0;
private String[] adjacentNodeNames;
public static final char fieldSeparator = '\t';
public double getPageRank() {
return pageRank;
}
public Node setPageRank(double pageRank) {
this.pageRank = pageRank;
return this;
}
public String[] getAdjacentNodeNames() {
return adjacentNodeNames;
}
public Node setAdjacentNodeNames(String[] adjacentNodeNames) {
this.adjacentNodeNames = adjacentNodeNames;
return this;
}
public boolean containsAdjacentNodes() {
return adjacentNodeNames != null && adjacentNodeNames.length > 0;
}
@Override
public String toString() {
StringBuilder sb = new StringBuilder();
sb.append(pageRank);
if (getAdjacentNodeNames() != null) {
sb.append(fieldSeparator).append(
StringUtils.join(getAdjacentNodeNames(), fieldSeparator));
}
return sb.toString();
}
// value =1.0 B D
public static Node fromMR(String value) throws IOException {
String[] parts = StringUtils.splitPreserveAllTokens(value,
fieldSeparator);
if (parts.length < 1) {
throw new IOException("Expected 1 or more parts but received "
+ parts.length);
}
Node node = new Node().setPageRank(Double.valueOf(parts[0]));
if (parts.length > 1) {
node.setAdjacentNodeNames(Arrays
.copyOfRange(parts, 1, parts.length));
}
return node;
}
public static Node fromMR(String v1,String v2) throws IOException {
return fromMR(v1+fieldSeparator+v2);
//1.0 B D
}
}
conf.set("mapreduce.app-submission.corss-paltform", "true");
conf.set("mapreduce.framework.name", "local");
错误信息
org.apache.hadoop.io.nativeio.NativeIO$Windows.access0(Ljava/lang/String;I)Z
图示
解决步骤 :
注意: 虽然是在Windows环境下运行MapReduce程序, 但是仍需要有hdfs文件系统支持,所以集群仍需要打开 Hadoop其他相关异常及解决方案: https://blog.csdn.net/congcong68/article/details/42043093
TF-IDF(term frequency–inverse document frequency)是一种用于资讯检索与资讯探勘的常用加权技术。
1.词频 (term frequency, TF)
指的是某一个给定的词语在一份给定的文件中出现的次数。这个数字通常会被归一化(分子一般小于分母 区别于IDF),以防止它偏向长的文件。(同一个词语在长文件里可能会比短文件有更高的词频,而不管该词语重要与否。)
公式中: ni,j是该词在文件dj中的出现次数,而分母则是在文件dj中所有字词的出现次数之和。
2.逆向文件频率 (inverse document frequency, IDF)
是一个词语普遍重要性的度量。某一特定词语的IDF,可以由总文件数目除以包含该词语之文件的数目,再将得到的商取对数得到。
3.TF-IDF:
某一特定文件内的高词语频率,以及该词语在整个文件集合中的低文件频率,可以产生出高权重的TF-IDF。 因此,TF-IDF倾向于过滤掉常见的词语,保留重要的词语。
TF-IDF的主要思想: 如果某个词或短语在一篇文章中出现的频率TF高,并且在其他文章中很少出现, 则认为此词或者短语具有很好的类别区分能力,适合用来分类。
需求 利用MapReduce技术实现对微博数据的TF-IDF统计
思路
第一次:词频统计+文本总数统计
第二次:字词集合统计:逆向文件频率
第三次:取1,2次结果最终计算出字词的TF-IDF
实现
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
/**
* 第一个mr程序
*
* 统计出:每个账号的关键词以及词频
* @author chy
*
*/
public class FirstJob {
public static void main(String[] args) {
Configuration conf = new Configuration();
conf.set("mapreduce.app-submission.coress-paltform", "true");
conf.set("mapreduce.framework.name", "local");//Windows运行单机
try {
FileSystem fs = FileSystem.get(conf);
Job job = Job.getInstance(conf);
job.setJarByClass(FirstJob.class);
job.setJobName("weibo1");
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
job.setNumReduceTasks(4);
job.setPartitionerClass(FirstPartition.class);
job.setMapperClass(FirstMapper.class);
job.setCombinerClass(FirstReduce.class);
job.setReducerClass(FirstReduce.class);
FileInputFormat.addInputPath(job, new Path("/data/tfidf/input/"));
Path path = new Path("/data/tfidf/output/weibo1");
if (fs.exists(path)) {
fs.delete(path, true);
}
FileOutputFormat.setOutputPath(job, path);
boolean f = job.waitForCompletion(true);
if (f) {
}
} catch (Exception e) {
e.printStackTrace();
}
}
}
import java.io.IOException;
import java.io.StringReader;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import org.wltea.analyzer.core.IKSegmenter;
import org.wltea.analyzer.core.Lexeme;
/**
* 第一个MR,计算TF和计算N(微博总数)
*
* @author root
*
*/
public class FirstMapper extends Mapper<LongWritable, Text, Text, IntWritable> {
protected void map(LongWritable key, Text value, Context context)
throws IOException, InterruptedException {
//3823890210294392 今天我约了豆浆,油条
String[] v = value.toString().trim().split("\t");
if (v.length >= 2) {
String id = v[0].trim();
String content = v[1].trim();
StringReader sr = new StringReader(content);
IKSegmenter ikSegmenter = new IKSegmenter(sr, true);
Lexeme word = null;
while ((word = ikSegmenter.next()) != null) {
String w = word.getLexemeText();
context.write(new Text(w + "_" + id), new IntWritable(1));
//今天_3823890210294392 1
}
context.write(new Text("count"), new IntWritable(1));
//count 1
} else {
System.out.println(value.toString() + "-------------");
}
}
}
/**
* 设置分片
*
*数据的路由分发
*/
public class FirstPartition extends HashPartitioner<Text, IntWritable>{
public int getPartition(Text key, IntWritable value, int reduceCount) {
if(key.equals(new Text("count")))//如果出现count就去第四个分区
return 3;//第四个分区
else//否则被3模,取0,1,2
return super.getPartition(key, value, reduceCount-1);
}
}
/**
* 第一个reduce
* c1_001,2 c2_001,1 count,10000
*
* @author chy
*
*/
public class FirstReduce extends Reducer<Text, IntWritable, Text, IntWritable> {
protected void reduce(Text key, Iterable<IntWritable> iterable,
Context context) throws IOException, InterruptedException {
int sum = 0;
for (IntWritable i : iterable) {
sum = sum + i.get();
}
if (key.equals(new Text("count"))) {
System.out.println(key.toString() + "___________" + sum);
}
context.write(key, new IntWritable(sum));
}
}
public class TwoJob {
public static void main(String[] args) {
Configuration conf =new Configuration();
conf.set("mapreduce.app-submission.coress-paltform", "true");
conf.set("mapreduce.framework.name", "local");
try {
Job job =Job.getInstance(conf);
job.setJarByClass(TwoJob.class);
job.setJobName("weibo2");
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
job.setMapperClass(TwoMapper.class);
job.setCombinerClass(TwoReduce.class);
job.setReducerClass(TwoReduce.class);
//mr运行时的输入数据从hdfs的哪个目录中获取
FileInputFormat.addInputPath(job, new Path("/data/tfidf/output/weibo1"));
FileOutputFormat.setOutputPath(job, new Path("/data/tfidf/output/weibo2"));
boolean f= job.waitForCompletion(true);
if(f){
System.out.println("执行job成功");
}
} catch (Exception e) {
e.printStackTrace();
}
}
}
//统计df:词在多少个微博中出现过。
public class TwoMapper extends Mapper<LongWritable, Text, Text, IntWritable> {
protected void map(LongWritable key, Text value, Context context)
throws IOException, InterruptedException {
// 获取当前 mapper task的数据片段(split)
FileSplit fs = (FileSplit) context.getInputSplit();
if (!fs.getPath().getName().contains("part-r-00003")) {
//豆浆_3823890201582094 3
String[] v = value.toString().trim().split("\t");
if (v.length >= 2) {
String[] ss = v[0].split("_");
if (ss.length >= 2) {
String w = ss[0];
context.write(new Text(w), new IntWritable(1));
}
} else {
System.out.println(value.toString() + "-------------");
}
}
}
}
public class TwoReduce extends Reducer<Text, IntWritable, Text, IntWritable> {
protected void reduce(Text key, Iterable<IntWritable> arg1, Context context)
throws IOException, InterruptedException {
int sum = 0;
for (IntWritable i : arg1) {
sum = sum + i.get();
}
context.write(key, new IntWritable(sum));
}
}
public class LastJob {
public static void main(String[] args) {
Configuration conf =new Configuration();
// conf.set("mapred.jar", "C:\\Users\\root\\Desktop\\tfidf.jar");
// conf.set("mapreduce.job.jar", "C:\\Users\\root\\Desktop\\tfidf.jar");
conf.set("mapreduce.app-submission.cross-platform", "true");
try {
FileSystem fs =FileSystem.get(conf);
Job job =Job.getInstance(conf);
job.setJarByClass(LastJob.class);
job.setJobName("weibo3");
// job.setJar("C:\\Users\\root\\Desktop\\tfidf.jar");
job.setJar("C:\\Users\\root\\Desktop\\tfidf.jar");
//2.5
//把微博总数加载到
job.addCacheFile(new Path("/data/tfidf/output/weibo1/part-r-00003").toUri());
//把df加载到
job.addCacheFile(new Path("/data/tfidf/output/weibo2/part-r-00000").toUri());
//设置map任务的输出key类型、value类型
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(Text.class);
job.setMapperClass(LastMapper.class);
job.setReducerClass(LastReduce.class);
//mr运行时的输入数据从hdfs的哪个目录中获取
FileInputFormat.addInputPath(job, new Path("/data/tfidf/output/weibo1"));
Path outpath =new Path("/data/tfidf/output/weibo3");
if(fs.exists(outpath)){
fs.delete(outpath, true);
}
FileOutputFormat.setOutputPath(job,outpath );
boolean f= job.waitForCompletion(true);
if(f){
System.out.println("执行job成功");
}
} catch (Exception e) {
e.printStackTrace();
}
}
}
public class LastMapper extends Mapper<LongWritable, Text, Text, Text> {
// 存放微博总数
public static Map<String, Integer> cmap = null;
// 存放df
public static Map<String, Integer> df = null;
// 在map方法执行之前
protected void setup(Context context) throws IOException,
InterruptedException {
System.out.println("******************");
if (cmap == null || cmap.size() == 0 || df == null || df.size() == 0) {
URI[] ss = context.getCacheFiles();
if (ss != null) {
for (int i = 0; i < ss.length; i++) {
URI uri = ss[i];
if (uri.getPath().endsWith("part-r-00003")) {// 微博总数
Path path = new Path(uri.getPath());
// FileSystem fs
// =FileSystem.get(context.getConfiguration());
// fs.open(path);
BufferedReader br = new BufferedReader(new FileReader(path.getName()));
String line = br.readLine();
if (line.startsWith("count")) {
String[] ls = line.split("\t");
cmap = new HashMap<String, Integer>();
cmap.put(ls[0], Integer.parseInt(ls[1].trim()));
}
br.close();
} else if (uri.getPath().endsWith("part-r-00000")) {// 词条的DF
df = new HashMap<String, Integer>();
Path path = new Path(uri.getPath());
BufferedReader br = new BufferedReader(new FileReader(path.getName()));
String line;
while ((line = br.readLine()) != null) {
String[] ls = line.split("\t");
df.put(ls[0], Integer.parseInt(ls[1].trim()));
}
br.close();
}
}
}
}
}
protected void map(LongWritable key, Text value, Context context)
throws IOException, InterruptedException {
FileSplit fs = (FileSplit) context.getInputSplit();
// System.out.println("--------------------");
if (!fs.getPath().getName().contains("part-r-00003")) {
//豆浆_3823930429533207 2
String[] v = value.toString().trim().split("\t");
if (v.length >= 2) {
int tf = Integer.parseInt(v[1].trim());// tf值
String[] ss = v[0].split("_");
if (ss.length >= 2) {
String w = ss[0];
String id = ss[1];
double s = tf * Math.log(cmap.get("count") / df.get(w));
NumberFormat nf = NumberFormat.getInstance();
nf.setMaximumFractionDigits(5);
context.write(new Text(id), new Text(w + ":" + nf.format(s)));
}
} else {
System.out.println(value.toString() + "-------------");
}
}
}
}
public class LastReduce extends Reducer<Text, Text, Text, Text> {
//key 微博ID, iterable(value): 词+tfitf
protected void reduce(Text key, Iterable<Text> iterable, Context context)
throws IOException, InterruptedException {
StringBuffer sb = new StringBuffer();
for (Text i : iterable) {
sb.append(i.toString() + "\t");
}
context.write(key, new Text(sb.toString()));
}
}
4.第3个job运行结果 分片文件显示的内容是取第1,2job运算结果运算出TF-IDF 数据分析师可以根据这些数据,分析内在联系,挖掘出有价值的数据或者数据间的内在联系 例如啤酒尿不湿的案例
每一步迭代
思路:
需求 根据MapReduce,实现对用户数据的ItemCF
思路:
实现: 本项目代码以及所有代码打包至百度云
/data/itemcf/input/
,并上传文件(sample)sam_tianchi_2014002_rec_tmall_log.csv
(在源码相关包下)
总结
链接:https://pan.baidu.com/s/17VE5yaMHn0UABc8qyQuyZg 本博文所有案例源码以及分词器jar 提取码:grss