前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >SDP(11):MongoDB-Engine功能实现

SDP(11):MongoDB-Engine功能实现

作者头像
用户1150956
发布2018-03-16 16:31:45
1K0
发布2018-03-16 16:31:45
举报

  根据上篇关于MongoDB-Engine的功能设计方案,我们将在这篇讨论里进行功能实现和测试。下面是具体的功能实现代码:基本上是直接调用Mongo-scala的对应函数,需要注意的是java类型和scala类型之间的相互转换:

object MGOEngine {
  import MGOContext._
  import MGOCommands._
  import MGOAdmins._

  def mgoExecute[T](ctx: MGOContext)(implicit client: MongoClient): Future[T] = {
    val db = client.getDatabase(ctx.dbName)
    val coll = db.getCollection(ctx.collName)
    ctx.action match {
        /* count */
      case Count(Some(filter),Some(opt)) =>
        coll.count(filter,opt.asInstanceOf[CountOptions])
          .toFuture().asInstanceOf[Future[T]]
      case Count(Some(filter),None) =>
        coll.count(filter).toFuture()
          .asInstanceOf[Future[T]]
      case Count(None,None) =>
        coll.count().toFuture()
          .asInstanceOf[Future[T]]
        /* distinct */
      case Distict(field,Some(filter)) =>
        coll.distinct(field,filter).toFuture()
          .asInstanceOf[Future[T]]
      case Distict(field,None) =>
        coll.distinct((field)).toFuture()
          .asInstanceOf[Future[T]]
      /* find */
      case Find(None,None,optConv,false) =>
        if (optConv == None) coll.find().toFuture().asInstanceOf[Future[T]]
        else coll.find().map(optConv.get).toFuture().asInstanceOf[Future[T]]
      case Find(None,None,optConv,true) =>
        if (optConv == None) coll.find().first().head().asInstanceOf[Future[T]]
        else coll.find().first().map(optConv.get).head().asInstanceOf[Future[T]]
      case Find(Some(filter),None,optConv,false) =>
        if (optConv == None) coll.find(filter).toFuture().asInstanceOf[Future[T]]
        else coll.find(filter).map(optConv.get).toFuture().asInstanceOf[Future[T]]
      case Find(Some(filter),None,optConv,true) =>
        if (optConv == None) coll.find(filter).first().head().asInstanceOf[Future[T]]
        else coll.find(filter).first().map(optConv.get).head().asInstanceOf[Future[T]]
      case Find(None,Some(next),optConv,_) =>
        if (optConv == None) next(coll.find[Document]()).toFuture().asInstanceOf[Future[T]]
        else next(coll.find[Document]()).map(optConv.get).toFuture().asInstanceOf[Future[T]]
      case Find(Some(filter),Some(next),optConv,_) =>
        if (optConv == None) next(coll.find[Document](filter)).toFuture().asInstanceOf[Future[T]]
        else next(coll.find[Document](filter)).map(optConv.get).toFuture().asInstanceOf[Future[T]]
        /* aggregate */
      case Aggregate(pline) => coll.aggregate(pline).toFuture().asInstanceOf[Future[T]]
        /* mapReduce */
      case MapReduce(mf,rf) => coll.mapReduce(mf,rf).toFuture().asInstanceOf[Future[T]]
       /* insert */
      case Insert(docs,Some(opt)) =>
        if (docs.size > 1) coll.insertMany(docs,opt.asInstanceOf[InsertManyOptions]).toFuture()
          .asInstanceOf[Future[T]]
        else coll.insertOne(docs.head,opt.asInstanceOf[InsertOneOptions]).toFuture()
          .asInstanceOf[Future[T]]
      case Insert(docs,None) =>
        if (docs.size > 1) coll.insertMany(docs).toFuture().asInstanceOf[Future[T]]
        else coll.insertOne(docs.head).toFuture().asInstanceOf[Future[T]]
        /* delete */
      case Delete(filter,None,onlyOne) =>
         if (onlyOne) coll.deleteOne(filter).toFuture().asInstanceOf[Future[T]]
         else coll.deleteMany(filter).toFuture().asInstanceOf[Future[T]]
      case Delete(filter,Some(opt),onlyOne) =>
        if (onlyOne) coll.deleteOne(filter,opt.asInstanceOf[DeleteOptions]).toFuture().asInstanceOf[Future[T]]
        else coll.deleteMany(filter,opt.asInstanceOf[DeleteOptions]).toFuture().asInstanceOf[Future[T]]
        /* replace */
      case Replace(filter,replacement,None) =>
         coll.replaceOne(filter,replacement).toFuture().asInstanceOf[Future[T]]
      case Replace(filter,replacement,Some(opt)) =>
        coll.replaceOne(filter,replacement,opt.asInstanceOf[UpdateOptions]).toFuture().asInstanceOf[Future[T]]
        /* update */
      case Update(filter,update,None,onlyOne) =>
        if (onlyOne) coll.updateOne(filter,update).toFuture().asInstanceOf[Future[T]]
        else coll.updateMany(filter,update).toFuture().asInstanceOf[Future[T]]
      case Update(filter,update,Some(opt),onlyOne) =>
        if (onlyOne) coll.updateOne(filter,update,opt.asInstanceOf[UpdateOptions]).toFuture().asInstanceOf[Future[T]]
        else coll.updateMany(filter,update,opt.asInstanceOf[UpdateOptions]).toFuture().asInstanceOf[Future[T]]
        /* bulkWrite */
      case BulkWrite(commands,None) =>
         coll.bulkWrite(commands).toFuture().asInstanceOf[Future[T]]
      case BulkWrite(commands,Some(opt)) =>
        coll.bulkWrite(commands,opt.asInstanceOf[BulkWriteOptions]).toFuture().asInstanceOf[Future[T]]

        /* drop collection */
      case DropCollection(collName) =>
        val coll = db.getCollection(collName)
        coll.drop().toFuture().asInstanceOf[Future[T]]
        /* create collection */
      case CreateCollection(collName,None) =>
        db.createCollection(collName).toFuture().asInstanceOf[Future[T]]
      case CreateCollection(collName,Some(opt)) =>
        db.createCollection(collName,opt.asInstanceOf[CreateCollectionOptions]).toFuture().asInstanceOf[Future[T]]
        /* list collection */
      case ListCollection(dbName) =>
        client.getDatabase(dbName).listCollections().toFuture().asInstanceOf[Future[T]]
        /* create view */
      case CreateView(viewName,viewOn,pline,None) =>
        db.createView(viewName,viewOn,pline).toFuture().asInstanceOf[Future[T]]
      case CreateView(viewName,viewOn,pline,Some(opt)) =>
        db.createView(viewName,viewOn,pline,opt.asInstanceOf[CreateViewOptions]).toFuture().asInstanceOf[Future[T]]
        /* create index */
      case CreateIndex(key,None) =>
        coll.createIndex(key).toFuture().asInstanceOf[Future[T]]
      case CreateIndex(key,Some(opt)) =>
        coll.createIndex(key,opt.asInstanceOf[IndexOptions]).toFuture().asInstanceOf[Future[T]]
        /* drop index */
      case DropIndexByName(indexName, None) =>
        coll.dropIndex(indexName).toFuture().asInstanceOf[Future[T]]
      case DropIndexByName(indexName, Some(opt)) =>
        coll.dropIndex(indexName,opt.asInstanceOf[DropIndexOptions]).toFuture().asInstanceOf[Future[T]]
      case DropIndexByKey(key,None) =>
        coll.dropIndex(key).toFuture().asInstanceOf[Future[T]]
      case DropIndexByKey(key,Some(opt)) =>
        coll.dropIndex(key,opt.asInstanceOf[DropIndexOptions]).toFuture().asInstanceOf[Future[T]]
      case DropAllIndexes(None) =>
        coll.dropIndexes().toFuture().asInstanceOf[Future[T]]
      case DropAllIndexes(Some(opt)) =>
        coll.dropIndexes(opt.asInstanceOf[DropIndexOptions]).toFuture().asInstanceOf[Future[T]]
    }

  }

}

注意:以上所有函数都返回Future[T]结果。下面我们来试运行这些函数,不过先关注一些细节:关于MongoDB的Date,Blob,Array等类型在scala中的使用方法:

 type MGODate = java.util.Date
  def mgoDate(yyyy: Int, mm: Int, dd: Int): MGODate = {
    val ca = Calendar.getInstance()
    ca.set(yyyy,mm,dd)
    ca.getTime()
  }
  def mgoDateTime(yyyy: Int, mm: Int, dd: Int, hr: Int, min: Int, sec: Int): MGODate = {
    val ca = Calendar.getInstance()
    ca.set(yyyy,mm,dd,hr,min,sec)
    ca.getTime()
  }
  def mgoDateTimeNow: MGODate = {
    val ca = Calendar.getInstance()
    ca.getTime
  }


  def mgoDateToString(dt: MGODate, formatString: String): String = {
    val fmt= new SimpleDateFormat(formatString)
    fmt.format(dt)
  }

  type MGOBlob = BsonBinary
  type MGOArray = BsonArray

  def fileToMGOBlob(fileName: String, timeOut: FiniteDuration = 60 seconds)(
    implicit mat: Materializer) = FileToByteArray(fileName,timeOut)

  def mgoBlobToFile(blob: MGOBlob, fileName: String)(
    implicit mat: Materializer) =  ByteArrayToFile(blob.getData,fileName)

然后就是MongoDB数据类型的读取帮助函数:

 def mgoGetStringOrNone(doc: Document, fieldName: String) = {
    if (doc.keySet.contains(fieldName))
      Some(doc.getString(fieldName))
    else None
  }
  def mgoGetIntOrNone(doc: Document, fieldName: String) = {
    if (doc.keySet.contains(fieldName))
      Some(doc.getInteger(fieldName))
    else None
  }
  def mgoGetLonggOrNone(doc: Document, fieldName: String) = {
    if (doc.keySet.contains(fieldName))
      Some(doc.getLong(fieldName))
    else None
  }
  def mgoGetDoubleOrNone(doc: Document, fieldName: String) = {
    if (doc.keySet.contains(fieldName))
      Some(doc.getDouble(fieldName))
    else None
  }
  def mgoGetBoolOrNone(doc: Document, fieldName: String) = {
    if (doc.keySet.contains(fieldName))
      Some(doc.getBoolean(fieldName))
    else None
  }
  def mgoGetDateOrNone(doc: Document, fieldName: String) = {
    if (doc.keySet.contains(fieldName))
      Some(doc.getDate(fieldName))
    else None
  }
  def mgoGetBlobOrNone(doc: Document, fieldName: String) = {
    if (doc.keySet.contains(fieldName))
      doc.get(fieldName).asInstanceOf[Option[MGOBlob]]
    else None
  }
  def mgoGetArrayOrNone(doc: Document, fieldName: String) = {
    if (doc.keySet.contains(fieldName))
      doc.get(fieldName).asInstanceOf[Option[MGOArray]]
    else None
  }

  def mgoArrayToDocumentList(arr: MGOArray): scala.collection.immutable.List[org.bson.BsonDocument] = {
   (arr.getValues.asScala.toList)
      .asInstanceOf[scala.collection.immutable.List[org.bson.BsonDocument]]
  }

  type MGOFilterResult = FindObservable[Document] => FindObservable[Document]

下面我们就开始设置试运行环境,从一个全新的collection开始:

import akka.actor.ActorSystem
import akka.stream.ActorMaterializer
import org.mongodb.scala._
import org.mongodb.scala.connection._

import scala.collection.JavaConverters._
import com.mongodb.client.model._

import scala.util._

object MongoEngineTest extends App {
import MGOContext._
import MGOEngine._
import MGOHelpers._
import MGOCommands._
import MGOAdmins._

  val clusterSettings = ClusterSettings.builder()
    .hosts(List(new ServerAddress("localhost:27017")).asJava).build()
  val clientSettings = MongoClientSettings.builder().clusterSettings(clusterSettings).build()
  implicit val client = MongoClient(clientSettings)

  implicit val system = ActorSystem()
  implicit val mat = ActorMaterializer()
  implicit val ec = system.dispatcher

  val ctx = MGOContext("testdb","po").setCommand(
    DropCollection("po"))
  println(getResult(mgoExecute(ctx)))

测试运行了DropCollection指令。下面我们试着insert两个document:

  val pic = fileToMGOBlob("/users/tiger/nobody.png")
  val po1 = Document (
    "ponum" -> "po18012301",
    "vendor" -> "The smartphone compay",
    "podate" -> mgoDate(2017,5,13),
    "remarks" -> "urgent, rush order",
    "handler" -> pic,
    "podtl" -> Seq(
      Document("item" -> "sony smartphone", "price" -> 2389.00, "qty" -> 1239, "packing" -> "standard"),
      Document("item" -> "ericson smartphone", "price" -> 897.00, "qty" -> 1000, "payterm" -> "30 days")
    )
  )

  val po2 = Document (
    "ponum" -> "po18022002",
    "vendor" -> "The Samsung compay",
    "podate" -> mgoDate(2015,11,6),
    "podtl" -> Seq(
      Document("item" -> "samsung galaxy s8", "price" -> 2300.00, "qty" -> 100, "packing" -> "standard"),
      Document("item" -> "samsung galaxy s7", "price" -> 1897.00, "qty" -> 1000, "payterm" -> "30 days"),
      Document("item" -> "apple iphone7", "price" -> 6500.00, "qty" -> 100, "packing" -> "luxury")
    )
  )

  val optInsert = new InsertManyOptions().ordered(true)
  val ctxInsert = ctx.setCommand(
    Insert(Seq(po1,po2),Some(optInsert))
  ) println(getResult(mgoExecute(ctxInsert)))

注意InsertManyOptions的具体设定方式。 为了配合更方便准确的强类型操作,我们需要进行Document类型到具体应用类型之间的对应转换:

  case class PO (
                  ponum: String,
                  podate: MGODate,
                  vendor: String,
                  remarks: Option[String],
                  podtl: Option[MGOArray],
                  handler: Option[MGOBlob]
                )
  def toPO(doc: Document): PO = {
    PO(
      ponum = doc.getString("ponum"),
      podate = doc.getDate("podate"),
      vendor = doc.getString("vendor"),
      remarks = mgoGetStringOrNone(doc,"remarks"),
      podtl = mgoGetArrayOrNone(doc,"podtl"),
      handler = mgoGetBlobOrNone(doc,"handler")
    )
  }

  case class PODTL(
                    item: String,
                    price: Double,
                    qty: Int,
                    packing: Option[String],
                    payTerm: Option[String]
                  )
  def toPODTL(podtl: Document): PODTL = {
    PODTL(
      item = podtl.getString("item"),
      price = podtl.getDouble("price"),
      qty = podtl.getInteger("qty"),
      packing = mgoGetStringOrNone(podtl,"packing"),
      payTerm = mgoGetStringOrNone(podtl,"payterm")
    )
  }

  def showPO(po: PO) = {
    println(s"po number: ${po.ponum}")
    println(s"po date: ${mgoDateToString(po.podate,"yyyy-MM-dd")}")
    println(s"vendor: ${po.vendor}")
    if (po.remarks != None)
      println(s"remarks: ${po.remarks.get}")
    po.podtl match {
      case Some(barr) =>
        mgoArrayToDocumentList(barr)
         .map { dc => toPODTL(dc)}
           .foreach { doc: PODTL =>
          print(s"==>Item: ${doc.item} ")
          print(s"price: ${doc.price} ")
          print(s"qty: ${doc.qty} ")
          doc.packing.foreach(pk => print(s"packing: ${pk} "))
          doc.payTerm.foreach(pt => print(s"payTerm: ${pt} "))
          println("")
        }
      case _ =>
    }

    po.handler match {
      case Some(blob) =>
        val fileName = s"/users/tiger/${po.ponum}.png"
        mgoBlobToFile(blob,fileName)
        println(s"picture saved to ${fileName}")
      case None => println("no picture provided")
    }

  }

在上面的代码里我们使用了前面提供的MongoDB数据类型读取帮助函数。下面我们测试对poCollection中的Document进行查询,示范包括projection,sort,filter等:

  import org.mongodb.scala.model.Projections._
  import org.mongodb.scala.model.Filters._
  import org.mongodb.scala.model.Sorts._
  val sort: MGOFilterResult = find => find.sort(descending("ponum"))
  val proj: MGOFilterResult = find => find.projection(and(include("ponum","podate"),include("vendor"),excludeId()))
  val ctxFind = ctx.setCommand(Find(andThen=Some(proj)))
  val ctxFindFirst = ctx.setCommand(Find(firstOnly=true,converter = Some(toPO _)))
  val ctxFindArrayItem = ctx.setCommand(
    Find(filter = Some(equal("podtl.qty",100)), converter = Some(toPO _))
  )
  
  for {
    _ <- mgoExecute[List[Document]](ctxFind).andThen {
      case Success(docs) => docs.map(toPO).foreach(showPO)
        println("-------------------------------")
      case Failure(e) => println(e.getMessage)
    }

    _ <- mgoExecute[PO](ctxFindFirst).andThen {
      case Success(doc) => showPO(doc)
        println("-------------------------------")
      case Failure(e) => println(e.getMessage)
    }

    _ <- mgoExecute[List[PO]](ctxFindArrayItem).andThen {
      case Success(docs) => docs.foreach(showPO)
        println("-------------------------------")
      case Failure(e) => println(e.getMessage)
    }
  } yield()

因为mgoExecute返回Future结果,所以我们可以用for-comprehension对几个运算进行串联运行。

下面是这次示范的源代码:

build.sbt

name := "learn-mongo"

version := "0.1"

scalaVersion := "2.12.4"

libraryDependencies := Seq(
  "org.mongodb.scala" %% "mongo-scala-driver" % "2.2.1",
  "com.lightbend.akka" %% "akka-stream-alpakka-mongodb" % "0.17"
)

MGOHelpers.scala

import org.mongodb.scala._
import scala.concurrent._
import scala.concurrent.duration._

object MGOHelpers {

  implicit class DocumentObservable[C](val observable: Observable[Document]) extends ImplicitObservable[Document] {
    override val converter: (Document) => String = (doc) => doc.toJson
  }

  implicit class GenericObservable[C](val observable: Observable[C]) extends ImplicitObservable[C] {
    override val converter: (C) => String = (doc) => doc.toString
  }

  trait ImplicitObservable[C] {
    val observable: Observable[C]
    val converter: (C) => String

    def results(): Seq[C] = Await.result(observable.toFuture(), 10 seconds)
    def headResult() = Await.result(observable.head(), 10 seconds)
    def printResults(initial: String = ""): Unit = {
      if (initial.length > 0) print(initial)
      results().foreach(res => println(converter(res)))
    }
    def printHeadResult(initial: String = ""): Unit = println(s"${initial}${converter(headResult())}")
  }

  def getResult[T](fut: Future[T], timeOut: Duration = 1 second): T = {
      Await.result(fut,timeOut)
  }
  def getResults[T](fut: Future[Iterable[T]], timeOut: Duration = 1 second): Iterable[T] = {
    Await.result(fut,timeOut)
  }

}

FileStreaming.scala

import java.io.{InputStream, ByteArrayInputStream}
import java.nio.ByteBuffer
import java.nio.file.Paths

import akka.stream.{Materializer}
import akka.stream.scaladsl.{FileIO, StreamConverters}

import scala.concurrent.{Await}
import akka.util._
import scala.concurrent.duration._

object FileStreaming {
  def FileToByteBuffer(fileName: String, timeOut: FiniteDuration = 60 seconds)(
    implicit mat: Materializer):ByteBuffer = {
    val fut = FileIO.fromPath(Paths.get(fileName)).runFold(ByteString()) { case (hd, bs) =>
      hd ++ bs
    }
    (Await.result(fut, timeOut)).toByteBuffer
  }

  def FileToByteArray(fileName: String, timeOut: FiniteDuration = 60 seconds)(
    implicit mat: Materializer): Array[Byte] = {
    val fut = FileIO.fromPath(Paths.get(fileName)).runFold(ByteString()) { case (hd, bs) =>
      hd ++ bs
    }
    (Await.result(fut, timeOut)).toArray
  }

  def FileToInputStream(fileName: String, timeOut: FiniteDuration = 60 seconds)(
    implicit mat: Materializer): InputStream = {
    val fut = FileIO.fromPath(Paths.get(fileName)).runFold(ByteString()) { case (hd, bs) =>
      hd ++ bs
    }
    val buf = (Await.result(fut, timeOut)).toArray
    new ByteArrayInputStream(buf)
  }

  def ByteBufferToFile(byteBuf: ByteBuffer, fileName: String)(
    implicit mat: Materializer) = {
    val ba = new Array[Byte](byteBuf.remaining())
    byteBuf.get(ba,0,ba.length)
    val baInput = new ByteArrayInputStream(ba)
    val source = StreamConverters.fromInputStream(() => baInput)  //ByteBufferInputStream(bytes))
    source.runWith(FileIO.toPath(Paths.get(fileName)))
  }

  def ByteArrayToFile(bytes: Array[Byte], fileName: String)(
    implicit mat: Materializer) = {
    val bb = ByteBuffer.wrap(bytes)
    val baInput = new ByteArrayInputStream(bytes)
    val source = StreamConverters.fromInputStream(() => baInput) //ByteBufferInputStream(bytes))
    source.runWith(FileIO.toPath(Paths.get(fileName)))
  }

  def InputStreamToFile(is: InputStream, fileName: String)(
    implicit mat: Materializer) = {
    val source = StreamConverters.fromInputStream(() => is)
    source.runWith(FileIO.toPath(Paths.get(fileName)))
  }

}

MongoEngine.scala

import java.text.SimpleDateFormat

import org.bson.conversions.Bson
import org.mongodb.scala._
import org.mongodb.scala.model._
import java.util.Calendar
import scala.collection.JavaConverters._
import FileStreaming._
import akka.stream.Materializer
import org.mongodb.scala.bson.{BsonArray, BsonBinary}

import scala.concurrent._
import scala.concurrent.duration._

object MGOContext {

  trait MGOCommands

  object MGOCommands {

    case class Count(filter: Option[Bson], options: Option[Any]) extends MGOCommands

    case class Distict(fieldName: String, filter: Option[Bson]) extends MGOCommands

  /*  org.mongodb.scala.FindObservable
  import com.mongodb.async.client.FindIterable
  val resultDocType = FindIterable[Document]
  val resultOption = FindObservable(resultDocType)
    .maxScan(...)
  .limit(...)
  .sort(...)
  .project(...) */
    case class Find[M](filter: Option[Bson] = None,
                    andThen: Option[FindObservable[Document] => FindObservable[Document]]= None,
                    converter: Option[Document => M] = None,
                    firstOnly: Boolean = false) extends MGOCommands

    case class Aggregate(pipeLine: Seq[Bson]) extends MGOCommands

    case class MapReduce(mapFunction: String, reduceFunction: String) extends MGOCommands

    case class Insert(newdocs: Seq[Document], options: Option[Any] = None) extends MGOCommands

    case class Delete(filter: Bson, options: Option[Any] = None, onlyOne: Boolean = false) extends MGOCommands

    case class Replace(filter: Bson, replacement: Document, options: Option[Any] = None) extends MGOCommands

    case class Update(filter: Bson, update: Bson, options: Option[Any] = None, onlyOne: Boolean = false) extends MGOCommands

    case class BulkWrite(commands: List[WriteModel[Document]], options: Option[Any] = None) extends MGOCommands

  }

  object MGOAdmins {

    case class DropCollection(collName: String) extends MGOCommands

    case class CreateCollection(collName: String, options: Option[Any] = None) extends MGOCommands

    case class ListCollection(dbName: String) extends MGOCommands

    case class CreateView(viewName: String, viewOn: String, pipeline: Seq[Bson], options: Option[Any] = None) extends MGOCommands

    case class CreateIndex(key: Bson, options: Option[Any] = None) extends MGOCommands

    case class DropIndexByName(indexName: String, options: Option[Any] = None) extends MGOCommands

    case class DropIndexByKey(key: Bson, options: Option[Any] = None) extends MGOCommands

    case class DropAllIndexes(options: Option[Any] = None) extends MGOCommands

  }

  case class MGOContext(
                         dbName: String,
                         collName: String,
                         action: MGOCommands = null
                       ) {
    ctx =>
    def setDbName(name: String): MGOContext = ctx.copy(dbName = name)

    def setCollName(name: String): MGOContext = ctx.copy(collName = name)

    def setCommand(cmd: MGOCommands): MGOContext = ctx.copy(action = cmd)
  }

  object MGOContext {
    def apply(db: String, coll: String) = new MGOContext(db, coll)

    def apply(db: String, coll: String, command: MGOCommands) =
      new MGOContext(db, coll, command)

  }

  type MGODate = java.util.Date
  def mgoDate(yyyy: Int, mm: Int, dd: Int): MGODate = {
    val ca = Calendar.getInstance()
    ca.set(yyyy,mm,dd)
    ca.getTime()
  }
  def mgoDateTime(yyyy: Int, mm: Int, dd: Int, hr: Int, min: Int, sec: Int): MGODate = {
    val ca = Calendar.getInstance()
    ca.set(yyyy,mm,dd,hr,min,sec)
    ca.getTime()
  }
  def mgoDateTimeNow: MGODate = {
    val ca = Calendar.getInstance()
    ca.getTime
  }


  def mgoDateToString(dt: MGODate, formatString: String): String = {
    val fmt= new SimpleDateFormat(formatString)
    fmt.format(dt)
  }

  type MGOBlob = BsonBinary
  type MGOArray = BsonArray

  def fileToMGOBlob(fileName: String, timeOut: FiniteDuration = 60 seconds)(
    implicit mat: Materializer) = FileToByteArray(fileName,timeOut)

  def mgoBlobToFile(blob: MGOBlob, fileName: String)(
    implicit mat: Materializer) =  ByteArrayToFile(blob.getData,fileName)

  def mgoGetStringOrNone(doc: Document, fieldName: String) = {
    if (doc.keySet.contains(fieldName))
      Some(doc.getString(fieldName))
    else None
  }
  def mgoGetIntOrNone(doc: Document, fieldName: String) = {
    if (doc.keySet.contains(fieldName))
      Some(doc.getInteger(fieldName))
    else None
  }
  def mgoGetLonggOrNone(doc: Document, fieldName: String) = {
    if (doc.keySet.contains(fieldName))
      Some(doc.getLong(fieldName))
    else None
  }
  def mgoGetDoubleOrNone(doc: Document, fieldName: String) = {
    if (doc.keySet.contains(fieldName))
      Some(doc.getDouble(fieldName))
    else None
  }
  def mgoGetBoolOrNone(doc: Document, fieldName: String) = {
    if (doc.keySet.contains(fieldName))
      Some(doc.getBoolean(fieldName))
    else None
  }
  def mgoGetDateOrNone(doc: Document, fieldName: String) = {
    if (doc.keySet.contains(fieldName))
      Some(doc.getDate(fieldName))
    else None
  }
  def mgoGetBlobOrNone(doc: Document, fieldName: String) = {
    if (doc.keySet.contains(fieldName))
      doc.get(fieldName).asInstanceOf[Option[MGOBlob]]
    else None
  }
  def mgoGetArrayOrNone(doc: Document, fieldName: String) = {
    if (doc.keySet.contains(fieldName))
      doc.get(fieldName).asInstanceOf[Option[MGOArray]]
    else None
  }

  def mgoArrayToDocumentList(arr: MGOArray): scala.collection.immutable.List[org.bson.BsonDocument] = {
   (arr.getValues.asScala.toList)
      .asInstanceOf[scala.collection.immutable.List[org.bson.BsonDocument]]
  }

  type MGOFilterResult = FindObservable[Document] => FindObservable[Document]
}
object MGOEngine {
  import MGOContext._
  import MGOCommands._
  import MGOAdmins._

  def mgoExecute[T](ctx: MGOContext)(implicit client: MongoClient): Future[T] = {
    val db = client.getDatabase(ctx.dbName)
    val coll = db.getCollection(ctx.collName)
    ctx.action match {
        /* count */
      case Count(Some(filter),Some(opt)) =>
        coll.count(filter,opt.asInstanceOf[CountOptions])
          .toFuture().asInstanceOf[Future[T]]
      case Count(Some(filter),None) =>
        coll.count(filter).toFuture()
          .asInstanceOf[Future[T]]
      case Count(None,None) =>
        coll.count().toFuture()
          .asInstanceOf[Future[T]]
        /* distinct */
      case Distict(field,Some(filter)) =>
        coll.distinct(field,filter).toFuture()
          .asInstanceOf[Future[T]]
      case Distict(field,None) =>
        coll.distinct((field)).toFuture()
          .asInstanceOf[Future[T]]
      /* find */
      case Find(None,None,optConv,false) =>
        if (optConv == None) coll.find().toFuture().asInstanceOf[Future[T]]
        else coll.find().map(optConv.get).toFuture().asInstanceOf[Future[T]]
      case Find(None,None,optConv,true) =>
        if (optConv == None) coll.find().first().head().asInstanceOf[Future[T]]
        else coll.find().first().map(optConv.get).head().asInstanceOf[Future[T]]
      case Find(Some(filter),None,optConv,false) =>
        if (optConv == None) coll.find(filter).toFuture().asInstanceOf[Future[T]]
        else coll.find(filter).map(optConv.get).toFuture().asInstanceOf[Future[T]]
      case Find(Some(filter),None,optConv,true) =>
        if (optConv == None) coll.find(filter).first().head().asInstanceOf[Future[T]]
        else coll.find(filter).first().map(optConv.get).head().asInstanceOf[Future[T]]
      case Find(None,Some(next),optConv,_) =>
        if (optConv == None) next(coll.find[Document]()).toFuture().asInstanceOf[Future[T]]
        else next(coll.find[Document]()).map(optConv.get).toFuture().asInstanceOf[Future[T]]
      case Find(Some(filter),Some(next),optConv,_) =>
        if (optConv == None) next(coll.find[Document](filter)).toFuture().asInstanceOf[Future[T]]
        else next(coll.find[Document](filter)).map(optConv.get).toFuture().asInstanceOf[Future[T]]
        /* aggregate */
      case Aggregate(pline) => coll.aggregate(pline).toFuture().asInstanceOf[Future[T]]
        /* mapReduce */
      case MapReduce(mf,rf) => coll.mapReduce(mf,rf).toFuture().asInstanceOf[Future[T]]
       /* insert */
      case Insert(docs,Some(opt)) =>
        if (docs.size > 1) coll.insertMany(docs,opt.asInstanceOf[InsertManyOptions]).toFuture()
          .asInstanceOf[Future[T]]
        else coll.insertOne(docs.head,opt.asInstanceOf[InsertOneOptions]).toFuture()
          .asInstanceOf[Future[T]]
      case Insert(docs,None) =>
        if (docs.size > 1) coll.insertMany(docs).toFuture().asInstanceOf[Future[T]]
        else coll.insertOne(docs.head).toFuture().asInstanceOf[Future[T]]
        /* delete */
      case Delete(filter,None,onlyOne) =>
         if (onlyOne) coll.deleteOne(filter).toFuture().asInstanceOf[Future[T]]
         else coll.deleteMany(filter).toFuture().asInstanceOf[Future[T]]
      case Delete(filter,Some(opt),onlyOne) =>
        if (onlyOne) coll.deleteOne(filter,opt.asInstanceOf[DeleteOptions]).toFuture().asInstanceOf[Future[T]]
        else coll.deleteMany(filter,opt.asInstanceOf[DeleteOptions]).toFuture().asInstanceOf[Future[T]]
        /* replace */
      case Replace(filter,replacement,None) =>
         coll.replaceOne(filter,replacement).toFuture().asInstanceOf[Future[T]]
      case Replace(filter,replacement,Some(opt)) =>
        coll.replaceOne(filter,replacement,opt.asInstanceOf[UpdateOptions]).toFuture().asInstanceOf[Future[T]]
        /* update */
      case Update(filter,update,None,onlyOne) =>
        if (onlyOne) coll.updateOne(filter,update).toFuture().asInstanceOf[Future[T]]
        else coll.updateMany(filter,update).toFuture().asInstanceOf[Future[T]]
      case Update(filter,update,Some(opt),onlyOne) =>
        if (onlyOne) coll.updateOne(filter,update,opt.asInstanceOf[UpdateOptions]).toFuture().asInstanceOf[Future[T]]
        else coll.updateMany(filter,update,opt.asInstanceOf[UpdateOptions]).toFuture().asInstanceOf[Future[T]]
        /* bulkWrite */
      case BulkWrite(commands,None) =>
         coll.bulkWrite(commands).toFuture().asInstanceOf[Future[T]]
      case BulkWrite(commands,Some(opt)) =>
        coll.bulkWrite(commands,opt.asInstanceOf[BulkWriteOptions]).toFuture().asInstanceOf[Future[T]]

        /* drop collection */
      case DropCollection(collName) =>
        val coll = db.getCollection(collName)
        coll.drop().toFuture().asInstanceOf[Future[T]]
        /* create collection */
      case CreateCollection(collName,None) =>
        db.createCollection(collName).toFuture().asInstanceOf[Future[T]]
      case CreateCollection(collName,Some(opt)) =>
        db.createCollection(collName,opt.asInstanceOf[CreateCollectionOptions]).toFuture().asInstanceOf[Future[T]]
        /* list collection */
      case ListCollection(dbName) =>
        client.getDatabase(dbName).listCollections().toFuture().asInstanceOf[Future[T]]
        /* create view */
      case CreateView(viewName,viewOn,pline,None) =>
        db.createView(viewName,viewOn,pline).toFuture().asInstanceOf[Future[T]]
      case CreateView(viewName,viewOn,pline,Some(opt)) =>
        db.createView(viewName,viewOn,pline,opt.asInstanceOf[CreateViewOptions]).toFuture().asInstanceOf[Future[T]]
        /* create index */
      case CreateIndex(key,None) =>
        coll.createIndex(key).toFuture().asInstanceOf[Future[T]]
      case CreateIndex(key,Some(opt)) =>
        coll.createIndex(key,opt.asInstanceOf[IndexOptions]).toFuture().asInstanceOf[Future[T]]
        /* drop index */
      case DropIndexByName(indexName, None) =>
        coll.dropIndex(indexName).toFuture().asInstanceOf[Future[T]]
      case DropIndexByName(indexName, Some(opt)) =>
        coll.dropIndex(indexName,opt.asInstanceOf[DropIndexOptions]).toFuture().asInstanceOf[Future[T]]
      case DropIndexByKey(key,None) =>
        coll.dropIndex(key).toFuture().asInstanceOf[Future[T]]
      case DropIndexByKey(key,Some(opt)) =>
        coll.dropIndex(key,opt.asInstanceOf[DropIndexOptions]).toFuture().asInstanceOf[Future[T]]
      case DropAllIndexes(None) =>
        coll.dropIndexes().toFuture().asInstanceOf[Future[T]]
      case DropAllIndexes(Some(opt)) =>
        coll.dropIndexes(opt.asInstanceOf[DropIndexOptions]).toFuture().asInstanceOf[Future[T]]
    }

  }

}

MongoEngineTest.scala

import akka.actor.ActorSystem
import akka.stream.ActorMaterializer
import org.mongodb.scala._
import org.mongodb.scala.connection._

import scala.collection.JavaConverters._
import com.mongodb.client.model._

import scala.util._

object MongoEngineTest extends App {
import MGOContext._
import MGOEngine._
import MGOHelpers._
import MGOCommands._
import MGOAdmins._

  val clusterSettings = ClusterSettings.builder()
    .hosts(List(new ServerAddress("localhost:27017")).asJava).build()
  val clientSettings = MongoClientSettings.builder().clusterSettings(clusterSettings).build()
  implicit val client = MongoClient(clientSettings)

  implicit val system = ActorSystem()
  implicit val mat = ActorMaterializer()
  implicit val ec = system.dispatcher

  val ctx = MGOContext("testdb","po").setCommand(
    DropCollection("po"))
  println(getResult(mgoExecute(ctx)))


  val pic = fileToMGOBlob("/users/tiger/nobody.png")
  val po1 = Document (
    "ponum" -> "po18012301",
    "vendor" -> "The smartphone compay",
    "podate" -> mgoDate(2017,5,13),
    "remarks" -> "urgent, rush order",
    "handler" -> pic,
    "podtl" -> Seq(
      Document("item" -> "sony smartphone", "price" -> 2389.00, "qty" -> 1239, "packing" -> "standard"),
      Document("item" -> "ericson smartphone", "price" -> 897.00, "qty" -> 1000, "payterm" -> "30 days")
    )
  )

  val po2 = Document (
    "ponum" -> "po18022002",
    "vendor" -> "The Samsung compay",
    "podate" -> mgoDate(2015,11,6),
    "podtl" -> Seq(
      Document("item" -> "samsung galaxy s8", "price" -> 2300.00, "qty" -> 100, "packing" -> "standard"),
      Document("item" -> "samsung galaxy s7", "price" -> 1897.00, "qty" -> 1000, "payterm" -> "30 days"),
      Document("item" -> "apple iphone7", "price" -> 6500.00, "qty" -> 100, "packing" -> "luxury")
    )
  )

  val optInsert = new InsertManyOptions().ordered(true)
  val ctxInsert = ctx.setCommand(
    Insert(Seq(po1,po2),Some(optInsert))
  )
 // println(getResult(mgoExecute(ctxInsert)))

  case class PO (
                  ponum: String,
                  podate: MGODate,
                  vendor: String,
                  remarks: Option[String],
                  podtl: Option[MGOArray],
                  handler: Option[MGOBlob]
                )
  def toPO(doc: Document): PO = {
    PO(
      ponum = doc.getString("ponum"),
      podate = doc.getDate("podate"),
      vendor = doc.getString("vendor"),
      remarks = mgoGetStringOrNone(doc,"remarks"),
      podtl = mgoGetArrayOrNone(doc,"podtl"),
      handler = mgoGetBlobOrNone(doc,"handler")
    )
  }

  case class PODTL(
                    item: String,
                    price: Double,
                    qty: Int,
                    packing: Option[String],
                    payTerm: Option[String]
                  )
  def toPODTL(podtl: Document): PODTL = {
    PODTL(
      item = podtl.getString("item"),
      price = podtl.getDouble("price"),
      qty = podtl.getInteger("qty"),
      packing = mgoGetStringOrNone(podtl,"packing"),
      payTerm = mgoGetStringOrNone(podtl,"payterm")
    )
  }

  def showPO(po: PO) = {
    println(s"po number: ${po.ponum}")
    println(s"po date: ${mgoDateToString(po.podate,"yyyy-MM-dd")}")
    println(s"vendor: ${po.vendor}")
    if (po.remarks != None)
      println(s"remarks: ${po.remarks.get}")
    po.podtl match {
      case Some(barr) =>
        mgoArrayToDocumentList(barr)
         .map { dc => toPODTL(dc)}
           .foreach { doc: PODTL =>
          print(s"==>Item: ${doc.item} ")
          print(s"price: ${doc.price} ")
          print(s"qty: ${doc.qty} ")
          doc.packing.foreach(pk => print(s"packing: ${pk} "))
          doc.payTerm.foreach(pt => print(s"payTerm: ${pt} "))
          println("")
        }
      case _ =>
    }

    po.handler match {
      case Some(blob) =>
        val fileName = s"/users/tiger/${po.ponum}.png"
        mgoBlobToFile(blob,fileName)
        println(s"picture saved to ${fileName}")
      case None => println("no picture provided")
    }

  }

  import org.mongodb.scala.model.Projections._
  import org.mongodb.scala.model.Filters._
  import org.mongodb.scala.model.Sorts._
  val sort: MGOFilterResult = find => find.sort(descending("ponum"))
  val proj: MGOFilterResult = find => find.projection(and(include("ponum","podate"),include("vendor"),excludeId()))
  val ctxFind = ctx.setCommand(Find(andThen=Some(proj)))
  val ctxFindFirst = ctx.setCommand(Find(firstOnly=true,converter = Some(toPO _)))
  val ctxFindArrayItem = ctx.setCommand(
    Find(filter = Some(equal("podtl.qty",100)), converter = Some(toPO _))
  )

  for {
    _ <- mgoExecute[List[Document]](ctxFind).andThen {
      case Success(docs) => docs.map(toPO).foreach(showPO)
        println("-------------------------------")
      case Failure(e) => println(e.getMessage)
    }

    _ <- mgoExecute[PO](ctxFindFirst).andThen {
      case Success(doc) => showPO(doc)
        println("-------------------------------")
      case Failure(e) => println(e.getMessage)
    }

    _ <- mgoExecute[List[PO]](ctxFindArrayItem).andThen {
      case Success(docs) => docs.foreach(showPO)
        println("-------------------------------")
      case Failure(e) => println(e.getMessage)
    }
  } yield()


  scala.io.StdIn.readLine()


  system.terminate()
}
本文参与 腾讯云自媒体分享计划,分享自作者个人站点/博客。
原始发表:2018-03-12 ,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 作者个人站点/博客 前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体分享计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
相关产品与服务
云数据库 MongoDB
腾讯云数据库 MongoDB(TencentDB for MongoDB)是腾讯云基于全球广受欢迎的 MongoDB 打造的高性能 NoSQL 数据库,100%完全兼容 MongoDB 协议,支持跨文档事务,提供稳定丰富的监控管理,弹性可扩展、自动容灾,适用于文档型数据库场景,您无需自建灾备体系及控制管理系统。
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档