图的三角形计数问题是一个基本的图计算问题,是很多复杂网络分析(比如社交网络分析) 的基础。目前图的三角形计数问题已经成为了 Spark 系统中 GraphX 图计算库所提供的一个算法级 API。本次实验任务就是要在 Hadoop 系统上实现 Twitter 社交网络图的三角形计数任务。
整过过程分为3个job,各job对应的键值对类型如下表所示:
inputKey | inputValue | outputKey | outputValue | |
---|---|---|---|---|
Map1 | Object | Text | Text | Text |
Reduce1 | Text | Text | Text | Text |
Map2 | LongWritable | Text | Text | Text |
Reduce2 | Text | Text | Text | Text |
Map3 | LongWritable | Text | Text | Text |
Reduce3 | Text | Text | Text | Text |
Map1负责读入边,将按行存储的点对读入后,分割成a和b两个点,去除起点和终点相同的边,将标号较小的点放在前面,即a < b,输出键值对a + b –> +,表示存在一条a到b的边。因为对于第二个数据集标号值较大,所以需要以字符串形式存储。
Reduce1负责去重,不改变键值对,但是对于多条相同的键值对只保留一条。
Map2负责以+为分隔符,将key拆成两个点,输出键值对变为a->b。
Reduce2负责统计需要查询的边,如果存在边ab和ac,假设b<c,那么需要查询bc是否存在,将上一步Map的键值对变回a+b->+,表示已经统计过了,对于相同起点的两个点bc,构造键值对b + c->-,表示需要查找是否存在bc这条边。
Map3相当于什么也没做,只是为了使用下一个reduce任务而设。
Reduce3负责统计三角形,对于一个键a+b,如果存在值为+的元素,则表示存在这条边,如果存在值为-的元素,则表示存在一个三角形由这条边构成,统计-的个数,若+存在,则将总结果加上-的数量。
public static class Map1 extends Mapper<Object, Text, Text, Text>
{
private Text newKey = new Text();
private Text newValue = new Text();
public void map(Object key,Text value,Context context)throws IOException,InterruptedException
{
StringTokenizer itr=new StringTokenizer(value.toString());
String a = itr.nextToken();
String b = itr.nextToken();
if(a.compareTo(b) == 0) return ;//去除起点终点相同的边
if(a.compareTo(b) > 0) //交换ab,保证a<b
{
String temp = a;
a = b;
b = temp;
}
newKey.set(a + "+" + b);
newValue.set("+");
context.write(newKey, newValue);
}
}
public static class Reduce1 extends Reducer<Text, Text, Text, Text>
{
private Text newValue = new Text();
public void reduce(Text key,Iterable<Text> values,Context context)
throws IOException,InterruptedException
{
newValue.set("+"); //去重
context.write(key, newValue);
}
}
public static class Map2 extends Mapper<LongWritable, Text, Text, Text>
{
public void map(LongWritable key, Text value, Context context)
throws IOException,InterruptedException
{
StringTokenizer itr=new StringTokenizer(value.toString());
String[] tokens = itr.nextToken().toString().split("\\+"); //以+为分隔符分割a和b
Text newKey = new Text();
Text newValue = new Text();
newKey.set(tokens[0]);
newValue.set(tokens[1]);
context.write(newKey, newValue);
}
}
public static class Reduce2 extends Reducer<Text, Text, Text, Text>
{
public void reduce(Text key,Iterable<Text> values,Context context)
throws IOException,InterruptedException
{
ArrayList<String> array = new ArrayList<String>();
Text newKey = new Text();
Text newValue = new Text();
newValue.set("+");
for(Text value : values)
{
array.add(value.toString());//有相同定点的边加入array集合
newKey.set(key.toString()+"+"+value.toString());//处理过的变为map执行之前的形式
context.write(newKey, newValue);
}
for(int i=0; i<array.size(); i++) //对于任意两点构造查询键值对
{
for(int j=i+1; j<array.size(); j++)
{
String a = array.get(i);
String b = array.get(j);
if(a.compareTo(b) < 0)//保证ab的大小关系,减号表示需要查询
{
newKey.set(a+"+"+b);
newValue.set("-");
}
else
{
newKey.set(b+"+"+a);
newValue.set("-");
}
context.write(newKey, newValue);
}
}
}
}
public static class Map3 extends Mapper<LongWritable, Text, Text, Text>
{
private Text newKey = new Text();
private Text newValue = new Text();
public void map(LongWritable key,Text value,Context context)//没做什么,只是为了执行reduce
throws IOException,InterruptedException
{
StringTokenizer itr=new StringTokenizer(value.toString());
newKey.set(itr.nextToken().toString());
newValue.set(itr.nextToken().toString());
context.write(newKey, newValue);
}
}
public static class Reduce3 extends Reducer<Text, Text, Text, Text>
{
private static int result = 0;
public void cleanup(Context context) throws IOException, InterruptedException
{
context.write(new Text("Result: "), new Text(""+result)); //写入结果
}
public void reduce(Text key,Iterable<Text> values,Context context)
throws IOException,InterruptedException
{
int cnt = 0;
boolean flag = false;
for(Text value : values)
{
if(value.toString().equalsIgnoreCase("+")) //存在这条边
flag = true;
else if(value.toString().equalsIgnoreCase("-")) //需要查询这条边
cnt ++;
}
if (flag) result += cnt;
}
}
开发环境:Intellijidea + meaven + java1.8
数据集 | 三角形个数 | Driver程序运行时间(秒) |
---|---|---|
13082506 | 1142 | |
Google+(选做) | 1073677742 | 197642 |
完整代码如下:
import java.io.IOException;
import java.util.ArrayList;
import java.util.StringTokenizer;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.util.GenericOptionsParser;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.conf.Configuration;
public class TriCount
{
// 读取 边1+边2-》+
public static class Map1 extends Mapper<Object, Text, Text, Text>
{
private Text newKey = new Text();
private Text newValue = new Text();
public void map(Object key,Text value,Context context)throws IOException,InterruptedException
{
StringTokenizer itr=new StringTokenizer(value.toString());
String a = itr.nextToken();
String b = itr.nextToken();
if(a.compareTo(b) == 0) return ;
if(a.compareTo(b) > 0)
{
String temp = a;
a = b;
b = temp;
}
newKey.set(a + "+" + b);
newValue.set("+");
context.write(newKey, newValue);
/*long a = Integer.parseInt(itr.nextToken());
long b = Integer.parseInt(itr.nextToken());
if(a > b)
{
long temp = a;
a = b;
b = temp;
}
if(a != b)
{
newKey.set(a + "+" + b);
newValue.set("+");
context.write(newKey, newValue);
}*/
}
}
//去重
public static class Reduce1 extends Reducer<Text, Text, Text, Text>
{
private Text newValue = new Text();
public void reduce(Text key,Iterable<Text> values,Context context)throws IOException,InterruptedException
{
newValue.set("+");
context.write(key, newValue);
}
}
//拆分 边1-》边2
public static class Map2 extends Mapper<LongWritable, Text, Text, Text>
{
public void map(LongWritable key, Text value, Context context)throws IOException,InterruptedException
{
StringTokenizer itr=new StringTokenizer(value.toString());
String[] tokens = itr.nextToken().toString().split("\\+");
Text newKey = new Text();
Text newValue = new Text();
newKey.set(tokens[0]);
newValue.set(tokens[1]);
context.write(newKey, newValue);
}
}
//添加查询边1+边2->2
public static class Reduce2 extends Reducer<Text, Text, Text, Text>
{
public void reduce(Text key,Iterable<Text> values,Context context)throws IOException,InterruptedException
{
ArrayList<String> array = new ArrayList<String>();
Text newKey = new Text();
Text newValue = new Text();
newValue.set("+");
for(Text value : values)
{
array.add(value.toString());
newKey.set(key.toString()+"+"+value.toString());
context.write(newKey, newValue);
}
for(int i=0; i<array.size(); i++)
{
for(int j=i+1; j<array.size(); j++)
{
String a = array.get(i);
String b = array.get(j);
if(a.compareTo(b) < 0)
{
newKey.set(a+"+"+b);
newValue.set("-");
}
else
{
newKey.set(b+"+"+a);
newValue.set("-");
}
/*if(Integer.parseInt(array.get(i)) < Integer.parseInt(array.get(j)))
{
newKey.set(array.get(i)+"+"+array.get(j));
newValue.set("-");
}
else
{
newKey.set(array.get(j)+"+"+array.get(i));
newValue.set("-");
}*/
context.write(newKey, newValue);
}
}
}
}
//什么都没做
public static class Map3 extends Mapper<LongWritable, Text, Text, Text>
{
private Text newKey = new Text();
private Text newValue = new Text();
public void map(LongWritable key,Text value,Context context)throws IOException,InterruptedException
{
StringTokenizer itr=new StringTokenizer(value.toString());
newKey.set(itr.nextToken().toString());
newValue.set(itr.nextToken().toString());
context.write(newKey, newValue);
}
}
//统计查询边是否出现
public static class Reduce3 extends Reducer<Text, Text, Text, Text>
{
private static int result = 0;
public void cleanup(Context context) throws IOException, InterruptedException
{
context.write(new Text("Result: "), new Text(""+result));
}
public void reduce(Text key,Iterable<Text> values,Context context)throws IOException,InterruptedException
{
int cnt = 0;
boolean flag = false;
for(Text value : values)
{
if(value.toString().equalsIgnoreCase("+"))
flag = true;
else if(value.toString().equalsIgnoreCase("-"))
cnt ++;
}
if (flag) result += cnt;
}
}
public static void main(String[] args) throws Exception
{
Configuration conf=new Configuration();
String[] otherArgs=new GenericOptionsParser(conf,args).getRemainingArgs();
Job job1 = new Job(conf, "job1");
job1.setJarByClass(TriCount.class);
job1.setMapperClass(Map1.class);
job1.setMapOutputKeyClass(Text.class);
job1.setMapOutputValueClass(Text.class);
job1.setReducerClass(Reduce1.class);
job1.setOutputKeyClass(Text.class);
job1.setOutputValueClass(Text.class);
// FileInputFormat.addInputPath(job1, new Path("/data/graphTriangleCount/twitter_graph_v2.txt"));
// FileOutputFormat.setOutputPath(job1,new Path("/user/2016st28/exp3/step1/"));
FileInputFormat.addInputPath(job1, new Path("/data/graphTriangleCount/gplus_combined.unique.txt"));
FileOutputFormat.setOutputPath(job1,new Path("/user/2016st28/exp3_2/step1/"));
job1.waitForCompletion(true);
Job job2 = new Job(conf, "job2");
job2.setJarByClass(TriCount.class);
job2.setMapperClass(Map2.class);
job2.setMapOutputKeyClass(Text.class);
job2.setMapOutputValueClass(Text.class);
job2.setReducerClass(Reduce2.class);
job2.setOutputKeyClass(Text.class);
job2.setOutputValueClass(Text.class);
// FileInputFormat.addInputPath(job2, new Path("/user/2016st28/exp3/step1/"));
// FileOutputFormat.setOutputPath(job2, new Path("/user/2016st28/exp3/step2/"));
FileInputFormat.addInputPath(job2, new Path("/user/2016st28/exp3_2/step1/"));
FileOutputFormat.setOutputPath(job2, new Path("/user/2016st28/exp3_2/step2/"));
job2.waitForCompletion(job1.isComplete());
Job job3 = new Job(conf, "job3");
job3.setJarByClass(TriCount.class);
job3.setMapperClass(Map3.class);
job3.setMapOutputKeyClass(Text.class);
job3.setMapOutputValueClass(Text.class);
job3.setReducerClass(Reduce3.class);
job3.setOutputKeyClass(Text.class);
job3.setOutputValueClass(Text.class);
//FileInputFormat.addInputPath(job3, new Path("/user/2016st28/exp3/step2/"));
//FileOutputFormat.setOutputPath(job3, new Path("/user/2016st28/exp3/step3/"));
FileInputFormat.addInputPath(job3, new Path("/user/2016st28/exp3_2/step2/"));
FileOutputFormat.setOutputPath(job3, new Path("/user/2016st28/exp3_2/step3/"));
job3.waitForCompletion(job2.isComplete());
}
}