前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >打造自己的MapReduce[二]:Hadoop连接MongoDB

打造自己的MapReduce[二]:Hadoop连接MongoDB

作者头像
星回
发布2018-08-02 15:17:13
1.4K0
发布2018-08-02 15:17:13
举报
文章被收录于专栏:星回的实验室

在搭建完Hadoop集群后,我们可以基于HDFS做一些离线计算。然而HDFS毕竟是基于文件的系统,所以当我们存储的数据要兼顾一些线上业务访问的时候(如接入层/推荐引擎的实时用户画像查询)就显得比较捉急了。HDFS查询读取没有索引,至少也是分钟级的,此时要是把支持高并发的数据库与Hadoop直接对接,岂不美哉?

MongoDB

在存放海量非标准结构型数据时,我们时常用到MongoDB。MongoDB是专为可扩展性,高性能和高可用性而设计的数据库。它可以从单服务器部署扩展到大型、复杂的多数据中心架构。利用内存计算的优势,MongoDB能够提供高性能的数据读写操作。可扩展+内存计算,这就为对接Hadoop大数据以及线上实时查询提供了很好的基础。

Mongo-hadoop Connector

选定数据库后,我们将用到一个可连接MongoDB作为数据输入输出源的driver,和HDFS不同的是,它按照MongoDB中存储的行来进行split,并且可以将reduce的结果作为BSON文件或者直接写入到MongoDB中。

连接步骤

首先我们从github源码下载源码进行编译。推荐直接从http://search.maven.org自己下载编译好的jar包。mongo-hadoop依赖于mongo-java-driver,所以需要同时将这两个包加载到Hadoop集群中,有三种方法:

  1. 调用hadoop命令式加上-libjars参数,指定第三方依赖库;
  2. 手动上传到各个节点的HADOOP_CLASSPATH中;
  3. 传到HDFS上,在MapReduce代码中通过addClassPath加入依赖库目录。

注意:不同的mongo-hadoop版本对应不同的mongo-java-driver版本,需注意对应。我应用的版本分别是mongo-hadoop-core-2.0.2.jarmongo-java-driver-3.4.2.jar

然后就可以开始写MapReduce了。Map和Reduce方法和HDFS上没什么两样,只是在main函数中,我们需要如下设置input和output相关参数:

代码语言:javascript
复制
MongoConfigUtil.setInputFormat(getConf(), MongoInputFormat.class);  
MongoConfigUtil.setOutputFormat(getConf(), TextOutputFormat.class);

MongoConfigUtil.setInputURI(getConf(), "mongodb://mongo_db:password@127.0.0.1:13670/mongo_db.inputCollection?authSource=admin");  
MongoConfigUtil.setAuthURI(getConf(), "mongodb://mongo_db:password@127.0.0.1:13670/admin");  

上述代码只是连接了MongoDB作为input,如果需要把结果写会到MongoDB,加入如下代码即可:

代码语言:javascript
复制
MongoConfigUtil.setOutputURI(getConf(), "mongodb://mongo_db:password@127.0.0.1:13670/mongo_db.outputCollection?authSource=admin");  

注意:不幸的是,这个地方我又踩到坑里了。当MongoDB设置了登录验证时,我们需要在写入/写出之前加上校验数据库的验证。MongoConfigUtil.setAuthURI()这个方法只是设定split时的验证库,在写入/写出库账号不一致时可以用上。然而若是写入/写出库账号和验证账号在同一库中时,这种方法就不管用了。此时我们需要再db string后加入authSource=<验证库名>这么个参数

如果需要在拉取input时先利用查询参数过滤出一部分数据,这可以在hadoop执行时通过-D加入如下参数:

代码语言:javascript
复制
mongo.input.query={"_id":{"$gt":{"$date":1182470400000}}}  

更多参数可参考https://github.com/mongodb/mongo-hadoop/wiki/Configuration-Reference

函数入口

在学习example的时候,我又产生了疑问。他的main函数直接调用了ToolRunner.run(),之前MapReduce在入口函数里设置job实例相关参数的地方去哪了?可以看到他的构造函数里甚至都没有出现过Job类。

通过分析源代码中core/src/main/java/com/mongodb/hadoop/util/MongoTool.java这个文件,可以发现它其实就是继承了Configured类,并且实现了Tool接口。其中有这么个方法会在run返回之前调用,把该干的都干了:

代码语言:javascript
复制
 private int runMapredJob(final Configuration conf) {
        final JobConf job = new JobConf(conf, getClass());
        /**
         * Any arguments specified with -D <property>=<value>
         * on the CLI will be picked up and set here
         * They override any XML level values
         * Note that -D<space> is important - no space will
         * not work as it gets picked up by Java itself
         */
        // TODO - Do we need to set job name somehow more specifically?
        // This may or may not be correct/sane
        job.setJarByClass(getClass());
        final Class<? extends org.apache.hadoop.mapred.Mapper> mapper = MapredMongoConfigUtil.getMapper(conf);

        if (LOG.isDebugEnabled()) {
            LOG.debug("Mapper Class: " + mapper);
            LOG.debug("Input URI: " + conf.get(MapredMongoConfigUtil.INPUT_URI));
        }
        job.setMapperClass(mapper);
        Class<? extends org.apache.hadoop.mapred.Reducer> combiner = MapredMongoConfigUtil.getCombiner(conf);
        if (combiner != null) {
            job.setCombinerClass(combiner);
        }
        job.setReducerClass(MapredMongoConfigUtil.getReducer(conf));

        job.setOutputFormat(MapredMongoConfigUtil.getOutputFormat(conf));
        job.setOutputKeyClass(MapredMongoConfigUtil.getOutputKey(conf));
        job.setOutputValueClass(MapredMongoConfigUtil.getOutputValue(conf));
        job.setInputFormat(MapredMongoConfigUtil.getInputFormat(conf));
        Class mapOutputKeyClass = MapredMongoConfigUtil.getMapperOutputKey(conf);
        Class mapOutputValueClass = MapredMongoConfigUtil.getMapperOutputValue(conf);

        if (mapOutputKeyClass != null) {
            job.setMapOutputKeyClass(mapOutputKeyClass);
        }
        if (mapOutputValueClass != null) {
            job.setMapOutputValueClass(mapOutputValueClass);
        }

        /**
         * Determines if the job will run verbosely e.g. print debug output
         * Only works with foreground jobs
         */
        final boolean verbose = MapredMongoConfigUtil.isJobVerbose(conf);
        /**
         * Run job in foreground aka wait for completion or background?
         */
        final boolean background = MapredMongoConfigUtil.isJobBackground(conf);
        try {
            RunningJob runningJob = JobClient.runJob(job);
            if (background) {
                LOG.info("Setting up and running MapReduce job in background.");
                return 0;
            } else {
                LOG.info("Setting up and running MapReduce job in foreground, will wait for results.  {Verbose? "
                         + verbose + "}");
                runningJob.waitForCompletion();
                return 0;
            }
        } catch (final Exception e) {
            LOG.error("Exception while executing job... ", e);
            return 1;
        }

    }

因此我们在调整一些任务参数(如Mapper数量)时可以选择在自己的类里复写这个方法。

本文参与 腾讯云自媒体同步曝光计划,分享自作者个人站点/博客。
如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 作者个人站点/博客 前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • MongoDB
  • Mongo-hadoop Connector
  • 连接步骤
  • 函数入口
相关产品与服务
访问管理
访问管理(Cloud Access Management,CAM)可以帮助您安全、便捷地管理对腾讯云服务和资源的访问。您可以使用CAM创建子用户、用户组和角色,并通过策略控制其访问范围。CAM支持用户和角色SSO能力,您可以根据具体管理场景针对性设置企业内用户和腾讯云的互通能力。
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档