在搭建完Hadoop集群后,我们可以基于HDFS做一些离线计算。然而HDFS毕竟是基于文件的系统,所以当我们存储的数据要兼顾一些线上业务访问的时候(如接入层/推荐引擎的实时用户画像查询)就显得比较捉急了。HDFS查询读取没有索引,至少也是分钟级的,此时要是把支持高并发的数据库与Hadoop直接对接,岂不美哉?
在存放海量非标准结构型数据时,我们时常用到MongoDB。MongoDB是专为可扩展性,高性能和高可用性而设计的数据库。它可以从单服务器部署扩展到大型、复杂的多数据中心架构。利用内存计算的优势,MongoDB能够提供高性能的数据读写操作。可扩展+内存计算,这就为对接Hadoop大数据以及线上实时查询提供了很好的基础。
选定数据库后,我们将用到一个可连接MongoDB作为数据输入输出源的driver,和HDFS不同的是,它按照MongoDB中存储的行来进行split,并且可以将reduce的结果作为BSON文件或者直接写入到MongoDB中。
首先我们从github源码下载源码进行编译。推荐直接从http://search.maven.org自己下载编译好的jar包。mongo-hadoop依赖于mongo-java-driver,所以需要同时将这两个包加载到Hadoop集群中,有三种方法:
注意:不同的mongo-hadoop版本对应不同的mongo-java-driver版本,需注意对应。我应用的版本分别是mongo-hadoop-core-2.0.2.jar
和mongo-java-driver-3.4.2.jar
然后就可以开始写MapReduce了。Map和Reduce方法和HDFS上没什么两样,只是在main函数中,我们需要如下设置input和output相关参数:
MongoConfigUtil.setInputFormat(getConf(), MongoInputFormat.class);
MongoConfigUtil.setOutputFormat(getConf(), TextOutputFormat.class);
MongoConfigUtil.setInputURI(getConf(), "mongodb://mongo_db:password@127.0.0.1:13670/mongo_db.inputCollection?authSource=admin");
MongoConfigUtil.setAuthURI(getConf(), "mongodb://mongo_db:password@127.0.0.1:13670/admin");
上述代码只是连接了MongoDB作为input,如果需要把结果写会到MongoDB,加入如下代码即可:
MongoConfigUtil.setOutputURI(getConf(), "mongodb://mongo_db:password@127.0.0.1:13670/mongo_db.outputCollection?authSource=admin");
注意:不幸的是,这个地方我又踩到坑里了。当MongoDB设置了登录验证时,我们需要在写入/写出之前加上校验数据库的验证。MongoConfigUtil.setAuthURI()
这个方法只是设定split时的验证库,在写入/写出库账号不一致时可以用上。然而若是写入/写出库账号和验证账号在同一库中时,这种方法就不管用了。此时我们需要再db string后加入authSource=<验证库名>这么个参数
如果需要在拉取input时先利用查询参数过滤出一部分数据,这可以在hadoop执行时通过-D加入如下参数:
mongo.input.query={"_id":{"$gt":{"$date":1182470400000}}}
更多参数可参考https://github.com/mongodb/mongo-hadoop/wiki/Configuration-Reference
在学习example的时候,我又产生了疑问。他的main函数直接调用了ToolRunner.run()
,之前MapReduce在入口函数里设置job实例相关参数的地方去哪了?可以看到他的构造函数里甚至都没有出现过Job类。
通过分析源代码中core/src/main/java/com/mongodb/hadoop/util/MongoTool.java
这个文件,可以发现它其实就是继承了Configured类,并且实现了Tool接口。其中有这么个方法会在run返回之前调用,把该干的都干了:
private int runMapredJob(final Configuration conf) {
final JobConf job = new JobConf(conf, getClass());
/**
* Any arguments specified with -D <property>=<value>
* on the CLI will be picked up and set here
* They override any XML level values
* Note that -D<space> is important - no space will
* not work as it gets picked up by Java itself
*/
// TODO - Do we need to set job name somehow more specifically?
// This may or may not be correct/sane
job.setJarByClass(getClass());
final Class<? extends org.apache.hadoop.mapred.Mapper> mapper = MapredMongoConfigUtil.getMapper(conf);
if (LOG.isDebugEnabled()) {
LOG.debug("Mapper Class: " + mapper);
LOG.debug("Input URI: " + conf.get(MapredMongoConfigUtil.INPUT_URI));
}
job.setMapperClass(mapper);
Class<? extends org.apache.hadoop.mapred.Reducer> combiner = MapredMongoConfigUtil.getCombiner(conf);
if (combiner != null) {
job.setCombinerClass(combiner);
}
job.setReducerClass(MapredMongoConfigUtil.getReducer(conf));
job.setOutputFormat(MapredMongoConfigUtil.getOutputFormat(conf));
job.setOutputKeyClass(MapredMongoConfigUtil.getOutputKey(conf));
job.setOutputValueClass(MapredMongoConfigUtil.getOutputValue(conf));
job.setInputFormat(MapredMongoConfigUtil.getInputFormat(conf));
Class mapOutputKeyClass = MapredMongoConfigUtil.getMapperOutputKey(conf);
Class mapOutputValueClass = MapredMongoConfigUtil.getMapperOutputValue(conf);
if (mapOutputKeyClass != null) {
job.setMapOutputKeyClass(mapOutputKeyClass);
}
if (mapOutputValueClass != null) {
job.setMapOutputValueClass(mapOutputValueClass);
}
/**
* Determines if the job will run verbosely e.g. print debug output
* Only works with foreground jobs
*/
final boolean verbose = MapredMongoConfigUtil.isJobVerbose(conf);
/**
* Run job in foreground aka wait for completion or background?
*/
final boolean background = MapredMongoConfigUtil.isJobBackground(conf);
try {
RunningJob runningJob = JobClient.runJob(job);
if (background) {
LOG.info("Setting up and running MapReduce job in background.");
return 0;
} else {
LOG.info("Setting up and running MapReduce job in foreground, will wait for results. {Verbose? "
+ verbose + "}");
runningJob.waitForCompletion();
return 0;
}
} catch (final Exception e) {
LOG.error("Exception while executing job... ", e);
return 1;
}
}
因此我们在调整一些任务参数(如Mapper数量)时可以选择在自己的类里复写这个方法。