一、wordcount
(1)纯本地运行
要点:有一个combiner方法,可以在执行完map时调用,从而对数据进行先一步的处理,降低Reduce的IO压力。
MapTask.java
import java.io.IOException;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
/**
* 四个参数
* KEYIN 输入数据的key 行偏移量
* VALURIN 输入的value,每一行数据的类型
* KEYOUT 输出的key类型
* VALUEOUT 输出的value类型
*
* 序列化
* java的序列化:存储全类名,每一个数据的类型都会存储 效率不高
* hadoop自己的序列化
* Long LongWritable
* Integer IntWritable
* String Text
* float FloatWritable
* double DoubleWritable
* null NullWritable
* @author hasee
*
*/
/**
* map 阶段: 每一行的数据进行切分,输出数据
* @author hasee
*
*/
public class MapTask extends Mapper<LongWritable, Text, Text, IntWritable>{
@Override
protected void map(LongWritable key, Text value,Context context)
throws IOException, InterruptedException {
// TODO Auto-generated method stub
//value 每行的数据
String[] split = value.toString().split(" ");
for (String word : split) {
context.write(new Text(word), new IntWritable(1));
}
//super.map(key, value, context);
}
}
ReduceTask.java
import java.io.IOException;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
/**
* hello--->{1 1 1 1 1}
* @author hasee
*
*/
public class ReduceTask extends Reducer<Text, IntWritable, Text, IntWritable>{
@Override
protected void reduce(Text key, Iterable<IntWritable> values,Context context) throws IOException, InterruptedException {
// TODO Auto-generated method stub
int count = 0;
for (IntWritable value : values) {
count += value.get();
}
context.write(key, new IntWritable(count));
}
}
Driver.java
import java.io.File;
import org.apache.commons.io.FileUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
/**
* 本地模式
* 小数据测试,测试完成之后才改成集群模式进行提交
* @author root
*
*/
public class Driver {
public static void main(String[] args) throws Exception {
Configuration conf = new Configuration();
//System.setProperty("HADOOP_USER_NAME", "hasee");
/**
* fs.defaultFs的默认值file:/// 本地文件系统
* mapreduce.framework.name默认值是local
*/
Job job = Job.getInstance(conf);
job.setMapperClass(MapTask.class);
job.setReducerClass(ReduceTask.class);
job.setJarByClass(Driver.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(IntWritable.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
//设置combiner
job.setCombinerClass(Mycombiner.class);
//设置输入和输出目录
FileInputFormat.setInputPaths(job, new Path("d:\\data\\word.txt"));
FileOutputFormat.setOutputPath(job, new Path("d:\\data\\out\\word"));
//job.wait();
File file = new File("d:\\data\\out\\word");
if(file.exists()){
FileUtils.deleteDirectory(file);
}
//提交任务
boolean b = job.waitForCompletion(true);
System.out.println(b?0:1);
}
}
Mycombiner.java
import java.io.IOException;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
/**
* 用来对map的结果进行先一步的处理
* 从而降低IO流的压力
* @author hasee
*
*/
public class Mycombiner extends Reducer<Text, IntWritable, Text, IntWritable> {
@Override
protected void reduce(Text arg0, Iterable<IntWritable> arg1,
Reducer<Text, IntWritable, Text, IntWritable>.Context arg2) throws IOException, InterruptedException {
int count = 0;
for (IntWritable intWritable : arg1) {
count += intWritable.get();
}
arg2.write(arg0, new IntWritable(count));
}
}
(2)在eclipse上运行yarn集群。
MapTask和ReduceTask相同,只有Driver不一样。
/**
* 从eclipse提交到集群
* @author root
*
*/
public class Driver {
public static void main(String[] args) throws Exception {
//声明使用哪个用户提交的
System.setProperty("HADOOP_USER_NAME", "root");
Configuration conf = new Configuration();
//告诉它集群在哪里
//设置hdfs 提交平台地址 resourcemanager windows平台提交
conf.set("fs.defaultFS", "hdfs://hadoop01:9000");
conf.set("mapreduce.framework.name","yarn" );
conf.set("yarn.resourcemanager.hostname", "hadoop01");
conf.set("mapreduce.app-submission.cross-platform", "true");
Job job = Job.getInstance(conf, "eclipseToCluster");
//设置map和Reduce以及提交的jar
job.setMapperClass(MapTask.class);
job.setReducerClass(ReduceTask.class);
//job.setJarByClass(Driver.class);
job.setJar("C:\\Users\\hasee\\Desktop\\wc.jar");
//设置输入输出类型
job.setMapOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
//设置输入和输出目录
FileInputFormat.addInputPath(job, new Path("/word.txt"));
FileOutputFormat.setOutputPath(job, new Path("/wordcount/eclipse-out/"));
//判断文件是否存在
FileSystem fs = FileSystem.get(conf);
if(fs.exists(new Path("/wordcount/eclipse-out/"))){
fs.delete(new Path("/wordcount/eclipse-out/"));
}
//提交任务
boolean b = job.waitForCompletion(true);
System.out.println(b?0:1);
}
}
二、求两个人的共同好友
分为两步:分别用两个mapReduce执行
第一步:求出某个人在哪些人的好友里边。
第二步:经过第一步后,这些人的共同好友都有他,然后对他们进行遍历,即求出任意两个人的好友
FreendMR.java
public class FriendMR {
public static class MapTask extends Mapper<LongWritable, Text, Text, Text>{
@Override
protected void map(LongWritable key, Text value, Mapper<LongWritable, Text, Text, Text>.Context context)
throws IOException, InterruptedException {
// TODO Auto-generated method stub
String[] split1 = value.toString().split(":");
String user = split1[0];
String[] friends = split1[1].split(",");
for (String f : friends) {
context.write(new Text(f), new Text(user));
}
}
}
public static class ReduceTask extends Reducer<Text, Text, Text, Text>{
@Override
protected void reduce(Text key, Iterable<Text> values, Reducer<Text, Text, Text, Text>.Context context)
throws IOException, InterruptedException {
StringBuilder sb = new StringBuilder();
for (Text user : values) {
sb.append(user.toString()).append(",");
}
context.write(key,new Text(sb.toString()));
}
}
public static void main(String[] args) throws Exception {
Configuration conf = new Configuration();
//System.setProperty("HADOOP_USER_NAME", "hasee");
/**
* fs.defaultFs的默认值file:/// 本地文件系统
* mapreduce.framework.name默认值是local
*/
Job job = Job.getInstance(conf);
job.setMapperClass(MapTask.class);
job.setReducerClass(ReduceTask.class);
job.setJarByClass(FriendMR.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(Text.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(Text.class);
//设置输入和输出目录
FileInputFormat.setInputPaths(job, new Path("d:\\data\\friend.txt"));
FileOutputFormat.setOutputPath(job, new Path("d:\\data\\out\\friendOne"));
//job.wait();
File file = new File("d:\\data\\out\\friendOne");
if(file.exists()){
FileUtils.deleteDirectory(file);
}
//提交任务
boolean b = job.waitForCompletion(true);
System.out.println(b?0:1);
}
}
2.CommonFreendsTwo.java
public class ConmmonFriendsTwo {
public static class MapTask extends Mapper<LongWritable, Text, Text, Text>{
@Override
protected void map(LongWritable key, Text value, Mapper<LongWritable, Text, Text, Text>.Context context)
throws IOException, InterruptedException {
//FileSplit fileSplit = (FileSplit)context.getInputSplit();
//String fileName = fileSplit.getPath().getName();
String[] split = value.toString().split("\t");
String f = split[0];
String[] users = split[1].split(",");
Arrays.sort(users);
//两两组合
for (int i = 0;i<users.length -1;i++) {
for(int j = i+1;j<users.length;j++){
context.write(new Text(users[i]+"-"+users[j]), new Text(f));
}
}
}
}
public static class ReduceTask extends Reducer<Text, Text, Text, Text>{
@Override
protected void reduce(Text userPair, Iterable<Text> friends, Reducer<Text, Text, Text, Text>.Context context)
throws IOException, InterruptedException {
StringBuilder sb = new StringBuilder();
for (Text f : friends) {
sb.append(f.toString()).append(",");
}
context.write(userPair, new Text(sb.deleteCharAt(sb.length()-1).toString()));
}
}
public static void main(String[] args) throws Exception {
Configuration conf = new Configuration();
//System.setProperty("HADOOP_USER_NAME", "hasee");
/**
* fs.defaultFs的默认值file:/// 本地文件系统
* mapreduce.framework.name默认值是local
*/
Job job = Job.getInstance(conf);
job.setMapperClass(MapTask.class);
job.setReducerClass(ReduceTask.class);
job.setJarByClass(ConmmonFriendsTwo.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(Text.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(Text.class);
//设置输入和输出目录
FileInputFormat.setInputPaths(job, new Path("D:\\data\\out\\friendOne\\part-r-00000"));
FileOutputFormat.setOutputPath(job, new Path("d:\\data\\out\\friendTwo"));
//job.wait();
File file = new File("d:\\data\\out\\friendTwo");
if(file.exists()){
FileUtils.deleteDirectory(file);
}
//提交任务
boolean b = job.waitForCompletion(true);
System.out.println(b?0:1);
}
}
三、计算多个文件中同一字符分别在某个文件中出现的次数
思路:输入的是文件夹,利用FileSplit fileSplit = (FileSplit)context.getInputSplit(); pathname = fileSplit.getPath().getName();遍历文件夹,获得当前文件夹的名字。从而进行计算。
1.CreateIndexOne.java
首先将每个文件中的字符数统计出来 : hello-a.txt 3
/**
* 计算多个文件里字符出现的次数
* 每个word在各个文件中出现的次数
* you a.txt 1,b.txt 1,c.txt 1
* @author hasee
*
*/
public class CreateIndexOne {
public static class MapTask extends Mapper<LongWritable, Text, Text, IntWritable>{
String pathname = null;
@Override
protected void setup(Mapper<LongWritable, Text, Text, IntWritable>.Context context)
throws IOException, InterruptedException {
// TODO Auto-generated method stub
//获取当前文件名 计算切片
FileSplit fileSplit = (FileSplit)context.getInputSplit();
pathname = fileSplit.getPath().getName();
}
@Override
protected void map(LongWritable key, Text value, Mapper<LongWritable, Text, Text, IntWritable>.Context context)
throws IOException, InterruptedException {
String[] words = value.toString().split(" ");
for (String word : words) {
context.write(new Text(word+"-"+pathname), new IntWritable(1));
}
}
}
public static class ReduceTask extends Reducer<Text, IntWritable, Text, IntWritable>{
@Override
protected void reduce(Text key, Iterable<IntWritable> values,
Reducer<Text, IntWritable, Text, IntWritable>.Context context) throws IOException, InterruptedException {
// TODO Auto-generated method stub
int count = 0;
for (IntWritable value : values) {
count++;
}
context.write(key, new IntWritable(count));
}
}
public static void main(String[] args) throws Exception {
Configuration conf = new Configuration();
//System.setProperty("HADOOP_USER_NAME", "hasee");
/**
* fs.defaultFs的默认值file:/// 本地文件系统
* mapreduce.framework.name默认值是local
*/
Job job = Job.getInstance(conf);
job.setMapperClass(MapTask.class);
job.setReducerClass(ReduceTask.class);
job.setJarByClass(CreateIndexOne.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(IntWritable.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
//设置输入和输出目录
FileInputFormat.setInputPaths(job, new Path("d:\\data\\in\\index"));
FileOutputFormat.setOutputPath(job, new Path("d:\\data\\out\\indexOne"));
//job.wait();
File file = new File("d:\\data\\out\\indexOne");
if(file.exists()){
FileUtils.deleteDirectory(file);
}
//提交任务
boolean b = job.waitForCompletion(true);
System.out.println(b?0:1);
}
}
2.CreateIndexTwo.java
合并每个文件在各个文件的次数; hello a.txt 3,b.txt 2
public class CreateIndexTwo {
public static class MapTask extends Mapper<LongWritable, Text, Text, Text>{
Text outKey = new Text();
Text outValue = new Text();
@Override
protected void map(LongWritable key, Text value, Mapper<LongWritable, Text, Text, Text>.Context context)
throws IOException, InterruptedException {
String[] split = value.toString().split("-");
String word = split[0];
String nameNum = split[1];
outKey.set(word);
outValue.set(nameNum);
context.write(outKey,outValue);
}
}
public static class ReduceTask extends Reducer<Text, Text, Text, Text>{
@Override
protected void reduce(Text key, Iterable<Text> values, Reducer<Text, Text, Text, Text>.Context context)
throws IOException, InterruptedException {
StringBuilder builder = new StringBuilder();
boolean flag = true;
for (Text text : values) {
if(flag){
builder.append(text.toString());
flag = false;
}else{
builder.append(",");
builder.append(text.toString());
}
}
//context.write(key, new Text(builder.deleteCharAt(builder.length()-1).toString()));
context.write(key, new Text(builder.toString()));
}
}
public static void main(String[] args) throws Exception {
Configuration conf = new Configuration();
//System.setProperty("HADOOP_USER_NAME", "hasee");
/**
* fs.defaultFs的默认值file:/// 本地文件系统
* mapreduce.framework.name默认值是local
*/
Job job = Job.getInstance(conf);
job.setMapperClass(MapTask.class);
job.setReducerClass(ReduceTask.class);
job.setJarByClass(CreateIndexTwo.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(Text.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(Text.class);
//设置输入和输出目录
FileInputFormat.setInputPaths(job, new Path("d:\\data\\out\\indexOne"));
FileOutputFormat.setOutputPath(job, new Path("d:\\data\\out\\indexTwo"));
//job.wait();
File file = new File("d:\\data\\out\\indexTwo");
if(file.exists()){
FileUtils.deleteDirectory(file);
}
//提交任务
boolean b = job.waitForCompletion(true);
System.out.println(b?0:1);
}
}
四、统计电影文件合计
1.求每部电影最高的前20个评分
/**
* 求每部电影的评分前20位
* @author hasee
*
*/
public class TopN1 {
public static class MapTask extends Mapper<LongWritable, Text, Text, MovieBean>{
@Override
public void map(LongWritable key, Text value, Mapper<LongWritable, Text, Text, MovieBean>.Context context)
throws IOException, InterruptedException {
try {
//读取json格式
ObjectMapper objectMapper = new ObjectMapper();
MovieBean bean = objectMapper.readValue(value.toString(), MovieBean.class);
String movie = bean.getMovie();
context.write(new Text(movie), bean);
} catch (Exception e) {
// TODO: handle exception
}
}
}
public static class ReduceTask extends Reducer<Text, MovieBean, MovieBean, NullWritable>{
@Override
protected void reduce(Text movieId, Iterable<MovieBean> movieBeans,
Reducer<Text, MovieBean, MovieBean, NullWritable>.Context context)
throws IOException, InterruptedException {
ArrayList<MovieBean> list = new ArrayList<>();
for (MovieBean movieBean : movieBeans) {
/**
* 防止出现都是同一个东西
*/
MovieBean bean = new MovieBean();
bean.set(movieBean);
list.add(bean);
}
//排序
Collections.sort(list,new Comparator<MovieBean>() {
@Override
public int compare(MovieBean o1, MovieBean o2) {
// TODO Auto-generated method stub
return o2.getRate()-o1.getRate(); //降序
}
});
for(int i = 0;i < Math.min(20, list.size());i++){
context.write(list.get(i), NullWritable.get());
}
context.write(new MovieBean(), NullWritable.get());
}
}
public static void main(String[] args) throws Exception {
}
}
2.使用小根堆进行排序
public static class ReduceTask extends Reducer<Text, MovieBean, MovieBean, NullWritable>{
@Override
protected void reduce(Text movieId, Iterable<MovieBean> movieBeans,
Reducer<Text, MovieBean, MovieBean, NullWritable>.Context context)
throws IOException, InterruptedException {
//使用小根堆 很重要
TreeSet<MovieBean> tree = new TreeSet<>(new Comparator<MovieBean>() {
@Override
public int compare(MovieBean o1, MovieBean o2) {
if(o1.getRate() - o2.getRate() ==0){
return o1.getUid().compareTo(o2.getUid());
}else{
return o1.getRate() - o2.getRate();
}
}
});
for (MovieBean movieBean : movieBeans) {
MovieBean movieBean2 = new MovieBean();
movieBean2.set(movieBean);
if(tree.size()<20){
tree.add(movieBean2);
}else{
MovieBean first = tree.first();
if(first.getRate()<movieBean2.getRate()){
//做替换
tree.remove(first);
tree.add(movieBean2);
}
}
}
for (MovieBean movieBean : tree) {
context.write(movieBean, NullWritable.get());
}
}
}
3.重写Bean实现WritableComparable进行排序
包含 "分组" 和 "分区"
MovieBean.java
package day07.movie.top3;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableComparable;
/**
* Writable hadoop的序列化接口
* @author hasee
*
*/
public class MovieBean implements WritableComparable<MovieBean>{
private String movie;
private int rate;
private String timeStamp;
private String uid;
@Override
public int compareTo(MovieBean o) {
// TODO Auto-generated method stub
if(o.getMovie().compareTo(this.getMovie())==0){
return o.getRate()-this.getRate();
}else{
return o.getMovie().compareTo(this.getMovie());
}
}
@Override
public void readFields(DataInput arg0) throws IOException {
// TODO Auto-generated method stub
movie = arg0.readUTF();
rate = arg0.readInt();
timeStamp = arg0.readUTF();
uid = arg0.readUTF();
}
@Override
public void write(DataOutput arg0) throws IOException {
// TODO Auto-generated method stub
arg0.writeUTF(movie);
arg0.writeInt(rate);
arg0.writeUTF(timeStamp);
arg0.writeUTF(uid);
}
@Override
public String toString() {
return "MovieBean [movie=" + movie + ", rate=" + rate + ", timeStamp=" + timeStamp + ", uid=" + uid + "]";
}
public String getMovie() {
return movie;
}
public void setMovie(String movie) {
this.movie = movie;
}
public int getRate() {
return rate;
}
public void setRate(int rate) {
this.rate = rate;
}
public String getTimeStamp() {
return timeStamp;
}
public void setTimeStamp(String timeStamp) {
this.timeStamp = timeStamp;
}
public String getUid() {
return uid;
}
public void setUid(String uid) {
this.uid = uid;
}
public void set(MovieBean movieBean) {
// TODO Auto-generated method stub
this.movie = movieBean.getMovie();
this.rate = movieBean.getRate();
this.timeStamp = movieBean.getTimeStamp();
this.uid = movieBean.getUid();
}
public void set(String movie,int rate,String timeStamp,String uid){
this.movie = movie;
this.rate = rate;
this.timeStamp = timeStamp;
this.uid = uid;
}
}
MyGroup.java
/**
* 分组
* 将movieid相同的数据分到一个组
* @author hasee
*
*/
public class MyGroup extends WritableComparator{
//构造器,初始化
public MyGroup() {
super(MovieBean.class,true);
}
@Override
public int compare(WritableComparable a, WritableComparable b) {
MovieBean bean1 = (MovieBean)a;
MovieBean bean2 = (MovieBean)b;
return bean1.getMovie().compareTo(bean2.getMovie());
}
}
Mypartition.java
/**
* 进行分区,将想要的数据分发到相同的reduce中
* @author hasee
*
*/
public class MyPartition extends Partitioner<MovieBean, NullWritable>{
/**
* numPartitions代表有多少个reduceTask
* key 就是map端输出的key
* value map端输出的value
*/
@Override
public int getPartition(MovieBean key, NullWritable value, int numPartitions) {
return (key.getMovie().hashCode() & Integer.MAX_VALUE)%numPartitions;
}
}
Top.java
public class TopN3 {
public static class MapTask extends Mapper<LongWritable, Text, MovieBean,NullWritable >{
@Override
protected void map(LongWritable key, Text value,
Mapper<LongWritable, Text, MovieBean, NullWritable>.Context context)
throws IOException, InterruptedException {
try {
//解析json
ObjectMapper objectMapper = new ObjectMapper();
MovieBean bean = objectMapper.readValue(value.toString(), MovieBean.class);
context.write(bean, NullWritable.get());
} catch (Exception e) {
// TODO: handle exception
}
}
}
public static class ReduceTask extends Reducer<MovieBean, NullWritable, MovieBean, NullWritable>{
@Override
protected void reduce(MovieBean key, Iterable<NullWritable> arg1,
Reducer<MovieBean, NullWritable, MovieBean, NullWritable>.Context context)
throws IOException, InterruptedException {
int num = 0;
//虽然是空的,但是key能根据迭代进行相应的得到空值的结果
for (NullWritable nullWritable : arg1) {
if(num>=20){
break;
}else{
context.write(key, NullWritable.get());
num++;
}
}
}
}
public static void main(String[] args) throws Exception {
Configuration conf = new Configuration();
Job job = Job.getInstance(conf);
job.setMapperClass(MapTask.class);
job.setReducerClass(ReduceTask.class);
job.setJarByClass(TopN3.class);
/**
* 设置reduce的设备数量
*/
job.setNumReduceTasks(2);
/**
* 将相同的数据交给同一个reduce处理
*/
job.setPartitionerClass(MyPartition.class);
/**
* 将movieid相同的数据分到一个组,这样就可以进行数量控制了。
* 否则,每一条数据都视为一个单独的组
*/
job.setGroupingComparatorClass(MyGroup.class);
job.setMapOutputKeyClass(MovieBean.class);
job.setMapOutputValueClass(NullWritable.class);
job.setOutputKeyClass(MovieBean.class);
job.setOutputValueClass(NullWritable.class);
FileInputFormat.setInputPaths(job, new Path("D:\\data\\rating.json"));
FileOutputFormat.setOutputPath(job, new Path("d:\\data\\out\\movie\\top3"));
File file = new File("d:\\data\\out\\movie\\top3");
if(file.exists()){
FileUtils.deleteDirectory(file);
}
boolean b = job.waitForCompletion(true);
System.out.println(b?"恭喜你答对了!":"不要放弃,希望就在明天");
}
}
五、合并两张表的数据
1.使用map-reduce
需要写将两个表需要的数据合成一个JoinBean 需要 implements Writable
/**
* 合并两张表
* 读取文件夹下多个文件
* @author hasee
*
*/
public class JoinMR {
public static class MapTask extends Mapper<LongWritable, Text, Text, JoinBean>{
String table = null;
@Override
protected void setup(Mapper<LongWritable, Text, Text, JoinBean>.Context context)
throws IOException, InterruptedException {
// TODO Auto-generated method stub
FileSplit fileSplit = (FileSplit)context.getInputSplit();
table = fileSplit.getPath().getName();
}
@Override
protected void map(LongWritable key, Text value, Mapper<LongWritable, Text, Text, JoinBean>.Context context)
throws IOException, InterruptedException {
String[] split = value.toString().split("::");
JoinBean joinBean = new JoinBean();
if(table.startsWith("users")){
joinBean.set(split[0], split[2], split[1], "null", "null", "user");
}else if(table.startsWith("ratings")){
joinBean.set(split[0], "null", "null", split[1], split[2], "rating");
}
context.write(new Text(joinBean.getUid()), joinBean);
}
}
public static class ReduceTask extends Reducer<Text, JoinBean, JoinBean, NullWritable>{
@Override
protected void reduce(Text key, Iterable<JoinBean> values,
Reducer<Text, JoinBean, JoinBean, NullWritable>.Context context) throws IOException, InterruptedException {
//放user的
JoinBean joinBean = new JoinBean();
ArrayList<JoinBean> list = new ArrayList<>();
//分离数据
for (JoinBean joinBean2 : values) {
String table = joinBean2.getTable();
if("user".equals(table)){
joinBean.set(joinBean2.getUid(), joinBean2.getAge(), joinBean2.getGender(), joinBean2.getMovieId(), joinBean2.getRating(), joinBean2.getTable());
}else{
JoinBean joinBean3 = new JoinBean();
joinBean3.set(joinBean2.getUid(), joinBean2.getAge(), joinBean2.getGender(), joinBean2.getMovieId(), joinBean2.getRating(), joinBean2.getTable());
list.add(joinBean3);
}
}
//拼接数据
for (JoinBean joinBean2 : list) {
joinBean2.setAge(joinBean.getAge());
joinBean2.setGender(joinBean.getGender());
context.write(joinBean2, NullWritable.get());
}
}
}
public static void main(String[] args) throws Exception {
Configuration conf = new Configuration();
Job job = Job.getInstance(conf);
job.setMapperClass(MapTask.class);
job.setReducerClass(ReduceTask.class);
job.setJarByClass(JoinMR.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(JoinBean.class);
job.setOutputKeyClass(JoinBean.class);
job.setOutputValueClass(NullWritable.class);
FileInputFormat.setInputPaths(job, new Path("D:\\data\\in\\movie"));
FileOutputFormat.setOutputPath(job, new Path("d:\\data\\out\\movie\\top1"));
File file = new File("d:\\data\\out\\movie\\top1");
if(file.exists()){
FileUtils.deleteDirectory(file);
}
boolean b = job.waitForCompletion(true);
System.out.println(b?"恭喜你答对了!":"不要放弃,希望就在明天");
}
}
2.只使用map来处理两张表的合并
setup中读取小表,并保存到map中
map 中读取大表,并熊map中获取数据,进行合并。
/**
* 合并两张表
* 读取文件夹下多个文件
* setup的时候读小表,使用hdfs的api进行读取
* map端读大表,进行合并
* 传参
* @author hasee
*
*/
public class JoinMR {
public static class MapTask extends Mapper<LongWritable, Text, JoinBean, NullWritable>{
String table = null;
//存放小表数据
Map<String,String> map = new HashMap<>();
@Override
protected void setup(Context context)
throws IOException, InterruptedException {
// TODO Auto-generated method stub
Configuration conf = context.getConfiguration();
//String fileName = conf.get("fileName");
FileSystem fs = FileSystem.get(conf);
FSDataInputStream inputStream = fs.open(new Path("/users.dat"));
BufferedReader br = new BufferedReader(new InputStreamReader(inputStream));
String line = null;
while((line = br.readLine())!=null){
String[] split = line.split("::");
map.put(split[0],line);
}
}
@Override
protected void map(LongWritable key, Text value, Context context)
throws IOException, InterruptedException {
String[] split = value.toString().split("::");
JoinBean joinBean = new JoinBean();
String[] line = map.get(split[0]).split("::");
//1::F::1::10::48067
joinBean.set(split[0], line[2], line[1], split[1], split[2], "null");
context.write(joinBean, NullWritable.get());
}
}
public static void main(String[] args) throws Exception {
//什么都没有配置 怎么知道我的yarn集群在哪里?怎么知道hdfs集群?
//通过加装配置文件
Configuration conf = new Configuration();
conf.set("fs.defaultFS", "hdfs://hadoop01:9000");
System.setProperty("HADOOP_USER_NAME", "root");
Job job = Job.getInstance(conf);
job.setMapperClass(MapTask.class);
job.setJarByClass(JoinMR.class);
//job.setJar(jar);
//设置输出类型
job.setOutputKeyClass(JoinBean.class);
job.setOutputValueClass(NullWritable.class);
//设置输入和输出目录
FileInputFormat.addInputPath(job, new Path("/ratings.dat"));
FileOutputFormat.setOutputPath(job, new Path("/out/join"));
//查看目录是否存在
FileSystem fs = FileSystem.get(conf);
if(fs.exists(new Path("/out/join"))){
fs.delete(new Path("/out/join"), true);
}
//提交之后会监控运行状态
boolean b = job.waitForCompletion(true);
System.out.println(b?"程序执行完毕!!!":"程序出bug了!!!!");
}
}
六、求平均分最高的前20个电影
因为要求得是前20部电影,与前面求得每部电影的前20 个评分不同。
这一次运用Redece类里边的三个方法。setup();reduce();cleanup()
public class AvgTop {
public static class MapTask extends Mapper<LongWritable, Text, Text,MovieBean>{
@Override
protected void map(LongWritable key, Text value, Mapper<LongWritable, Text, Text, MovieBean>.Context context)
throws IOException, InterruptedException {
ObjectMapper objectMapper = new ObjectMapper();
MovieBean bean = objectMapper.readValue(value.toString(), MovieBean.class);
context.write(new Text(bean.getMovie()), bean);
}
}
public static class RedeceTask extends Reducer<Text, MovieBean, MovieBean, NullWritable>{
TreeSet<MovieBean> set = null;
@Override
protected void setup(Context context)
throws IOException, InterruptedException {
set = new TreeSet<>(new Comparator<MovieBean>() {
@Override
public int compare(MovieBean o1, MovieBean o2) {
// TODO Auto-generated method stub
if(o1.getAvg()-o2.getAvg()==0){
return o1.getMovie().compareTo(o2.getMovie());
}else{
return (int)(o1.getAvg()-o2.getAvg());
}
}
});
}
@Override
protected void reduce(Text key, Iterable<MovieBean> value,
Reducer<Text, MovieBean, MovieBean, NullWritable>.Context context)
throws IOException, InterruptedException {
MovieBean bean = new MovieBean();
int count = 0;
int sum = 0;
for (MovieBean movieBean : value) {
bean.set(movieBean);
count++;
sum += movieBean.getRate();
}
float avg = sum*1.0f/count;
avg = avg*1000;
bean.setAvg(avg);
if(set.size()<20){
set.add(bean);
}else{
MovieBean bean2 = set.first();
if(bean2.getAvg()<bean.getAvg()){
set.remove(bean2);
set.add(bean);
}
}
}
@Override
protected void cleanup(Reducer<Text, MovieBean, MovieBean, NullWritable>.Context context)
throws IOException, InterruptedException {
// TODO Auto-generated method stub
for (MovieBean movieBean : set) {
context.write(movieBean, NullWritable.get());
}
}
}
public static void main(String[] args) throws Exception {
Configuration conf = new Configuration();
Job job = Job.getInstance(conf);
job.setMapperClass(MapTask.class);
job.setReducerClass(RedeceTask.class);
job.setJarByClass(AvgTop.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(MovieBean.class);
job.setOutputKeyClass(MovieBean.class);
job.setOutputValueClass(NullWritable.class);
FileInputFormat.setInputPaths(job, new Path("D:\\data\\rating.json"));
FileOutputFormat.setOutputPath(job, new Path("d:\\data\\out\\movie\\top5"));
File file = new File("d:\\data\\out\\movie\\top5");
if(file.exists()){
FileUtils.deleteDirectory(file);
}
boolean b = job.waitForCompletion(true);
System.out.println(b?"恭喜你答对了!":"不要放弃,希望就在明天");
}
}