前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >mapreduce如何使用本地文件 转

mapreduce如何使用本地文件 转

作者头像
stys35
发布2019-03-05 16:28:36
1.5K0
发布2019-03-05 16:28:36
举报
文章被收录于专栏:工作笔记精华工作笔记精华
代码语言:javascript
复制
对于java来说,读取本地文件再正常不过。但是对于mapreduce程序来说,读取本地文件常常会陷入误区。本地明明有这个文件,在本地运行jar包,mapreduce为什么读不到?因为我们知道,mapreduce程序本来就不是在本地执行的,程序会分布式的在各个机器上执行,你当然读不到文件,那所谓的“本地文件”就不叫“本地文件”,当然只有一个例外:你的hadoop集群是伪集群。

比如下面的示例:

package test;
import java.io.BufferedReader;
import java.io.File;
import java.io.FileNotFoundException;
import java.io.FileReader;
import java.io.IOException;
import java.net.URI;
import java.util.Iterator;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.filecache.DistributedCache;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.FileInputFormat;
import org.apache.hadoop.mapred.FileOutputFormat;
import org.apache.hadoop.mapred.JobClient;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.MapReduceBase;
import org.apache.hadoop.mapred.Mapper;
import org.apache.hadoop.mapred.OutputCollector;
import org.apache.hadoop.mapred.Reducer;
import org.apache.hadoop.mapred.Reporter;
import org.apache.hadoop.mapred.TextInputFormat;
import org.apache.hadoop.mapred.TextOutputFormat;
import org.apache.hadoop.mapreduce.Mapper.Context;
import org.apache.hadoop.util.GenericOptionsParser;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
public class FileTest
{
    public static void main(String args[])
    {
        int mr = 0;
        try
        {
            mr = ToolRunner
                    .run(new Configuration(), new FileTestDriver(), args);
        }
        catch (Exception e)
        {
            e.printStackTrace();
        }
        
        System.exit(mr);
    }
}
class FileTestDriver extends Configured implements Tool
{
    @Override
    public int run(String[] arg0) throws Exception
    {
        Configuration config = getConf();
        JobConf conf = new JobConf(config, FileTestDriver.class);
        String[] otherArgs = new GenericOptionsParser(config, arg0)
                .getRemainingArgs();
        String input = otherArgs[0];
        String ouput = otherArgs[1];
        conf.setOutputKeyClass(Text.class);
        conf.setOutputValueClass(Text.class);
        conf.set("mapred.task.timeout", "6000000");
        conf.setMapperClass(FileTestMapper.class);
        conf.setReducerClass(FileTestReducer.class);
        conf.setInputFormat(TextInputFormat.class);
        conf.setOutputFormat(TextOutputFormat.class);
        FileInputFormat.setInputPaths(conf, new Path(input));
        FileOutputFormat.setOutputPath(conf, new Path(ouput));
        JobClient.runJob(conf);
        return 0;
    }
}
class FileTestMapper extends MapReduceBase implements
        Mapper<LongWritable, Text, Text, Text>
{
    private String filepath = "";
    
    public void configure(JobConf job)
    {
        filepath = job.get("files");
    }
    public void map(LongWritable key, Text value,
            OutputCollector<Text, Text> output, Reporter reporter)
            throws IOException
    {
        String url = "qq.com";
        String host = getTop100DomainTest(url, filepath);
        output.collect(new Text(url + "\t" + host), new Text(""));
       
    }
    public String getTop100DomainTest(String url, String filepath)
    {
        try
        {
            BufferedReader reader = new BufferedReader(new FileReader(new File(
                    filepath)));
            String line = "";

            while ((line = reader.readLine()) != null)
            {
                // splitLine[0]为host 后面跟着域名
                line = line.replaceAll("( )+", " ");
                String[] splitLine = line.split(" ");
                for (int i = 1; i < splitLine.length; i++)
                {
                    String host = splitLine[i];
                    if (url.equals(host))
                    {
                                return splitLine[0];
                    }
                }
            }
            return "";
        }
        catch (FileNotFoundException e)
        {
            return "";
        }
        catch (IOException e)
        {
            return "";
        }

    }

}

class FileTestReducer extends MapReduceBase implements
        Reducer<Text, Text, Text, Text>
{
    public void reduce(Text key, Iterator<Text> values,
            OutputCollector<Text, Text> output, Reporter reporter)
            throws IOException
    {
        output.collect(key, new Text(""));
    }

}

 public String getTop100DomainTest(String url, String filepath)方法读取文件,并根据url返回url的domain。

将上述程序打包test.jar后,

运行命令:

hadoop jar test.jar test.FileTest -D files="/opt/top100.txt"  /test/test /test/test1

如果您是伪集群,那么恭喜,程序成功运行,如果您是分布式,那么程序很可能运行不成功?

我们知道原理后,这段代码在分布式的情况下,也可以运行成功,怎么办?那就把集群的所有机器都拷贝top100.txt到/opt下!

程序运行成功了吧?但其实是很老土的。当你集群数多,你要一一拷贝,那是多么麻烦的一件事,而且所有的配置文件必须在同样的文件夹下,如果你能忍受,那go ahead。

实际上mapreduce提供了一个缓存方法DistributedCache。

只需在配置阶段加入:

DistributedCache.addCacheFile(new URI("/test/top100.txt"), conf);

即可,但此处的"/test/top100.txt"为hdfs的路径。

然后在mapper 的public void configure(JobConf job)方法中加入

public void configure(JobConf job)
    {
        try
        {
            localFiles = DistributedCache.getLocalCacheFiles(job);
        }
        catch (IOException e)
        {
            e.printStackTrace();
        }
    }

即可。

map中引用,通过 path.toUri().getPath()即可访问到file。
本文参与 腾讯云自媒体分享计划,分享自作者个人站点/博客。
如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 作者个人站点/博客 前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体分享计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档