前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >MapReduce操作实例

MapReduce操作实例

作者头像
曼路
发布2018-10-18 15:11:43
1.5K0
发布2018-10-18 15:11:43
举报
文章被收录于专栏:浪淘沙浪淘沙

一、wordcount

(1)纯本地运行

要点:有一个combiner方法,可以在执行完map时调用,从而对数据进行先一步的处理,降低Reduce的IO压力。

代码语言:javascript
复制
MapTask.java

import java.io.IOException;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;


/**
 * 四个参数
 * KEYIN 输入数据的key 行偏移量
 * VALURIN 输入的value,每一行数据的类型
 * KEYOUT 输出的key类型
 * VALUEOUT 输出的value类型
 * 
 * 序列化
 * java的序列化:存储全类名,每一个数据的类型都会存储	效率不高
 * hadoop自己的序列化
 * Long		LongWritable
 * Integer	IntWritable
 * String	Text
 * float	FloatWritable
 * double	DoubleWritable
 * null		NullWritable
 * @author hasee
 *
 */

/**
 * map 阶段: 每一行的数据进行切分,输出数据
 * @author hasee
 *
 */
public class MapTask extends Mapper<LongWritable, Text, Text, IntWritable>{
	@Override
	protected void map(LongWritable key, Text value,Context context)
			throws IOException, InterruptedException {
		// TODO Auto-generated method stub
		
		//value 每行的数据
		String[] split = value.toString().split(" ");
		for (String word : split) {
			context.write(new Text(word), new IntWritable(1));
		}
		//super.map(key, value, context);
	}
}


ReduceTask.java

import java.io.IOException;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
/**
 * hello--->{1 1 1 1 1}
 * @author hasee
 *
 */
public class ReduceTask extends Reducer<Text, IntWritable, Text, IntWritable>{
	@Override
	protected void reduce(Text key, Iterable<IntWritable> values,Context context) throws IOException, InterruptedException {
		// TODO Auto-generated method stub
		int count = 0;
		for (IntWritable value : values) {
			count += value.get();
		}
		context.write(key, new IntWritable(count));
	}
}




Driver.java

import java.io.File;

import org.apache.commons.io.FileUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;



/**
 * 本地模式
 * 小数据测试,测试完成之后才改成集群模式进行提交
 * @author root
 *
 */
public class Driver {
	public static void main(String[] args) throws Exception {
	
		
		Configuration conf = new Configuration();
		
		//System.setProperty("HADOOP_USER_NAME", "hasee");
		
		/**
		 * fs.defaultFs的默认值file:///	本地文件系统
		 * mapreduce.framework.name默认值是local
		 */
		Job job = Job.getInstance(conf);

		
		job.setMapperClass(MapTask.class);
		job.setReducerClass(ReduceTask.class);
		job.setJarByClass(Driver.class);

		
		job.setMapOutputKeyClass(Text.class);
		job.setMapOutputValueClass(IntWritable.class);
		job.setOutputKeyClass(Text.class);
		job.setOutputValueClass(IntWritable.class);
		
		//设置combiner
		job.setCombinerClass(Mycombiner.class);
		
		//设置输入和输出目录
		FileInputFormat.setInputPaths(job, new Path("d:\\data\\word.txt"));
		FileOutputFormat.setOutputPath(job, new Path("d:\\data\\out\\word"));
		//job.wait();
		File file = new File("d:\\data\\out\\word");
		if(file.exists()){
			FileUtils.deleteDirectory(file);
		}
		//提交任务
		boolean b = job.waitForCompletion(true);
		
		System.out.println(b?0:1);
		
	}
}



Mycombiner.java

import java.io.IOException;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
/**
 * 用来对map的结果进行先一步的处理
 * 从而降低IO流的压力
 * @author hasee
 *
 */
public class Mycombiner extends Reducer<Text, IntWritable, Text, IntWritable> {

	@Override
	protected void reduce(Text arg0, Iterable<IntWritable> arg1,
			Reducer<Text, IntWritable, Text, IntWritable>.Context arg2) throws IOException, InterruptedException {
		int count = 0;
		for (IntWritable intWritable : arg1) {
			count += intWritable.get();
		}
		arg2.write(arg0, new IntWritable(count));
	}
}

(2)在eclipse上运行yarn集群。

MapTask和ReduceTask相同,只有Driver不一样。

代码语言:javascript
复制
/**
 * 从eclipse提交到集群
 * @author root
 *
 */
public class Driver {
	public static void main(String[] args) throws Exception {
		
		//声明使用哪个用户提交的
		System.setProperty("HADOOP_USER_NAME", "root");
		Configuration conf = new Configuration();
		
		//告诉它集群在哪里
		//设置hdfs  提交平台地址  resourcemanager  windows平台提交
		conf.set("fs.defaultFS", "hdfs://hadoop01:9000");
		conf.set("mapreduce.framework.name","yarn" );
		conf.set("yarn.resourcemanager.hostname", "hadoop01");
		conf.set("mapreduce.app-submission.cross-platform", "true");
		
		
		Job job = Job.getInstance(conf, "eclipseToCluster");
		//设置map和Reduce以及提交的jar
		job.setMapperClass(MapTask.class);
		job.setReducerClass(ReduceTask.class);
		//job.setJarByClass(Driver.class);
		job.setJar("C:\\Users\\hasee\\Desktop\\wc.jar");
		//设置输入输出类型
		job.setMapOutputKeyClass(Text.class);
		job.setOutputValueClass(IntWritable.class);
		job.setOutputKeyClass(Text.class);
		job.setOutputValueClass(IntWritable.class);
		
		//设置输入和输出目录
		FileInputFormat.addInputPath(job, new Path("/word.txt"));
		FileOutputFormat.setOutputPath(job, new Path("/wordcount/eclipse-out/"));
		
		//判断文件是否存在
		FileSystem fs = FileSystem.get(conf);
		if(fs.exists(new Path("/wordcount/eclipse-out/"))){
			fs.delete(new Path("/wordcount/eclipse-out/"));
		}
		
		//提交任务
		boolean b = job.waitForCompletion(true);
		
		System.out.println(b?0:1);
		
		
	}

}

二、求两个人的共同好友

分为两步:分别用两个mapReduce执行

第一步:求出某个人在哪些人的好友里边。

第二步:经过第一步后,这些人的共同好友都有他,然后对他们进行遍历,即求出任意两个人的好友

FreendMR.java

代码语言:javascript
复制
public class FriendMR {
	
	public static class MapTask extends Mapper<LongWritable, Text, Text, Text>{
		@Override
		protected void map(LongWritable key, Text value, Mapper<LongWritable, Text, Text, Text>.Context context)
				throws IOException, InterruptedException {
			// TODO Auto-generated method stub
			String[] split1 = value.toString().split(":");
			String user = split1[0];
			String[] friends = split1[1].split(",");
			for (String f : friends) {
				context.write(new Text(f), new Text(user));
			}
		}
		
	}
	
	public static class ReduceTask extends Reducer<Text, Text, Text, Text>{
		@Override
		protected void reduce(Text key, Iterable<Text> values, Reducer<Text, Text, Text, Text>.Context context)
				throws IOException, InterruptedException {
			
			StringBuilder sb = new StringBuilder();
			for (Text user : values) {
				sb.append(user.toString()).append(",");
			}
			context.write(key,new Text(sb.toString()));
		}
	}
	
	public static void main(String[] args) throws Exception {
		
		Configuration conf = new Configuration();
		
		//System.setProperty("HADOOP_USER_NAME", "hasee");
		
		/**
		 * fs.defaultFs的默认值file:///	本地文件系统
		 * mapreduce.framework.name默认值是local
		 */
		Job job = Job.getInstance(conf);

		
		job.setMapperClass(MapTask.class);
		job.setReducerClass(ReduceTask.class);
		job.setJarByClass(FriendMR.class);

		
		job.setMapOutputKeyClass(Text.class);
		job.setMapOutputValueClass(Text.class);
		job.setOutputKeyClass(Text.class);
		job.setOutputValueClass(Text.class);
		
		
		//设置输入和输出目录
		FileInputFormat.setInputPaths(job, new Path("d:\\data\\friend.txt"));
		FileOutputFormat.setOutputPath(job, new Path("d:\\data\\out\\friendOne"));
		//job.wait();
		File file = new File("d:\\data\\out\\friendOne");
		if(file.exists()){
			FileUtils.deleteDirectory(file);
		}
		//提交任务
		boolean b = job.waitForCompletion(true);
		
		System.out.println(b?0:1);
	}
}

 2.CommonFreendsTwo.java

代码语言:javascript
复制
public class ConmmonFriendsTwo {

	public static class MapTask extends Mapper<LongWritable, Text, Text, Text>{
		@Override
		protected void map(LongWritable key, Text value, Mapper<LongWritable, Text, Text, Text>.Context context)
				throws IOException, InterruptedException {
			
			
			//FileSplit fileSplit = (FileSplit)context.getInputSplit();
			//String fileName = fileSplit.getPath().getName();
			
			String[] split = value.toString().split("\t");
			String f = split[0];
			String[] users = split[1].split(",");
			Arrays.sort(users);
			//两两组合
			for (int i = 0;i<users.length -1;i++) {
				for(int j = i+1;j<users.length;j++){
					context.write(new Text(users[i]+"-"+users[j]), new Text(f));
				}
			}
		}
	}
	
	
	public static class ReduceTask extends Reducer<Text, Text, Text, Text>{
		@Override
		protected void reduce(Text userPair, Iterable<Text> friends, Reducer<Text, Text, Text, Text>.Context context)
				throws IOException, InterruptedException {
			
				StringBuilder sb = new StringBuilder();
				for (Text f : friends) {
					sb.append(f.toString()).append(",");
				}
				context.write(userPair, new Text(sb.deleteCharAt(sb.length()-1).toString()));
			
		}
	}
	
	
	public static void main(String[] args) throws Exception {
Configuration conf = new Configuration();
		
		//System.setProperty("HADOOP_USER_NAME", "hasee");
		
		/**
		 * fs.defaultFs的默认值file:///	本地文件系统
		 * mapreduce.framework.name默认值是local
		 */
		Job job = Job.getInstance(conf);

		
		job.setMapperClass(MapTask.class);
		job.setReducerClass(ReduceTask.class);
		job.setJarByClass(ConmmonFriendsTwo.class);

		
		job.setMapOutputKeyClass(Text.class);
		job.setMapOutputValueClass(Text.class);
		job.setOutputKeyClass(Text.class);
		job.setOutputValueClass(Text.class);
		
		
		//设置输入和输出目录
		FileInputFormat.setInputPaths(job, new Path("D:\\data\\out\\friendOne\\part-r-00000"));
		FileOutputFormat.setOutputPath(job, new Path("d:\\data\\out\\friendTwo"));
		//job.wait();
		File file = new File("d:\\data\\out\\friendTwo");
		if(file.exists()){
			FileUtils.deleteDirectory(file);
		}
		//提交任务
		boolean b = job.waitForCompletion(true);
		
		System.out.println(b?0:1);
	}
}

三、计算多个文件中同一字符分别在某个文件中出现的次数

思路:输入的是文件夹,利用FileSplit fileSplit = (FileSplit)context.getInputSplit();         pathname = fileSplit.getPath().getName();遍历文件夹,获得当前文件夹的名字。从而进行计算。

1.CreateIndexOne.java

首先将每个文件中的字符数统计出来   :  hello-a.txt  3

代码语言:javascript
复制
/**
 * 计算多个文件里字符出现的次数
 * 每个word在各个文件中出现的次数
 * you	a.txt	1,b.txt	1,c.txt	1
 * @author hasee
 *
 */
public class CreateIndexOne {
		
	public static class MapTask extends Mapper<LongWritable, Text, Text, IntWritable>{
		String pathname = null;
		
		@Override
		protected void setup(Mapper<LongWritable, Text, Text, IntWritable>.Context context)
				throws IOException, InterruptedException {
			// TODO Auto-generated method stub
			//获取当前文件名	计算切片
			FileSplit fileSplit = (FileSplit)context.getInputSplit();
			pathname = fileSplit.getPath().getName();
		}
		
		@Override
		protected void map(LongWritable key, Text value, Mapper<LongWritable, Text, Text, IntWritable>.Context context)
				throws IOException, InterruptedException {
			
			String[] words = value.toString().split(" ");
			for (String word : words) {
				
				context.write(new Text(word+"-"+pathname), new IntWritable(1));
			}
		}
	}
	
	
	public static class ReduceTask extends Reducer<Text, IntWritable, Text, IntWritable>{
		
		@Override
		protected void reduce(Text key, Iterable<IntWritable> values,
				Reducer<Text, IntWritable, Text, IntWritable>.Context context) throws IOException, InterruptedException {
			// TODO Auto-generated method stub
			int count = 0;
			for (IntWritable value : values) {
				count++;
			}
			context.write(key, new IntWritable(count));
		}
	}
	
	
	public static void main(String[] args) throws Exception {

		Configuration conf = new Configuration();
		
		//System.setProperty("HADOOP_USER_NAME", "hasee");
		
		/**
		 * fs.defaultFs的默认值file:///	本地文件系统
		 * mapreduce.framework.name默认值是local
		 */
		Job job = Job.getInstance(conf);

		
		job.setMapperClass(MapTask.class);
		job.setReducerClass(ReduceTask.class);
		job.setJarByClass(CreateIndexOne.class);

		
		job.setMapOutputKeyClass(Text.class);
		job.setMapOutputValueClass(IntWritable.class);
		job.setOutputKeyClass(Text.class);
		job.setOutputValueClass(IntWritable.class);
		
		
		//设置输入和输出目录
		FileInputFormat.setInputPaths(job, new Path("d:\\data\\in\\index"));
		FileOutputFormat.setOutputPath(job, new Path("d:\\data\\out\\indexOne"));
		//job.wait();
		File file = new File("d:\\data\\out\\indexOne");
		if(file.exists()){
			FileUtils.deleteDirectory(file);
		}
		//提交任务
		boolean b = job.waitForCompletion(true);
		
		System.out.println(b?0:1);
	}
}

2.CreateIndexTwo.java

合并每个文件在各个文件的次数;  hello    a.txt  3,b.txt  2

代码语言:javascript
复制
public class CreateIndexTwo {
	
	
	public static class MapTask extends Mapper<LongWritable, Text, Text, Text>{
		
		Text outKey = new Text();
		Text outValue = new Text();
		@Override
		protected void map(LongWritable key, Text value, Mapper<LongWritable, Text, Text, Text>.Context context)
				throws IOException, InterruptedException {
			
			String[] split = value.toString().split("-");
			String word = split[0];
			String nameNum = split[1];
			outKey.set(word);
			outValue.set(nameNum);
			context.write(outKey,outValue);
		}
	}
	
	
	public static class ReduceTask extends Reducer<Text, Text, Text, Text>{
		@Override
		protected void reduce(Text key, Iterable<Text> values, Reducer<Text, Text, Text, Text>.Context context)
				throws IOException, InterruptedException {
			StringBuilder builder = new StringBuilder();
			boolean flag = true;
			for (Text text : values) {
				if(flag){
					builder.append(text.toString());
					flag = false;
				}else{
					builder.append(",");
					builder.append(text.toString());
				}
				
			}
			//context.write(key, new Text(builder.deleteCharAt(builder.length()-1).toString()));
			context.write(key, new Text(builder.toString()));
			
		}
	}
	
	public static void main(String[] args) throws Exception {
		

		Configuration conf = new Configuration();
		
		//System.setProperty("HADOOP_USER_NAME", "hasee");
		
		/**
		 * fs.defaultFs的默认值file:///	本地文件系统
		 * mapreduce.framework.name默认值是local
		 */
		Job job = Job.getInstance(conf);

		
		job.setMapperClass(MapTask.class);
		job.setReducerClass(ReduceTask.class);
		job.setJarByClass(CreateIndexTwo.class);

		
		job.setMapOutputKeyClass(Text.class);
		job.setMapOutputValueClass(Text.class);
		job.setOutputKeyClass(Text.class);
		job.setOutputValueClass(Text.class);
		
		
		//设置输入和输出目录
		FileInputFormat.setInputPaths(job, new Path("d:\\data\\out\\indexOne"));
		FileOutputFormat.setOutputPath(job, new Path("d:\\data\\out\\indexTwo"));
		//job.wait();
		File file = new File("d:\\data\\out\\indexTwo");
		if(file.exists()){
			FileUtils.deleteDirectory(file);
		}
		//提交任务
		boolean b = job.waitForCompletion(true);
		
		System.out.println(b?0:1);
	}
}

四、统计电影文件合计

1.求每部电影最高的前20个评分

代码语言:javascript
复制
/**
 * 求每部电影的评分前20位
 * @author hasee
 *
 */

public class TopN1 {

	public static class MapTask extends Mapper<LongWritable, Text, Text, MovieBean>{
		
		@Override
		public  void map(LongWritable key, Text value, Mapper<LongWritable, Text, Text, MovieBean>.Context context)
				throws IOException, InterruptedException {
			try {
                //读取json格式
				ObjectMapper objectMapper = new ObjectMapper();
				MovieBean bean = objectMapper.readValue(value.toString(), MovieBean.class);
				String movie = bean.getMovie();
				context.write(new Text(movie), bean);
				
			} catch (Exception e) {
				// TODO: handle exception
			}
			
		}
	}
	
	public static class ReduceTask extends Reducer<Text, MovieBean, MovieBean, NullWritable>{
		@Override
		protected void reduce(Text movieId, Iterable<MovieBean> movieBeans,
				Reducer<Text, MovieBean, MovieBean, NullWritable>.Context context)
				throws IOException, InterruptedException {
			ArrayList<MovieBean> list = new ArrayList<>();
			for (MovieBean movieBean : movieBeans) {
				/**
				 * 防止出现都是同一个东西
				 */
				MovieBean bean = new MovieBean();
				bean.set(movieBean);
				list.add(bean);
			}
			//排序
			Collections.sort(list,new Comparator<MovieBean>() {

				@Override
				public int compare(MovieBean o1, MovieBean o2) {
					// TODO Auto-generated method stub
					return o2.getRate()-o1.getRate();	//降序
				}
			});
			
			for(int i = 0;i < Math.min(20, list.size());i++){
				context.write(list.get(i), NullWritable.get());
			}
			context.write(new MovieBean(), NullWritable.get());
		}
	}
	
	public static void main(String[] args) throws Exception {
		
	}
	
	
}

2.使用小根堆进行排序

代码语言:javascript
复制
public static class ReduceTask extends Reducer<Text, MovieBean, MovieBean, NullWritable>{
		@Override
		protected void reduce(Text movieId, Iterable<MovieBean> movieBeans,
				Reducer<Text, MovieBean, MovieBean, NullWritable>.Context context)
				throws IOException, InterruptedException {
			//使用小根堆   很重要
			TreeSet<MovieBean> tree = new TreeSet<>(new Comparator<MovieBean>() {

				@Override
				public int compare(MovieBean o1, MovieBean o2) {
					if(o1.getRate() - o2.getRate() ==0){
						return o1.getUid().compareTo(o2.getUid());
					}else{
						return  o1.getRate() - o2.getRate();
					}
				
				}
			});
			for (MovieBean movieBean : movieBeans) {
				MovieBean movieBean2 = new MovieBean();
				movieBean2.set(movieBean);
				if(tree.size()<20){
					tree.add(movieBean2);
				}else{
					MovieBean first = tree.first();
					if(first.getRate()<movieBean2.getRate()){
						//做替换
						tree.remove(first);
						tree.add(movieBean2);
					}
					
				}
			}
			
			for (MovieBean movieBean : tree) {
				context.write(movieBean, NullWritable.get());
			}
		
		}
	}

3.重写Bean实现WritableComparable进行排序

包含  "分组"  和 "分区"

MovieBean.java

代码语言:javascript
复制
package day07.movie.top3;

import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;

import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableComparable;
/**
 * Writable hadoop的序列化接口
 * @author hasee
 *
 */
public class MovieBean implements WritableComparable<MovieBean>{
	
	private String movie;
	private int rate;
	private String timeStamp;
	private String uid;
	
	@Override
	public int compareTo(MovieBean o) {
		// TODO Auto-generated method stub
		if(o.getMovie().compareTo(this.getMovie())==0){
			return o.getRate()-this.getRate();
		}else{	
			return o.getMovie().compareTo(this.getMovie());
		}
		
		
	}
	
	@Override
	public void readFields(DataInput arg0) throws IOException {
		// TODO Auto-generated method stub
		movie = arg0.readUTF();
		rate = arg0.readInt();
		timeStamp = arg0.readUTF();
		uid = arg0.readUTF();
	}
	@Override
	public void write(DataOutput arg0) throws IOException {
		// TODO Auto-generated method stub
		arg0.writeUTF(movie);
		arg0.writeInt(rate);
		arg0.writeUTF(timeStamp);
		arg0.writeUTF(uid);
		
	}
	
	@Override
	public String toString() {
		return "MovieBean [movie=" + movie + ", rate=" + rate + ", timeStamp=" + timeStamp + ", uid=" + uid + "]";
	}
	public String getMovie() {
		return movie;
	}
	public void setMovie(String movie) {
		this.movie = movie;
	}
	public int getRate() {
		return rate;
	}
	public void setRate(int rate) {
		this.rate = rate;
	}
	public String getTimeStamp() {
		return timeStamp;
	}
	public void setTimeStamp(String timeStamp) {
		this.timeStamp = timeStamp;
	}
	public String getUid() {
		return uid;
	}
	public void setUid(String uid) {
		this.uid = uid;
	}
	public void set(MovieBean movieBean) {
		// TODO Auto-generated method stub
		this.movie = movieBean.getMovie();
		this.rate = movieBean.getRate();
		this.timeStamp = movieBean.getTimeStamp();
		this.uid = movieBean.getUid();
	}
	public void set(String movie,int rate,String timeStamp,String uid){
		this.movie = movie;
		this.rate = rate;
		this.timeStamp = timeStamp;
		this.uid = uid;
	}
	
	
	
}

 MyGroup.java

代码语言:javascript
复制
/**
 * 分组
 * 将movieid相同的数据分到一个组
 * @author hasee
 *
 */
public class MyGroup extends WritableComparator{

	
	//构造器,初始化
	public MyGroup() {
		super(MovieBean.class,true);
	}
	
	@Override
	public int compare(WritableComparable a, WritableComparable b) {
		MovieBean bean1 = (MovieBean)a;
		MovieBean bean2 = (MovieBean)b;
		return bean1.getMovie().compareTo(bean2.getMovie());
	}
}

Mypartition.java

代码语言:javascript
复制
/**
 * 进行分区,将想要的数据分发到相同的reduce中
 * @author hasee
 *
 */
public class MyPartition extends Partitioner<MovieBean, NullWritable>{

	/**
	 * numPartitions代表有多少个reduceTask
	 * key 就是map端输出的key
	 * value map端输出的value
	 */
	@Override
	public int getPartition(MovieBean key, NullWritable value, int numPartitions) {
		
		return (key.getMovie().hashCode() & Integer.MAX_VALUE)%numPartitions;
	}
	
	
}

Top.java

代码语言:javascript
复制
public class TopN3 {
	public static class MapTask extends Mapper<LongWritable, Text, MovieBean,NullWritable >{
		
		@Override
		protected void map(LongWritable key, Text value,
				Mapper<LongWritable, Text, MovieBean, NullWritable>.Context context)
				throws IOException, InterruptedException {
			try {
				//解析json
				ObjectMapper objectMapper = new ObjectMapper();
				MovieBean bean = objectMapper.readValue(value.toString(), MovieBean.class);
				context.write(bean, NullWritable.get());
			} catch (Exception e) {
				// TODO: handle exception
			}
				
		}
	}
	
	
	public static class ReduceTask extends Reducer<MovieBean, NullWritable, MovieBean, NullWritable>{
		
		@Override
		protected void reduce(MovieBean key, Iterable<NullWritable> arg1,
				Reducer<MovieBean, NullWritable, MovieBean, NullWritable>.Context context)
				throws IOException, InterruptedException {
			int num = 0;
			//虽然是空的,但是key能根据迭代进行相应的得到空值的结果
			for (NullWritable nullWritable : arg1) {
				if(num>=20){
					break;
				}else{
					context.write(key, NullWritable.get());
					num++;
				}
			}
		}
		
	}
	public static void main(String[] args) throws Exception {
		Configuration conf = new Configuration();
		Job job = Job.getInstance(conf);
		
		job.setMapperClass(MapTask.class);
		job.setReducerClass(ReduceTask.class);
		job.setJarByClass(TopN3.class);
		/**
		 * 设置reduce的设备数量
		 */
		job.setNumReduceTasks(2);
		/**
		 * 将相同的数据交给同一个reduce处理
		 */
		job.setPartitionerClass(MyPartition.class);
		/**
		 * 将movieid相同的数据分到一个组,这样就可以进行数量控制了。
		 * 否则,每一条数据都视为一个单独的组
		 */
		job.setGroupingComparatorClass(MyGroup.class);
		
		job.setMapOutputKeyClass(MovieBean.class);
		job.setMapOutputValueClass(NullWritable.class);
		
		job.setOutputKeyClass(MovieBean.class);
		job.setOutputValueClass(NullWritable.class);
		
		FileInputFormat.setInputPaths(job, new Path("D:\\data\\rating.json"));
		FileOutputFormat.setOutputPath(job, new Path("d:\\data\\out\\movie\\top3"));
		File file = new File("d:\\data\\out\\movie\\top3");
		if(file.exists()){
			FileUtils.deleteDirectory(file);
		}
		
		boolean b = job.waitForCompletion(true);
		System.out.println(b?"恭喜你答对了!":"不要放弃,希望就在明天");
	}
}

五、合并两张表的数据

1.使用map-reduce

需要写将两个表需要的数据合成一个JoinBean   需要  implements Writable

代码语言:javascript
复制
/**
 * 合并两张表
 * 读取文件夹下多个文件
 * @author hasee
 *
 */
public class JoinMR {

	public static class MapTask extends Mapper<LongWritable, Text, Text, JoinBean>{
		String table = null;
		@Override
		protected void setup(Mapper<LongWritable, Text, Text, JoinBean>.Context context)
				throws IOException, InterruptedException {
			// TODO Auto-generated method stub
			FileSplit fileSplit = (FileSplit)context.getInputSplit();
			table = fileSplit.getPath().getName();
		}
		@Override
		protected void map(LongWritable key, Text value, Mapper<LongWritable, Text, Text, JoinBean>.Context context)
				throws IOException, InterruptedException {
			String[] split = value.toString().split("::");
			JoinBean joinBean = new JoinBean();
			if(table.startsWith("users")){
				joinBean.set(split[0], split[2], split[1], "null", "null", "user");
			}else if(table.startsWith("ratings")){
				joinBean.set(split[0], "null", "null", split[1], split[2], "rating");
			}
			context.write(new Text(joinBean.getUid()), joinBean);
		}
		
	}
	
	
	public static class ReduceTask extends Reducer<Text, JoinBean, JoinBean, NullWritable>{
		@Override
		protected void reduce(Text key, Iterable<JoinBean> values,
				Reducer<Text, JoinBean, JoinBean, NullWritable>.Context context) throws IOException, InterruptedException {
			//放user的
			JoinBean joinBean = new JoinBean();
			ArrayList<JoinBean> list = new ArrayList<>();
			//分离数据
			for (JoinBean joinBean2 : values) {
				String table = joinBean2.getTable();
				if("user".equals(table)){
					joinBean.set(joinBean2.getUid(), joinBean2.getAge(), joinBean2.getGender(), joinBean2.getMovieId(), joinBean2.getRating(), joinBean2.getTable());
				}else{
					JoinBean joinBean3 = new JoinBean();
					joinBean3.set(joinBean2.getUid(), joinBean2.getAge(), joinBean2.getGender(), joinBean2.getMovieId(), joinBean2.getRating(), joinBean2.getTable());
					list.add(joinBean3);
				}
			}
			
			//拼接数据
			for (JoinBean joinBean2 : list) {
				joinBean2.setAge(joinBean.getAge());
				joinBean2.setGender(joinBean.getGender());
				context.write(joinBean2, NullWritable.get());
			}
			
		}
	}
	
	
	public static void main(String[] args) throws Exception {
		Configuration conf = new Configuration();
		Job job = Job.getInstance(conf);
		
		job.setMapperClass(MapTask.class);
		job.setReducerClass(ReduceTask.class);
		job.setJarByClass(JoinMR.class);
		
		job.setMapOutputKeyClass(Text.class);
		job.setMapOutputValueClass(JoinBean.class);
		
		job.setOutputKeyClass(JoinBean.class);
		job.setOutputValueClass(NullWritable.class);
		
		FileInputFormat.setInputPaths(job, new Path("D:\\data\\in\\movie"));
		FileOutputFormat.setOutputPath(job, new Path("d:\\data\\out\\movie\\top1"));
		File file = new File("d:\\data\\out\\movie\\top1");
		if(file.exists()){
			FileUtils.deleteDirectory(file);
		}
		
		boolean b = job.waitForCompletion(true);
		System.out.println(b?"恭喜你答对了!":"不要放弃,希望就在明天");
	}
}

2.只使用map来处理两张表的合并

setup中读取小表,并保存到map中

map 中读取大表,并熊map中获取数据,进行合并。

代码语言:javascript
复制
/**
 * 合并两张表
 * 读取文件夹下多个文件
 * setup的时候读小表,使用hdfs的api进行读取
 * map端读大表,进行合并
 * 传参
 * @author hasee
 *
 */
public class JoinMR {

	public static class MapTask extends Mapper<LongWritable, Text, JoinBean, NullWritable>{
		String table = null;
		//存放小表数据
		Map<String,String> map = new HashMap<>();
		@Override
		protected void setup(Context context)
				throws IOException, InterruptedException {
			// TODO Auto-generated method stub
			Configuration conf = context.getConfiguration();
			//String fileName = conf.get("fileName");
			FileSystem fs = FileSystem.get(conf);
			FSDataInputStream inputStream = fs.open(new Path("/users.dat"));
			BufferedReader br = new BufferedReader(new InputStreamReader(inputStream));
			String line = null;
			while((line = br.readLine())!=null){
				String[] split = line.split("::");
				map.put(split[0],line);
			}
		}
		@Override
		protected void map(LongWritable key, Text value, Context context)
				throws IOException, InterruptedException {
			String[] split = value.toString().split("::");
			JoinBean joinBean = new JoinBean();
			String[] line = map.get(split[0]).split("::");
			//1::F::1::10::48067
			joinBean.set(split[0], line[2], line[1], split[1], split[2], "null");
			context.write(joinBean, NullWritable.get());
		}
		
	}
	
	
	
	
	
	public static void main(String[] args) throws Exception {
		//什么都没有配置 怎么知道我的yarn集群在哪里?怎么知道hdfs集群?
				//通过加装配置文件
				Configuration conf = new Configuration();
				
				conf.set("fs.defaultFS", "hdfs://hadoop01:9000");
				System.setProperty("HADOOP_USER_NAME", "root");	
				
				Job job = Job.getInstance(conf);
				
				job.setMapperClass(MapTask.class);
				
				job.setJarByClass(JoinMR.class);
				//job.setJar(jar);
				//设置输出类型
				
				job.setOutputKeyClass(JoinBean.class);
				job.setOutputValueClass(NullWritable.class);
				
				//设置输入和输出目录
				FileInputFormat.addInputPath(job, new Path("/ratings.dat"));
				FileOutputFormat.setOutputPath(job, new Path("/out/join"));
				//查看目录是否存在
				FileSystem fs = FileSystem.get(conf);
				if(fs.exists(new Path("/out/join"))){
					fs.delete(new Path("/out/join"), true);
				}
				//提交之后会监控运行状态
				boolean b = job.waitForCompletion(true);
				System.out.println(b?"程序执行完毕!!!":"程序出bug了!!!!");
		
	}
}

六、求平均分最高的前20个电影

因为要求得是前20部电影,与前面求得每部电影的前20 个评分不同。

这一次运用Redece类里边的三个方法。setup();reduce();cleanup()

代码语言:javascript
复制
public class AvgTop {
	
	public static class MapTask extends Mapper<LongWritable, Text, Text,MovieBean>{
		@Override
		protected void map(LongWritable key, Text value, Mapper<LongWritable, Text, Text, MovieBean>.Context context)
				throws IOException, InterruptedException {
			ObjectMapper objectMapper = new ObjectMapper();
			MovieBean bean = objectMapper.readValue(value.toString(), MovieBean.class);
			context.write(new Text(bean.getMovie()), bean);
		}
	}
	
	public static class RedeceTask extends Reducer<Text, MovieBean, MovieBean, NullWritable>{
		TreeSet<MovieBean> set = null;
		
		@Override
		protected void setup(Context context)
				throws IOException, InterruptedException {
				set =  new TreeSet<>(new Comparator<MovieBean>() {			
				@Override
				public int compare(MovieBean o1, MovieBean o2) {
					// TODO Auto-generated method stub
					if(o1.getAvg()-o2.getAvg()==0){
						return o1.getMovie().compareTo(o2.getMovie());
					}else{
						return (int)(o1.getAvg()-o2.getAvg());
					}
				}
			});
		}
		
		
		
		@Override
		protected void reduce(Text key, Iterable<MovieBean> value,
				Reducer<Text, MovieBean, MovieBean, NullWritable>.Context context)
				throws IOException, InterruptedException {
			MovieBean bean = new MovieBean();
			int count = 0;
			int sum = 0;
			for (MovieBean movieBean : value) {	
				bean.set(movieBean);
					count++;
					sum += movieBean.getRate();
			}
			float avg = sum*1.0f/count;
			avg = avg*1000;
			bean.setAvg(avg);
			if(set.size()<20){
				set.add(bean);
			}else{
				MovieBean bean2 = set.first();
				if(bean2.getAvg()<bean.getAvg()){
					set.remove(bean2);
					set.add(bean);
				}
			}
		}
		
		@Override
		protected void cleanup(Reducer<Text, MovieBean, MovieBean, NullWritable>.Context context)
				throws IOException, InterruptedException {
			// TODO Auto-generated method stub
			for (MovieBean movieBean : set) {
				context.write(movieBean, NullWritable.get());
			}
		}
		
			
	}
	
	public static void main(String[] args) throws Exception {
		Configuration conf = new Configuration();
		Job job = Job.getInstance(conf);
		
		job.setMapperClass(MapTask.class);
		job.setReducerClass(RedeceTask.class);
		job.setJarByClass(AvgTop.class);
		
		job.setMapOutputKeyClass(Text.class);
		job.setMapOutputValueClass(MovieBean.class);
		
		job.setOutputKeyClass(MovieBean.class);
		job.setOutputValueClass(NullWritable.class);
		
		FileInputFormat.setInputPaths(job, new Path("D:\\data\\rating.json"));
		FileOutputFormat.setOutputPath(job, new Path("d:\\data\\out\\movie\\top5"));
		File file = new File("d:\\data\\out\\movie\\top5");
		if(file.exists()){
			FileUtils.deleteDirectory(file);
		}
		
		boolean b = job.waitForCompletion(true);
		System.out.println(b?"恭喜你答对了!":"不要放弃,希望就在明天");
	}
}
本文参与 腾讯云自媒体分享计划,分享自作者个人站点/博客。
原始发表:2018年08月27日,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 作者个人站点/博客 前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体分享计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
相关产品与服务
对象存储
对象存储(Cloud Object Storage,COS)是由腾讯云推出的无目录层次结构、无数据格式限制,可容纳海量数据且支持 HTTP/HTTPS 协议访问的分布式存储服务。腾讯云 COS 的存储桶空间无容量上限,无需分区管理,适用于 CDN 数据分发、数据万象处理或大数据计算与分析的数据湖等多种场景。
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档