00:00
接下来我们已经把预先想要定义的东西定义好的话,呃,接下来就要拿到我们的数据了,我们这里的数据是从哪里拿的呢?我们现在需要的最核心的数据是什么数据?是不是电影的相似度列表数据啊,或者我们叫相似度矩阵对吧?相似的相似度矩阵数据,那我们加载进来之后,大家会想到这个相似度矩阵可能会很大,对吧?然后接下来我们在做流失处理的时候,这个计算可能会比较比较多,比较复杂,那我们处理这样的这个大数据,比较大的这个数据模块的时候,做流失处理的时候,是不是往往会把这样的数据做一个。广播啊,大家之前讲过这个广播变量对吧?对,所以我们这里边再把这个加进来给大家讲一下,所以呃,把它广播出去,呃,这当然就是呃为了这个我们性能的考虑,那大家知道这个广播变量它的特点是什么呢?广播出去之后有什么好处呢?啊对,它就相当于是一个上面会留一份副本对不对,我们就不需要每一个任务都去呃保存这么一份副本了,对吧?啊就你就是针对那个X xcuor去保存副本就可以啊,这样的话我们就会节省很多这样的,呃内存资源,我们的空间资源啊,那这里大家会想到我定义一个叫相似度矩阵,那就是。
01:45
Same movie对吧?Matrix吧,相似度矩阵,它等于什么呢?我们是不是还是用这个SPA session的read方法先把那个读进来对吧?那这个对读的过程当中当然会有一些option啊,首先我们是不是要定义UI?
02:06
这里面都是之前大家已经比较习惯的这些操作了,Monggo con里边的uri对不对,然后option option,还有这个collection对不对?嗯,哎,怎么了?呃,电影相似度矩阵,我们之前是把它放到monggo里了,对吧,没有写到里面对不对,我们的程序里面没有把它实现,当然大家是不是可以把这个数据也全放到release里面去啊,啊,对,但是如果全存到里面去的话,这个数据其实还挺大的对不对?呃,大家可以想到啊,就是我们如果要是有足够大的空间,这个red就是大家知道这个成本比较高嘛,你如果要是说钱多的话,够用的话,当然是这样效果会更好,如果要是不够用的话,那我是不是大家会想到这其实我们在这个呃,Driver上面直接先把它广播出去。
03:06
这个是不是跟我们后边的那个实时计算是就是不会影响到实时计算的速率速度啊,啊对,所以说这个其实是不太影响我们实时计算的过程的,所以其实这个也没什么关系啊,就放当然更好,这里的话我们定义这个collection啊collection name我们是叫什么,从哪个表里边去取呢?是不是叫movie res collection啊,就是第一个对吧,这里边的这一个表明,然后接下来,呃,大家习惯啊,Read的话就没有没有mode了,对不对啊,Writer才有mode的方,这个方法我们这里就直接form com.mongo DB spark.cq。然后哎,哪里写错了哦,这里是不是少了一个C啊,啊大家看的很仔细,Load把数据加载进来,然后我们要把它转成什么呢?诶,是不是我们先按照定义好的这个数据格式把它转成movie Rex啊,然后最后我们想要什么东西呢?哎,我们还是把它转成RDD,然后再做转换。
04:20
当然了,这个做转换,大家看我这里边做的转换,可能相对来讲就是大家可能在这里还不知道他要干什么事情啊,我做的事情是比方说我对每一个movie ranks,这里面的每一个都是一个movie ras,对不对,我把它做一个操作,做一个什么操作呢?转换成。呃,大家会想到本身movie ras里边的值应该是什么样的一个格式,是不是就是一个mid,然后后边一个列表啊,但是其实我们之后会怎么用这个相似度,呃,这个列表呢?我们是不是想要给两个mid直接查出他们之间的那个相似度啊,我们后面是不是最希望做的事情是这样的一个事情啊?
05:18
所以如果要实现这样的一个功能的话,我们之前的这个存储可能就有一点有一点不太好查了,对不对啊,你查到之后,后面这个sequence是不是还得去遍历一下啊,然后找到对应的那个元素对不对?那大家想到什么样的方式查询速度最快呢?什么样的数据存储方式查询最快呢?对,是不是KV这种类型的,我们把它变成一个map是不是更好一些啊,啊,所以这里我们希望把后面这一部分转成一个map,另外大家就会想到前面这一部分,哎,我们这是一个样例类,对吧?那么它的一个对象,如果我们还要根据这一个前面的mid去做筛选的话,是不是也还得去去做一步啊,另外做一步啊,那我是不是把外面这一层也转成一个map,是不是根据这个mid直接找到这个列表,然后再根据里边的这个m mid又是一个map,是不是直接就找找到那个相似度评分了,诶,这是我们的一个想法,就是想把里边都转成map,所以为了。
06:20
查询相似度方便。转换成map啊,那我们这个怎么转呢?啊,前面这个部分还是不要变对吧,还是movie ras的mid,后边这部分我们就需要把movie ras的它有一个参助,是不是叫Rex啊,这本来是不是一个recommendation类型的一个sequence啊,那现在我们要把它转换成什么呢?呃,大家会想到我们是不是要把里边的每一个元素都转换成map,对不对,里边是要做一个操作,外边是不是也要有一个操作啊?
07:09
呃,这这里面我们就就就还是不要这个括号了,对吧,我们直接把它做这样的一个操作就可以。放到这里来。好,那大家会想到我在做完这样的操作之后,那接下来应该做什么事情呢?哎,其实不是啊,我们最后这个map过程当中,其实就是要把它最后转换成这样的一个数据结构,对吧,所以这个括号还是需要的。我们其实是前面这一部分,我们就应该是定义的是一个操作,而不是定义成一个一个方法了,对不对,Ma里边不应该是一个方法。
08:06
好,呃,那么我们相当于是对里边的每一个元素我们都要做这样的一个处理,那么大家会想到做完这个map操作之后,所有的整个的这个movie ras,我们是不是想要让它按照mid这个作为key,然后后边所有的这个元素作为它的值啊,还是要把它转成一个map对不对?对,那整个的这个,这相当于还是一个RDD对不对?那大家会想到它可以直接直接转成map对不对,我当然我们这里边也可以调用它的这个。是不是可以把它collect as map啊,我们这里用这个collect as map把它转换成对应的map,那当然前面的这一步里边,我们的这个相似相似度列表里边的每一个元素是不是对应的,还要做一个转换啊,转换成一个map对不对?那大家会想到这里边的每一个元素,它里边是不是应该拿出它的key是什么呢?是不是mid,然后它的value是什么呢?
09:13
是不是score啊啊,当时的那个相似度评分我们就是叫score存进去的对吧?所以还是score,那这个元组我们是不是把它是不是转成一个map呀?哦,大家会看到这里我们直接不能调用这个to map方法,呃,那大家会想到我们可以怎么去做呢?诶,这里其实不是这个元组要去做这个图map方法,对吧?大家会看到我是不是可以把这个map完了之后,这是它里边的每一个元素的做法对不对?我是不是可以只在后边去做这个图map方法啊,对吧?把我们当已经做过操作的这个ras做图map就可以了,那大家可能会想到,那我中间为什么要加这么一个map操作呢?我我直接这么做可以吗?
10:06
如果直接这么做的话,会有问题,大家会想到本身的这个是一个什么类型,它是一个我们在哪里,在这里,对吧,它是一个迭代的类型,对不对,对吧,里边是recommendation是一个。我们定义好的样例类这样的一个对象,对不对?对应的这个recommendation对象,那么我直接如果把它要转成map的话,可以直接转吗?对,转出来是不是有问题啊,所以大家注意,我们是不是还是应该把里边的元素不要用这个对象,而是要转成我们这里边的这样的一个元组啊,对吧,每个元素转成这样的一个元组,呃,所以中间我们要插这样一波啊。后边再把它collect as map转成这样的一个,呃,我们想要的一个map就可以了,这是我们的数据准备的过程,当然了,先把这个,我们只是把这个数据准备好了,接下来是不是定义一个广播电变量,把它广播出去啊,比方说我们这个就叫same movie matrix。
11:21
呃,我们叫broadcast对吧,它就等于SC,呃,我们应该是sc.broadcast对不对?里边传入的就是前面定义好的same movie matrix啊,这就把这个很大的这个数据作为广播数据定义好了,广播出去,然后呃,大家会想到接下来是不是就是我们要真正的做流失处理了,对吧?那在做流失处理之前是不是先要我们是不是先要跟卡夫卡做连接啊,我们是要基于卡夫卡去创建一个input stream对吧?啊,那这里边我们先要去,呃这个定义卡夫卡的连接项,连接配置对不对?卡夫卡参数配置参数连接参数。
12:21
呃,那这里边的话,其实是一些常规的一些定义了啊,大家看卡卡,呃,我们就叫per吧,等于是不是还是定义一个Mac啊,大家知道在后边我们创建那个stream的时候,是不是有一个参数就应该是传这个卡夫卡perter啊,Per对吧?呃,那那里的话,在这里我们提前定义好,然后直接把这个卡夫卡permeter传进去就可以了,这里需要什么呢?我们还是直接看这个文档吧,我们需要的内容。大家回忆一下是不是有这个P,这是什么?对,这是我们起的那个卡普卡集群对不对,默认端口是9092,我这里边的话直接都是logo host,所以比较简单,因为我的虚机就是直接在这个logo host上创建出来的啊,所以都是一样的。然后这里下一个是key,它的这个disizer,这是什么啊?对,反序列,反序列化这个工具对不对?然后我们这个是在消费者里边完成的,对吧?然后下面同样的key的反序列化,还有value的反序列化对不对?我们一般定义的就是这个class of string,呃,R,对吧,这个都是常规的一些写法,然后下边还有group ID,这个我们定义一个叫recommender,对不对?给一个group ID,最后还有一个all two offet reet,这是什么呢?对,便宜量的一个初始设置对不对,我们一般是不是都给一个latest就可以了呀。
13:54
对,就是最新的,好,那我这个就不详细去敲了啊这个配置下,诶,我们把它CTRL一下CTRLC直接copy过来,那当然这里面这个东西是需要引入的,对不对。
14:11
嗯,没有引入啊,那这里面大家看我们选哪个呢?是不是肯定要选这个卡夫卡common下面的这个stringr啊,对吧,我们把这个引入。好,定义了这个啊,卡夫卡perter的话,接下来是不是通过卡夫卡创建一个stream啊,对吧,我们的一个input stream对不对,真正的创建这个,呃,输入数据流。挖一个卡夫卡,我们就叫卡夫卡吧,卡夫卡诶呃夫卡迪stream对吧,没关系啊,卡夫卡stream都一样,对不对,然后那么大家创建的时候一般用什么方法创建呢?是不是我们把卡呃卡夫卡的那个U里边是不是有一个对最简单的方式是不是可以create direct stream呀,创建一呃就是直接的这个stream啊。
15:19
然后里边我们一般创建的时候,可能会指定一下它的这个泛型啊,String string皮外都是string类型,对不对啊,这跟我们前面这个,呃,就是反序列化这个定义是一样,要匹配上,然后里边的参数是什么呢。啊,大家会想到这里边要传的参数是不是首先应该有SSC啊,啊,然后还有什么呢?呃,SSC这个比较短,我们就放在最最上面吧,然后第二个参数是不是它应该有一个location,呃的那个策略啊,对吧,Strategies对不对,Location strategies,那我们一般在这里是不是给的是一个呃,Prefer consistent就可以了,对不对,偏向连续的,这偏向连续的策略,Prefer consistent。
16:19
呃,然后后边我们还要定义这个消费者的策略,对不对,Consumer strategies,对吧,它是不是我们要给一个subscribe呀,是不是这样写的,大家之前应该用过对不对?那么这个subscribe方法,大家看前边第一个是不是应该传入的是什么呢?是不是应该传入的是我们卡夫卡的那个topic列表啊,刚才大家看到那个参数了啊,这里现在又不显示,我们这里一般也是会给他一个这个类型的定义string,那后边我们是不是先要把这个topic列表传传输进进来,那这里边我们只有一个列表,那把它还是转成一个啊。
17:09
里边从哪里去取,是不是config里边去拿,Con里边我们已经有叫卡夫卡点,呃,Topic对不对,定义好的那个recommend recommend对不对,这里对吧?好,呃第这是subscribe的第一个参数,那么它第二个参数还应该传什么?是不是还应该传入一个卡夫卡的这个parameter配置项,对不对,我们把这个传入的话,这个在呃,我们调用这个create direct stream这个方法的时候,是不是就直接会根据我们的这个parameter创建出一个消费者啊,出一个对不对,然后就订阅了我们对应的这个,那么到时候如果卡普卡那边有了数据来了数的话,我们这里是不是直接就可以获取到了,诶这样的话就把这个数据流就打通了啊,这部分大家应该是之前都讲过啊,如果大家有一些忘记的话,那还需要再好好的回忆一下。
18:16
然后接下来我们是不是就可以去把这一个流去做一个处理了,对吧?呃。转,把原始数据转换成平分六,那我们想要的一个数据结构是什么样子呢?大家想到我这里边来的这个数据,呃,应该是一个什么样的状态呢?我们默认啊,就是来的数据是这样的一个一个状态啊,就是都是用竖线分割的一个状态,一个评分数据,大家可能想到我们的那个,呃,本身的评分数据是包含一个UID,一个mid,后边是不是还有一个score啊,Score最后是不是还有一个time step呀,那这是我们当时的一条评分所有的数据对不对?所以我们收集日志的时候,是不是就希望收集成这个样子啊,那现在我们拿到。
19:25
数据是不是还应该把它拆开,然后保存成我们想要的那种处理的数据结构,是不是这样的一个一个做法,所以我们先做一个预处理啊呃,这个不要放在后边,放在后边大家容易误解这个是我们本身的原始数据,对不对?那这里我们再定义一个叫做RA stream吧。呃,那大家会想到它应该是不是基于前面定义好的卡夫卡stream,就是我们的这个stream对吧,去做一个操作对不对?我们可以直接在这个stream上面加,呃,使用这个map算子,那么是不是可以对这个流进行一个转换操作啊,这就相当于我们定义了一个流计算的过程,对不对?那么这里边的map我们首先是做什么呢?呃,其实就是说每来一条message,是不是要把它做一个转换啊,我们转换成什么呢?
20:29
大家会想到我定义一个就是要把这个message里边所有的东西是不是要拆出来啊,那肯定还是要message split,对对不对,Split。呃,这个message好像不能直接split,因为大家想到这里边我们拿到的是一个,呃,就是评分流里边的这个value对不对?呃,我们要拿它的值的话,还需要去value对不对,Value,然后再split啊,那大家想到这里边我们又要去传入这个,呃,就是我们对应的那个字符,我们现现在是不是想要用竖线去做分割啊?呃,然后在这里我们是不是还要做一个转移,因为大家知道在正则的表达里边是不是竖线,还有这个像我们之前的这个上间号啊,呃,还有一些像这个星号加号是不是都需要做转移啊,所以这里边还是做了。
21:28
另外双反斜线也要转移,是不是双重反斜线,双反斜线对吧?先把这个提取出来,我们的这个attribute提取出来,最后我们要得到的数据结构是什么样子呢?是不是其实就是这里边的u ID midd,还有SC,这是不是就就完事了呀?Time STEM去掉对不对?那这里边我解析出来之后全是string,我们是不是应该把u ID mid转成对,呃,我们我们应该把它是不是转成int啊,所以ATTRIBUTE0,然后我们应该把它to int调一个to int方法。
22:10
然后ATTRIBUTE1,应该把它同样to对不对,然后ATTRIBUTE2,对,这个时候是不是应该to double好,这是我们的一个呃计算的方式。呃,那当然了,这个这个时候大家如果要是说还想把呃,我们后边的这个就是这个这个时间也保存下来,时间戳也完整的保存下来的话,我们这里就只做一个切分也是可以的,对不对,大家想愿意做的话,那我们就把H93也拿下来啊点那这个是不是也是int啊,因为我们知道呃一个INT32就可以搞定它,然后接下来其实就是我们的继续做处理,对不对啊。
23:05
继续做。流失处理,那这里其实就是我们的核心部分了,对不对啊,核心算法部分,核心实时算法部分,那这里边我们要做什么操作,是不是对这个rating stream stream应该去做一个操作了,大家习惯的这种,呃,就是对它这个呃,转换输出的这个操作用的是什么?是不是一般我们有有一个算这个操作符叫做for each r DD啊这个大家应该还记得吧,大家回忆一下之前我们的这个用法啊,因为大家知道我们这里边拿到了这个re,相当于是什么东西呢?大家可以想象的到,我们d stream这里边一次一次进来的东西是不是都是一个时间窗口内的一组数据,对不对啊,所以是一组RDD,所以这个for r DD我们这里拿到的是什么呀?我们这里拿到的其实是,呃,所有就是在在这一次处理过程当中对应的这个RDD1组DD对不对,所以我们相当于拿到的是这样一个东西,我们要对它做一个处理,对吧?那然后大家想到这个这一组RDD,我们是不是还是继续可以去做一个对for each对吧?呃,可以做一个便利,然后里边是不是拿到的就是对应的这个RDD的,呃,具体的内容啊,那当然我们接下来就可以做模式匹配了,模式匹配这里边到底拿到的是什么呢?是不是就是前面我们。
24:57
你好的,Uidd m mid score啊,当然了,上面我们三还有那就是time step对吧,把这个都写出来,接下来我们就可以真正去做,呃,就是具体的操作了,对不对啊,这里就不用不用弄起来了,因为大家会发现接下来是不是就是一步一步的流市走,就是流市处理的步骤啊,我们接下来首先进来之后啊,大家会想到进入这样的一个地方来,我们是不是应该有一去控制台输出,大家看起来会明确一点,知道这里边真正的来一些来了一条数据对不对,要不然我们这个控制台没数据的话,我们都不知道试验的成功不成功,好那么我们这里比方说就叫rating data coming吧,好呃,大家为了更明显的话,我们多加几个这个这个箭头对吧,可以看到的这个比较明显一些,然后接下来我们就分步做计算了,做什。
25:57
怎么处理呢?是不是首先应该步骤来了啊,首先应该做什么呢?是不是从red里获取什么获取当前用户最近的K4评分,对吧?啊,这个数据是不是我们需要的呀,然后我们把它保存一下,就保存成一个一个数组吧,Array,那么这个array里边大家想想需要有什么样的数据结构呢?每一个元素应该是什么样的呢?对应的评分,那是不是应该有mid啊,对应的是不是还应该有它的一个评分值啊,哎,所以最后我们其实要要存的是这样的一个结构,好,我们先放在这里啊。
26:53
好,这里我们还是把这个括起来吧,要不然的话。
27:00
要不然的话,我们在这里好像他这个总是对不齐的样子,第二步我们做什么做什么处理。接下来是不是应该要拿到备选电影啊?我们什么是备选电影呢?是不是要从对相似度矩阵中?取出当前电影最相似的,最相似的N个电影对吧,作为备选列表,好,那这是我们想要做的一件事情,呃,那大家会想到就是在在这一步里边,我们拿到这个备选列表,它的数据结构应该是什么样呢?那是不是列表的话也用一个数组比较好表达啊,那里边是什么东西?备选列表这个我们是不是就不关心它的评分了啊,在这里就完全没关心评分了,因为大家知道这个这个相似度它的那个评分是什么,其实是一个相似相似度评分对不对?所以这部分我们当时都选的是大于0.6的,我们就不需要再考虑,只要它符合这个标准,现在就不需要再去做判断了,那这里它里边是不是就是只有一个mid就可以啊。
28:28
好,这是我们一开始需要的基本的数据,这两步,然后接下来第三步我们做什么事情呢?嗯,计第三步是不是就是对每个备选电影是不是要计算推荐优先级啊对,按照公式了,对吧,优先级。然后最后我们是不是要得到一个当前用户的实时推荐列表啊,对吧?啊,当然了,这里如果大家不想太复杂的操作的话,我们也可以就把它保存成一个array就可以,然后后面我们要保存到mango的时候,再把它做一些转换是不是就可以了啊,这里还是返回一个array吧,大家想这个推荐列表这应该包含什么,这是不是就就是我们这里定义的这个recommendition这样的一个列表,是不是就是这个ras啊,对吧,其实我们要的就是这个东西,所以这里边其实我们最后拿到的是不是里边应该有一个mid,有一个score啊,所以还是一样的一个数据类型,这里把它写出来,Mid score,最后一步。
29:51
是不是把推荐数据保存到mango DB中对吧?这就是我们的主要的四个步骤,呃,那大家可以想到就是这几个步骤,如果呃,我们按照这个都都做完的话,那接下来其实就就是我们的核心的这个流失处理就已经搞定了,对不对,这就已经完全的定义好了,那接下来还有什么事情要做呢?
30:25
接下来是不是就是我们常规的这个SSC,应该要是不是要启动要start啊,所以这里边其实就是开始接收和处理数据。呃,当然了,然后大家习惯应该后面还会跟一个叫什么wait termin termination对不对,表示我们正常来讲的话,就一直在这里处理,一直等着,对不对,除非出现这个我们的手动结束,或者说是这个异常状况的时候才会去停止,呃,那当然了,这里大家可能会想到有一个问题啊,就是我们在后面去跑的时候,这个过程当中是不是会不停的有iner输出啊。
31:12
所以我们看这些消息的时候,可能会有很多的iner,导致我们想看的数据都看不到,对不对,那即使这里边我有这么一个明显的显示,那这里面如果要是简单一些的话,我是不是这里可以去打印一条数据啊。比方说我这里叫啊,就叫streaming started对吧,然后有了这种保证之后,我们是不是这里就可以不用把它的英分全输出了。我们是不是可以让他输出的简洁一些,只要是这个警告以上才才输出就可以了,对吧,这个是可以的啊好,呃,那大家看到这其实就是我们整体的这个代码框架啊,这是这个main函数,我们看一下大括号没问题吧,好。
我来说两句