00:00
呃,有了这些东西之后,接下来我们就进入到呃,在这个对象里边去做一些基本的操作了,首先还是会去定义一些常量。常量和表名,大家想一下,我们在这个程序里面用到哪些表呢?跟offline去做一个对照的话,还是啊,他这里边先要的是RA。我们这里边要不要。呃,现在想的话,好像应该是都不需要对吧。呃,所以我们就干脆这个RA就不要了,然后下边还定义了一些,这是要往man购里边写的那个推荐列表和这个product res,是不是那个相似度列表啊,相似度矩阵,所以这两部分大家会想到我们最后是不是推荐也得有一个写入到一个一个表里边去,只不过不应该还叫user X了,要不然的话你是不是就把之前的那个离线推荐的结果覆盖了啊,所以我们应该重新定义一个,但是模式其实是差不多的啊,所以说我们这里可以借鉴一下,那最后这里边我们还定义了一个user max recommendation,那这里边也可以借鉴,就是对我们最多推荐多少个啊,那这里边我们可以甚至可以定义更多的这个常量,比方说我可以要求。
01:29
最后呃,大大家会想到这里是最后推荐的那个列表是多少个,我这里面甚至可以在之前就把这个限定好,比方说我要求从相似度矩阵里边选出来的那个备选列表是多长,哎,是不是就可以定义一个max长度啊,那最后是不是一定不会超过这个备选列表啊,这个就直接就限限定好了,然后另外我还可以去要求,比方说在这个RA里边,可能最近的评分有很多个用户,可能这个我们的那个短时间内对吧,做了很多次评分,那可能这个长度也是非常长的,也可以给限制一个最长,我从re里边去选取多少个啊,这这个都是可以去做要求的啊好,那么我这里就先把这两个copy过来。
02:20
呃,这里就不应该叫user raps了,比方说我们这里就叫呃,Stream in,呃,我们叫online,或者叫stream,就是流逝的这个推荐都可以啊,这里我叫STEM吧啊,为什么叫stream ras呢?是因为我们实时推荐就是在这个后边的业务系统里边,他查询的那张表名字叫stream res,所以这里大家最好还是保持一致,要不然的话,我们后面就得改这个业务系统里面的代码了。或者说呃,就是要改里面的那个配置了,那那些常数常数配置好常数定义啊,所以这里面我们定义一个叫stream ras这样的一张表,来保存最后的这个实时推荐的结果,另外我们还定义后面这个product ras,这就是我们从里面去读取相似度矩阵了,另外我们还要定义对应的。
03:22
那个最大长度,比方说这里可以定义一个max,呃,User RA,就是我们从那个里边选取评分,最大选取多少对吧?呃,这里我还是统一先给一个20吧,大家只要知道是什么意思就可以,然后后边我再给一个候选列表最大的长度,比方说我这个叫s product就是。Product就是相似的,这个商品最多选取多少个对吧,那个candidate选取多少个候选人。我们也给一个20,先把这两个定义出来,然后接下来就可以去写main函数了,入口方法里边,首先大家会想到是不是还应该有一堆啊配置项啊,应该有那个conig,对不对,我们先把那个con copy过来。
04:21
我们一个一个一项一项来看Spark这个需要保留吗?啊,这个肯定的啊,因为我们一开始肯定是有这个SPA session的定义,对吧?Spark con定义,那么mongo UI mongo DB这两项需要吗?啊,这个当然也是需要的,我们肯定跟mongo相关的一些操作也是用从这个con这个这个地方直接定义好,我们再把它包装成前面定义的这个mango con对象对不对?呃,就是这样的一个过程,然后大家会想到还需要别的东西吗?呃,对,那大家会想到应该有red相关的一些配置,哎,我们这里边red比较简单,就没有别的东西了,这里边定义了客户端之后,是不是直接用就好了,所以没有涉及到更多的东西,如果有的话,我们也应该直接配置在这里,或者大家可以,呃,我们在上边这里调用的时候,还是把把这一部分都都定义在统一的一个地方,对不对?呃,也是可以的,另外我们这里还需要定义什么呢?
05:24
对,是不是还有卡夫卡相关的一些东西也需要定义出来啊,呃,所以大家可以看到在这个代码里边参考,大家可以参考一下代码文档啊,我们定义了一个叫做卡夫卡topic的一个,呃,这样的一个键啊,它的值,我们这个topic就叫做recommend,这是我们定义好的,在这个呃,就是我们的这个卡,这个Spark streaming流失处理的过程当中应该就会去。啊,就相当于要去消费这个recommend topic下面的数据,对不对啊,就要去创建这样的一个消费者,所以卡卡点topic。
06:08
叫做recommend。哦,这里面拼错了啊,卡夫卡。呃,当然这里边如果是这个本身在con里面map定义的话,这个倒无所谓,在后面前面错后边错到一起也就没关系啊,但是最好还是按照比较好理解的这种方式,大家把这个定义对好,那接下来的过程大家会想到是不是跟之前应该是非常接近啊,呃,接下来是不是还是应该创建Spark。呃,这里边我们就去定义一个Spark config。你有Spark。呃,那这里边大家注意,就是后边我们还是一样的啊set对set master这里边是用的config里边的。
07:06
是不是spark.cos啊?然后后边我们set APP name这个注意不要,如果我们直接抄那边的话,容易忘记改这里啊,大家还是注意一下这里边我们直接敲这里就改成了叫做online recommend。好,呃,那接下来我们同样还是把这个Spark session创建出来。Spark session.builder对吧?呃,然后后边我们把这个con传进来,最后呃,来一个get or create,呃,如果有的话直接用,没有就创建,诶这里边大家要注意。对,我们这是这个要做这个Spark streaming流式计算,那SPA session里边有没有这个SC呢?这个streaming的contact呢。
08:09
Spark2.0里边其实是有没有把这个streaming contact包进去的,对吧?这个大家知道的吧,对,他们俩是分开的,所以大家要注意啊,就是在Spark1.0的时候,呃,本身是所有的东西都分开,就是比方说有这个呃SPA contact sc对吧,有这个cql contact,有have contact,所有的都是分开,我们都要分别去创建,然后所有的不同的模块有不同的切接入点,那么到了Spark2.0的时候呢,相当于就把它们封装在了一起,给我们统提供了一个统一的接入点,这个就是所谓的Spark session,大家知道在SPA session里面是不是就有sa,呃,就有这个Spark contact,呃,就有这个呃SQ contact,但是它里边没有streaming contact,所以我们是不是还得去重新对,重新去拗一个呃这个streaming contact,那这里边我还是把。
09:10
SC先定义出来吧,这个SC就是在对Spark session里边对直接拿到这个Spark contact就可以,那大家知道在创建这个对我们创建一个SSC需要去new一个对streaming context,它里边是不是需要对需要把SC传进去啊,呃,就是直接写这个SPA contact就可以对吧,我们这里是前面定义出来了,所以把这个sa传进去,呃,另外后边是不是对指定一下这个be duration,那么比方说这里边我用这个。Second。给一个给个两秒钟吧,呃,这里大家会知道,一般这个best是要要给足够大的时间,保证我们这个对计算能够完成,这个时间间隔内能够把这个计算都都能够完成,所以一般是给500毫秒以上,所以我们这里边给一个两秒钟就肯定没有问题。
10:21
好,那接下来当然了,就是常规的那些东西啊,我们该引入的这个包还是要引入Spark。影视的这个包对吧?因为我们可能要做data set和这个data frame相关的一些处理计算,所以这个包肯定是需要的。另外我们再来参考一下离线这一部分哦,那么这个mongo con是不是也是一样可以定义出来啊?呃,这个就直接抄过来就好了,呃,这里边我们不要用offline里边的,因为这里自己已经定义了嘛,用上面定义好的这个样例列就可以,呃,这样的话我们就把前面的呃,就是这些定义都已经定义完了,接下来对我们就需要去先加载,诶我们在创建那些卡普卡那些连接之前,我们先加载一下数据吧,先把该要的数据先拿到对不对?加载数据,呃,另外这里大家注意我们这里的数据主要的需要的是什么呢?从mango里面加载的数据是什么呢?Red是直接就是来了那个评评分数据之后,我们实时的去读的,对吧。
11:31
啊,去里面实时读的,这里我们提前要加载数据,加载什么呢?对,主要是要加载相似度矩阵,那么这个相似度矩阵大家会想到实际应用的时候,这个矩阵应该是比较大的,对吧。所以这里边我们做一个考量,呃,就是为性能考虑,可以把它做一个对,做一个广播广播出去,呃,那么如果广播出去的话,大家知道就相当于我们就变成每一个exe保存一个副本,对吧,就不需要每一次用到的时候给每一个这个呃计算的这个task都都去分发这个一个副本了,这个就会提升一些性能。
12:19
好,所以那接下来我们加载数据的时候,我定义一个叫做呃,就是它的矩阵,相似度矩阵嘛,我就叫一个same product maricx吧,Product maricx,商品的相似度矩阵啊,那么它应该等于什么呢?我们用Spark session的read方法,是不是直接可以从。Mango的表里面去读出来,呃,所以后边我们会用到option。呃,这里还是一些常规的定义啊,Uri这个从哪里取呢?Mongo con里边有U对吧?然后接下来对我们还得定义collection这个从哪里去取。
13:07
前面我们定义了是不是在啪从从哪个举呃那个表里面去取呢?是不是就是从这个相似度矩阵里边去取啊,叫做product ras,所以这里我们从product res里边把它取出来,呃,那接下来可以去对format一下com.mango db.spark.cq接下来点load把它加载进来,对吧?呃,这样就可以把数据读进来了,呃,然后接下来我们可能还得做一些转换,那么这个数据首先读进来之后,我们先把它包装成就是按照我们定义好的这个product先做一个转换,对不对?呃,然后接下来我们会把它在广播出去之前,我们可能会有一些考量,就是把它转换成什么样的形式转。
14:08
转播出去呢,为了后边我们做查询方便,大家会想到我最好是把它转换成一个。之前的这个形式,呃,我们加载进来之后,这相当于都是什么呢。这相当于还都是data set对吧?呃,但大家会想到这个load,他这里边拿到的啊,这里都是那个date frame,对不对,拿到的都是都是这个DF,那么data frame如果我们要去访问的时候,那你相当于还得用一些查询条件去做访问,对吧?啊,这个过程可能就会又麻烦一点,那最简单的访问方式是不是把对应的当时我们这个结构不就是一个product ID,然后对应着它的一组那个相似度的列表吗?我们直观的一个想法是不是把它保存成map的形式,然后是不是直接从这个变量里面,你按kv k value的这种形式直接把它选取出来就可以了,直接就能找到对应的那个相似度列表。呃,那大家就会想到我们最后应用的时候,可能是要查当前的这个商品跟另外某一个商品之间的相似度。
15:24
那是不是在那个列表里边还要找到某一个product ID,然后找到对应的那个相似度啊,诶,所以我们最好是把外面也包装成一个map,把里边也包装成一个map,转换成一个map,是不是这个查询起来就最方便,那到时候我就根据外层的product ID,诶,找他的那个对应的value,找到一个列表,然后再根据第二个product ID在里边去找,找到对应的那个值,是不是就是他们俩之间的相似度啊,这就是基本的这个想法,所以如果我们基于这样的考虑的话,那其实后边应该把它整个都转换成一个,大家会想到啊,这是,呃,我们的一个相当于是一个推荐列表这样的模式啊,我最后是希望把它转换成一个什么呢?
16:18
转换成一个。Res里边它前面是不是应该一开始是一个product ID啊,诶这里边大家看我这里还不能直接去转换,那我先去把它做一个对转成RDD。然后我接下来把它里边的内容就可以去拿出来了,呃。诶,这里边哦,大家看我是把它转成了product了,这个product我们前面有定义吗?对,其实没有定义,我应该把它转换成什么呢?对,Product product对吧,应该把它转换成product,然后接下来我们这里边要去用它里边的什么东西呢?是不是前边是一个product ID,后边应该把它的那个列表,呃,对,拿出来Rex的Rex啊,这个我们换一个名字吧,不要叫Rex了,就叫他的item吧。
17:27
就是每一个item。后边的这个ras大家会想到是不是应该把这个里边的每一个元素对都转换成k value那样的一个形式啊,所以相当于我们是想把这个ras是不是转换成k value,呃,转换成一个map啊,所以我们看看它能不能直接to map呢?诶直接可以把它做一个to map操作,这里边大家注意就是呃,这个里边本身它应该是一个什么形式呢?
18:07
对,应该我们得要求它里边是一个元组形式才可以,如果说这里大家不放心这个rax里边的形式,呃,之前我们定义的它是一个sequence,对不对,而且这里边是一个什么呢?是一个recommendation,那这是一个我们自己定义的样类,那这个里边你直接去to map可能不太好使,所以我这里边需要去做一个什么操作。啊,当然就是说对我我是不是应该把这个它里边的这个东西每一个都做一个转换啊,对吧,先拼出来每一个元素拼成一个元组,然后再去图map是不是就可以了,那么它里边的每一个元素。是不是用下划线就可以。呃,这里边它本身还是一个样例类,所以我们要用它的这个,呃,对应的这个名称去选取,对不对,它里边的元素的属性,这个属性名称去选取,那么另外还有是它的score对不对?把它包装成这样的一个元组,然后to map就可以把它转换成一个map了,这是里边把它转成map对吧?呃,这里给大家一句。
19:27
那个注释啊,就是。为了后续查询相似度方便,呃,把。数据转换成map形式啊,就是转换成k value的形式,对吧,到时候我们直接根据K去查对应的这个value就好了,那最后得到的这个结果我是不是可以。
20:03
还应该再把它转转成一次map啊,大家想一下,是不是我应该把整个这个RDD去转换成一个map,所以这里边RDD会有一个叫做CLA的as map方法,呃,用这个方法就可以把前面的这种,呃,根据前面这个作为K,后边这个做value,是不是就可以把它转换成一个map了?呃,这也就是为什么我们这里要把它转换成RDD,如果是data frame的话,是不是就没有直接collect map这样的方法了啊?所以这是我们做的一些前期的这个数据加载和预处理转换,那接下来我们要把它广播出去,对不对?所以去定义一个广播变量,那么叫做same product product metrics。呃,然后我们叫对BC吧,Broadcast对吧?呃,那么它就应该等于大家注意到broadcast是不是要用到SC,呃,他点broadcast里边传进去我们前面这个数据。
21:15
Maricx,这样就可以把它做一个广播,那个定义广播变量,好呃,这就是我们前面的这些数据,准备好。
我来说两句