专栏首页函数式编程语言及工具restapi(3)- MongoDBEngine : MongoDB Scala编程工具库

restapi(3)- MongoDBEngine : MongoDB Scala编程工具库

最近刚好有同事在学习MongoDB,我们讨论过MongoDB应该置于服务器端然后通过web-service为客户端提供数据的上传下载服务。我们可以用上节讨论的respapi框架来实现针对MongoDB的CRUD操作。在谈到restapi之前我在这篇讨论先介绍一下MongoDB数据库操作的scala编程,因为与传统的SQL数据库操作编程有比较大的差别。

在前面有关sdp (streaming-data-processor)系列的博文中有一段是关于MongoDBEngine的。刚好把这套工具的使用在这里介绍一下。

MongoDBEngine是基于mongodb-scala-driver上开发的一套MongoDB数据库CRUD Scala编程工具,其主要功能可以从下面这三个函数中反映出来:

 def mgoUpdate[T](ctx: MGOContext)(implicit client: MongoClient): DBOResult[T] 


  // T => FindIterable  e.g List[Document]
 def mgoQuery[T](ctx: MGOContext, Converter: Option[Document => Any] = None)(implicit client: MongoClient): DBOResult[T]


 def mgoAdmin(ctx: MGOContext)(implicit client: MongoClient): DBOResult[Completed] 

其中: mgoUpdate功能包括:insert,update,delete,replace ...

mgoQuery: find,count,distinct ...

mgoAdmin: dropCollection, createCollection ...

首先需要注意的是它们的返回结果类型: DBOResult[T],实质上是 Future[Either[String,Option[T]]]

 type DBOError[A] = EitherT[Task,Throwable,A]
 type DBOResult[A] = OptionT[DBOError,A]

看起来很复杂,实际容易解释:设计这个类型的初衷是针对数据库操作的,所以:

1、异步操作,所以用Future (Task即Future, 如:Task.runToFuture)

2、返回结果可能为空,所以用Option

3、发生错误结果也为空,但需要知道空值是由错误产生的,所以用了Either

把所有返回结果类型统一成DBOResult[T]是为了方便进行函数组合,如:

for {
    a <- mgoQuery(...)
    _ <- mgoUpdate(a, ...)
    b <- mgoQuery(...)
} yield b

但另一方面也为写代码带来一些麻烦,如从结构中抽出运算结果值:

 mgoQuery[List[Document]](ctxFind).value.value.runToFuture {
      case Success(eold) => eold match {
        case Right(old) => old match {
          case Some(ld) => ld.map(toPO(_)).foreach(showPO)
          case None => println(s"Empty document found!")
        }
        case Left(err) => println(s"Error: ${err.getMessage}")
 }

是有些麻烦,不过能更详细的了解命令执行过程,而且是统一标准的写法(ctlr-c, ctlr-v 就可以了)。

上面三个函数都有一个同样的MGOContext类型的入参数,这是一个命令类型:

  case class MGOContext(
                         dbName: String,
                         collName: String,
                         actionType: MGO_ACTION_TYPE = MGO_QUERY,
                         action: Option[MGOCommands] = None,
                         actionOptions: Option[Any] = None,
                         actionTargets: Seq[String] = Nil
                       ) {
    ctx =>
    def setDbName(name: String): MGOContext = ctx.copy(dbName = name)

    def setCollName(name: String): MGOContext = ctx.copy(collName = name)

    def setActionType(at: MGO_ACTION_TYPE): MGOContext = ctx.copy(actionType = at)

    def setCommand(cmd: MGOCommands): MGOContext  = ctx.copy(action = Some(cmd))

    def toSomeProto = MGOProtoConversion.ctxToProto(this)

  }

  object MGOContext {
    def apply(db: String, coll: String) = new MGOContext(db, coll)
    def fromProto(proto: sdp.grpc.services.ProtoMGOContext): MGOContext =
      MGOProtoConversion.ctxFromProto(proto)
  }

可以看到MGOContext.action就是具体的操作命令。下面是一个mgoAdmin命令的示范:

  val ctx = MGOContext("testdb","po").setCommand(
    DropCollection("po"))

  import monix.execution.Scheduler.Implicits.global
  println(getResult(mgoAdmin(ctx).value.value.runToFuture))

mgoUpdate示范:

  val optInsert = new InsertManyOptions().ordered(true)
  val ctxInsert = ctx.setCommand(
    Insert(Seq(po1,po2),Some(optInsert))
  )
  println(getResult(mgoUpdate(ctxInsert).value.value.runToFuture))

我们选择MongoDB的主要目的是因为它分布式特性,适合大数据模式。但同时MongoDB具备强大的query功能,与传统SQL数据库模式相近,甚至还可以用索引。虽然MongoDB不支持数据关系,但对于我们这样的传统SQL老兵还是必然之选。MongoDB支持逗点查询组合,如:

    val resultDocType = FindIterable[Document]
    val resultOption = FindObservable(resultDocType)
      .maxScan(...)
    .limit(...)
    .sort(...)
    .project(...) 

比如对查询结果进行排序,同时又抽选几个返回的字段可以写成:FindObservable(...).sort(...).project(...)。MongoEngine提供了一个ResultOptions类型:

 case class ResultOptions(
                            optType: FOD_TYPE,
                            bson: Option[Bson] = None,
                            value: Int = 0 ){
     def toFindObservable: FindObservable[Document] => FindObservable[Document] = find => {
      optType match {
        case  FOD_FIRST        => find
        case  FOD_FILTER       => find.filter(bson.get)
        case  FOD_LIMIT        => find.limit(value)
        case  FOD_SKIP         => find.skip(value)
        case  FOD_PROJECTION   => find.projection(bson.get)
        case  FOD_SORT         => find.sort(bson.get)
        case  FOD_PARTIAL      => find.partial(value != 0)
        case  FOD_CURSORTYPE   => find
        case  FOD_HINT         => find.hint(bson.get)
        case  FOD_MAX          => find.max(bson.get)
        case  FOD_MIN          => find.min(bson.get)
        case  FOD_RETURNKEY    => find.returnKey(value != 0)
        case  FOD_SHOWRECORDID => find.showRecordId(value != 0)

      }
    }

这个类型也是MGOContext类型的一个参数。 下面是一些用例:

  val ctxFilter = Find(filter=Some(equal("podtl.qty",100)))


  val sort: Bson = (descending("ponum"))
  val proj: Bson = (and(include("ponum","podate")
                   ,include("vendor"),excludeId()))
  val resSort = ResultOptions(FOD_SORT,Some(sort))
  val resProj = ResultOptions(FOD_PROJECTION,Some(proj))
  val ctxFind = ctx.setCommand(Find(andThen=Seq(resProj,resSort)))

  val ctxFindFirst = ctx.setCommand(Find(firstOnly=true))
  val ctxFindArrayItem = ctx.setCommand(
    Find(filter = Some(equal("podtl.qty",100)))
  )

下面是一个完整的例子:

import akka.actor.ActorSystem
import akka.stream.ActorMaterializer
import org.mongodb.scala._

import scala.collection.JavaConverters._
import com.mongodb.client.model._
import com.datatech.sdp.mongo.engine._
import MGOClasses._

import scala.util._

object TestMongoEngine extends App {
  import MGOEngine._
  import MGOHelpers._
  import MGOCommands._
  import MGOAdmins._

  // or provide custom MongoClientSettings
  val settings: MongoClientSettings = MongoClientSettings.builder()
    .applyToClusterSettings(b => b.hosts(List(new ServerAddress("localhost")).asJava))
    .build()
  implicit val client: MongoClient = MongoClient(settings)

  implicit val system = ActorSystem()
  implicit val mat = ActorMaterializer()
 // implicit val ec = system.dispatcher

  val ctx = MGOContext("testdb","po").setCommand(
    DropCollection("po"))

  import monix.execution.Scheduler.Implicits.global
  println(getResult(mgoAdmin(ctx).value.value.runToFuture))

scala.io.StdIn.readLine()

  val pic = fileToMGOBlob("/users/tiger/cert/MeTiger.png")
  val po1 = Document (
    "ponum" -> "po18012301",
    "vendor" -> "The smartphone compay",
    "podate" -> mgoDate(2017,5,13),
    "remarks" -> "urgent, rush order",
    "handler" -> pic,
    "podtl" -> Seq(
      Document("item" -> "sony smartphone", "price" -> 2389.00, "qty" -> 1239, "packing" -> "standard"),
      Document("item" -> "ericson smartphone", "price" -> 897.00, "qty" -> 1000, "payterm" -> "30 days")
    )
  )

  val po2 = Document (
    "ponum" -> "po18022002",
    "vendor" -> "The Samsung compay",
    "podate" -> mgoDate(2015,11,6),
    "podtl" -> Seq(
      Document("item" -> "samsung galaxy s8", "price" -> 2300.00, "qty" -> 100, "packing" -> "standard"),
      Document("item" -> "samsung galaxy s7", "price" -> 1897.00, "qty" -> 1000, "payterm" -> "30 days"),
      Document("item" -> "apple iphone7", "price" -> 6500.00, "qty" -> 100, "packing" -> "luxury")
    )
  )

  val optInsert = new InsertManyOptions().ordered(true)
  val ctxInsert = ctx.setCommand(
    Insert(Seq(po1,po2),Some(optInsert))
  )
  println(getResult(mgoUpdate(ctxInsert).value.value.runToFuture))

  scala.io.StdIn.readLine()

  case class PO (
                  ponum: String,
                  podate: MGODate,
                  vendor: String,
                  remarks: Option[String],
                  podtl: Option[MGOArray],
                  handler: Option[MGOBlob]
                )
  def toPO(doc: Document): PO = {
    PO(
      ponum = doc.getString("ponum"),
      podate = doc.getDate("podate"),
      vendor = doc.getString("vendor"),
      remarks = mgoGetStringOrNone(doc,"remarks"),
      podtl = mgoGetArrayOrNone(doc,"podtl"),
      handler = mgoGetBlobOrNone(doc,"handler")
    )
  }

  case class PODTL(
                    item: String,
                    price: Double,
                    qty: Int,
                    packing: Option[String],
                    payTerm: Option[String]
                  )
  def toPODTL(podtl: Document): PODTL = {
    PODTL(
      item = podtl.getString("item"),
      price = podtl.getDouble("price"),
      qty = podtl.getInteger("qty"),
      packing = mgoGetStringOrNone(podtl,"packing"),
      payTerm = mgoGetStringOrNone(podtl,"payterm")
    )
  }

  def showPO(po: PO) = {
    println(s"po number: ${po.ponum}")
    println(s"po date: ${mgoDateToString(po.podate,"yyyy-MM-dd")}")
    println(s"vendor: ${po.vendor}")
    if (po.remarks != None)
      println(s"remarks: ${po.remarks.get}")
    po.podtl match {
      case Some(barr) =>
        mgoArrayToDocumentList(barr)
          .map { dc => toPODTL(dc)}
          .foreach { doc: PODTL =>
            print(s"==>Item: ${doc.item} ")
            print(s"price: ${doc.price} ")
            print(s"qty: ${doc.qty} ")
            doc.packing.foreach(pk => print(s"packing: ${pk} "))
            doc.payTerm.foreach(pt => print(s"payTerm: ${pt} "))
            println("")
          }
      case _ =>
    }

    po.handler match {
      case Some(blob) =>
        val fileName = s"/users/tiger/${po.ponum}.png"
        mgoBlobToFile(blob,fileName)
        println(s"picture saved to ${fileName}")
      case None => println("no picture provided")
    }

  }

  import org.mongodb.scala.model.Projections._
  import org.mongodb.scala.model.Filters._
  import org.mongodb.scala.model.Sorts._
  import org.mongodb.scala.bson.conversions._
  import org.mongodb.scala.bson.Document


  val ctxFilter = Find(filter=Some(equal("podtl.qty",100)))


  val sort: Bson = (descending("ponum"))
  val proj: Bson = (and(include("ponum","podate")
                   ,include("vendor"),excludeId()))
  val resSort = ResultOptions(FOD_TYPE.FOD_SORT,Some(sort))
  val resProj = ResultOptions(FOD_TYPE.FOD_PROJECTION,Some(proj))
  val ctxFind = ctx.setCommand(Find(andThen=Seq(resProj,resSort)))

  val ctxFindFirst = ctx.setCommand(Find(firstOnly=true))
  val ctxFindArrayItem = ctx.setCommand(
    Find(filter = Some(equal("podtl.qty",100)))
  )

  for {
    _ <- mgoQuery[List[Document]](ctxFind).value.value.runToFuture.andThen {
      case Success(eold) => eold match {
        case Right(old) => old match {
          case Some(ld) => ld.map(toPO(_)).foreach(showPO)
          case None => println(s"Empty document found!")
        }
        case Left(err) => println(s"Error: ${err.getMessage}")
      }
        println("-------------------------------")
      case Failure(e) => println(e.getMessage)
    }

    _ <- mgoQuery[PO](ctxFindFirst,Some(toPO _)).value.value.runToFuture.andThen {
      case Success(eop) => eop match {
        case Right(op) => op match {
          case Some(p) => showPO(_)
          case None => println(s"Empty document found!")
        }
        case Left(err) => println(s"Error: ${err.getMessage}")
      }
        println("-------------------------------")
      case Failure(e) => println(e.getMessage)
    }

    _ <- mgoQuery[List[PO]](ctxFindArrayItem,Some(toPO _)).value.value.runToFuture.andThen {
      case Success(eops) => eops match {
        case Right(ops) => ops match {
          case Some(lp) => lp.foreach(showPO)
          case None => println(s"Empty document found!")
        }
        case Left(err) => println(s"Error: ${err.getMessage}")
      }
        println("-------------------------------")
      case Failure(e) => println(e.getMessage)
    }
  } yield()


  scala.io.StdIn.readLine()


  system.terminate()
}

运行程序后结果如下:

Right(Some(The operation completed successfully))

Right(Some(The operation completed successfully))

po number: po18022002
po date: 2015-12-06
vendor: The Samsung compay
no picture provided
po number: po18012301
po date: 2017-06-13
vendor: The smartphone compay
no picture provided
-------------------------------
-------------------------------
po number: po18022002
po date: 2015-12-06
vendor: The Samsung compay
==>Item: samsung galaxy s8 price: 2300.0 qty: 100 packing: standard 
==>Item: samsung galaxy s7 price: 1897.0 qty: 1000 payTerm: 30 days 
==>Item: apple iphone7 price: 6500.0 qty: 100 packing: luxury 
no picture provided
-------------------------------

以下是本次讨论涉及的全部源代码:

build.sbt

name := "dt-dal"

version := "0.2"

scalaVersion := "2.12.8"

scalacOptions += "-Ypartial-unification"

val akkaVersion = "2.5.23"
val akkaHttpVersion = "10.1.8"

libraryDependencies := Seq(
  // for scalikejdbc
  "org.scalikejdbc" %% "scalikejdbc"       % "3.2.1",
  "org.scalikejdbc" %% "scalikejdbc-test"   % "3.2.1"   % "test",
  "org.scalikejdbc" %% "scalikejdbc-config"  % "3.2.1",
  "org.scalikejdbc" %% "scalikejdbc-streams" % "3.2.1",
  "org.scalikejdbc" %% "scalikejdbc-joda-time" % "3.2.1",
  "com.h2database"  %  "h2" % "1.4.199",
  "com.zaxxer" % "HikariCP" % "2.7.4",
  "com.jolbox" % "bonecp" % "0.8.0.RELEASE",
  "com.typesafe.slick" %% "slick" % "3.3.2",
  //for cassandra 3.6.0
  "com.datastax.cassandra" % "cassandra-driver-core" % "3.6.0",
  "com.datastax.cassandra" % "cassandra-driver-extras" % "3.6.0",
  "com.lightbend.akka" %% "akka-stream-alpakka-cassandra" % "1.1.0",
  //for mongodb 4.0
  "org.mongodb.scala" %% "mongo-scala-driver" % "2.6.0",
  "com.lightbend.akka" %% "akka-stream-alpakka-mongodb" % "1.1.0",
  "ch.qos.logback"  %  "logback-classic"   % "1.2.3",
  "io.monix" %% "monix" % "3.0.0-RC3",
  "org.typelevel" %% "cats-core" % "2.0.0-M4"
)

converters/DBOResultType.scala

package com.datatech.sdp.result

import cats._
import cats.data.EitherT
import cats.data.OptionT
import monix.eval.Task
import cats.implicits._

import scala.concurrent._

import scala.collection.TraversableOnce

object DBOResult {


  type DBOError[A] = EitherT[Task,Throwable,A]
  type DBOResult[A] = OptionT[DBOError,A]

  implicit def valueToDBOResult[A](a: A): DBOResult[A] =
         Applicative[DBOResult].pure(a)
  implicit def optionToDBOResult[A](o: Option[A]): DBOResult[A] =
         OptionT((o: Option[A]).pure[DBOError])
  implicit def eitherToDBOResult[A](e: Either[Throwable,A]): DBOResult[A] = {
 //   val error: DBOError[A] = EitherT[Task,Throwable, A](Task.eval(e))
         OptionT.liftF(EitherT.fromEither[Task](e))
  }
  implicit def futureToDBOResult[A](fut: Future[A]): DBOResult[A] = {
       val task = Task.fromFuture[A](fut)
       val et = EitherT.liftF[Task,Throwable,A](task)
       OptionT.liftF(et)
  }

  implicit class DBOResultToTask[A](r: DBOResult[A]) {
    def toTask = r.value.value
  }

  implicit class DBOResultToOption[A](r:Either[Throwable,Option[A]]) {
    def someValue: Option[A] = r match {
      case Left(err) => (None: Option[A])
      case Right(oa) => oa
    }
  }

  def wrapCollectionInOption[A, C[_] <: TraversableOnce[_]](coll: C[A]): DBOResult[C[A]] =
    if (coll.isEmpty)
      optionToDBOResult(None: Option[C[A]])
    else
      optionToDBOResult(Some(coll): Option[C[A]])
}

filestream/FileStreaming.scala

package com.datatech.sdp.file

import java.io.{ByteArrayInputStream, InputStream}
import java.nio.ByteBuffer
import java.nio.file.Paths

import akka.stream.Materializer
import akka.stream.scaladsl.{FileIO, StreamConverters}
import akka.util._

import scala.concurrent.Await
import scala.concurrent.duration._

object Streaming {
  def FileToByteBuffer(fileName: String, timeOut: FiniteDuration = 60 seconds)(
    implicit mat: Materializer):ByteBuffer = {
    val fut = FileIO.fromPath(Paths.get(fileName)).runFold(ByteString()) { case (hd, bs) =>
      hd ++ bs
    }
    (Await.result(fut, timeOut)).toByteBuffer
  }


  def FileToByteArray(fileName: String, timeOut: FiniteDuration = 60 seconds)(
    implicit mat: Materializer): Array[Byte] = {
    val fut = FileIO.fromPath(Paths.get(fileName)).runFold(ByteString()) { case (hd, bs) =>
      hd ++ bs
    }
    (Await.result(fut, timeOut)).toArray
  }

  def FileToInputStream(fileName: String, timeOut: FiniteDuration = 60 seconds)(
    implicit mat: Materializer): InputStream = {
    val fut = FileIO.fromPath(Paths.get(fileName)).runFold(ByteString()) { case (hd, bs) =>
      hd ++ bs
    }
    val buf = (Await.result(fut, timeOut)).toArray
    new ByteArrayInputStream(buf)
  }

  def ByteBufferToFile(byteBuf: ByteBuffer, fileName: String)(
    implicit mat: Materializer) = {
    val ba = new Array[Byte](byteBuf.remaining())
    byteBuf.get(ba,0,ba.length)
    val baInput = new ByteArrayInputStream(ba)
    val source = StreamConverters.fromInputStream(() => baInput)  //ByteBufferInputStream(bytes))
    source.runWith(FileIO.toPath(Paths.get(fileName)))
  }

  def ByteArrayToFile(bytes: Array[Byte], fileName: String)(
    implicit mat: Materializer) = {
    val bb = ByteBuffer.wrap(bytes)
    val baInput = new ByteArrayInputStream(bytes)
    val source = StreamConverters.fromInputStream(() => baInput) //ByteBufferInputStream(bytes))
    source.runWith(FileIO.toPath(Paths.get(fileName)))
  }

  def InputStreamToFile(is: InputStream, fileName: String)(
    implicit mat: Materializer) = {
    val source = StreamConverters.fromInputStream(() => is)
    source.runWith(FileIO.toPath(Paths.get(fileName)))
  }

}

logging/Log.scala

package com.datatech.sdp.logging

import org.slf4j.Logger

/**
  * Logger which just wraps org.slf4j.Logger internally.
  *
  * @param logger logger
  */
class Log(logger: Logger) {

  // use var consciously to enable squeezing later
  var isDebugEnabled: Boolean = logger.isDebugEnabled
  var isInfoEnabled: Boolean = logger.isInfoEnabled
  var isWarnEnabled: Boolean = logger.isWarnEnabled
  var isErrorEnabled: Boolean = logger.isErrorEnabled
  var isTraceEnabled: Boolean = logger.isTraceEnabled

  def withLevel(level: Symbol)(msg: => String, e: Throwable = null): Unit = {
    level match {
      case 'debug | 'DEBUG => debug(msg)
      case 'info | 'INFO => info(msg)
      case 'warn | 'WARN => warn(msg)
      case 'error | 'ERROR => error(msg)
      case 'trace | 'TRACE => trace(msg)
      case _ => // nothing to do
    }
  }

  var stepOn: Boolean = false

  def step(msg: => String): Unit = {
    if(stepOn)
      logger.info("\n****** {} ******\n",msg)
  }

  def trace(msg: => String): Unit = {
    if (isTraceEnabled && logger.isTraceEnabled) {
      logger.trace(msg)
    }
  }

  def trace(msg: => String, e: Throwable): Unit = {
    if (isTraceEnabled && logger.isTraceEnabled) {
      logger.trace(msg, e)
    }
  }

  def debug(msg: => String): Unit = {
    if (isDebugEnabled && logger.isDebugEnabled) {
      logger.debug(msg)
    }
  }

  def debug(msg: => String, e: Throwable): Unit = {
    if (isDebugEnabled && logger.isDebugEnabled) {
      logger.debug(msg, e)
    }
  }

  def info(msg: => String): Unit = {
    if (isInfoEnabled && logger.isInfoEnabled) {
      logger.info(msg)
    }
  }

  def info(msg: => String, e: Throwable): Unit = {
    if (isInfoEnabled && logger.isInfoEnabled) {
      logger.info(msg, e)
    }
  }

  def warn(msg: => String): Unit = {
    if (isWarnEnabled && logger.isWarnEnabled) {
      logger.warn(msg)
    }
  }

  def warn(msg: => String, e: Throwable): Unit = {
    if (isWarnEnabled && logger.isWarnEnabled) {
      logger.warn(msg, e)
    }
  }

  def error(msg: => String): Unit = {
    if (isErrorEnabled && logger.isErrorEnabled) {
      logger.error(msg)
    }
  }

  def error(msg: => String, e: Throwable): Unit = {
    if (isErrorEnabled && logger.isErrorEnabled) {
      logger.error(msg, e)
    }
  }

}

logging/LogSupport.scala

package com.datatech.sdp.logging

import org.slf4j.LoggerFactory

trait LogSupport {

  /**
    * Logger
    */
  protected val log = new Log(LoggerFactory.getLogger(this.getClass))

}

mgo/engine/ObservableToPublisher.scala

package com.datatech.sdp.mongo.engine

import java.util.concurrent.atomic.AtomicBoolean

import org.mongodb.{scala => mongoDB}
import org.{reactivestreams => rxStreams}

final case class ObservableToPublisher[T](observable: mongoDB.Observable[T])
  extends rxStreams.Publisher[T] {
  def subscribe(subscriber: rxStreams.Subscriber[_ >: T]): Unit =
    observable.subscribe(new mongoDB.Observer[T]() {
      override def onSubscribe(subscription: mongoDB.Subscription): Unit =
        subscriber.onSubscribe(new rxStreams.Subscription() {
          private final val cancelled: AtomicBoolean = new AtomicBoolean

          override def request(n: Long): Unit =
            if (!subscription.isUnsubscribed && !cancelled.get() && n < 1) {
              subscriber.onError(
                new IllegalArgumentException(
                  s"Demand from publisher should be a positive amount. Current amount is:$n"
                )
              )
            } else {
              subscription.request(n)
            }

          override def cancel(): Unit =
            if (!cancelled.getAndSet(true)) subscription.unsubscribe()
        })

      def onNext(result: T): Unit = subscriber.onNext(result)

      def onError(e: Throwable): Unit = subscriber.onError(e)

      def onComplete(): Unit = subscriber.onComplete()
    })
}

mgo/engine/MongoDBEngine.scala

package com.datatech.sdp.mongo.engine

import java.text.SimpleDateFormat
import java.util.Calendar

import akka.NotUsed
import akka.stream.Materializer
import akka.stream.alpakka.mongodb.scaladsl._
import akka.stream.scaladsl.{Flow, Source}
import org.bson.conversions.Bson
import org.mongodb.scala.bson.collection.immutable.Document
import org.mongodb.scala.bson.{BsonArray, BsonBinary}
import org.mongodb.scala.model._
import org.mongodb.scala.{MongoClient, _}
import com.datatech.sdp
import sdp.file.Streaming._
import sdp.logging.LogSupport

import scala.collection.JavaConverters._
import scala.concurrent._
import scala.concurrent.duration._

object MGOClasses {
  type MGO_ACTION_TYPE = Int
  object MGO_ACTION_TYPE {
    val MGO_QUERY = 0
    val MGO_UPDATE = 1
    val MGO_ADMIN = 2
  }

  /*  org.mongodb.scala.FindObservable
    import com.mongodb.async.client.FindIterable
    val resultDocType = FindIterable[Document]
    val resultOption = FindObservable(resultDocType)
      .maxScan(...)
    .limit(...)
    .sort(...)
    .project(...) */

  type FOD_TYPE = Int
  object FOD_TYPE {
    val FOD_FIRST = 0 //def first(): SingleObservable[TResult], return the first item
    val FOD_FILTER = 1 //def filter(filter: Bson): FindObservable[TResult]
    val FOD_LIMIT = 2 //def limit(limit: Int): FindObservable[TResult]
    val FOD_SKIP = 3 //def skip(skip: Int): FindObservable[TResult]
    val FOD_PROJECTION = 4 //def projection(projection: Bson): FindObservable[TResult]
    //Sets a document describing the fields to return for all matching documents
    val FOD_SORT = 5 //def sort(sort: Bson): FindObservable[TResult]
    val FOD_PARTIAL = 6 //def partial(partial: Boolean): FindObservable[TResult]
    //Get partial results from a sharded cluster if one or more shards are unreachable (instead of throwing an error)
    val FOD_CURSORTYPE = 7 //def cursorType(cursorType: CursorType): FindObservable[TResult]
    //Sets the cursor type
    val FOD_HINT = 8 //def hint(hint: Bson): FindObservable[TResult]
    //Sets the hint for which index to use. A null value means no hint is set
    val FOD_MAX = 9 //def max(max: Bson): FindObservable[TResult]
    //Sets the exclusive upper bound for a specific index. A null value means no max is set
    val FOD_MIN = 10 //def min(min: Bson): FindObservable[TResult]
    //Sets the minimum inclusive lower bound for a specific index. A null value means no max is set
    val FOD_RETURNKEY = 11 //def returnKey(returnKey: Boolean): FindObservable[TResult]
    //Sets the returnKey. If true the find operation will return only the index keys in the resulting documents
    val FOD_SHOWRECORDID = 12 //def showRecordId(showRecordId: Boolean): FindObservable[TResult]
    //Sets the showRecordId. Set to true to add a field `\$recordId` to the returned documents
  }
  case class ResultOptions(
                            optType: FOD_TYPE,
                            bson: Option[Bson] = None,
                            value: Int = 0 ){
    import FOD_TYPE._
     def toFindObservable: FindObservable[Document] => FindObservable[Document] = find => {
      optType match {
        case  FOD_FIRST        => find
        case  FOD_FILTER       => find.filter(bson.get)
        case  FOD_LIMIT        => find.limit(value)
        case  FOD_SKIP         => find.skip(value)
        case  FOD_PROJECTION   => find.projection(bson.get)
        case  FOD_SORT         => find.sort(bson.get)
        case  FOD_PARTIAL      => find.partial(value != 0)
        case  FOD_CURSORTYPE   => find
        case  FOD_HINT         => find.hint(bson.get)
        case  FOD_MAX          => find.max(bson.get)
        case  FOD_MIN          => find.min(bson.get)
        case  FOD_RETURNKEY    => find.returnKey(value != 0)
        case  FOD_SHOWRECORDID => find.showRecordId(value != 0)

      }
    }
  }

  trait MGOCommands

  object MGOCommands {

    case class Count(filter: Option[Bson] = None, options: Option[Any] = None) extends MGOCommands

    case class Distict(fieldName: String, filter: Option[Bson] = None) extends MGOCommands

    /*  org.mongodb.scala.FindObservable
    import com.mongodb.async.client.FindIterable
    val resultDocType = FindIterable[Document]
    val resultOption = FindObservable(resultDocType)
      .maxScan(...)
    .limit(...)
    .sort(...)
    .project(...) */
    case class Find(filter: Option[Bson] = None,
                       andThen: Seq[ResultOptions] = Seq.empty[ResultOptions],
                       firstOnly: Boolean = false) extends MGOCommands

    case class Aggregate(pipeLine: Seq[Bson]) extends MGOCommands

    case class MapReduce(mapFunction: String, reduceFunction: String) extends MGOCommands

    case class Insert(newdocs: Seq[Document], options: Option[Any] = None) extends MGOCommands

    case class Delete(filter: Bson, options: Option[Any] = None, onlyOne: Boolean = false) extends MGOCommands

    case class Replace(filter: Bson, replacement: Document, options: Option[Any] = None) extends MGOCommands

    case class Update(filter: Bson, update: Bson, options: Option[Any] = None, onlyOne: Boolean = false) extends MGOCommands


    case class BulkWrite(commands: List[WriteModel[Document]], options: Option[Any] = None) extends MGOCommands

  }

  object MGOAdmins {

    case class DropCollection(collName: String) extends MGOCommands

    case class CreateCollection(collName: String, options: Option[Any] = None) extends MGOCommands

    case class ListCollection(dbName: String) extends MGOCommands

    case class CreateView(viewName: String, viewOn: String, pipeline: Seq[Bson], options: Option[Any] = None) extends MGOCommands

    case class CreateIndex(key: Bson, options: Option[Any] = None) extends MGOCommands

    case class DropIndexByName(indexName: String, options: Option[Any] = None) extends MGOCommands

    case class DropIndexByKey(key: Bson, options: Option[Any] = None) extends MGOCommands

    case class DropAllIndexes(options: Option[Any] = None) extends MGOCommands

  }

  case class MGOContext(
                         dbName: String,
                         collName: String,
                         actionType: MGO_ACTION_TYPE = MGO_ACTION_TYPE.MGO_QUERY,
                         action: Option[MGOCommands] = None,
                         actionOptions: Option[Any] = None,
                         actionTargets: Seq[String] = Nil
                       ) {
    ctx =>
    def setDbName(name: String): MGOContext = ctx.copy(dbName = name)

    def setCollName(name: String): MGOContext = ctx.copy(collName = name)

    def setActionType(at: MGO_ACTION_TYPE): MGOContext = ctx.copy(actionType = at)

    def setCommand(cmd: MGOCommands): MGOContext  = ctx.copy(action = Some(cmd))

  }

  object MGOContext {
    def apply(db: String, coll: String) = new MGOContext(db, coll)
  }

  case class MGOBatContext(contexts: Seq[MGOContext], tx: Boolean = false) {
    ctxs =>
    def setTx(txopt: Boolean): MGOBatContext = ctxs.copy(tx = txopt)
    def appendContext(ctx: MGOContext): MGOBatContext =
      ctxs.copy(contexts = contexts :+ ctx)
  }

  object MGOBatContext {
    def apply(ctxs: Seq[MGOContext], tx: Boolean = false) = new MGOBatContext(ctxs,tx)
  }

  type MGODate = java.util.Date
  def mgoDate(yyyy: Int, mm: Int, dd: Int): MGODate = {
    val ca = Calendar.getInstance()
    ca.set(yyyy,mm,dd)
    ca.getTime()
  }
  def mgoDateTime(yyyy: Int, mm: Int, dd: Int, hr: Int, min: Int, sec: Int): MGODate = {
    val ca = Calendar.getInstance()
    ca.set(yyyy,mm,dd,hr,min,sec)
    ca.getTime()
  }
  def mgoDateTimeNow: MGODate = {
    val ca = Calendar.getInstance()
    ca.getTime
  }


  def mgoDateToString(dt: MGODate, formatString: String): String = {
    val fmt= new SimpleDateFormat(formatString)
    fmt.format(dt)
  }

  type MGOBlob = BsonBinary
  type MGOArray = BsonArray

  def fileToMGOBlob(fileName: String, timeOut: FiniteDuration = 60 seconds)(
    implicit mat: Materializer) = FileToByteArray(fileName,timeOut)

  def mgoBlobToFile(blob: MGOBlob, fileName: String)(
    implicit mat: Materializer) =  ByteArrayToFile(blob.getData,fileName)

  def mgoGetStringOrNone(doc: Document, fieldName: String) = {
    if (doc.keySet.contains(fieldName))
      Some(doc.getString(fieldName))
    else None
  }
  def mgoGetIntOrNone(doc: Document, fieldName: String) = {
    if (doc.keySet.contains(fieldName))
      Some(doc.getInteger(fieldName))
    else None
  }
  def mgoGetLonggOrNone(doc: Document, fieldName: String) = {
    if (doc.keySet.contains(fieldName))
      Some(doc.getLong(fieldName))
    else None
  }
  def mgoGetDoubleOrNone(doc: Document, fieldName: String) = {
    if (doc.keySet.contains(fieldName))
      Some(doc.getDouble(fieldName))
    else None
  }
  def mgoGetBoolOrNone(doc: Document, fieldName: String) = {
    if (doc.keySet.contains(fieldName))
      Some(doc.getBoolean(fieldName))
    else None
  }
  def mgoGetDateOrNone(doc: Document, fieldName: String) = {
    if (doc.keySet.contains(fieldName))
      Some(doc.getDate(fieldName))
    else None
  }
  def mgoGetBlobOrNone(doc: Document, fieldName: String) = {
    if (doc.keySet.contains(fieldName))
      doc.get(fieldName).asInstanceOf[Option[MGOBlob]]
    else None
  }
  def mgoGetArrayOrNone(doc: Document, fieldName: String) = {
    if (doc.keySet.contains(fieldName))
      doc.get(fieldName).asInstanceOf[Option[MGOArray]]
    else None
  }

  def mgoArrayToDocumentList(arr: MGOArray): scala.collection.immutable.List[org.bson.BsonDocument] = {
    (arr.getValues.asScala.toList)
      .asInstanceOf[scala.collection.immutable.List[org.bson.BsonDocument]]
  }

  type MGOFilterResult = FindObservable[Document] => FindObservable[Document]
}


object MGOEngine extends LogSupport {

  import MGOClasses._
  import MGOAdmins._
  import MGOCommands._
  import sdp.result.DBOResult._
  import com.mongodb.reactivestreams.client.MongoClients

  object TxUpdateMode {
    private def mgoTxUpdate(ctxs: MGOBatContext, observable: SingleObservable[ClientSession])(
              implicit client: MongoClient, ec: ExecutionContext): SingleObservable[ClientSession] = {
      log.info(s"mgoTxUpdate> calling ...")
      observable.map(clientSession => {

        val transactionOptions =
          TransactionOptions.builder()
            .readConcern(ReadConcern.SNAPSHOT)
            .writeConcern(WriteConcern.MAJORITY).build()

        clientSession.startTransaction(transactionOptions)
/*
        val fut = Future.traverse(ctxs.contexts) { ctx =>
          mgoUpdateObservable[Completed](ctx).map(identity).toFuture()
        }
        Await.ready(fut, 3 seconds) */

        ctxs.contexts.foreach { ctx =>
          mgoUpdateObservable[Completed](ctx).map(identity).toFuture()
        }
        clientSession
      })
    }

    private def commitAndRetry(observable: SingleObservable[Completed]): SingleObservable[Completed] = {
      log.info(s"commitAndRetry> calling ...")
      observable.recoverWith({
        case e: MongoException if e.hasErrorLabel(MongoException.UNKNOWN_TRANSACTION_COMMIT_RESULT_LABEL) => {
          log.warn("commitAndRetry> UnknownTransactionCommitResult, retrying commit operation ...")
          commitAndRetry(observable)
        }
        case e: Exception => {
          log.error(s"commitAndRetry> Exception during commit ...: $e")
          throw e
        }
      })
    }

    private def runTransactionAndRetry(observable: SingleObservable[Completed]): SingleObservable[Completed] = {
      log.info(s"runTransactionAndRetry> calling ...")
      observable.recoverWith({
        case e: MongoException if e.hasErrorLabel(MongoException.TRANSIENT_TRANSACTION_ERROR_LABEL) => {
          log.warn("runTransactionAndRetry> TransientTransactionError, aborting transaction and retrying ...")
          runTransactionAndRetry(observable)
        }
      })
    }

    def mgoTxBatch(ctxs: MGOBatContext)(
            implicit client: MongoClient, ec: ExecutionContext): DBOResult[Completed] = {

      log.info(s"mgoTxBatch>  MGOBatContext: ${ctxs}")

      val updateObservable: Observable[ClientSession] = mgoTxUpdate(ctxs, client.startSession())
      val commitTransactionObservable: SingleObservable[Completed] =
        updateObservable.flatMap(clientSession => clientSession.commitTransaction())
      val commitAndRetryObservable: SingleObservable[Completed] = commitAndRetry(commitTransactionObservable)

      runTransactionAndRetry(commitAndRetryObservable)

      valueToDBOResult(Completed())

    }
  }


  def mgoUpdateBatch(ctxs: MGOBatContext)(implicit client: MongoClient, ec: ExecutionContext): DBOResult[Completed] = {
    log.info(s"mgoUpdateBatch>  MGOBatContext: ${ctxs}")
    if (ctxs.tx) {
        TxUpdateMode.mgoTxBatch(ctxs)
      } else {
/*
        val fut = Future.traverse(ctxs.contexts) { ctx =>
          mgoUpdate[Completed](ctx).map(identity) }

        Await.ready(fut, 3 seconds)
        FastFastFuture.successful(new Completed) */
        ctxs.contexts.foreach { ctx =>
          mgoUpdate[Completed](ctx).map(identity) }

         valueToDBOResult(Completed())
      }

  }

  def mongoStream(ctx: MGOContext)(
    implicit client: MongoClient, ec: ExecutionContextExecutor): Source[Document, NotUsed] = {

    log.info(s"mongoStream>  MGOContext: ${ctx}")

    def toResultOption(rts: Seq[ResultOptions]): FindObservable[Document] => FindObservable[Document] = findObj =>
      rts.foldRight(findObj)((a,b) => a.toFindObservable(b))

    val db = client.getDatabase(ctx.dbName)
    val coll = db.getCollection(ctx.collName)
    if ( ctx.action == None) {
      log.error(s"mongoStream> uery action cannot be null!")
      throw new IllegalArgumentException("query action cannot be null!")
    }
    try {
      ctx.action.get match {
        case Find(None, Nil, false) => //FindObservable
          MongoSource(ObservableToPublisher(coll.find()))
        case Find(None, Nil, true) => //FindObservable
          MongoSource(ObservableToPublisher(coll.find().first()))
        case Find(Some(filter), Nil, false) => //FindObservable
          MongoSource(ObservableToPublisher(coll.find(filter)))
        case Find(Some(filter), Nil, true) => //FindObservable
          MongoSource(ObservableToPublisher(coll.find(filter).first()))
        case Find(None, sro, _) => //FindObservable
          val next = toResultOption(sro)
          MongoSource(ObservableToPublisher(next(coll.find[Document]())))
        case Find(Some(filter), sro, _) => //FindObservable
          val next = toResultOption(sro)
          MongoSource(ObservableToPublisher(next(coll.find[Document](filter))))
        case _ =>
          log.error(s"mongoStream> unsupported streaming query [${ctx.action.get}]")
          throw new RuntimeException(s"mongoStream> unsupported streaming query [${ctx.action.get}]")

      }
    }
    catch { case e: Exception =>
      log.error(s"mongoStream> runtime error: ${e.getMessage}")
      throw new RuntimeException(s"mongoStream> Error: ${e.getMessage}")
    }

  }


  // T => FindIterable  e.g List[Document]
  def mgoQuery[T](ctx: MGOContext, Converter: Option[Document => Any] = None)(implicit client: MongoClient): DBOResult[T] = {
    log.info(s"mgoQuery>  MGOContext: ${ctx}")

    val db = client.getDatabase(ctx.dbName)
    val coll = db.getCollection(ctx.collName)

    def toResultOption(rts: Seq[ResultOptions]): FindObservable[Document] => FindObservable[Document] = findObj =>
      rts.foldRight(findObj)((a,b) => a.toFindObservable(b))


    if ( ctx.action == None) {
      log.error(s"mgoQuery> uery action cannot be null!")
      Left(new IllegalArgumentException("query action cannot be null!"))
    }
    try {
      ctx.action.get match {
        /* count */
        case Count(Some(filter), Some(opt)) => //SingleObservable
          coll.countDocuments(filter, opt.asInstanceOf[CountOptions])
            .toFuture().asInstanceOf[Future[T]]
        case Count(Some(filter), None) => //SingleObservable
          coll.countDocuments(filter).toFuture()
            .asInstanceOf[Future[T]]
        case Count(None, None) => //SingleObservable
          coll.countDocuments().toFuture()
            .asInstanceOf[Future[T]]
        /* distinct */
        case Distict(field, Some(filter)) => //DistinctObservable
          coll.distinct(field, filter).toFuture()
            .asInstanceOf[Future[T]]
        case Distict(field, None) => //DistinctObservable
          coll.distinct((field)).toFuture()
            .asInstanceOf[Future[T]]
        /* find */
        case Find(None, Nil, false) => //FindObservable
          if (Converter == None) coll.find().toFuture().asInstanceOf[Future[T]]
          else coll.find().map(Converter.get).toFuture().asInstanceOf[Future[T]]
        case Find(None, Nil, true) => //FindObservable
          if (Converter == None) coll.find().first().head().asInstanceOf[Future[T]]
          else coll.find().first().map(Converter.get).head().asInstanceOf[Future[T]]
        case Find(Some(filter), Nil, false) => //FindObservable
          if (Converter == None) coll.find(filter).toFuture().asInstanceOf[Future[T]]
          else coll.find(filter).map(Converter.get).toFuture().asInstanceOf[Future[T]]
        case Find(Some(filter), Nil, true) => //FindObservable
          if (Converter == None) coll.find(filter).first().head().asInstanceOf[Future[T]]
          else coll.find(filter).first().map(Converter.get).head().asInstanceOf[Future[T]]
        case Find(None, sro, _) => //FindObservable
          val next = toResultOption(sro)
          if (Converter == None) next(coll.find[Document]()).toFuture().asInstanceOf[Future[T]]
          else next(coll.find[Document]()).map(Converter.get).toFuture().asInstanceOf[Future[T]]
        case Find(Some(filter), sro, _) => //FindObservable
          val next = toResultOption(sro)
          if (Converter == None) next(coll.find[Document](filter)).toFuture().asInstanceOf[Future[T]]
          else next(coll.find[Document](filter)).map(Converter.get).toFuture().asInstanceOf[Future[T]]
        /* aggregate AggregateObservable*/
        case Aggregate(pline) => coll.aggregate(pline).toFuture().asInstanceOf[Future[T]]
        /* mapReduce MapReduceObservable*/
        case MapReduce(mf, rf) => coll.mapReduce(mf, rf).toFuture().asInstanceOf[Future[T]]
        /* list collection */
        case ListCollection(dbName) => //ListConllectionObservable
          client.getDatabase(dbName).listCollections().toFuture().asInstanceOf[Future[T]]

      }
    }
    catch { case e: Exception =>
      log.error(s"mgoQuery> runtime error: ${e.getMessage}")
      Left(new RuntimeException(s"mgoQuery> Error: ${e.getMessage}"))
    }
  }
  //T => Completed, result.UpdateResult, result.DeleteResult
  def mgoUpdate[T](ctx: MGOContext)(implicit client: MongoClient): DBOResult[T] =
    try {
      mgoUpdateObservable[T](ctx).toFuture()
    }
    catch { case e: Exception =>
      log.error(s"mgoUpdate> runtime error: ${e.getMessage}")
      Left(new RuntimeException(s"mgoUpdate> Error: ${e.getMessage}"))
    }

  def mgoUpdateObservable[T](ctx: MGOContext)(implicit client: MongoClient): SingleObservable[T] = {
    log.info(s"mgoUpdateObservable>  MGOContext: ${ctx}")

    val db = client.getDatabase(ctx.dbName)
    val coll = db.getCollection(ctx.collName)
    if ( ctx.action == None) {
      log.error(s"mgoUpdateObservable> uery action cannot be null!")
      throw new IllegalArgumentException("mgoUpdateObservable> query action cannot be null!")
    }
    try {
      ctx.action.get match {
        /* insert */
        case Insert(docs, Some(opt)) => //SingleObservable[Completed]
          if (docs.size > 1)
            coll.insertMany(docs, opt.asInstanceOf[InsertManyOptions]).asInstanceOf[SingleObservable[T]]
          else coll.insertOne(docs.head, opt.asInstanceOf[InsertOneOptions]).asInstanceOf[SingleObservable[T]]
        case Insert(docs, None) => //SingleObservable
          if (docs.size > 1) coll.insertMany(docs).asInstanceOf[SingleObservable[T]]
          else coll.insertOne(docs.head).asInstanceOf[SingleObservable[T]]
        /* delete */
        case Delete(filter, None, onlyOne) => //SingleObservable
          if (onlyOne) coll.deleteOne(filter).asInstanceOf[SingleObservable[T]]
          else coll.deleteMany(filter).asInstanceOf[SingleObservable[T]]
        case Delete(filter, Some(opt), onlyOne) => //SingleObservable
          if (onlyOne) coll.deleteOne(filter, opt.asInstanceOf[DeleteOptions]).asInstanceOf[SingleObservable[T]]
          else coll.deleteMany(filter, opt.asInstanceOf[DeleteOptions]).asInstanceOf[SingleObservable[T]]
        /* replace */
        case Replace(filter, replacement, None) => //SingleObservable
          coll.replaceOne(filter, replacement).asInstanceOf[SingleObservable[T]]
        case Replace(filter, replacement, Some(opt)) => //SingleObservable
          coll.replaceOne(filter, replacement, opt.asInstanceOf[ReplaceOptions]).asInstanceOf[SingleObservable[T]]
        /* update */
        case Update(filter, update, None, onlyOne) => //SingleObservable
          if (onlyOne) coll.updateOne(filter, update).asInstanceOf[SingleObservable[T]]
          else coll.updateMany(filter, update).asInstanceOf[SingleObservable[T]]
        case Update(filter, update, Some(opt), onlyOne) => //SingleObservable
          if (onlyOne) coll.updateOne(filter, update, opt.asInstanceOf[UpdateOptions]).asInstanceOf[SingleObservable[T]]
          else coll.updateMany(filter, update, opt.asInstanceOf[UpdateOptions]).asInstanceOf[SingleObservable[T]]
        /* bulkWrite */
        case BulkWrite(commands, None) => //SingleObservable
          coll.bulkWrite(commands).asInstanceOf[SingleObservable[T]]
        case BulkWrite(commands, Some(opt)) => //SingleObservable
          coll.bulkWrite(commands, opt.asInstanceOf[BulkWriteOptions]).asInstanceOf[SingleObservable[T]]
      }
    }
    catch { case e: Exception =>
      log.error(s"mgoUpdateObservable> runtime error: ${e.getMessage}")
      throw new RuntimeException(s"mgoUpdateObservable> Error: ${e.getMessage}")
    }
  }

  def mgoAdmin(ctx: MGOContext)(implicit client: MongoClient): DBOResult[Completed] = {
    log.info(s"mgoAdmin>  MGOContext: ${ctx}")

    val db = client.getDatabase(ctx.dbName)
    val coll = db.getCollection(ctx.collName)
    if ( ctx.action == None) {
      log.error(s"mgoAdmin> uery action cannot be null!")
      Left(new IllegalArgumentException("mgoAdmin> query action cannot be null!"))
    }
    try {
      ctx.action.get match {
        /* drop collection */
        case DropCollection(collName) => //SingleObservable
          val coll = db.getCollection(collName)
          coll.drop().toFuture()
        /* create collection */
        case CreateCollection(collName, None) => //SingleObservable
          db.createCollection(collName).toFuture()
        case CreateCollection(collName, Some(opt)) => //SingleObservable
          db.createCollection(collName, opt.asInstanceOf[CreateCollectionOptions]).toFuture()
        /* list collection
      case ListCollection(dbName) =>   //ListConllectionObservable
        client.getDatabase(dbName).listCollections().toFuture().asInstanceOf[Future[T]]
        */
        /* create view */
        case CreateView(viewName, viewOn, pline, None) => //SingleObservable
          db.createView(viewName, viewOn, pline).toFuture()
        case CreateView(viewName, viewOn, pline, Some(opt)) => //SingleObservable
          db.createView(viewName, viewOn, pline, opt.asInstanceOf[CreateViewOptions]).toFuture()
        /* create index */
        case CreateIndex(key, None) => //SingleObservable
          coll.createIndex(key).toFuture().asInstanceOf[Future[Completed]] //   asInstanceOf[SingleObservable[Completed]]
        case CreateIndex(key, Some(opt)) => //SingleObservable
          coll.createIndex(key, opt.asInstanceOf[IndexOptions]).asInstanceOf[Future[Completed]] // asInstanceOf[SingleObservable[Completed]]
        /* drop index */
        case DropIndexByName(indexName, None) => //SingleObservable
          coll.dropIndex(indexName).toFuture()
        case DropIndexByName(indexName, Some(opt)) => //SingleObservable
          coll.dropIndex(indexName, opt.asInstanceOf[DropIndexOptions]).toFuture()
        case DropIndexByKey(key, None) => //SingleObservable
          coll.dropIndex(key).toFuture()
        case DropIndexByKey(key, Some(opt)) => //SingleObservable
          coll.dropIndex(key, opt.asInstanceOf[DropIndexOptions]).toFuture()
        case DropAllIndexes(None) => //SingleObservable
          coll.dropIndexes().toFuture()
        case DropAllIndexes(Some(opt)) => //SingleObservable
          coll.dropIndexes(opt.asInstanceOf[DropIndexOptions]).toFuture()
      }
    }
    catch { case e: Exception =>
      log.error(s"mgoAdmin> runtime error: ${e.getMessage}")
      throw new RuntimeException(s"mgoAdmin> Error: ${e.getMessage}")
    }

  }

}


object MongoActionStream {

  import MGOClasses._

  case class StreamingInsert[A](dbName: String,
                                collName: String,
                                converter: A => Document,
                                parallelism: Int = 1
                               ) extends MGOCommands

  case class StreamingDelete[A](dbName: String,
                                collName: String,
                                toFilter: A => Bson,
                                parallelism: Int = 1,
                                justOne: Boolean = false
                               ) extends MGOCommands

  case class StreamingUpdate[A](dbName: String,
                                collName: String,
                                toFilter: A => Bson,
                                toUpdate: A => Bson,
                                parallelism: Int = 1,
                                justOne: Boolean = false
                               ) extends MGOCommands


  case class InsertAction[A](ctx: StreamingInsert[A])(
    implicit mongoClient: MongoClient) {

    val database = mongoClient.getDatabase(ctx.dbName)
    val collection = database.getCollection(ctx.collName)

    def performOnRow(implicit ec: ExecutionContext): Flow[A, Document, NotUsed] =
      Flow[A].map(ctx.converter)
        .mapAsync(ctx.parallelism)(doc => collection.insertOne(doc).toFuture().map(_ => doc))
  }

  case class UpdateAction[A](ctx: StreamingUpdate[A])(
    implicit mongoClient: MongoClient) {

    val database = mongoClient.getDatabase(ctx.dbName)
    val collection = database.getCollection(ctx.collName)

    def performOnRow(implicit ec: ExecutionContext): Flow[A, A, NotUsed] =
      if (ctx.justOne) {
        Flow[A]
          .mapAsync(ctx.parallelism)(a =>
            collection.updateOne(ctx.toFilter(a), ctx.toUpdate(a)).toFuture().map(_ => a))
      } else
        Flow[A]
          .mapAsync(ctx.parallelism)(a =>
            collection.updateMany(ctx.toFilter(a), ctx.toUpdate(a)).toFuture().map(_ => a))
  }


  case class DeleteAction[A](ctx: StreamingDelete[A])(
    implicit mongoClient: MongoClient) {

    val database = mongoClient.getDatabase(ctx.dbName)
    val collection = database.getCollection(ctx.collName)

    def performOnRow(implicit ec: ExecutionContext): Flow[A, A, NotUsed] =
      if (ctx.justOne) {
        Flow[A]
          .mapAsync(ctx.parallelism)(a =>
            collection.deleteOne(ctx.toFilter(a)).toFuture().map(_ => a))
      } else
        Flow[A]
          .mapAsync(ctx.parallelism)(a =>
            collection.deleteMany(ctx.toFilter(a)).toFuture().map(_ => a))
  }

}

object MGOHelpers {

  implicit class DocumentObservable[C](val observable: Observable[Document]) extends ImplicitObservable[Document] {
    override val converter: (Document) => String = (doc) => doc.toJson
  }

  implicit class GenericObservable[C](val observable: Observable[C]) extends ImplicitObservable[C] {
    override val converter: (C) => String = (doc) => doc.toString
  }

  trait ImplicitObservable[C] {
    val observable: Observable[C]
    val converter: (C) => String

    def results(): Seq[C] = Await.result(observable.toFuture(), 10 seconds)

    def headResult() = Await.result(observable.head(), 10 seconds)

    def printResults(initial: String = ""): Unit = {
      if (initial.length > 0) print(initial)
      results().foreach(res => println(converter(res)))
    }

    def printHeadResult(initial: String = ""): Unit = println(s"${initial}${converter(headResult())}")
  }

  def getResult[T](fut: Future[T], timeOut: Duration = 1 second): T = {
    Await.result(fut, timeOut)
  }

  def getResults[T](fut: Future[Iterable[T]], timeOut: Duration = 1 second): Iterable[T] = {
    Await.result(fut, timeOut)
  }

  import monix.eval.Task
  import monix.execution.Scheduler.Implicits.global

  final class FutureToTask[A](x: => Future[A]) {
    def asTask: Task[A] = Task.deferFuture[A](x)
  }

  final class TaskToFuture[A](x: => Task[A]) {
    def asFuture: Future[A] = x.runToFuture
  }

}

TestMongoEngine.scala

import akka.actor.ActorSystem
import akka.stream.ActorMaterializer
import org.mongodb.scala._

import scala.collection.JavaConverters._
import com.mongodb.client.model._
import com.datatech.sdp.mongo.engine._
import MGOClasses._

import scala.util._

object TestMongoEngine extends App {
  import MGOEngine._
  import MGOHelpers._
  import MGOCommands._
  import MGOAdmins._

  // or provide custom MongoClientSettings
  val settings: MongoClientSettings = MongoClientSettings.builder()
    .applyToClusterSettings(b => b.hosts(List(new ServerAddress("localhost")).asJava))
    .build()
  implicit val client: MongoClient = MongoClient(settings)

  implicit val system = ActorSystem()
  implicit val mat = ActorMaterializer()
 // implicit val ec = system.dispatcher

  val ctx = MGOContext("testdb","po").setCommand(
    DropCollection("po"))

  import monix.execution.Scheduler.Implicits.global
  println(getResult(mgoAdmin(ctx).value.value.runToFuture))

scala.io.StdIn.readLine()

  val pic = fileToMGOBlob("/users/tiger/cert/MeTiger.png")
  val po1 = Document (
    "ponum" -> "po18012301",
    "vendor" -> "The smartphone compay",
    "podate" -> mgoDate(2017,5,13),
    "remarks" -> "urgent, rush order",
    "handler" -> pic,
    "podtl" -> Seq(
      Document("item" -> "sony smartphone", "price" -> 2389.00, "qty" -> 1239, "packing" -> "standard"),
      Document("item" -> "ericson smartphone", "price" -> 897.00, "qty" -> 1000, "payterm" -> "30 days")
    )
  )

  val po2 = Document (
    "ponum" -> "po18022002",
    "vendor" -> "The Samsung compay",
    "podate" -> mgoDate(2015,11,6),
    "podtl" -> Seq(
      Document("item" -> "samsung galaxy s8", "price" -> 2300.00, "qty" -> 100, "packing" -> "standard"),
      Document("item" -> "samsung galaxy s7", "price" -> 1897.00, "qty" -> 1000, "payterm" -> "30 days"),
      Document("item" -> "apple iphone7", "price" -> 6500.00, "qty" -> 100, "packing" -> "luxury")
    )
  )

  val optInsert = new InsertManyOptions().ordered(true)
  val ctxInsert = ctx.setCommand(
    Insert(Seq(po1,po2),Some(optInsert))
  )
  println(getResult(mgoUpdate(ctxInsert).value.value.runToFuture))

  scala.io.StdIn.readLine()

  case class PO (
                  ponum: String,
                  podate: MGODate,
                  vendor: String,
                  remarks: Option[String],
                  podtl: Option[MGOArray],
                  handler: Option[MGOBlob]
                )
  def toPO(doc: Document): PO = {
    PO(
      ponum = doc.getString("ponum"),
      podate = doc.getDate("podate"),
      vendor = doc.getString("vendor"),
      remarks = mgoGetStringOrNone(doc,"remarks"),
      podtl = mgoGetArrayOrNone(doc,"podtl"),
      handler = mgoGetBlobOrNone(doc,"handler")
    )
  }

  case class PODTL(
                    item: String,
                    price: Double,
                    qty: Int,
                    packing: Option[String],
                    payTerm: Option[String]
                  )
  def toPODTL(podtl: Document): PODTL = {
    PODTL(
      item = podtl.getString("item"),
      price = podtl.getDouble("price"),
      qty = podtl.getInteger("qty"),
      packing = mgoGetStringOrNone(podtl,"packing"),
      payTerm = mgoGetStringOrNone(podtl,"payterm")
    )
  }

  def showPO(po: PO) = {
    println(s"po number: ${po.ponum}")
    println(s"po date: ${mgoDateToString(po.podate,"yyyy-MM-dd")}")
    println(s"vendor: ${po.vendor}")
    if (po.remarks != None)
      println(s"remarks: ${po.remarks.get}")
    po.podtl match {
      case Some(barr) =>
        mgoArrayToDocumentList(barr)
          .map { dc => toPODTL(dc)}
          .foreach { doc: PODTL =>
            print(s"==>Item: ${doc.item} ")
            print(s"price: ${doc.price} ")
            print(s"qty: ${doc.qty} ")
            doc.packing.foreach(pk => print(s"packing: ${pk} "))
            doc.payTerm.foreach(pt => print(s"payTerm: ${pt} "))
            println("")
          }
      case _ =>
    }

    po.handler match {
      case Some(blob) =>
        val fileName = s"/users/tiger/${po.ponum}.png"
        mgoBlobToFile(blob,fileName)
        println(s"picture saved to ${fileName}")
      case None => println("no picture provided")
    }

  }

  import org.mongodb.scala.model.Projections._
  import org.mongodb.scala.model.Filters._
  import org.mongodb.scala.model.Sorts._
  import org.mongodb.scala.bson.conversions._
  import org.mongodb.scala.bson.Document


  val ctxFilter = Find(filter=Some(equal("podtl.qty",100)))


  val sort: Bson = (descending("ponum"))
  val proj: Bson = (and(include("ponum","podate")
                   ,include("vendor"),excludeId()))
  val resSort = ResultOptions(FOD_TYPE.FOD_SORT,Some(sort))
  val resProj = ResultOptions(FOD_TYPE.FOD_PROJECTION,Some(proj))
  val ctxFind = ctx.setCommand(Find(andThen=Seq(resProj,resSort)))

  val ctxFindFirst = ctx.setCommand(Find(firstOnly=true))
  val ctxFindArrayItem = ctx.setCommand(
    Find(filter = Some(equal("podtl.qty",100)))
  )

  for {
    _ <- mgoQuery[List[Document]](ctxFind).value.value.runToFuture.andThen {
      case Success(eold) => eold match {
        case Right(old) => old match {
          case Some(ld) => ld.map(toPO(_)).foreach(showPO)
          case None => println(s"Empty document found!")
        }
        case Left(err) => println(s"Error: ${err.getMessage}")
      }
        println("-------------------------------")
      case Failure(e) => println(e.getMessage)
    }

    _ <- mgoQuery[PO](ctxFindFirst,Some(toPO _)).value.value.runToFuture.andThen {
      case Success(eop) => eop match {
        case Right(op) => op match {
          case Some(p) => showPO(_)
          case None => println(s"Empty document found!")
        }
        case Left(err) => println(s"Error: ${err.getMessage}")
      }
        println("-------------------------------")
      case Failure(e) => println(e.getMessage)
    }

    _ <- mgoQuery[List[PO]](ctxFindArrayItem,Some(toPO _)).value.value.runToFuture.andThen {
      case Success(eops) => eops match {
        case Right(ops) => ops match {
          case Some(lp) => lp.foreach(showPO)
          case None => println(s"Empty document found!")
        }
        case Left(err) => println(s"Error: ${err.getMessage}")
      }
        println("-------------------------------")
      case Failure(e) => println(e.getMessage)
    }
  } yield()


  scala.io.StdIn.readLine()


  system.terminate()
}

本文参与腾讯云自媒体分享计划,欢迎正在阅读的你也加入,一起分享。

我来说两句

0 条评论
登录 后参与评论

相关文章

  • Akka-CQRS(8)- CQRS Reader Actor 应用实例

    前面我们已经讨论了CQRS-Reader-Actor的基本工作原理,现在是时候在之前那个POS例子里进行实际的应用示范了。

    用户1150956
  • SDP(12): MongoDB-Engine - Streaming

       在akka-alpakka工具包里也提供了对MongoDB的stream-connector,能针对MongoDB数据库进行streaming操作。这个M...

    用户1150956
  • restapi(1)- 文件上传下载服务

    上次对restapi开了个头,设计了一个包括了身份验证和使用权限的restful服务开发框架。这是一个通用框架,开发人员只要直接往里面加新功能就行了。虽然这次...

    用户1150956
  • Akka-CQRS(8)- CQRS Reader Actor 应用实例

    前面我们已经讨论了CQRS-Reader-Actor的基本工作原理,现在是时候在之前那个POS例子里进行实际的应用示范了。

    用户1150956
  • SDP(12): MongoDB-Engine - Streaming

       在akka-alpakka工具包里也提供了对MongoDB的stream-connector,能针对MongoDB数据库进行streaming操作。这个M...

    用户1150956
  • Akka(37): Http:客户端操作模式

       Akka-http的客户端连接模式除Connection-Level和Host-Level之外还有一种非常便利的模式:Request-Level-Api。...

    用户1150956
  • PICE(3):CassandraStreaming - gRPC-CQL Service

      在上一篇博文里我们介绍了通过gRPC实现JDBC数据库的streaming,这篇我们介绍关于cassandra的streaming实现方式。如果我们需要从一...

    用户1150956
  • 在Spark上用LDA计算文本主题模型

    在新闻推荐中,由于新闻主要为文本的特性,基于内容的推荐(Content-based Recommendation)一直是主要的推荐策略。基于内容的策略主要思路是...

    星回
  • 聊聊flink Table的select操作

    flink-table_2.11-1.7.0-sources.jar!/org/apache/flink/table/api/table.scala

    codecraft
  • 聊聊flink Table的select操作

    flink-table_2.11-1.7.0-sources.jar!/org/apache/flink/table/api/table.scala

    codecraft

扫码关注云+社区

领取腾讯云代金券