我正在从事一个Big项目,并且有一个小的KPI,在这个项目中,我只需要编写减少输出的前10个值。为了完成这一要求,我使用了一个计数器,并在计数器等于11时中断了循环,但是还原器仍然将所有的值写入HDFS。
这是一个非常简单的java代码,但我被困在:(
为了进行测试,我创建了一个独立的类(java应用程序)来完成这个任务,并且这个类在那里工作;我想知道为什么它不能在还原器代码中工作。
请有人帮我,如果我漏掉了什么,请给我建议。
MAP -约简代码
package comparableTest;
import java.io.IOException;
import java.nio.ByteBuffer;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.IntWritable.Comparator;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.WritableComparator;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.Mapper.Context;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.util.GenericOptionsParser;
public class ValueSortExp2 {
public static void main(String[] args) throws Exception {
Configuration conf = new Configuration(true);
String arguments[] = new GenericOptionsParser(conf, args).getRemainingArgs();
Job job = new Job(conf, "Test commond");
job.setJarByClass(ValueSortExp2.class);
// Setup MapReduce
job.setMapperClass(MapTask2.class);
job.setReducerClass(ReduceTask2.class);
job.setNumReduceTasks(1);
// Specify key / value
job.setMapOutputKeyClass(IntWritable.class);
job.setMapOutputValueClass(Text.class);
job.setOutputKeyClass(IntWritable.class);
job.setOutputValueClass(Text.class);
job.setSortComparatorClass(IntComparator2.class);
// Input
FileInputFormat.addInputPath(job, new Path(arguments[0]));
job.setInputFormatClass(TextInputFormat.class);
// Output
FileOutputFormat.setOutputPath(job, new Path(arguments[1]));
job.setOutputFormatClass(TextOutputFormat.class);
int code = job.waitForCompletion(true) ? 0 : 1;
System.exit(code);
}
public static class IntComparator2 extends WritableComparator {
public IntComparator2() {
super(IntWritable.class);
}
@Override
public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2) {
Integer v1 = ByteBuffer.wrap(b1, s1, l1).getInt();
Integer v2 = ByteBuffer.wrap(b2, s2, l2).getInt();
return v1.compareTo(v2) * (-1);
}
}
public static class MapTask2 extends Mapper<LongWritable, Text, IntWritable, Text> {
public void map(LongWritable key,Text value, Context context) throws IOException, InterruptedException {
String tokens[]= value.toString().split("\\t");
// int empId = Integer.parseInt(tokens[0]) ;
int count = Integer.parseInt(tokens[2]) ;
context.write(new IntWritable(count), new Text(value));
}
}
public static class ReduceTask2 extends Reducer<IntWritable, Text, IntWritable, Text> {
int cnt=0;
public void reduce(IntWritable key, Iterable<Text> list, Context context)
throws java.io.IOException, InterruptedException {
for (Text value : list ) {
cnt ++;
if (cnt==11)
{
break;
}
context.write(new IntWritable(cnt), value);
}
}
}
} 简单JAVA代码工作良好
package comparableTest;
import java.io.IOException;
import java.util.ArrayList;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer.Context;
public class TestData {
//static int cnt=0;
public static void main(String args[]) throws IOException, InterruptedException {
ArrayList<String> list = new ArrayList<String>() {{
add("A");
add("B");
add("C");
add("D");
}};
reduce(list);
}
public static void reduce(Iterable<String> list)
throws java.io.IOException, InterruptedException {
int cnt=0;
for (String value : list ) {
cnt ++;
if (cnt==3)
{
break;
}
System.out.println(value);
}
}
}示例数据--标头只是更多的信息,实际数据来自第2行。
`ID名称计数(需要显示前10个desc)
“玩具总动员”(1995) 2077
10 GoldenEye (1995年) 888
100市政厅(1996) 128
1 000凝缩(1996) 20
1001 (L‘’Associe)(1982年)0
1002教育署的下一步行动(1996) 8
1003极端措施(1996年) 121
1004微光人,(1996) 101
1005 D3:强力鸭(1996) 142
1006分庭,(1996) 78
1007苹果饺子帮(1975) 232
1008 Davy Crockett,野生边疆之王(1955) 97
1009逃到巫山(1975) 291
101瓶火箭(1996年) 253
1010爱情虫,(1969) 242
1011 Herbie再骑(1974) 135
1012 (1957) 301
1013家长陷阱,(1961) 258
1014 Pollyanna (1960) 136
1015“归途:不可思议的旅程”(1993) 234
1016沙吉狗,(1959年) 156
1017瑞士家庭鲁滨逊(1960年) 276
1018该死的猫!(1965年) 123
1019,000联盟海底联盟(1954年) 575
102错误先生(1996年) 60
1020酷跑(1993) 392
1021名外野天使(1994) 247
1022灰姑娘(1950年) 577
1023维尼与狂风日(1968年) 221
1024三个卡瓦列罗号,(1945年) 126
石头上的1025把剑,(1963) 293
1026亲爱的我的心(1949) 8
1027罗宾汉:盗贼王子(1991) 344
1028 Mary Poppin (1964) 1011
1029 Dumbo (1941年) 568
103“难忘”(1996) 33
1030皮特之龙(1977) 323
1031贝德霍布斯和布隆支(1971年) 319`
发布于 2017-09-07 10:39:55
如果您将int cnt=0;移动到reduce方法中(作为该方法的第一个语句),您将得到每个键的前10个值(我猜这就是您想要的)。
否则,就像现在一样,你的计数器会不断增加,你只会跳过第11个值(不管是什么键),继续到第12号。
如果您只想打印10个值(不管键),请将cnt初始化留在原处,并将if条件更改为if (cnt > 10).然而,这不是一个好的实践,所以你可能需要重新考虑你的算法。(假设您不需要10个随机值,那么如何知道在分布式环境中,当您有超过1个还原器和一个散列分区器时,将首先处理哪个键?)
https://stackoverflow.com/questions/46087100
复制相似问题