00:00
呃,然后我们接下来要做的也简单,就是根据这个一项一项写就可以了,对吧?呃,首先我们第一个是历史热门统计。大家会想一下,就是这个历史热门统计,我们在做的时候,其实是不是就是一条CQ就搞定了呀,我们从哪个表里边去选数据呢?我们是不是要从ratings里边去选数据啊,最后选出来的数据结构是一个什么样的结构,是不是就是一个mid,然后对是不是还有一个count啊,对吧?所以大家会想到呃,最后我们其实就是一句SPA,直接把它筛出来是不是就完事啊,所以我们定义一个比方说这个叫呃,它本来这个叫more movies,我们叫RI more movies df吧。
01:01
因为大家知道这个Spark CQ返回的应该默认是一个DF,对不对,好,那么然后大家把这个Spark CQ写出来,里边应该怎么写呢?Select。Mid对吧,然后还有什么呢?对,大家会想到还有这个count,这个count是不是得针对,哎,我们是不是要按照那个评分的数量,按照每一个mid,我们是不是对它的评分数量要做一个group by啊,然后统计它的数量对不对,所以我们这里是不是count't谁的数量呢?啊,平分个数啊,当然我们可以去统计平分个数,其实我们这里统直接统计mid的个数是不是也是一样啊,是不是一样啊,这里其实就是说只统计个数,跟具体的评分没关系,对不对啊,所以这里是一样的啊,Count mids count,因为我们最后返回的字段就叫count,对不对?然后这里边from ratings对不对,然后还要做一个什么操作呢?不要忘记group by mid对吧?所以就这么一句,直接把它搞定就好了,如果我们把这个data frame已经拿到的话,接下来是不是应该把它写入到mongo对应的表里边去啊把。
02:28
结果写入对应的mongo DB表中,呃,那这个过程大家会想到,呃,我当然可以就是说按照这么一串,就我们之前写的那个就是呃,Data frame它的那个writer那那种方法对吧,写一串,但是大家想到后边我们是不是每一次写入的时候都是这么一串的话,是不是有点有点冗长了呀,我们是不是想把这个代码做一个提取,所以我们定义一个函数吧,还是所以比方说我定义就像上次一样,我们定义一个day。
03:05
呃,Store df in mongo DB定义这样一个函数,那么这里边大家会想到我是不是应该把rate啊,应该把得到的这个data frame要写进去,对不对?然后还要传什么参数?哦,诶,非常好,大家想到配置是要传的,但我们前面定义了那个影视参数,我们是不是用影视的方式传啊,这里调用的时候就不用传了,对吧?那是不是还需要有表名啊,因为大家想到我们既然要复用这个函数的话,下边是不是每一次调用的时候数据不一样,表名也应该不一样啊,哎,所以这个我们是需要去传的。好,那这里边它的表名应该是哪个呢?我们定义的时候就把它定义成一样的这个名字了,所以说这个好找,对吧?More movies,哎,这样就可以了,那这里我们要把它做一个实现了,对吧,Df store dfin,呃。
04:06
当然了,这里他直接把这个弄出来了,我们还是把这个DF,它是不是直接就用这个对no直接就用data frame就可以,然后后边这个我们还是名字给的明确一点啊,叫collection name表明对不对,这是一个string,然后大家会想到后边是不是还应该有影视的配置参数啊,啊,那这个配置参数我们给的叫monggo config啊把这个传入。没有返回值对吧,直接写入就可以了,那这里边的写入是不是就跟我们在一开始这个data loader里边的写入是一样的啊,是不是调这个data frame writer这个方法对不对?返回这个这个data frame writer.write,然后点option,那么这里边我们用到的首先uri这个不变,从conig里边拿对不对,Conig,然后继续option,我们是不是要指定collection啊,Collection用什么?是不是得用传进来的这个collection内,哎,这个大家知道是怎么做的就可以了,然后我们写入的时候是不是定义一个mod是overri。
05:29
然后后边是不是要做form了,那这里边就还是啊,com.mongo db.spark.cq最后对C搞定,这是不是就搞定了,接下来我们每一次想要往里存的时候,是不是直接一行就把它存进去完事啊,哎,所以这样我们就可以复用这个代码了,这样就舒服一点,好那接下来我们继续做,那接下来是最近的热门统计,对不对?我们的思路是什么呢?大家回忆一下,我们在这个PPT里面,其实把这个思路都已经给大家做过介绍。
06:09
我们的思路是。景琦啊,我们是不是先要定义一个udf,就是用户自定义函数对吧?我们先把这个time step是不是要转成想要的那个格式啊,我们转成什么样子了?是不是要转成YYYYMM这样的年月的格式啊,最后我们是不是按照这个格式去做group y选出来的就是每个月里边的评分个数评分啊,然后我们再去根据评分个数做一个统计是不是就可以了?哎,所以这是我们的一个基本思路,那大家看我们一开始是不是先得定义出对应的这个呃,Udf来啊,所以这里边我们要做的事情是,呃,首先我们定义udf,就是注册udf之前,我们是不是先得有一个呃日期的格式化工具啊,因为大家会涉及到我们那个YYYMMN这样的一个日期格式对不对,所以我们先定。
07:12
一个simple data format,这我们就直接拗一个了,对吧,Simple,诶这里边还没有啊,我们直接得写simple data ma,我们直接用Java里边的这个这这个类就可以,对不对,大家会想到这里边给的是什么,对YYYYMM。所以这里我们是做的什么事情呢?这里做的是呃,创建一个日期格式化工具,对吧,然后接下来要做的事情。是不是注册udf啊udf我们的udf主要的目的是干什么呢?把时间戳转换成呃想要的这个YYYYMM是不是就是年月格式好那大家会想到,呃,这这个就不是不用定义变量了,对吧?Spark点呃我们要注册一个udf,所以调udf的register方法对不对?
08:28
啊,然后里边我们传什么呢?首先是不是要给一个他的name啊,我们这个叫changer date后边。后边是不是我们要做的这个函数操作,呃,那这里我们定义一个匿名函数,大家会看到这个我们要传入的可能就是本身的这个这个时间戳对不对,时间戳是不是一个int类型啊,好,然后我们呃匿名函数这种这种表达式对不对啊,当然了,我这里边可能就一行直接返回了,所以直接直接返回就可以,我们要调什么,是不是要调用前边的给出来的格式化工具的,是不是要调它的format方法啊对,那么这个format的时候,我们format哪个数据呢?大家会想到我是不是得new一个date啊对吧?得得用这个Java里边的这个,呃,Date这个类,那大家会想到这个日期,好,我把它先。
09:29
我们用Java,对。好,那这里边我们应该给什么东西呢?我们是不是大家会想到,那你直接把X传进来就完事了嘛,但是大家这里注意new date的时候,这里边传入的这个,呃,我们的这个时时间戳对吧?这个值应该是一个以秒为单位的,还是一个以毫秒为单位的,这里边应该是以毫秒为单位,对不对,而我们本身这里边的这个评分数据,这个大家觉得这这是秒为单位还是毫秒为单位,秒秒大家如果。
10:08
呃,大家不太熟悉是吧,那我们比方说大家知道这个在我们平常比方说Linux环境里边怎么看当前的这个时间是吧,比方说我们知道有date这个命令对吧,那么怎么样把它能转过去呢?是不是可以用这样的一个格式把它做一个转换啊,那我后边这个百分之S,那这是不是代表秒啊,大家会想到这是不是一个秒数啊,如果我们不信的话,我再看一眼,看看它变化的,大家觉得这是秒还是变化了十几,十几十一秒对不对,大家可以看一下我们这个两次操作,这是不是变化了一秒啊啊对,这个还是很容易去去看到的一个效果啊,啊或者就是大家如果对这个比方说对这个1JS比较熟的话,JS里面是不是也有对应的工这个,呃,Date这个类,我们可以去看到它对应的当前的这个时间戳啊,比方说我们调date的。
11:08
Pass方法对吧?诶大大大家知道这个ES里边如果要是看当前时间的话,是不是new一个date就可以啊,大家看它下面都已经显示出来了,对不对,标准时间对吧?啊,那我这里边如果要是想把它转成时间戳的话,就是调date.pass方法,把这个传进去,大家看这就是当前的一个时间戳,那这个时间戳大家看这是什么,这就变成了毫秒做单位对不对,后面是不是有三个零啊啊所以大家其实可以知道,我们平常一般在这个系统里面或者工具里边拿到的这个,呃,就是如果不特定的指定的话,一般都是秒数对吧,如果要是拿到这个毫秒数的话,后面都是三个零对不对,那这个明显位数是不一样的,我们这里边拿到的是不是都应该是秒数啊好,那接下来大家会想到诶,Statistics在这儿啊。
12:08
那既然我们这里要的是毫秒,本身是秒,那怎么办呢?大家想到那加三个零不就完了吗?是不是乘以1000啊?这里大家注意一下,乘以1000还不是直接就完事的,因为大家想到X,当时我们给的是什么int类型,乘以1000之后大家想一想,当时我们说它的这个大小为什么给了一个int,是因为它是不是在20亿之内啊,它只有十几亿对不对?那乘以1000之后它还在20亿之内吗?Int的取值范围是不是正负20亿之间,20多亿啊,大概是20亿对吧?那所以这里我们是不是就不能还是用int类型啊,所以是不是要把它转成long,怎么样简单转成long呢?当然可以那个把它那个做强制类型转换对吧?那我们这里既然是有个乘法,我让它乘以一个long类型的整整数,是不是本身的结果就得到一个long了,哎,对,所以这个就是一个很简单的一个处理啊。
13:15
那当然了,最后大家想到我这里边最后拿到的东西是不是还是应该给他转成int,我们统一去做处理啊,对吧,因为最后拿到的,呃,我们这个change date之后,最后转换回去之后,还应该是按照我们标准的数据类型去做处理的,我们统一把这个时间都定义成int就可以了。呃,因为大家会想到这个转换成年月之后需要用浪吗。年月是不是最多就是六位啊对吧,YYYYMM嘛,所以这个肯定不需要用浪啊,我们就把它to再转回去。好,我们定义了这样的一个udf,那按照我们的这个步骤,接下来该做什么事情了,我们是不是用大家会看到啊,我是不是应该用调用这样的udf,先把我们当前的这个数据做一个预处理啊,先把这个时间戳是不是转成我们想要的数据格式YYYYMM对不对?然后大家看这里边select的时候,本身的评分数据是不是还有UID啊,是不是他选取的时候还把UD给去掉了,哎,所以其实这是做了一个数据的预处理,最后把这个是不是还保存了一个临时表啊,哎,这是我们接下来要做的事情啊,所以呃,对原始数据做预处理,呃,去掉UID对吧,选取。
14:54
对,选取出来之后去掉UID,那么我们这里边可以定义一个比方说叫做RA,呃,这里边叫break of year month,对吧?年月的一个表达,对应的我们的一个评分,那么是不是写一条Spark CQ就可以了,这里边我们要的是select。
15:21
要的是不是mid,还有评分score score啊,小写,然后还有一个是不是时间戳转换之后的那个时间啊,那这里边是不是就是调用我们的udf就可以了,里边的内容是不是time step time step,时间戳,对吧?把它传输进去,传到传给我们这个udf,得到的是不是就是YYYYMM这样类型的一个时间,对吧?然后我们要的就是这三项,那当然,呃,这里把它as,我们给一个名字叫做year month吧,年月的表达式from ratings选出来。
16:05
然后我们把把它保存成一张零食表,Creating of year month.create or replace temp view对吧?那这里我们给一个名字叫做啊,我们就叫RA a month吧,简单一点写啊,那那大家会想到在接下来是不是我们从这个表里边就对它做一些group by,是不是就把我们想要的那些个数就能提取出来了,提取的方法是不是跟上面这一个统计是类似的一个过程啊,所以我们这里边就直接可以定义出来,最后我们想要的一个data frame是叫,呃,大家看还是跟表名对应,我们叫rate more recently movies,呃,加一个DF。然后它就应该等于写一条Spark CQ,诶我们应该怎么样去选呢?最后是不是还是要拿到的是什么东西呢?是不是一个mid,一个是不是还是一个count,对不对,大家想到另外还应该有一个什么来我们这里写啊。
17:28
从of months中。查找。电影在各个月份的统计,评分统计对吧,评分,那然后我们是不是最后得到的应该是什么呢?是一个mid加一个count对不对?一个数量,最后是不是还得跟上月份啊,那你要是没有这个月份的话,就不知道它到底是到底是哪个月的评分了,所以我们最后得到的这样的三个值,那这里我们写的时候是不是还应该有一个count啊,Count什么呢?大家会想到是不是还是我做rule by之后把mid做一个统计就可以了,哎,所以还是做了这样的一个操作,那当然这里是as count对吧?最后返回的是一个count,还需要选什么出来呢?是不是还有year month啊,这是本身的字段,所以说我们直接拿出来就可以了,From哪里writing of month。
18:40
对吧,临时表里面啊,接下来大家会想到我,我需要怎么样去统计这个count呢?关键就是这个count不好统计,对不对,我是不是要做一个group by啊好,那这个group by。Group什么?大家会想到我肯定要根据mid去做做group对不对?另外我还需要根据对时间年月去做group by,大家想到这两个group by的时候,我哪个放前面,哪个放后面呢?
19:18
大家会想到我是不是先应该根据年月?去做一个这样的一个group by聚合起来,然后再去每一个年月里边再去统计它的数量啊对不对,再去按照mid去把它做一个聚合,而不是说根据每一个mid,然后呃,当然大家如果要是想要用呃那种方式也可以,因为我们最后得到就是一个m mid对应一个count嘛,对吧?啊,这个大家都可以啊,但是如果从直观的想法感觉应该是按照年月先排出来,这个更好理解一点,对不对啊好,这是我们这里写的就是by year month和mid,然后后边大家会想到我是不是还可以去加入。
20:04
排序啊,我最后的排序是不是应该根据根据大小要降序排对不对,根据什么大小排呢?那这里就是order by了,对不对?Order啊,大家会想到首先我是不是先要根据。先根据count排,还是先根据年,大家会想到是不是年月应该排一下呀?因为我们要提取最近的对不对?到时候我们要推最近的,那是不是大家想到我是先找出所有的评分里边次数最多的来,然后在所有评分次数最多的里边去找年,年月离得最近的呢?还是说我先统计出来离现在最近的,然后从离得最近的里边找最最多的,是不是还是应该先找最近的呀?对,这个词序还是有一些关联的,对不对?因为我们到时候如果查这个数据的时候,我们从一开始往后选,是不是应该先选择最近月份的那些评分数据啊?
21:11
啊,我你我不是要找最多评分的,最多评分呢,变成历史评分了,对不对,历史统计了,所以我们现在要找的是最近的,我是不是最先应该是order by year month啊,对吧?那当然year month按照什么升序还是降序呢?对,DEC后边还应该根据count去做一个auto autobi对不对,这个是不是也是降序啊,啊DEC好,所以这样一条就把它搞定了,那当然这个写完之后,我们是不是可以把它存到mango里面去啊。存入mango DB,我们就直接调store df in mango DB这个方法了,对不对?写法跟前面完全一样,Read more recently movie df对应的那张表叫,诶,我们的那张表呢,Mongo,哦,不是不是mongo打头的对吧?就叫rate more recently movies,对不对啊,我们直接把它存进去就可以了,好,这是这部分内容,好,接下来我们还有一个平均评分统计,对不对?那这个平均评分的话,其实对于我们来讲应该是最简单的一个统计了,对吧?那这个不需要有任何的问题,我们是不是根据mid做一个聚合,然后把所有的那个score做一个平均数计算就完事,对不对?那大家想到一条CQ直接搞定,所以我们这里把它定义出来就可以了,Average。
22:46
这里叫average movies df,呃,那我们就是用sparkq,是不是select,这里边我们要的是什么东西呢?是不是最后拿到的就应该是一个m midd,还有什么,对,还有一个对不对,我们就叫avg好了,Select mid,然后还有什么,是不是用avg去算一下谁的avg啊,对,是不是平分,平分那个那个列我们叫score对不对,As avg我们把它算出来,从哪里,从rating里ratings对吧?另外我们要注意做一个对group mid对不对?按照mid对每个电影它所有的评分做一个聚合,聚合出来之后对所有的。
23:47
求一个平均数,这是不是就搞定了,然后得到的内容是不是把它store df到我们的mongo DB里面去,那这个存入的表叫average movies,这样是不是就存完了啊,所以这个就非常简单啊,最后的这个各类别电影的top统计可能稍微会复杂一点。
我来说两句