如果实时推荐继续采用离线推荐中的 ALS 算法,由于 ALS 算法运行时间巨大(好几分钟甚至好十几分钟),不具有实时得到新的推荐结果的能力;并且由于算法本身的使用的是用户评分表,用户本次评分后只更新了总评分表中的一项...(用于建立 redis 和 mongo 连接),并在 OnlineRecommender 中定义一些常量: src/main/scala/com.atguigu.online/OnlineRecommender.scala...import scala.collection.mutable.ArrayBuffer // 定义样例类 // 连接助手对象(用于建立 redis 和 mongo 的连接)并序列化 object ...// 因为 redis 操作返回的是 java 类,为了使用 map 操作需要引入转换类 import scala.collection.JavaConversions._ /** *...与 recentRecommends 有重复的商品 productId 时,recentRecommends 中 productId 的推荐优先级由于是上次实时推荐的结果,于是将作废,被替换成代表了更新后的
【实时推荐部分】 3、Flume 从综合业务服务的运行日志中读取日志更新,并将更新的日志实时推送到 Kafka 中;Kafka 在收到这些日志之后,通过 KafkaStream 程序对获取的日志信息进行过滤处理.../** * @param uri MongDB 的连接 * @param db MongDB 的 数据库 */ case class MongoConfig(uri: String, db...如果实时推荐继续采用离线推荐中的 ALS 算法,由于算法运行时间巨大,不具有实时得到新的推荐结果的能力;并且由于算法本身的使用的是评分表,用户本次评分后只更新了总评分表中的一项,使得算法运行后的推荐结果与用户本次评分之前的推荐结果基本没有多少差别...// 因为 redis 操作返回的是 java 类,为了使用 map 操作需要引入转换类 import scala.collection.JavaConversions._ /** *...与 recentRecommends 有重复的电影 mid 时,recentRecommends 中 mid 的推荐优先级由于是上次实时推荐的结果,于是将作废,被替换成代表了更新后的 updatedRecommends
示例: 该示例使用新的回调 API 来处理事务,它启动事务、执行指定的操作并提交(或在出错时中止)。...使用针对 MongoDB 部署版本更新的 MongoDB 驱动程序。...使用驱动程序时,事务中的每个操作必须与会话相关联(即将会话传递给每个操作)。 事务中的操作使用 事务级别的读关注,事务级别的写关注,和 事务级别的读偏好。...从 MongoDB 4.4 开始,你可以隐式或显式地在事务中创建集合。但是,你比须使用针对 4.4 更新的 MongoDB 驱动程序。...,使用为 MongoDB 4.2 更新的 MongoDB 驱动程序。
后台开发对每一个数据库表单使用统一的标准增添一套新的CRUD服务。希望如此能够提高开发效率,减少代码出错机会。 MongoDB是一种文件类型数据库,数据格式更加多样化。...客户端从后台下载时就需要把bytes转换成UTF8字符就可以恢复文件内容了。 首先,我们先从Model开始,在scala里用case class来表示。...Model是MongoDB Document的对应。在scala编程里我们是用case class 当作Document来操作的。...所以id字段名称是指定的,这点在设计表结构时要注意。 如何测试一个httpserver还是比较头痛的。用浏览器只能测试GET,其它POST,PUT,DELETE应该怎么测试?...() system.terminate() } } 下面是本次示范中的源代码: build.sbt name := "rest-mongo" version := "0.1" scalaVersion
另外 Mongo Scala Driver 的数据库操作默认返回 Observable 类型,如果你忘记了调用 toFuture 方法,或是没有消费返回数据,则数据库操作实际上并不会被执行,在开发中很容易引入一些...作为约定,Model 类使用 _id 字段作为唯一标识, 该字段同时也是 mongodb collection 的默认主键。...} 由于这些隐式的 Format 对象是在模型层的包对象(package object)中创建的,所以使用时无需显式导入,编译器会自动加载。...: Mongo) extends AbstractController(cc) {} 模型类和Collection 模型类使用 @Entity 注解标注, 一个模型类实例表示 mongodb collection...作为约定,模型类使用 _id 字段作为唯一标识, 该字段同时也是 mongodb collection 的默认主键。
看来这个系统需要MongoDB,rest-mongo和akka-cluster这几个组件。 我们先从前端需求开始:页面上每个商品有n个图片,客户端提出存入系统请求时提供商品编号、描述、默认尺寸及图片。...在上篇rest-mongo的基础上,针对新的系统需求做一些针对性的修改应该就行了。...如果客户在请求图片时没有提供就用数据库里客户端在提交存储时提供的默认宽高。...在编译时无法识别width,height。 好了,下面是Route部分的修改。...: MongoModel.scala package com.datatech.rest.mongo import org.mongodb.scala._ import com.datatech.sdp.mongo.engine
MongoDB提供了scala终端驱动mongo-scala-driver,我们就介绍一下MongoDB数据库和通过scala来进行数据操作编程。 ...因为MongoDB没有schema,所以Collection中的Document可以是不同形状格式的。...在用scala使用MongoDB之前必须先建立连接,scala-driver提供了多种连接方式: val client1 = MongoClient() val client2 = MongoClient...同样,db内的collection也是可以自动创建的,因为不需要预先设定字段格式(no-schema): val db: MongoDatabase = client.getDatabase("testdb...") val userCollection: MongoCollection[Document] = db.getCollection("users") collection中Document类的构建函数如下
实时推荐服务:项目采用 Spark Streaming 作为实时推荐系统,通过接收 Kafka 中缓存的数据,通过设计的推荐算法实现对实时推荐的数据处理,并将结构合并更新到 MongoDB 数据库。...【实时推荐部分】 2、Flume 从综合业务服务的运行日志中读取日志更新,并将更新的日志实时推送到 Kafka 中;Kafka 在收到这些日志之后,通过 kafkaStream 程序对获取的日志信息进行过滤处理...程序主体代码如下: DataLoader/src/main/scala/com.atguigu.recommerder/DataLoader.scala // 定义样例类 case class Product...,方便重复调用(当多次调用对 MongoDB 的存储或读写操作时) implicit val mongoConfig = MongoConfig(config("mongo.uri"), config...("mongo.db")) // 将数据保存到 MongoDB 中 storeDataInMongDB(productDF, ratingDF) // 关闭 Spark
不知怎么搞的,我尽然在这段代码中间使用了Await.result。从OOP角度分析这很容易理解,下一段程序需要上一段程序的结果来继续运行。...逻辑思路上没问题,不过这样的做法是典型的行令式编程模式。在函数式编程模式里,阶段性的运算结果是在包嵌在Monad中的。Monad本身只是一个运算计划,只有真正运算时才能获取结果。...修改后的源代码如下: MongoRepo.scala package com.datatech.rest.mongo import org.mongodb.scala._ import org.bson.conversions.Bson...import org.mongodb.scala.result._ import com.datatech.sdp.mongo.engine._ import MGOClasses._ import...._ import org.mongodb.scala.model.Filters._ import com.datatech.sdp.mongo.engine.MGOClasses._ import
但是在现实中理想总是不如人意,本来想在一个规模较小的公司展展拳脚,以为小公司会少点历史包袱,有利于全面技术改造。...但现实是:即使是小公司,一旦有个成熟的产品,那么进行全面的技术更新基本上是不可能的了,因为公司要生存,开发人员很难新旧技术之间随时切换。除非有狂热的热情,员工怠慢甚至抵制情绪不容易解决。...然后是数据库连接,下面是可以使用sqlserver的application.conf配置文件内容: # JDBC settings prod { db { h2 { driver...这个函数返回Source[R,Any],下面我们好好谈谈这个R:R是读的结果,通常是某个类或model,比如读取Person记录返回一组Person类的实例。这里有一种强类型的感觉。...最后是put:这是为批次型的事物处理设计的。接受一条或者多条无参数sql指令,多条指令会在一个事物中执行。
分布式数据库有一套与传统观念不同的数据模式,在设计库表结构时必须从满足各种数据抽取的需要为主要目的。...在使用MongoDB前我们必须熟悉它的数据模式和设计理念:在大数据时代的今天,数据的产生和使用发生了质的变化,传统关系数据库数据模式已经无法满足现代信息系统的要求。...比如,在设计个人信息表时要考虑有些人有两个地址,有些甚至没有地址,又有些有传真号,还有这个那个的其它特点等等。在关系数据库模式设计中我们必须作出取舍,牺牲一些属性。...但MongoDB的文件类数据库特点容许不同的数据格式,能实现完整的数据采集与储存。...再看看下面类型转换中的数据类型对应: case class PO ( ponum: String, podate: java.util.Date
所以使用集群客户端的机器必须在本机启动ClusterClient服务(运行这个actor),这是通讯桥梁的一端。...ClusterClient在启动时用预先配置的地址(contact points)与ClusterClientReceptionist连接,然后通过ClusterClientReceptionist发布的联络点清单来维护内部的对接点清单...,可以进行持久化,在发生系统重启时用这个名单来与集群连接。...在具体应用中要注意sender()的具体意义:从提供服务的actor方面看,sender()代表ClusterClientReceptionist。...客户端发送MongoDB指令的示范如下: //MongoDB 操作示范 import org.mongodb.scala._ import sdp.mongo.engine.MGOClasses
具体使用方式是在集群所有节点部署ClusterSingletonManager,由集群中的leader节点选定其中一个节点并指示上面的ClusterSingletonManager运行一个cluster...从应用场景来说cluster singleton应该是某种pull模式的应用:我们把singleton当作中央操作协调,比如说管理一个任务清单,多个ClusterSingletonProxy从任务清单中获取...如果需要实现push模式的任务派送:即由singleton主动通知集群里某种类型的actor执行任务,那么通过ClusterSingletonProxy沟通就不适用了,使用pub/sub方式是一个可行的解决方案...消息是由scalapb从.proto文件中自动产生的。...下面是这次讨论中的示范源代码: project/scalapb.sbt addSbtPlugin("com.thesamet" % "sbt-protoc" % "0.99.18") libraryDependencies
下面是具体的功能实现代码:基本上是直接调用Mongo-scala的对应函数,需要注意的是java类型和scala类型之间的相互转换: object MGOEngine { import MGOContext...下面我们来试运行这些函数,不过先关注一些细节:关于MongoDB的Date,Blob,Array等类型在scala中的使用方法: type MGODate = java.util.Date def...None => println("no picture provided") } } 在上面的代码里我们使用了前面提供的MongoDB数据类型读取帮助函数。...下面我们测试对poCollection中的Document进行查询,示范包括projection,sort,filter等: import org.mongodb.scala.model.Projections...下面是这次示范的源代码: build.sbt name := "learn-mongo" version := "0.1" scalaVersion := "2.12.4" libraryDependencies
当查询一个不存在的 collection 时也不会出错,Mongo 会认为那是一个空的 collection。...;“_id”是系统保留的字段,但用户可以自己储存唯一性的数据在字段中。 客户端语法: show dbs // 列出所有数据库 use memo // 使用数据库 memo。...y from foo // 一些SQL不能做的,MongoDB也可以做: db.foo.find({"address.city":"gz"}) // 搜索嵌套文档address中city值为gz的记录...db.foo.find({likes:"math"}) // 搜索数组 db.foo.ensureIndex({"address.city":1}) // 在嵌套文档的字段上建索引 更新数据 db.foo.update...({},{}) //更新对象,第一个参数是查询对象,第二个是替代的,可以在第二个对象里指定更新哪些字段,要使用 set。"
Scala 的模式匹配是类似与正则匹配的的模式匹配,但是不仅仅如此,它还可以匹配对象的内在的构建形式....模式匹配就是反向的构造器,可以通过嵌套器来构造对象,在构造时提供一些参数 例如: val list = List(3,6) list: List[Int] = List(3, 6) scala> list...,找不到则出错。...类型模式 "hello" match { case _:String => println("ok")} ok 如果使用了泛型,它会被擦拭掉,如同java的做法,所以上面的 List[String] 里的...实际上上面的语句编译时就会给出警告,但并不出错。 通常对于泛型直接用通配符替代,上面的写为 case a : List[_] =>
在谈到restapi之前我在这篇讨论先介绍一下MongoDB数据库操作的scala编程,因为与传统的SQL数据库操作编程有比较大的差别。...在前面有关sdp (streaming-data-processor)系列的博文中有一段是关于MongoDBEngine的。刚好把这套工具的使用在这里介绍一下。...MongoDBEngine是基于mongodb-scala-driver上开发的一套MongoDB数据库CRUD Scala编程工具,其主要功能可以从下面这三个函数中反映出来: def mgoUpdate...scala.collection.JavaConverters._ import com.mongodb.client.model._ import com.datatech.sdp.mongo.engine..." %% "mongo-scala-driver" % "2.6.0", "com.lightbend.akka" %% "akka-stream-alpakka-mongodb" % "1.1.0
在对上一篇博文里我们把MongoDB的消息指令序列化单独挑出来讨论了一番,在这篇我们准备在一个MongoDB scala开发环境里通过streaming运算来示范这些protobuf消息的应用。 ...这两个函数的实现包含在文章后面提供的源代码中。...下面这段是本次示范的服务实现代码: package sdp.grpc.mongo.server import sdp.mongo.engine._ import MGOClasses._ import...._ import scala.concurrent._ import akka.stream.ActorMaterializer import sdp.mongo.engine.MGOProtoConversion.MGODocument...: MGOProtoConversion.scala package sdp.mongo.engine import org.mongodb.scala.bson.collection.immutable.Document
解释: 运行mongo启动shell shell会在启动时自动连接MongoDB服务器,默认连接test数据库,并将这个数据库连接赋值给全局变量db,这个变量是MongoDB的主要入口点。...当查询一个不存在的collection时也不会出错,Mongo会认为那是一个空的collection。... db.foo.find({"address.city":"gz"}) // 搜索嵌套文档address中city值为gz的记录 db.foo.find({likes:"math"}) ...// 搜索数组 db.foo.ensureIndex({"address.city":1}) // 在嵌套文档的字段上建索引 更新数据: db.foo.update({},{})更新对象...,第一个参数是查询对象,第二个是替代的,可以在第二个对象里指定更新哪些字段,要使用$set。
如果想创建一个数据库名称 use mydb 要检查当前选择的数据库使用命令: db 创建的数据库mydb 列表中是不存在的。...// no case // 即时加上了列筛选,_id也会返回;必须显式的阻止_id返回 db.users.find({"age" : {"$gte" : 18, "$lte" : 30}}) select...中,既包含"apple",又包含"banana"的纪录 db.food.find({"fruit.2" : "peach"}) // 对数组的查询, 字段fruit中,第3个(从0开始)元素是peach...的纪录 db.food.find({"fruit" : {"$size" : 3}}) // 对数组的查询, 查询数组元素个数是3的记录,$size前面无法和其他的操作符复合使用 db.users.findOne...db.blog.find({"comments" : {"$elemMatch" : {"author" : "joe", "score" : {"$gte" : 5}}}}) // 嵌套查询,仅当嵌套的元素是数组时使用
领取专属 10元无门槛券
手把手带您无忧上云