前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >MapReduce案例之寻找共同好友

MapReduce案例之寻找共同好友

作者头像
孙晨c
发布2020-09-18 10:31:25
5320
发布2020-09-18 10:31:25
举报
文章被收录于专栏:无题~

以下是博客的好友列表数据,冒号前是一个用户,冒号后是该用户的所有好友(数据中的好友关系是单向的) 求出哪些人两两之间有共同好友,及他俩的共同好友都有谁?

在这里插入图片描述
在这里插入图片描述

输出格式:

A-B:C,E (用户-用户:共同好友...)

需求分析

分为两个job 第一次输出结果,先求出A、B、C、….等是谁的好友 Job1: Mapper: keyin-valuein: (A:B,C,D,F,E,O) map(): 将valuein拆分为若干好友,作为keyout写出 将keyin作为valueout keyout-valueout: (友:用户) (c:A),(C:B),(C:E)

Reducer: keyin-valuein : (友:用户) (c:A),(C:B),(C:E) reduce(): keyout-valueout :(友:用户,用户,用户,用户)

A I,K,C,B,G,F,H,O,D, B A,F,J,E, C A,E,B,H,F,G,K, D G,C,K,A,L,F,E,H, E G,M,L,H,A,F,B,D, F L,M,D,C,G,A, G M, H O, I O,C, J O, K B, L D,E, M E,F, O A,H,I,J,F,

第二次输出结果,输出每两个人的共同好友

Job2Mapper: keyin-valuein: (友:用户,用户,用户,用户) map(): 使用keyin作为valueout 将valuein切分后,两两拼接,作为keyout keyout-valueout: (用户-用户,友) (A-B,C),(A-B,E) (A-E,C), (A-G,C), (A-F,C), (A-K,C) (B-E,C ),(B-G,C)

代码语言:javascript
复制
		--------------------
		(B-E,C)
		(E-B,G)
		
		B-E: C,G

A-B E C A-C D F A-D E F A-E D B C A-F O B C D E A-G F E C D A-H E C D O A-I O A-J O B A-K D C A-L F E D A-M E F B-C A B-D A E B-E C B-F E A C B-G C E A B-H A E C B-I A B-K C A B-L E B-M E B-O A C-D A F C-E D C-F D A C-G D F A C-H D A C-I A C-K A D C-L D F C-M F C-O I A D-E L D-F A E D-G E A F D-H A E D-I A D-K A D-L E F D-M F E D-O A E-F D M C B E-G C D E-H C D E-J B E-K C D E-L D F-G D C A E F-H A D O E C F-I O A F-J B O F-K D C A F-L E D F-M E F-O A G-H D C E A G-I A G-K D A C G-L D F E G-M E F G-O A H-I O A H-J O H-K A C D H-L D E H-M E H-O A I-J O I-K A I-O A K-L D K-O A L-M E F

Reducer: keyin-valuein : (A-B,C),(A-B,E) reduce(): keyout-valueout : (A-B:C,E)

代码实现

(1)第一次Mapper类

代码语言:javascript
复制
public class OneShareFriendsMapper extends Mapper<LongWritable, Text, Text, Text>{
	
	@Override
	protected void map(LongWritable key, Text value, Mapper<LongWritable, Text, Text, Text>.Context context)
			throws IOException, InterruptedException {

		// 1 获取一行 A:B,C,D,F,E,O
		String line = value.toString();
		
		// 2 切割,冒号前面的是用户,冒号后面的是好友
		String[] fields = line.split(":");
		
		// 3 获取person和好友
		String person = fields[0];
		String[] friends = fields[1].split(",");
		
		// 4写出
		for(String friend: friends){
			// 输出 <好友,用户>
			context.write(new Text(friend), new Text(person));
		}
		
	}
}

(2)第一次Reducer类

代码语言:javascript
复制
public class OneShareFriendsReducer extends Reducer<Text, Text, Text, Text>{
	
	@Override
	protected void reduce(Text key, Iterable<Text> values, Context context)throws IOException, InterruptedException {
		
		StringBuffer sb = new StringBuffer();

		//拼接
		for(Text person: values){
			sb.append(person).append(",");
		}
		//写出
		context.write(key, new Text(sb.toString()));
	}
	
}

(3)第一次Driver类

代码语言:javascript
复制
public class OneShareFriendsDriver {

	public static void main(String[] args) throws Exception {
		
// 1 获取job对象
		Configuration configuration = new Configuration();
		Job job = Job.getInstance(configuration);
		
		// 2 指定jar包运行的路径
		job.setJarByClass(OneShareFriendsDriver.class);

		// 3 指定map/reduce使用的类
		job.setMapperClass(OneShareFriendsMapper.class);
		job.setReducerClass(OneShareFriendsReducer.class);
		
		// 4 指定map输出的数据类型
		job.setMapOutputKeyClass(Text.class);
		job.setMapOutputValueClass(Text.class);
		
		// 5 指定最终输出的数据类型
		job.setOutputKeyClass(Text.class);
		job.setOutputValueClass(Text.class);
		
		// 6 指定job的输入原始所在目录
		FileInputFormat.setInputPaths(job, new Path(args[0]));
		FileOutputFormat.setOutputPath(job, new Path(args[1]));
		
		// 7 提交
		boolean result = job.waitForCompletion(true);
		
		System.exit(result?0:1);
	}
	
}

(4)第二次Mapper类

代码语言:javascript
复制
public class TwoShareFriendsMapper extends Mapper<LongWritable, Text, Text, Text>{
	
	@Override
	protected void map(LongWritable key, Text value, Context context)
			throws IOException, InterruptedException {

		// A I,K,C,B,G,F,H,O,D,
		// 友 人,人,人
		String line = value.toString();
		String[] friend_persons = line.split("\t");

		String friend = friend_persons[0];
		String[] persons = friend_persons[1].split(",");

		Arrays.sort(persons);

		for (int i = 0; i < persons.length - 1; i++) {
			
			for (int j = i + 1; j < persons.length; j++) {
				// 发出 <人-人,好友> ,这样,相同的“人-人”对的所有好友就会到同1个reduce中去
				context.write(new Text(persons[i] + "-" + persons[j]), new Text(friend));
			}
			
		}
		
	}
	
}

(5)第二次Reducer类

代码语言:javascript
复制
public class TwoShareFriendsReducer extends Reducer<Text, Text, Text, Text>{
	
	@Override
	protected void reduce(Text key, Iterable<Text> values, Context context)	throws IOException, InterruptedException {
		
		StringBuffer sb = new StringBuffer();

		for (Text friend : values) {
			sb.append(friend).append(" ");
		}
		
		context.write(key, new Text(sb.toString()));
	}
	
}

(6)第二次Driver类

代码语言:javascript
复制
public class TwoShareFriendsDriver {

	public static void main(String[] args) throws Exception {
		
// 1 获取job对象
		Configuration configuration = new Configuration();
		Job job = Job.getInstance(configuration);
		
		// 2 指定jar包运行的路径
		job.setJarByClass(TwoShareFriendsDriver.class);

		// 3 指定map/reduce使用的类
		job.setMapperClass(TwoShareFriendsMapper.class);
		job.setReducerClass(TwoShareFriendsReducer.class);
		
		// 4 指定map输出的数据类型
		job.setMapOutputKeyClass(Text.class);
		job.setMapOutputValueClass(Text.class);
		
		// 5 指定最终输出的数据类型
		job.setOutputKeyClass(Text.class);
		job.setOutputValueClass(Text.class);
		
		// 6 指定job的输入原始所在目录
		FileInputFormat.setInputPaths(job, new Path(args[0]));
		FileOutputFormat.setOutputPath(job, new Path(args[1]));
		
		// 7 提交
		boolean result = job.waitForCompletion(true);
		System.exit(result?0:1);
	}
}

代码实现方案二

mapper1.java

代码语言:javascript
复制
/*
 * keyin-valuein:  (A:B,C,D,F,E,O)
	map(): 将valuein拆分为若干好友,作为keyout写出
			将keyin作为valueout
	keyout-valueout: (友:用户)
					(c:A),(C:B),(C:E) 
 */
public class Example3Mapper1 extends Mapper<Text, Text, Text, Text>{
	
	private Text out_key=new Text();
	
	@Override
	protected void map(Text key, Text value, Mapper<Text, Text, Text, Text>.Context context)
			throws IOException, InterruptedException {
		
		String[] friends = value.toString().split(",");
		
		for (String friend : friends) {
			
			out_key.set(friend);
			
			context.write(out_key, key);
			
		}
		
	}
}

reducer1.java

代码语言:javascript
复制
/*
 * keyin-valuein : (友:用户)
					(c:A),(C:B),(C:E)
	reduce(): 	
	keyout-valueout  :(友:用户,用户,用户,用户)
 */
public class Example3Reducer extends Reducer<Text, Text, Text, Text>{
	
	private Text out_value=new Text();
	
	@Override
	protected void reduce(Text key, Iterable<Text> value, Reducer<Text, Text, Text, Text>.Context context)
			throws IOException, InterruptedException {
		
		StringBuffer sb = new StringBuffer();
		
		for (Text text : value) {
			
			sb.append(text.toString()+",");
		}
		
		out_value.set(sb.toString());
		
		context.write(key, out_value);
		
		
		
	}

}

mapper2.java

代码语言:javascript
复制
/*
keyin-valuein:   (友\t用户,用户,用户,用户)
	map():  使用keyin作为valueout
				将valuein切分后,两两拼接,作为keyout
	keyout-valueout: (用户-用户,友)
					(A-B,C),(A-B,E)
					  (A-E,C), (A-G,C), (A-F,C), (A-K,C)
					  (B-E,C),(B-G,C)
 */
public class Example3Mapper2 extends Mapper<Text, Text, Text, Text>{
	
	private Text out_key=new Text();
	
	@Override
	protected void map(Text key, Text value, Mapper<Text, Text, Text, Text>.Context context)
			throws IOException, InterruptedException {
		
		String[] users = value.toString().split(",");
		
		//保证数组中的用户名有序
		Arrays.sort(users);
		
		//将valuein切分后,两两拼接,作为keyout
		for (int i = 0; i < users.length-1; i++) {
			
			for (int j = i+1; j < users.length; j++) {
				
				out_key.set(users[i]+"-"+users[j]);
				
				context.write(out_key, key);
				
			}
		}
		
	}

}

reducer2.java

代码语言:javascript
复制
/*
 *keyin-valuein : (A-B,C),(A-B,E)
	reduce(): 	
	keyout-valueout  : (A-B:C,E)
 */
public class Example3Reducer2 extends Reducer<Text, Text, Text, Text>{
	
	private Text out_value=new Text();
	
	@Override
	protected void reduce(Text key, Iterable<Text> value, Reducer<Text, Text, Text, Text>.Context context)
			throws IOException, InterruptedException {
		
		StringBuffer sb = new StringBuffer();
		
		for (Text text : value) {
			
			sb.append(text.toString()+",");
		}
		
		out_value.set(sb.toString());
		
		context.write(key, out_value);
		
		
		
	}

}

driver.java

代码语言:javascript
复制
/*
 * 1. Example1Driver 提交两个Job
 * 			Job2 必须 依赖于 Job1,必须在Job1已经运行完成之后,生成结果后,才能运行!
 * 
 * 2. JobControl: 定义一组MR jobs,还可以指定其依赖关系
 * 				可以通过addJob(ControlledJob aJob)向一个JobControl中添加Job对象!
 * 
 * 3. ControlledJob: 可以指定依赖关系的Job对象
 * 			addDependingJob(ControlledJob dependingJob): 为当前Job添加依赖的Job
 * 			 public ControlledJob(Configuration conf) : 基于配置构建一个ControlledJob
 * 
 */
public class Example3Driver {
	
public static void main(String[] args) throws Exception {
		
		//定义路径
		Path inputPath=new Path("e:/mrinput/friend");
		Path outputPath=new Path("e:/mroutput/friend");
		Path finalOutputPath=new Path("e:/mroutput/finalfriend");
		
		//作为整个Job的配置
		Configuration conf1 = new Configuration();
		conf1.set("mapreduce.input.keyvaluelinerecordreader.key.value.separator", ":");
		Configuration conf2 = new Configuration();
		
		
		//保证输出目录不存在
		FileSystem fs=FileSystem.get(conf1);
		
		if (fs.exists(outputPath)) {
			
			fs.delete(outputPath, true);
			
		}
		
		if (fs.exists(finalOutputPath)) {
			
			fs.delete(finalOutputPath, true);
			
		}
		
		// ①创建Job
		Job job1 = Job.getInstance(conf1);
		Job job2 = Job.getInstance(conf2);
		
		// 设置Job名称
		job1.setJobName("index1");
		job2.setJobName("index2");
		
		// ②设置Job1
		job1.setMapperClass(Example3Mapper1.class);
		job1.setReducerClass(Example3Reducer.class);
		
		job1.setOutputKeyClass(Text.class);
		job1.setOutputValueClass(Text.class);
		
		// 设置输入目录和输出目录
		FileInputFormat.setInputPaths(job1, inputPath);
		FileOutputFormat.setOutputPath(job1, outputPath);
		
		job1.setInputFormatClass(KeyValueTextInputFormat.class);
		
		// ②设置Job2
		job2.setMapperClass(Example3Mapper2.class);
		job2.setReducerClass(Example3Reducer2.class);
				
		job2.setOutputKeyClass(Text.class);
		job2.setOutputValueClass(Text.class);
				
		// 设置输入目录和输出目录
		FileInputFormat.setInputPaths(job2, outputPath);
		FileOutputFormat.setOutputPath(job2, finalOutputPath);
		
		// 设置job2的输入格式
		job2.setInputFormatClass(KeyValueTextInputFormat.class);
		
		//--------------------------------------------------------
		//构建JobControl
		JobControl jobControl = new JobControl("friends");
		
		//创建运行的Job
		ControlledJob controlledJob1 = new ControlledJob(job1.getConfiguration());
		ControlledJob controlledJob2 = new ControlledJob(job2.getConfiguration());
		
		//指定依赖关系
		controlledJob2.addDependingJob(controlledJob1);
		
		// 向jobControl设置要运行哪些job
		jobControl.addJob(controlledJob1);
		jobControl.addJob(controlledJob2);
		
		//运行JobControl
		Thread jobControlThread = new Thread(jobControl);
		//设置此线程为守护线程
		jobControlThread.setDaemon(true);
		
		jobControlThread.start();
		
		//获取JobControl线程的运行状态
		while(true) {
			
			//判断整个jobControl是否全部运行结束
			if (jobControl.allFinished()) {
				
				System.out.println(jobControl.getSuccessfulJobList());
				
				return;
				
			}
			
		}

		
}
		

}
本文参与 腾讯云自媒体同步曝光计划,分享自作者个人站点/博客。
原始发表:2020-09-16 ,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 作者个人站点/博客 前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 需求分析
  • 代码实现
  • 代码实现方案二
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档