00:00
现在已经把这个数据做了加载,而且已经定义了广播变变量,把它广播出去了,那接下来还需要做什么初始的这些操作呢?呃,大家会想到我们需要的数据应该是从卡夫卡那边拿来消费的,所以还需要去定义卡夫卡相关的那些配置参数,对不对?所以这里边我们需要创建。对,卡夫卡。呃,配置参数,呃,这里大家直接看一眼吧,其实我们在文档里边已经有比较详细的,这个就就全部都写在这里了,我们就直接用一个这个卡夫卡parameter定义出来,用一个map来表示,那里边用到什么呢?呃,BOO STEM service,呃,Local host的9092,默认端口9092对吧?大家如果自己起的卡夫卡这个呃,端口什么的不一样的话,把这个要做一个更改,然后后边呃大家会看到其实这些东西我其实都可以定义在前面的那个,呃,就是conig里边,然后在这里边把它包装成一个卡夫卡per meter也可以,对不对?所以这些东西大家都可以灵活调整啊。另外就是还要定义这些反序列化的这个,呃,这个这个类对不对?呃,这个key的反序列化和value的反序列化,这里边我们用到的都是string,呃,R,然后后边有group ID。
01:30
叫做recommend,还有一个这是什么?Auto offet reet,呃,就是我们的那个偏移量的那个设置,对不对?呃,Offet就是每一次从卡卡里边消费数据的时候,都是通过u keepper里边的偏移量来来去找的,对吧?所以我们这里边一般就定义成latest就可以了啊,这一部分大家还是复习一下卡夫卡相关的内容,我们这里就直接把这个copy过来了。好,呃,在这里需要把它。
02:05
大家注意要用这个卡夫卡common里边的,呃,Ization对吧,这里面的这一个方法好接下来。我们有了卡夫卡的配置参数,那是不是还需要?呃,接下来就可以创建一个input了,是不是啊,所以这里边我们去定义一个。Kafka。Stream,一个stream创建出来,那么它怎么样去创建呢?需要用到卡夫卡,对YouTube下边的create,呃,对direct stream,我们就直接创建一个stream啊,呃,这里边大家注意它是后面要跟上这个泛行的是不是,所以这里边我们就它的k value啊,我们用这个string都是string对吧?里边需要传什么参数呢?首先是不是需要传入。
03:06
啊对,要传入SSC,对stream context对吧?呃,然后接下来还有什么东西,呃,我这里边直接放到后边来写啊还需要什么呢?对,还需要location对吧,这个。策略strategy。呃,一般情况我们这里边用的是这个,呃,Prefer对吧,偏向这个连连续,然后接下来还有一个什么东西呢?对,是不是还应该有一个消费者的一个配置,对吧,Consumer。Strategy,呃,这个配置策略我们需要让他去订阅,订阅什么东西啊,这里还是有泛型。
04:01
String这个KY我们都定义成string的类型的啊,然后里边的这一部分是不是就是要定呃,定义它的那个订阅的topic啊啊,所以这里边需要是一个对,需要用一个array来表示。这里边topic是不是直接从con里边可以去拿出来啊,在这里边是定义好的啊,诶这里边卡夫卡点topic好,然后我们的这里边大家看这里还在报错啊,就是后边我们还少参数,还少什么东西呢?我们已经定义了这个topic,后边是不是还应该把对应的这个对参数perter啊,卡夫卡perter传进去。嗯。好,那么这传进来之后,这里就不报错了,所以大家会想到前面的这些配置参数,其实就是配置我们的这个consumer用的啊,那么如果已经有了这些配置之后,Driver就会直接根据这个呃,这里写的这些配置项subscribe去呃这里写的这个subscribe订阅好的这个主题topic,然后去创建对应的,大家看到这就是基本的这个创建stream的过程创建。
05:25
一个。好,呃,然后有了这样的一个stream之后,呃,我们接下来是不是要通过它去做一些处理,做一些,然后做一些计算啊,啊,我们先做一个也算是一个预处理吧,那这里边大家注意啊,对。呃,卡夫卡STEM进行处理产生。
06:03
评分流。啊,那这里面大家会想到就是,呃,在这个过程当中,我们得知道你进来的那个数据是什么样的形式,那本来进来的这个数据应该是什么形式呢?我们定义好它必须要包含四个元素,就是前面我们说的要有user ID,谁评分的对吧?其实就是跟我们这里边的这个RA非常类似的一种方式,就是谁评分的给谁的评分,这个product ID后边还有具体的评分值,有一个score,最后还有一个time STEM,一个时间戳啊,就是这样的部分,那我们具体产生进来的时候呢,会把它包装成一个用竖线分割的形式,所以这里边是什么什么状态呢?就是这样啊,User ID,然后product ID。后边是呃,Score,最后是time STEM,所以呃,根据这样的一个格式,我们拿到这个评分数据的时候,是不是应该把它做转换,把对应的这个呃,我们的这个user ID product ID每一个值提取出来啊,啊,做分词提取出来,所以这里我们就可以进一步定义一个RA stream。
07:29
评分流,那么卡夫卡卡夫卡stream好,呃,然后我们定义一些转换操作,这里边肯定要去map一下,呃,这里边可以,呃就是这里边拿到的应该就是我们当成一个message对吧?呃,去做一个这样的转换B,这里边我们需要去从message里边对。这个对象里边去拿到它的value,大家看到有这个value方法,然后去做切分,这个value我们定义好的就是一个string类型,所以呃,按照我们之前定义好的模式,它就是以竖线分割的一组属性,那么这里边我们是不是就要按照竖线去做切分啊,Split竖线切分,注意一下,诶,要做转移对吧?这前面我们也做过类似的这个操作,好,那接下来我们最后返回的是什么呢?
08:31
那就是切分好的东西了,ATTRIBUTE0,大家注意,本身这个切分好之后都是string,我们要的这个第一个属性是user ID,那是不是应该tot啊,后边ATTRIBUTE1是product ID是不是也应该to?第三个是score attribute2。那应该to double,对,最后还有一个time stamp attribute3,我们定义的时候应该也是T对吧?所以还是把它转换成int就可以了,好,那有了这个平分流的定义之后,接下来我们是不是就应该去定义?
09:18
真正的流失计算流失处理的过程了,对吧?呃,这里是核心算法部分。定义。评分流的处理流程,呃,那大家其实应该之前学过Spark streaming的话,这部分应该也比较熟悉,我们这里边的典型的一个应用就是直接RA stream.for each r DD,是不是就可以对它定义计算啊,这里边我们直接按照这个来做一个RDDS拿出来把它做一个处理啊,那是不是就是每一个,因为大家想到我们这里边的for each r DD拿到的是什么呢?其实是一个时间间隔内的一组RDD对不对?这里的for each r DD其实是一组RDD,所以接下来我们对这个对再去做一个for each,每一个做一个计算。
10:24
那这里边我们就可以做模式匹配了,配一下做一个这个元组的匹配啊,那大家会想到我拿到的应该是什么呢?这里进来的是不是就是这样的一个格式,所以匹配出来就是user ID product ID后边是score,还有一个time。好,呃,那么对应后边就是我们的操作过程了,呃,当然这里边为了我们后边做这个测试的时候方便啊,看到它的这个处理流程,我这里可以先打印一条数据出来啊,Print line,比方说我这里边就说这个RA data coming。
11:09
嗯,那这里可以我们加一个符号,看起来比较明显一点,对吧,接下来我们的处理流程大家会想到这里就todo啊。核心。算法处理部分,核心算法部分。流程,呃,那接下来的这一部分我们首先要做什么事情呢?是不是首先是我们还是给先先把这个注释写出来,首先是第一步做什么,是不是应该从我们有两个数据来源,一个是red里边的最近case评分,另外一个是mango里边的相似度矩阵,呃,对吧,那么相似度矩阵的话,我们已经放在前面的那个广播变量里边了,那后边我们是不是应该就从广播变量里边直接读出来当前。
12:13
评分的那个物品,它相似的物品啊,找到它的那个备选列表就可以了啊,这是我们的前两步啊,所以第一步我们先选这个red里边,从red里取出。当前。用户的最近。评评分,呃,然后我们希望把它保存成一个数组吧,呃,那么当然了,这个数组大家会想到应该是里边是一个什么什么样的结构呢?里边应该是我们应该保存一个元组是不是。
13:01
呃,这个这个数组里边每一个元素都应该是一个评分,那这个评分是不是应该包括一个product ID,然后还包括一个具体的评分值啊,所以我们应该是保存这样一个二元组的一个数组啊好,这是第一步,然后接下来第二步。按照我们的那个算法流程,就应该获取。呃,从相似度矩阵中获取。当前商品最相似的商品列表对吧?呃,然后这个就作为是不是作为我们的备选列表了。这是第二步,然后第三步还要做什么呢?
14:06
第三步做的是计算。每个。备选商品的。相呃,推荐评得分或者叫推荐优先级,对吧,就按照我们的那个模型来算。啊。得到当前用户的实时推荐列表,大家看,其实我们主要就是这样的三步啊,当然了后面还有第四步就是把。推荐列表保存到mango。DB,这就是我们的主体的四步,那这里边我们可以直接先把这个做一个,就是就都包装成一个函数吧,那这里边比方说我先去定义一个叫做user recently。
15:16
这样的一个变量,那么我们定义一个函数,比方说叫get user。RA。Ratings啊呃,那么这里面大家想想需要传进什么参数呢?我们最后要保保存成一个数组,那这个到底选取多少个,这里边我们前面是有定义的,对吧,有一个max的个数是不是就代表了到底要选取多少个评分啊,所以。这里边首先有一个叫做maps user number,我们把这个传进来,另外还有就是哎,对当前用户的最近评分嘛,你当然要把user ID要传进去。
16:05
另外还有什么呢?啊,那对应的是不是就还应该得把ras相关的这个客户端传过去啊,啊,所以这一部分啊,我们从。连接助手里边把JD传进去,这就是第一步,然后看第二步。第二步,我们要获取到一个备选列表,那么这里边我们就定义一个叫做candidate product这样的一个变量,那么它应该得到get top,我们还是定义一个函数啊,Top SIM products,选择它最相似的一个列表出来作为备选列表,那么这里边呃,我们应该传什么呢?同样还有一个最大的那个长度,对吧?这里是定义成叫same product number,然后后面还需要传什么呢?对,Product ID,好,呃,这里大家注意我们这个备选列表里边是不是还应该做一些预处理,做一个筛选啊。
17:16
因为跟当前商品最相似的列表里边是不是有可能。哎,是已经评过分的商品啊,所以我还得把当前用户评过分的是不是得筛掉。啊,所以这一部分我们是不是应该还得传一个什么参数进去。是不是得把user ID也传进去啊,要不然我怎么查当前用户苹果分的商品呢?哎,所以如果看到这一步的话,大家就会想到我们前面可能少了一个什么东西,因为当前用户评过分的商品,我们到哪里去查呢?啊。对,大家会想到,诶,我要不就是从red里边去查,但是red是一个缓存,它是有时效性的,不可能是所有的这些评分数据都保存在里边,那我们到哪里去查呢?对,是不是就得到RA里面去查了,所以前边我们是不是少定义了一个东西啊哦,在这里边表名是不是还应该定义一个。
18:20
我们应该把这个RA还是要定义回来,对吧。好,所以后边我们可能要对这个表进行操作,从里边要选取用户已经评过分的商品。然后大家想一下,这里边如果要选取最相似的商品列表的话,是不是应该把相似度矩阵也得传进去啊,如果我们包装成一个函数的话,呃,所以后边还要传相似度矩阵,这里边就是same product,是不是得把这个broadcast的对value得拿出来传进去,这就是这一步啊。
19:01
然后下边在做这个商品优先级计算的时候,我们可以最后得到一个,哎,这里大家注意一下前面这个备选列表,我们保存成一个什么样的形式呢?把它保存成还是一个数组就成了一个数组,那么这个数组大家会想到。里边每一个元素是什么数据类型呢?是不是只要有product ID就可以了啊,就是备选列表对吧?啊,这里他的那个具体的相似度我们已经不重要了,我们后边是跟那个最近评分的那些商品去做比对的,所以这里边只需要一个product ID就可以,然后这里边我们要得到当前用户的实时推荐列表的话,我们这里保存成还是保存成一个数组,大家会想到诶,这里为什么不保存成那个recommendation的那个列表呢?
20:01
因为最后我们要保存的过程当中,就不是用data frame writer去往里写数了,相当于我们是不是还要把里边的每一个元素包装成mango object mango DB object包装成这样一个对象,然后具体是插入到那个列表里边去啊。呃,对应的这个用户ID插入到他的那一行记录里面去,所以我们这里边只要用一个一个数组就可以了,当然这个数组里边的形式的话,还应该是什么呢。哎,是不是还应该是product ID。这是某一个商品,然后对应着一个对一个评分或者一个优先级,对吧,我们统一都叫score就可以了,所以它的这个格式还是一样的,那这里我们把这个就定义的叫做stream ras吧,最后我们要存到那个stream ras表里面去啊,Compute计算,呃,我们就叫计算product。
21:11
吧。它的优先级得分计算一个评分score出来,这里边我们需要传什么参数进去呢?首先是不是对每个备选商品算优先级,那我应该把这个candidate整个的这个列表传进去,对不对?然后我们计算的过程当中,是不是还要结合最近的K4评分去算啊,那个模型里面是用到了这个,所以前面两步的计算结果这一步都要用到,呃,然后接下来还需要什么呢?我们在计算的过程当中,是不是需要考察每一个备选商品和之前评分商品之间的相似度,那么这个相似度我是不是还应该去从相似度矩阵里边去去直接取出来啊,呃,所以这里边我还把之前的那个相似度矩阵的那个广播变量的value传进去啊,这就是我们的这三步,这三步做完最后把它存进去啊,这就不需要去再去定义变量了,直接我们定义一个,比方说save data。
22:20
To mango DB这里面的参数,大家会想到这里面需要什么参数呢?哎,对,大家注意,这里其实是需要user ID的,为什么需要user ID呢?对,你得知道我要插,现在是插一条数据,不是说直接把一个data frame直接完整的写进去就好了,所以我还得先查到那条数据,对它做更新,对吧,或者说我查掉这条数据,把它删掉,重新写一写一条,所以是这样的一个操作,那这里边。除了这条数据,呃,这个用户ID之外,我们就还要把对stream ras要写的具体的这个推荐列表写进去,那最后我们的操作其实就是把这四步对应的这四个函数实现,就完成了我们整个的这个过程,嗯,好,那接下来我们还是先把这个主体流程先跑完啊,那这里边定义了我们整体的这个实时计算流处理的过程,最后我们是不是应该还得有什么SSC是不是应该start啊,对吧,启动。
23:33
呃,这个。Stream。当然了,一般情况我们后边是不是还应该写一个wait termination对吧?呃,就是说呃,等待这个,呃用户的终止,或者说一些异常的终止,一般情况我们不会手动去终止的啊,就写这个就好了,呃,为了方便这个我们看清楚他是不是起来了,也可以在这里加一句打印输出,这里面我们说这个。
24:03
Streaming started。啊,这就是我们整个的函数的程序的处理流程啊。
我来说两句