在将MongoDB集合拉入Apache Spark/Hadoop时,是否可以通过MongoInputFormat进行投影以减小返回的数据集的大小?
发布于 2015-09-14 02:33:34
可以,您可以在配置MongoDB Hadoop连接器时指定mongo.input.fields
选项:
val config = new Configuration()
config.set("mongo.input.uri", [Your URI])
config.set("mongo.input.query", [Your Query])
config.set("mongo.input.fields", "{column: 1}")
有关投影的语法,请参阅the official MongoDB Documentations。
发布于 2021-01-22 05:25:10
MongoDB Spark Connector支持Aggregation Pipelines。聚合管道包括$project运算符。
使用聚合管道链接中的示例,您可以在Python中执行以下操作。
{ "_id" : 1, "type" : "apple", "qty" : 5 }
{ "_id" : 2, "type" : "orange", "qty" : 10 }
{ "_id" : 3, "type" : "banana", "qty" : 15 }
uri = f'mongodb://{user}:{password}@{host}/{database}.{collection}'
pipeline = json.dumps({
'$project': {
'type': 1,
}
}
)
df = spark.read.format("mongo").option(
"uri", uri
).option(
"pipeline", pipeline
).load()
df.show()
+---+------+
|_id| type |
+---+------+
|1.0| apple|
|2.0|orange|
|3.0|banana|
+---+------+
这里的spark
是SparkSession对象。
我在AWS Glue作业中使用了类似的代码,将一个非常复杂的文档(深度和广度)简化为引用ID和文档树中的特定值之间的简单关联。在没有投影的情况下,由于文档模式中的不一致,我有类型转换错误。
发布于 2017-06-19 14:18:06
虽然Kent的答案是正确的,但有一种首选的方法可以做到这一点:
MongoConfigUtil.setFields(Configuration conf, DBObject fields);
虽然这实际上执行了Kent的答案中描述的相同操作,但在控制投影的键(mongo.input.fields
)将来会发生变化的情况下,您的代码不会中断。此外,此方法支持将JSON字符串或DBObject作为第二个参数进行传递。因此,如果您选择使用DBObject,则不必自己将其序列化为JSON字符串- MongoConfigUtil
会自行进行序列化。
https://stackoverflow.com/questions/28762883
复制相似问题