前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >Spark与mongodb整合完整版本

Spark与mongodb整合完整版本

作者头像
Spark学习技巧
发布2018-01-30 18:48:57
9.1K0
发布2018-01-30 18:48:57
举报
文章被收录于专栏:Spark学习技巧

一,准备阶段

MongoDB Connector for spark是的spark操作mongodb数据很简单,这样方便使用spark去分析mongodb数据,sql分析,流式处理,机器学习,图计算。

要求:

1),要有mongodb和spark的基础

2),mongodb要求是2.6以上

3),Spark 1.6.x

4),Scala 2.10.x 使用mongo-spark-connector_2.10

5),Scala 2.11.x 使用mongo-spark-connector_2.11

代码语言:js
复制
<dependency>
  <groupId>org.mongodb.spark</groupId>
  <artifactId>mongo-spark-connector_2.10</artifactId>
  <version>1.1.0</version>
</dependency>

二,RDD操纵mongodb

1,导入Mongodb Connector依赖

为了SparkContext和RDD能使用Mongodb Connector特殊的函数和隐式转换,需要引入相关依赖。

import com.mongodb.spark._

2,链接到mongodb

当RDD需要读取或者写入数据到mongodb的时候,会自动创建链接。

3,写入数据到mongodb

将RDD数据写入到mongodb的时候,数据必须转化为BSON document。可以写个简单的map函数来实现将数据转化为Document或者BSONDocument或者DBObject

一些scala的类型是不被支持的,应该转化为相等的java类型。为了转化Scala类型到原生的类型,需要导入下面的包,然后使用.asJava方法:

import scala.collection.JavaConverters._

A),MongoSpark.save()

可以使用MongoSpark.save()方法将RDD数据写入到Mongodb,如下:

代码语言:js
复制
import org.bson.Document
val documents = sc.parallelize((1 to 10).map(i => Document.parse(s"{test: $i}")))
MongoSpark.save(documents) // Uses the SparkConf for configuration

也可以指定WriteConfig。

代码语言:js
复制
import com.mongodb.spark.config._
val writeConfig = WriteConfig(Map("collection" -> "spark", "writeConcern.w" -> "majority"), Some(WriteConfig(sc)))
val sparkDocuments = sc.parallelize((1 to 10).map(i => Document.parse(s"{spark: $i}")))
MongoSpark.save(sparkDocuments, writeConfig)

B),RDD Save Helper Methods

RDD还有一个隐式的辅助函数,saveToMongoDB(),可以用来写数据到Mongodb,如下:

代码语言:js
复制
import com.mongodb.spark._  //
val docs = """
             |{"name": "Ucloud ues", "ip": "10.7.1.8"}
             |{"name": "Gandalf", "ip": "127.0.0.1"}
             |{"name": "Bombur"}""".trim.stripMargin.split("[\\r\\n]+").toSeq
MongoSpark.save(sc.parallelize(docs.map(Document.parse)))
sc.parallelize(docs.map(Document.parse)).saveToMongoDB()

也可以指定WriteConfig。

代码语言:js
复制
documents.saveToMongoDB(WriteConfig(Map("uri" -> "mongodb://example.com/database.collection")))

4,从mongodb读取数据分析

A),MongoSpark.load()

该方法主要是从mongodb里面捞取数据做RDD,。

代码语言:js
复制
val rdd = MongoSpark.load(sc)
println(rdd.count)
println(rdd.first.toJson)

也可以指定ReadConfig

代码语言:js
复制
import com.mongodb.spark.config._
val readConfig = ReadConfig(Map("collection" -> "spark", "readPreference.name" -> "secondaryPreferred"), Some(ReadConfig(sc)))
val customRdd = MongoSpark.load(sc, readConfig)
println(customRdd.count)
println(customRdd.first.toJson)

B),SparkContext Load Helper Methods

SparkContext有一个隐式的辅助方法loadFromMongoDB,用来从Mongodb捞取数据。

sc.loadFromMongoDB()

也可以为其,指定配置ReadConfig

代码语言:js
复制
sc.loadFromMongoDB(ReadConfig(Map("uri" -> "mongodb://example.com/database.collection"))) // Uses the ReadConfig

5,Aggregation

在某些情况下,使用aggregation pipeline 可能比直接使用RDD的filter性能更好。Filter过滤数据看似是一个简单的RDD操作,实际上性能很低。比如,通常我们会如下使用:

代码语言:js
复制
val filteredRdd = rdd.filter(doc => doc.getInteger("test") > 5)
println(filteredRdd.count)
println(filteredRdd.first.toJson)

MongodbRDD可以传入一个aggregation pipeline ,允许在mongodb中过滤数据,然后仅仅传入需要的数据给Spark。

代码语言:js
复制
val aggregatedRdd = rdd.withPipeline(Seq(Document.parse("{ $match: { test : { $gt : 5 } } }")))
println(aggregatedRdd.count)
println(aggregatedRdd.first.toJson)

使用aggregation pipeline也提供了处理空值结果的好处,而过滤方法则没有。比如上面的例子中,假如filter没有任何数据,将会抛出异常如下:

代码语言:js
复制
ERROR Executor: Exception in task 0.0 in stage 1.0 (TID 8) java.lang.NullPointerException.

6,MongoSpark.builder()

如果需要对配置进行精细控制,那么MongoSpark配套提供了一个builder() 方法,用于配置Mongo Spark Connector的所有方面。也提供了创建RDD,DataFrame,Dataset的API。

三,SparkSql操纵mongodb

1,引入依赖

与RDD操纵mongodb不同的是,以SparkSql的形式操纵mongodb还需要引入SqlContext相关的特定的方法和隐式转换。

代码语言:js
复制
import com.mongodb.spark._
import com.mongodb.spark.sql._

2,创建sqlContext

代码语言:js
复制
import org.apache.spark.sql.SQLContext
val sqlContext = SQLContext.getOrCreate(sc)

3,DataFrames 和Datasets

Mongo Spark Connector提供了com.mongodb.spark.sql.DefaultSource类,可以通过它来从mongodb里创建DataFrame和Datasets。但是,为了方便创建一个DataFrame,该连接器提供了MongoSpark助手load(sqlContext)。 MongoSpark.load(sqlContext)是一个通过DataFrameReader读取数据的简单封装。

代码语言:js
复制
val df = MongoSpark.load(sqlContext) // Uses the SparkConf
df.printSchema()

4,显式指定schema

默认情况下,从SQLContext中的MongoDB读取通过从数据库中抽样文档来推测schema信息的。显示的声明schema信息,如下操作

代码语言:js
复制
case class Character(name: String, age: Int)
val explicitDF = MongoSpark.load[Character](sqlContext)
explicitDF.printSchema()

可以使用case class将DataFrame转化为Dataset

代码语言:js
复制
val dataset = explicitDF.as[Character]

RDD也可以转化为DataFrame和Dataset

代码语言:js
复制
val rdd = MongoSpark.load(sc)
val dfInferredSchema = rdd.toDF()
val dfExplicitSchema = rdd.toDF[Character]()
val ds = rdd.toDS[Character]()

5,更多创建DataFrame的方法

使用SQLContext的方法创建DataFrame

代码语言:js
复制
val df2 = sqlContext.loadFromMongoDB() // Uses the SparkConf for configuration

val df3 = sqlContext.loadFromMongoDB(ReadConfig(Map("uri" -> "mongodb://example.com/database.collection"))) // Uses the ReadConfig

val df4 = sqlContext.read.mongo()

sqlContext.read.format("com.mongodb.spark.sql").load()

// Set custom options
import com.mongodb.spark.config._

val customReadConfig = ReadConfig(Map("readPreference.name" -> "secondaryPreferred"), Some(ReadConfig(sc)))
val df5 = sqlContext.read.mongo(customReadConfig)

val df6 = sqlContext.read.format("com.mongodb.spark.sql").options(customReadConfig.asOptions).load()

6,Filter

当DataFrame或者SParkSql使用filter的时候,MongoConnector会构建一个aggregation pipeline 将数据过滤下推到mongodb。

df.filter(df("age") < 100).show()

7,Save DataFrames to MongoDB

Mongodb Spark Connector还提供了将DataFrame持久化到mongodb的操作。

代码语言:js
复制
MongoSpark.save(centenarians.write.option("collection", "hundredClub").mode("overwrite"))

println("Reading from the 'hundredClub' collection:")
MongoSpark.load[Character](sqlContext, ReadConfig(Map("collection" -> "hundredClub"), Some(ReadConfig(sqlContext)))).show()

也可以使用下面的方法来保存数据。

代码语言:js
复制
centenarians.write.option("collection", "hundredClub").mode("overwrite").mongo()
centenarians.write.option("collection", "hundredClub").mode("overwrite").format("com.mongodb.spark.sql").save()

四,数据类型

Spark支持数量有限的数据类型,以确保所有BSON类型于Spark DataFrames / Datasets中的类型都可以相互转化。

为了更好的支持Dataset,已经创建好了下面的Scala的case class,(com.mongodb.spark.sql.fieldTypes)和JavaBean class (com.mongodb.spark.sql.fieldTypes.api.java)

五,配置

1,配置的方法

A),使用Spark配置

三种方式可以实现

a),sparkconf :使用SparkConf配置的时候,需要在配置项前面带上特定前缀。

b),--conf

c),spark-default.conf

B),使用ReadConfig和WriteConfig

该方式的配置会覆盖掉所有的SparkConf配置

C),使用Options Map

Spark的API有部分支持Map[String,String],比如DataFrameReader和DataFrameWriter。可以使用asOptions()方法,将自定义的ReadConfig或者WriteConfig转化为一个map。

D),使用System Property

Mongodb Spark Connector为MongoClient提供了cache机制,只能通过SystemProperty配置。

2,输入配置

如果通过SparkConf设置Connector,配置必须加的前缀是:spark.mongodb.input

属性名称

描述

uri

Required。格式:mongodb://host:port/

database

Required。要读的数据库名称

collection

Required。要读的collection

localThreshold

从多个mongodbserver中选取一个Server的阈值,默认15ms

readPreference.name

要使用的Read Preference。默认:Primary。

readPreference.tagSets

要用的ReadPreference TagSets。

readConcern.level

要使用的Read Concern 等级。

sampleSize

制作schema时的采样数据的条数:1000.

partitioner

分区的策略。MongoDefaultPartitioner,下面讲。

3,Partitioner 配置

Mongodb作为spark数据源,分区数据的策略有很多种。目前,提供以下几种分区策略。在通过sparkconf配置的时候需要使用spark.mongodb.input.partitionerOptions.做前缀

A),MongoDefaultPartitioner

默认的分区策略。实际上是封装了MongoSamplePartitioner。

B),MongoSamplePartitioner

要求mongodb版本是3.2+。用于所有部署的通用分区器。使用平均文档大小和集合的随机抽样来确定集合的合适分区。

属性名

描述

partitionKey

分割收集数据的字段。该字段应该被索引并且包含唯一的值。默认_id

partitionSizeMB

每个分区的大小(以MB为单位).默认 64 MB

samplesPerPartition

每个分区要采集的样本文档的数量。默认 10

C),MongoShardedPartitioner

针对分片集群的分区器。根据chunk数据集对collection进行分片。需要读取配置数据库。

属性名

描述

shardkey

分割collection数据的字段,该字段应该被索引并且包含唯一的值。默认_id

D),MongoSplitVectorPartitioner

独立或复制集的分区器。在standalone或primary 上使用splitVector命令来确定数据库的分区。需要运行splitVector命令的权限。

属性名

描述

partitionKey

默认:_id.分割collection数据的字段。该字段会被索引,必须包含唯一的值

partitionSizeMB

默认:64MB.每2个分区的大小,以MB为单位。

E),MongoPaginateByCountPartitioner

用于所有部署模式的缓慢的通用分区器。创建特定数量的分区。需要查询每个分区。

属性名

描述

partitionKey

默认:_id.分割collection数据的字段。该字段会被索引,值唯一

numberOfPartitions

分区数,默认64.

F),MongoPaginateBySizePartitioner

用于所有部署模式的缓慢的通用分区器。根据数据大小创建分区。需要查询每个分区。

属性名

描述

partitionKey

默认:_id.分割collection数据的字段。该字段会被索引,必须包含唯一的值

partitionSizeMB

默认:64MB.每2个分区的大小,以MB为单位。

4,uri配置设置

通过SparkConf配置的话,需要加上spark.mongodb.input.前缀。

代码语言:js
复制
spark.mongodb.input.uri=mongodb://127.0.0.1/databaseName.collectionName?readPreference=primaryPreferred

单独每个属性进行配置

代码语言:js
复制
spark.mongodb.input.uri=mongodb://127.0.0.1/
spark.mongodb.input.database=databaseName
spark.mongodb.input.collection=collectionName
spark.mongodb.input.readPreference.name=primaryPreferred

如果你既在uri里面指定了配置,也单独设置了配置,那么uri里的会覆盖单独的配置。

代码语言:js
复制
spark.mongodb.input.uri=mongodb://127.0.0.1/foobar
spark.mongodb.input.database=bar

真正的链接数据库就是foobar。

5,输出配置

spark.mongodb.output. 前缀。

属性名

描述

uri

Required。mongodb://host:port/

database

Required。

collection

Required。

localThreshold

The threshold (milliseconds) for choosing a server from multiple MongoDB servers.Default: 15 ms

writeConcern.w

The write concern w value.Default w: 1

writeConcern.journal

The write concern journal value.

writeConcern.wTimeoutMS

The write concern wTimeout value.

uri配置,前缀spark.mongodb.output.

代码语言:js
复制
spark.mongodb.output.uri=mongodb://127.0.0.1/test.myCollection"

也可以进行独立配置

代码语言:js
复制
spark.mongodb.output.uri=mongodb://127.0.0.1/
spark.mongodb.output.database=test
spark.mongodb.output.collection=myCollection

Uri和单独都进行了配置,以uri里配置为准。如,下面最终就是foobar。

代码语言:js
复制
spark.mongodb.output.uri=mongodb://127.0.0.1/foobar
spark.mongodb.output.database=bar

6,Cache Configuration

MongoConnector的每个MongoClient包含了一个cache,所以可以实现多线程共享MongoClient。

由于cache的设置是在spark configuration配置生效之前,所以cache仅仅支持通过System Property设置。

属性名称

描述

spark.mongodb.keep_alive_ms

The length of time to keep a MongoClient available for sharing.Default: 5000

六,总结

通过连接器,使用Spark库可以访问所有MongoDB数据集:使用通过Dataset使用sql分析数据,这点收益与自动schema推断;Streaming;机器学习;图计算。

对于Spark读取外部数据封装RDD,实际上最终要的点就是计算分区。因为这决定者你任务的并发度和处理速度,完全理解数据,掌握数据在Spark应用中的流动过程,对做一个少bug的应用大有裨益。后面会出文章对这点,对多种数据源详细介绍,欢迎大家持续关注浪尖更新。

本文翻译自:https://docs.mongodb.com/spark-connector/v1.1/

本文参与 腾讯云自媒体同步曝光计划,分享自微信公众号。
原始发表:2017-08-14,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 浪尖聊大数据 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
相关产品与服务
云数据库 MongoDB
腾讯云数据库 MongoDB(TencentDB for MongoDB)是腾讯云基于全球广受欢迎的 MongoDB 打造的高性能 NoSQL 数据库,100%完全兼容 MongoDB 协议,支持跨文档事务,提供稳定丰富的监控管理,弹性可扩展、自动容灾,适用于文档型数据库场景,您无需自建灾备体系及控制管理系统。
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档