专栏首页计算机视觉与深度学习基础基于hadoop的社交网络三角形计数

基于hadoop的社交网络三角形计数

图的三角形计数问题是一个基本的图计算问题,是很多复杂网络分析(比如社交网络分析) 的基础。目前图的三角形计数问题已经成为了 Spark 系统中 GraphX 图计算库所提供的一个算法级 API。本次实验任务就是要在 Hadoop 系统上实现 Twitter 社交网络图的三角形计数任务。

1.1   MapReduce 的设计思路

整过过程分为3个job,各job对应的键值对类型如下表所示:

inputKey

inputValue

outputKey

outputValue

Map1

Object

Text

Text

Text

Reduce1

Text

Text

Text

Text

Map2

LongWritable

Text

Text

Text

Reduce2

Text

Text

Text

Text

Map3

LongWritable

Text

Text

Text

Reduce3

Text

Text

Text

Text

Map1负责读入边,将按行存储的点对读入后,分割成a和b两个点,去除起点和终点相同的边,将标号较小的点放在前面,即a < b,输出键值对a + b –> +,表示存在一条a到b的边。因为对于第二个数据集标号值较大,所以需要以字符串形式存储。

Reduce1负责去重,不改变键值对,但是对于多条相同的键值对只保留一条。

Map2负责以+为分隔符,将key拆成两个点,输出键值对变为a->b。

Reduce2负责统计需要查询的边,如果存在边ab和ac,假设b<c,那么需要查询bc是否存在,将上一步Map的键值对变回a+b->+,表示已经统计过了,对于相同起点的两个点bc,构造键值对b + c->-,表示需要查找是否存在bc这条边。

Map3相当于什么也没做,只是为了使用下一个reduce任务而设。

Reduce3负责统计三角形,对于一个键a+b,如果存在值为+的元素,则表示存在这条边,如果存在值为-的元素,则表示存在一个三角形由这条边构成,统计-的个数,若+存在,则将总结果加上-的数量。

1.2   MapReduce 的代码片段

   public static class Map1 extends Mapper<Object, Text, Text, Text>
    {
        private Text newKey = new Text();
        private Text newValue = new Text();
        public void map(Object key,Text value,Context context)throws IOException,InterruptedException
        {
            StringTokenizer itr=new StringTokenizer(value.toString());
            String a = itr.nextToken();
            String b = itr.nextToken();
            if(a.compareTo(b) == 0) return ;//去除起点终点相同的边
            if(a.compareTo(b) > 0) //交换ab,保证a<b
            {
                String temp = a;
                a = b;
                b = temp;
            }
            newKey.set(a + "+" + b);
            newValue.set("+");
            context.write(newKey, newValue);
        }
    }
    public static class Reduce1 extends Reducer<Text, Text, Text, Text>
    {
        private Text newValue = new Text();
        public void reduce(Text key,Iterable<Text> values,Context context)
                                        throws IOException,InterruptedException
        {
            newValue.set("+"); //去重
            context.write(key, newValue);
        }
    }
    public static class Map2 extends Mapper<LongWritable, Text, Text, Text>
    {
        public void map(LongWritable key, Text value, Context context)
                                        throws IOException,InterruptedException
        {
            StringTokenizer itr=new StringTokenizer(value.toString());
            String[] tokens = itr.nextToken().toString().split("\\+"); //以+为分隔符分割a和b
            Text newKey = new Text();
            Text newValue = new Text();
            newKey.set(tokens[0]);
            newValue.set(tokens[1]);
            context.write(newKey, newValue);
        }
    }

    public static class Reduce2 extends Reducer<Text, Text, Text, Text>
    {
        public void reduce(Text key,Iterable<Text> values,Context context)
                                        throws IOException,InterruptedException
        {
            ArrayList<String> array = new ArrayList<String>();
            Text newKey = new Text();
            Text newValue = new Text();
            newValue.set("+");
            for(Text value : values)
            {
                array.add(value.toString());//有相同定点的边加入array集合
                newKey.set(key.toString()+"+"+value.toString());//处理过的变为map执行之前的形式
                context.write(newKey, newValue);
            }
            for(int i=0; i<array.size(); i++) //对于任意两点构造查询键值对
            {
                for(int j=i+1; j<array.size(); j++)
                {
                    String a = array.get(i);
                    String b = array.get(j);
                    if(a.compareTo(b) < 0)//保证ab的大小关系,减号表示需要查询
                    {
                        newKey.set(a+"+"+b);
                        newValue.set("-");
                    }
                    else
                    {
                        newKey.set(b+"+"+a);
                        newValue.set("-");
                    }
                    context.write(newKey, newValue);
                }
            }
        }
    }
    public static class Map3 extends Mapper<LongWritable, Text, Text, Text>
    {
        private Text newKey = new Text();
        private Text newValue = new Text();
        public void map(LongWritable key,Text value,Context context)//没做什么,只是为了执行reduce
                                        throws IOException,InterruptedException
        {
            StringTokenizer itr=new StringTokenizer(value.toString());
            newKey.set(itr.nextToken().toString());
            newValue.set(itr.nextToken().toString());
            context.write(newKey, newValue);
        }
    }
    public static class Reduce3 extends Reducer<Text, Text, Text, Text>
    {
        private static int result = 0;
        public void cleanup(Context context) throws IOException, InterruptedException
        {
            context.write(new Text("Result: "), new Text(""+result)); //写入结果
        }
        public void reduce(Text key,Iterable<Text> values,Context context)
                                        throws IOException,InterruptedException
        {
            int cnt = 0;
            boolean flag = false;
            for(Text value : values)
            {
                if(value.toString().equalsIgnoreCase("+")) //存在这条边
                    flag = true;
                else if(value.toString().equalsIgnoreCase("-")) //需要查询这条边
                    cnt ++;
            }
            if (flag) result += cnt;
        }
    }

开发环境:Intellijidea + meaven + java1.8

实验结果

数据集

三角形个数

Driver程序运行时间(秒)

Twitter

13082506

1142

Google+(选做)

1073677742

197642

完整代码如下:

import java.io.IOException;
import java.util.ArrayList;
import java.util.StringTokenizer;

import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.util.GenericOptionsParser;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.conf.Configuration;


public class TriCount
{
    // 读取 边1+边2-》+
    public static class Map1 extends Mapper<Object, Text, Text, Text>
    {
        private Text newKey = new Text();
        private Text newValue = new Text();
        public void map(Object key,Text value,Context context)throws IOException,InterruptedException
        {

            StringTokenizer itr=new StringTokenizer(value.toString());
            String a = itr.nextToken();
            String b = itr.nextToken();
            if(a.compareTo(b) == 0) return ;
            if(a.compareTo(b) > 0)
            {
                String temp = a;
                a = b;
                b = temp;
            }
            newKey.set(a + "+" + b);
            newValue.set("+");
            context.write(newKey, newValue);
            /*long a = Integer.parseInt(itr.nextToken());
            long b = Integer.parseInt(itr.nextToken());
            if(a > b)
            {
                long temp = a;
                a = b;
                b = temp;
            }
            if(a != b)
            {
                newKey.set(a + "+" + b);
                newValue.set("+");
                context.write(newKey, newValue);
            }*/
        }
    }
    //去重
    public static class Reduce1 extends Reducer<Text, Text, Text, Text>
    {
        private Text newValue = new Text();
        public void reduce(Text key,Iterable<Text> values,Context context)throws IOException,InterruptedException
        {
            newValue.set("+");
            context.write(key, newValue);
        }
    }
    //拆分 边1-》边2
    public static class Map2 extends Mapper<LongWritable, Text, Text, Text>
    {
        public void map(LongWritable key, Text value, Context context)throws IOException,InterruptedException
        {
            StringTokenizer itr=new StringTokenizer(value.toString());
            String[] tokens = itr.nextToken().toString().split("\\+");
            Text newKey = new Text();
            Text newValue = new Text();
            newKey.set(tokens[0]);
            newValue.set(tokens[1]);
            context.write(newKey, newValue);
        }
    }
    //添加查询边1+边2->2
    public static class Reduce2 extends Reducer<Text, Text, Text, Text>
    {

        public void reduce(Text key,Iterable<Text> values,Context context)throws IOException,InterruptedException
        {
            ArrayList<String> array = new ArrayList<String>();
            Text newKey = new Text();
            Text newValue = new Text();
            newValue.set("+");
            for(Text value : values)
            {
                array.add(value.toString());
                newKey.set(key.toString()+"+"+value.toString());
                context.write(newKey, newValue);
            }
            for(int i=0; i<array.size(); i++)
            {
                for(int j=i+1; j<array.size(); j++)
                {
                    String a = array.get(i);
                    String b = array.get(j);
                    if(a.compareTo(b) < 0)
                    {
                        newKey.set(a+"+"+b);
                        newValue.set("-");
                    }
                    else
                    {
                        newKey.set(b+"+"+a);
                        newValue.set("-");
                    }
                    /*if(Integer.parseInt(array.get(i)) < Integer.parseInt(array.get(j)))
                    {
                        newKey.set(array.get(i)+"+"+array.get(j));
                        newValue.set("-");
                    }
                    else
                    {
                        newKey.set(array.get(j)+"+"+array.get(i));
                        newValue.set("-");
                    }*/
                    context.write(newKey, newValue);
                }
            }
        }
    }
    //什么都没做
    public static class Map3 extends Mapper<LongWritable, Text, Text, Text>
    {
        private Text newKey = new Text();
        private Text newValue = new Text();
        public void map(LongWritable key,Text value,Context context)throws IOException,InterruptedException
        {
            StringTokenizer itr=new StringTokenizer(value.toString());
            newKey.set(itr.nextToken().toString());
            newValue.set(itr.nextToken().toString());
            context.write(newKey, newValue);
        }
    }
    //统计查询边是否出现
    public static class Reduce3 extends Reducer<Text, Text, Text, Text>
    {
        private static int result = 0;
        public void cleanup(Context context) throws IOException, InterruptedException
        {
            context.write(new Text("Result: "), new Text(""+result));
        }
        public void reduce(Text key,Iterable<Text> values,Context context)throws IOException,InterruptedException
        {
            int cnt = 0;
            boolean flag = false;
            for(Text value : values)
            {
                if(value.toString().equalsIgnoreCase("+"))
                    flag = true;
                else if(value.toString().equalsIgnoreCase("-"))
                    cnt ++;
            }
            if (flag) result += cnt;
        }
    }
    public static void main(String[] args) throws Exception
    {
        Configuration conf=new Configuration();
        String[] otherArgs=new GenericOptionsParser(conf,args).getRemainingArgs();

        Job job1 = new Job(conf, "job1");
        job1.setJarByClass(TriCount.class);
        job1.setMapperClass(Map1.class);
        job1.setMapOutputKeyClass(Text.class);
        job1.setMapOutputValueClass(Text.class);
        job1.setReducerClass(Reduce1.class);
        job1.setOutputKeyClass(Text.class);
        job1.setOutputValueClass(Text.class);

       // FileInputFormat.addInputPath(job1, new Path("/data/graphTriangleCount/twitter_graph_v2.txt"));
       // FileOutputFormat.setOutputPath(job1,new Path("/user/2016st28/exp3/step1/"));
        FileInputFormat.addInputPath(job1, new Path("/data/graphTriangleCount/gplus_combined.unique.txt"));
        FileOutputFormat.setOutputPath(job1,new Path("/user/2016st28/exp3_2/step1/"));

        job1.waitForCompletion(true);

        Job job2 = new Job(conf, "job2");
        job2.setJarByClass(TriCount.class);
        job2.setMapperClass(Map2.class);
        job2.setMapOutputKeyClass(Text.class);
        job2.setMapOutputValueClass(Text.class);
        job2.setReducerClass(Reduce2.class);
        job2.setOutputKeyClass(Text.class);
        job2.setOutputValueClass(Text.class);

       // FileInputFormat.addInputPath(job2, new Path("/user/2016st28/exp3/step1/"));
       // FileOutputFormat.setOutputPath(job2, new Path("/user/2016st28/exp3/step2/"));
        FileInputFormat.addInputPath(job2, new Path("/user/2016st28/exp3_2/step1/"));
        FileOutputFormat.setOutputPath(job2, new Path("/user/2016st28/exp3_2/step2/"));

        job2.waitForCompletion(job1.isComplete());

        Job job3 = new Job(conf, "job3");
        job3.setJarByClass(TriCount.class);
        job3.setMapperClass(Map3.class);
        job3.setMapOutputKeyClass(Text.class);
        job3.setMapOutputValueClass(Text.class);
        job3.setReducerClass(Reduce3.class);
        job3.setOutputKeyClass(Text.class);
        job3.setOutputValueClass(Text.class);

        //FileInputFormat.addInputPath(job3, new Path("/user/2016st28/exp3/step2/"));
        //FileOutputFormat.setOutputPath(job3, new Path("/user/2016st28/exp3/step3/"));
        FileInputFormat.addInputPath(job3, new Path("/user/2016st28/exp3_2/step2/"));
        FileOutputFormat.setOutputPath(job3, new Path("/user/2016st28/exp3_2/step3/"));

        job3.waitForCompletion(job2.isComplete());
    }

}

本文参与腾讯云自媒体分享计划,欢迎正在阅读的你也加入,一起分享。

我来说两句

0 条评论
登录 后参与评论

相关文章

  • 文件倒排索引算法及其hadoop实现

    什么是文件的倒排索引? 简单讲就是一种搜索引擎的算法。过倒排索引,可以根据单词快速获取包含这个单词的文档列表。倒排索引主要由两个部分组成:“单词”和对应出现的“...

    triplebee
  • Leetcode 72 Edit Distance DP好题

    Given two words word1 and word2, find the minimum number of steps required to c...

    triplebee
  • Leetcode 191 Number of 1 Bits

    Write a function that takes an unsigned integer and returns the number of ’1' b...

    triplebee
  • Android实现列表时间轴

    本文实例为大家分享了Android列表时间轴展示的具体代码,供大家参考,具体内容如下

    砸漏
  • React-Native开发笔记 持续更新

    1、css单位转换px2dp 在做页面开发的时候习惯了用rem去做css单位,处理各种尺寸数据,到了React-Native里面做app开发时,rem就不好用...

    木子墨
  • 原创|Android Jetpack Compose 最全上手指南

    在今年的Google/IO大会上,亮相了一个全新的 Android 原生 UI 开发框架-Jetpack Compose, 与苹果的SwiftIUI一样,Jet...

    glumes
  • 解析一下WordCount项目

    很显然,首先我们拿到文本不是立刻就送去MapReduce中处理,而是先通过一个叫做TextInputFormat的类,处理好原有文本的数据,用偏移量逐个表识。然...

    可爱见见
  • 新增 MySQL 用户

    zucchiniy
  • transition-delay 属性——动画示例

    transition-delay 规定动画在过渡效果开始等待的时间。值以秒(s)或毫秒(ms)为单位,表明动画过渡效果将在何时开始。取值为正时会延迟一段时间...

    Html5知典
  • Android开发笔记(五十八)铃声与震动

    SeekBar继承自进度条ProcessBar,有关ProcessBar的介绍见《Android开发笔记(四十九)异步任务处理AsyncTask》。Seek...

    用户4464237

扫码关注云+社区

领取腾讯云代金券