前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >SDP(8):文本式数据库-MongoDB-Scala基本操作

SDP(8):文本式数据库-MongoDB-Scala基本操作

作者头像
用户1150956
发布2018-03-16 16:35:45
1.8K0
发布2018-03-16 16:35:45
举报

  MongoDB是一种文本式数据库。与传统的关系式数据库最大不同是MongoDB没有标准的格式要求,即没有schema,合适高效处理当今由互联网+商业产生的多元多态数据。MongoDB也是一种分布式数据库,充分具备大数据处理能力和高可用性。MongoDB提供了scala终端驱动mongo-scala-driver,我们就介绍一下MongoDB数据库和通过scala来进行数据操作编程。

   与关系数据库相似,MongoDB结构为Database->Collection->Document。Collection对应Table,Document对应Row。因为MongoDB没有schema,所以Collection中的Document可以是不同形状格式的。在用scala使用MongoDB之前必须先建立连接,scala-driver提供了多种连接方式:

  val client1 = MongoClient()
  val client2 = MongoClient("mongodb://localhost:27017")
  
  val clusterSettings = ClusterSettings.builder()
         .hosts(List(new ServerAddress("localhost:27017")).asJava).build()
  val clientSettings = MongoClientSettings.builder().clusterSettings(clusterSettings).build()
  val client = MongoClient(clientSettings)

下面是一些对应的MongoClient构建函数:

  /**
   * Create a default MongoClient at localhost:27017
   *
   * @return MongoClient
   */
  def apply(): MongoClient = apply("mongodb://localhost:27017")

  /**
   * Create a MongoClient instance from a connection string uri
   *
   * @param uri the connection string
   * @return MongoClient
   */
  def apply(uri: String): MongoClient = MongoClient(uri, None)

  /**
   * Create a MongoClient instance from a connection string uri
   *
   * @param uri the connection string
   * @param mongoDriverInformation any driver information to associate with the MongoClient
   * @return MongoClient
   * @note the `mongoDriverInformation` is intended for driver and library authors to associate extra driver metadata with the connections.
   */
  def apply(uri: String, mongoDriverInformation: Option[MongoDriverInformation]): MongoClient = {...}
  /**
   * Create a MongoClient instance from the MongoClientSettings
   *
   * @param clientSettings MongoClientSettings to use for the MongoClient
   * @return MongoClient
   */
  def apply(clientSettings: MongoClientSettings): MongoClient = MongoClient(clientSettings, None)

  /**
   * Create a MongoClient instance from the MongoClientSettings
   *
   * @param clientSettings MongoClientSettings to use for the MongoClient
   * @param mongoDriverInformation any driver information to associate with the MongoClient
   * @return MongoClient
   * @note the `mongoDriverInformation` is intended for driver and library authors to associate extra driver metadata with the connections.
   */
  def apply(clientSettings: MongoClientSettings, mongoDriverInformation: Option[MongoDriverInformation]): MongoClient = {

与MongoDB建立连接后可用选定Database:

 val db = client.getDatabase("testdb")

由于没有格式限制,所以testdb不需要预先构建,像文件系统的directory一样,不存在时可以自动创建。同样,db内的collection也是可以自动创建的,因为不需要预先设定字段格式(no-schema):

val db: MongoDatabase = client.getDatabase("testdb")
val userCollection: MongoCollection[Document] = db.getCollection("users")

collection中Document类的构建函数如下: 

 /**
   * Create a new document from the elems
   * @param elems   the key/value pairs that make up the Document. This can be any valid `(String, BsonValue)` pair that can be
   *                transformed into a [[BsonElement]] via [[BsonMagnets.CanBeBsonElement]] implicits and any [[BsonTransformer]]s that
   *                are in scope.
   * @return        a new Document consisting key/value pairs given by `elems`.
   */
  def apply(elems: CanBeBsonElement*): Document = {
    val underlying = new BsonDocument()
    elems.foreach(elem => underlying.put(elem.key, elem.value))
    new Document(underlying)
  }

Document可以通过CanbeBsonElement构建。CanbeBsonElement是一种key/value结构:

 /**
   * Represents a single [[BsonElement]]
   *
   * This is essentially a `(String, BsonValue)` key value pair. Any pair of `(String, T)` where type `T` has a [[BsonTransformer]] in
   * scope into a [[BsonValue]] is also a valid pair.
   */
  sealed trait CanBeBsonElement {
    val bsonElement: BsonElement

    /**
     * The key of the [[BsonElement]]
     * @return the key
     */
    def key: String = bsonElement.getName

    /**
     * The value of the [[BsonElement]]
     * @return the BsonValue
     */
    def value: BsonValue = bsonElement.getValue
  }

  /**
   * Implicitly converts key/value tuple of type (String, T) into a `CanBeBsonElement`
   *
   * @param kv the key value pair
   * @param transformer the implicit [[BsonTransformer]] for the value
   * @tparam T the type of the value
   * @return a CanBeBsonElement representing the key/value pair
   */
  implicit def tupleToCanBeBsonElement[T](kv: (String, T))(implicit transformer: BsonTransformer[T]): CanBeBsonElement = {
    new CanBeBsonElement {
      override val bsonElement: BsonElement = BsonElement(kv._1, transformer(kv._2))
    }
  }

有了上面这个tupleToCanBeBsonElement隐式转换函数就可以用下面的方式构建Document了: 

  val doc: Document = Document("_id" -> 0, "name" -> "MongoDB", "type" -> "database",
    "count" -> 1, "info" -> Document("x" -> 203, "y" -> 102))

这种key/value关系对应了一般数据库表中的字段名称/字段值。下面我们尝试建两个不同格式的Document并把它们加入到同一个collection里:

  val alice = Document("_id" -> 1, "name" -> "alice wong", "age" -> 24)
  val tiger = Document("first" -> "tiger", "last" -> "chan", "name" -> "tiger chan", "age" -> "unavailable")

  val addAlice: Observable[Completed] = userCollection.insertOne(alice)
  val addTiger: Observable[Completed] = userCollection.insertOne(tiger)

上面这个例子证明了MongoDB的no-schema特性。用insert方法加入数据返回结果是个Obervable类型。这个类型与Future很像:只是一种运算的描述,必须通过subscribe方法来实际运算获取结果:

   addAlice.subscribe(new Observer[Completed] {
    override def onComplete(): Unit = println("insert alice completed.")
    override def onNext(result: Completed): Unit = println("insert alice sucessful.")
    override def onError(e: Throwable): Unit = println(s"insert error: ${e.getMessage}")
  })

又或者转成Future后用Future方法如Await来运算:

  def headResult(observable: Observable[Completed]) = Await.result(observable.head(), 2 seconds)
  val r1 = headResult(addTiger)

Mongo-Scala提供了Observable到Future的转换函数:

   /**
     * Collects the [[Observable]] results and converts to a [[scala.concurrent.Future]].
     *
     * Automatically subscribes to the `Observable` and uses the [[collect]] method to aggregate the results.
     *
     * @note If the Observable is large then this will consume lots of memory!
     *       If the underlying Observable is infinite this Observable will never complete.
     * @return a future representation of the whole Observable
     */
    def toFuture(): Future[Seq[T]] = {
      val promise = Promise[Seq[T]]()
      collect().subscribe((l: Seq[T]) => promise.success(l), (t: Throwable) => promise.failure(t))
      promise.future
    }

    /**
     * Returns the head of the [[Observable]] in a [[scala.concurrent.Future]].
     *
     * @return the head result of the [[Observable]].
     */
    def head(): Future[T] = {
      import scala.concurrent.ExecutionContext.Implicits.global
      headOption().map {
        case Some(result) => result
        case None         => null.asInstanceOf[T] // scalastyle:ignore null
      }
    }

也可以用insertMany来成批加入:

  val peter = Document("_id" -> 3, "first" -> "peter", "age" -> "old")
  val chan = Document("last" -> "chan", "family" -> "chan's")
  val addMany = userCollection.insertMany(List(peter,chan))
  val r2 = headResult(addMany)

现在我们可以用count得出usersCollection中Document数量和用find把所有Document都印出来:

  userCollection.count.head.onComplete {
    case Success(c) => println(s"$c documents in users collection")
    case Failure(e) => println(s"count() error: ${e.getMessage}")
  }
  userCollection.find().toFuture().onComplete {
    case Success(users) => users.foreach(println)
    case Failure(e) => println(s"find error: ${e.getMessage}")
  }
  scala.io.StdIn.readLine()

显示结果:

insert alice sucessful.
insert alice completed.
4 documents in users collection
Document((_id,BsonInt32{value=1}), (name,BsonString{value='alice wong'}), (age,BsonInt32{value=24}))
Document((_id,BsonObjectId{value=5a96641aa83f2923ab437602}), (first,BsonString{value='tiger'}), (last,BsonString{value='chan'}), (name,BsonString{value='tiger chan'}), (age,BsonString{value='unavailable'}))
Document((_id,BsonInt32{value=3}), (first,BsonString{value='peter'}), (age,BsonString{value='old'}))
Document((_id,BsonObjectId{value=5a96641aa83f2923ab437603}), (last,BsonString{value='chan'}), (family,BsonString{value='chan's'}))

这个BsonString很碍眼,用隐式转换来把它转成String:

object Helpers {

  implicit class DocumentObservable[C](val observable: Observable[Document]) extends ImplicitObservable[Document] {
    override val converter: (Document) => String = (doc) => doc.toJson
  }

  implicit class GenericObservable[C](val observable: Observable[C]) extends ImplicitObservable[C] {
    override val converter: (C) => String = (doc) => doc.toString
  }

  trait ImplicitObservable[C] {
    val observable: Observable[C]
    val converter: (C) => String

    def results(): Seq[C] = Await.result(observable.toFuture(), 10 seconds)
    def headResult() = Await.result(observable.head(), 10 seconds)
    def printResults(initial: String = ""): Unit = {
      if (initial.length > 0) print(initial)
      results().foreach(res => println(converter(res)))
    }
    def printHeadResult(initial: String = ""): Unit = println(s"${initial}${converter(headResult())}")
  }

}

现在再列印:

  userCollection.find().printResults("all documents:")

all documents:{ "_id" : 1, "name" : "alice wong", "age" : 24 }
{ "_id" : { "$oid" : "5a9665cea83f29243ccacbd2" }, "first" : "tiger", "last" : "chan", "name" : "tiger chan", "age" : "unavailable" }
{ "_id" : 3, "first" : "peter", "age" : "old" }
{ "_id" : { "$oid" : "5a9665cea83f29243ccacbd3" }, "last" : "chan", "family" : "chan's" }

现在可读性强多了。find()无条件选出所有Document。MongoDB-Scala通过Filters对象提供了完整的查询条件构建函数如equal:

 /**
   * Creates a filter that matches all documents where the value of the field name equals the specified value. Note that this does
   * actually generate a `\$eq` operator, as the query language doesn't require it.
   *
   * A friendly alias for the `eq` method.
   *
   * @param fieldName the field name
   * @param value     the value
   * @tparam TItem  the value type
   * @return the filter
   * @see [[http://docs.mongodb.org/manual/reference/operator/query/eq \$eq]]
   */
  def equal[TItem](fieldName: String, value: TItem): Bson = eq(fieldName, value)

equal返回Bson,我们也可以把多个Bson组合起来形成一个更复杂的查询条件:

userCollection.find(and(gte("age",24),exists("name",true)))

好了,现在我们可以测试各种查询条件了:

  userCollection.find(notEqual("_id",3)).printResults("id != 3:")
  userCollection.find(equal("last", "chan")).printResults("last = chan:")
  userCollection.find(and(gte("age",24),exists("name",true))).printResults("age >= 24")
  userCollection.find(or(gte("age",24),equal("first","tiger"))).printResults("first = tiger")

显示结果:

id != 3:{ "_id" : 1, "name" : "alice wong", "age" : 24 }
{ "_id" : { "$oid" : "5a9665cea83f29243ccacbd2" }, "first" : "tiger", "last" : "chan", "name" : "tiger chan", "age" : "unavailable" }
{ "_id" : { "$oid" : "5a9665cea83f29243ccacbd3" }, "last" : "chan", "family" : "chan's" }
last = chan:{ "_id" : { "$oid" : "5a9665cea83f29243ccacbd2" }, "first" : "tiger", "last" : "chan", "name" : "tiger chan", "age" : "unavailable" }
{ "_id" : { "$oid" : "5a9665cea83f29243ccacbd3" }, "last" : "chan", "family" : "chan's" }
age >= 24{ "_id" : 1, "name" : "alice wong", "age" : 24 }
first = tiger{ "_id" : 1, "name" : "alice wong", "age" : 24 }
{ "_id" : { "$oid" : "5a9665cea83f29243ccacbd2" }, "first" : "tiger", "last" : "chan", "name" : "tiger chan", "age" : "unavailable" }

下面是本次示范的源代码:

build.sbt

name := "learn-mongo"

version := "0.1"

scalaVersion := "2.12.4"

libraryDependencies := Seq(
    "org.mongodb.scala" %% "mongo-scala-driver" % "2.2.1",
    "com.lightbend.akka" %% "akka-stream-alpakka-mongodb" % "0.17"
)

MongoScala101.scala

import org.mongodb.scala._
import scala.collection.JavaConverters._
import org.mongodb.scala.connection.ClusterSettings
import scala.concurrent._
import scala.concurrent.duration._
import scala.util._
import org.mongodb.scala.model.Filters._
object MongoScala101 extends App {
  import scala.concurrent.ExecutionContext.Implicits.global
//  val client1 = MongoClient()
//  val client2 = MongoClient("mongodb://localhost:27017")

  val clusterSettings = ClusterSettings.builder()
         .hosts(List(new ServerAddress("localhost:27017")).asJava).build()
  val clientSettings = MongoClientSettings.builder().clusterSettings(clusterSettings).build()
  val client = MongoClient(clientSettings)

  val db: MongoDatabase = client.getDatabase("testdb")
  val userCollection: MongoCollection[Document] = db.getCollection("users")
  val deleteAll = userCollection.deleteMany(notEqual("_id", 3))
  deleteAll.head.onComplete {
    case Success(c) => println(s"delete sucessful $c")
    case Failure(e) => println(s"delete error: ${e.getMessage}")
  }

  scala.io.StdIn.readLine()
  val delete3 = userCollection.deleteMany(equal("_id", 3))
  delete3.head.onComplete {
    case Success(c) => println(s"delete sucessful $c")
    case Failure(e) => println(s"delete error: ${e.getMessage}")
  }
  scala.io.StdIn.readLine()

  val doc: Document = Document("_id" -> 0, "name" -> "MongoDB", "type" -> "database",
    "count" -> 1, "info" -> Document("x" -> 203, "y" -> 102))

  val alice = Document("_id" -> 1, "name" -> "alice wong", "age" -> 24)
  val tiger = Document("first" -> "tiger", "last" -> "chan", "name" -> "tiger chan", "age" -> "unavailable")

  val addAlice: Observable[Completed] = userCollection.insertOne(alice)
  val addTiger: Observable[Completed] = userCollection.insertOne(tiger)

  addAlice.subscribe(new Observer[Completed] {
    override def onComplete(): Unit = println("insert alice completed.")
    override def onNext(result: Completed): Unit = println("insert alice sucessful.")
    override def onError(e: Throwable): Unit = println(s"insert error: ${e.getMessage}")
  })

  def headResult(observable: Observable[Completed]) = Await.result(observable.head(), 2 seconds)
  val r1 = headResult(addTiger)

  val peter = Document("_id" -> 3, "first" -> "peter", "age" -> "old")
  val chan = Document("last" -> "chan", "family" -> "chan's")
  val addMany = userCollection.insertMany(List(peter,chan))
  val r2 = headResult(addMany)


  import Helpers._
  userCollection.count.head.onComplete {
    case Success(c) => println(s"$c documents in users collection")
    case Failure(e) => println(s"count() error: ${e.getMessage}")
  }
  userCollection.find().toFuture().onComplete {
    case Success(users) => users.foreach(println)
    case Failure(e) => println(s"find error: ${e.getMessage}")
  }
  scala.io.StdIn.readLine()


  userCollection.find().printResults("all documents:")
  userCollection.find(notEqual("_id",3)).printResults("id != 3:")
  userCollection.find(equal("last", "chan")).printResults("last = chan:")
  userCollection.find(and(gte("age",24),exists("name",true))).printResults("age >= 24")
  userCollection.find(or(gte("age",24),equal("first","tiger"))).printResults("first = tiger")



  client.close()

  println("end!!!")

}

object Helpers {

  implicit class DocumentObservable[C](val observable: Observable[Document]) extends ImplicitObservable[Document] {
    override val converter: (Document) => String = (doc) => doc.toJson
  }

  implicit class GenericObservable[C](val observable: Observable[C]) extends ImplicitObservable[C] {
    override val converter: (C) => String = (doc) => doc.toString
  }

  trait ImplicitObservable[C] {
    val observable: Observable[C]
    val converter: (C) => String

    def results(): Seq[C] = Await.result(observable.toFuture(), 10 seconds)
    def headResult() = Await.result(observable.head(), 10 seconds)
    def printResults(initial: String = ""): Unit = {
      if (initial.length > 0) print(initial)
      results().foreach(res => println(converter(res)))
    }
    def printHeadResult(initial: String = ""): Unit = println(s"${initial}${converter(headResult())}")
  }

}
本文参与 腾讯云自媒体分享计划,分享自作者个人站点/博客。
原始发表:2018-02-28 ,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 作者个人站点/博客 前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体分享计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
相关产品与服务
数据库
云数据库为企业提供了完善的关系型数据库、非关系型数据库、分析型数据库和数据库生态工具。您可以通过产品选择和组合搭建,轻松实现高可靠、高可用性、高性能等数据库需求。云数据库服务也可大幅减少您的运维工作量,更专注于业务发展,让企业一站式享受数据上云及分布式架构的技术红利!
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档