00:00
首先是这个历史热门统计,我们的思路其实就是统计历史。评分数量,然后按照这个大小排序,最前面的是不是就是最热门的呀?啊,这个是很简单的一个思路,这里边我们就直接定义一个data frame,我们叫rate more,就跟那个表名定义成一样的格式啊,最后写也好写rate more products df,那就要写一个Spark CQ了,是不是因为大家知道这个SPA,这个CQ返回的是不是就是一个data frame啊,所以我们把它定义成这个DF,后续我们调用frame的writer方法啊,这个frame writer,对,直接再写回去写,写回到mango就可以了。好,那接下来我们看一下这个怎么写呢?这C怎么写呢?按照评分的个数,哎,那是不是就得根据每个商品的ID要做一个group by,然后再统计它的数量对吧?呃,最后我们可以做一个降序排列,所以整个写出来就是select,最后我们要的是什么呢?对,最后我们要的是product ID,还有一个什么对,统计个数我们要一个count对吧?最后我们要的是这样的一个形式,所以这里把product ID拿出来,还有一个是统计的对count,那我们是不是在group by之后,聚合之后可以用这个count来做一个统计product ID,大家会想到是不是直接统计聚合之后的这个product ID就可以啊?
01:49
对吧,同样的这一个商品,这个做的所有的这个评分,有几个product ID出现,那么他就被评分了几次,所以在这个过程当中,我们是不是对这个具体的那个评分数值根本都不关心的啊,只要统计它出现的个数就可以了啊,那么这里as count,我们最后把它就叫做count啊,那么呃,从哪里去选呢?From对ratings前面定义好的零食表,然后group。
02:22
呃。ID啊,那当然了,我们可以order by。Count DC对吧?这就是我们做这个评分个数统计的一条CQ,这个拿到对应的data frame之后,是不是可以直接把它写入到mango里面去了啊?这里我们统一定义一个写入方法吧,Store data frame in mango DB。呃,当然这里我需要去传入,对传入这个要写的数据,还要传入,你要往哪张表里面写对不对,你还得传入表名,这里边我们就是名称完全对应的啊,More products df就写到more products里面啊,这个就一一对应没问题,呃,那我们还是先把这一个来做一个实现吧,Store。
03:23
这里面的参数。其实就是前面是一个data frame,一个DF,对吧,我这里就不用CQ里面data frame,直接引入spark.data frame就可以,后边这个string我应该传入的是一个表名,所以这里边我们叫对collection name吧。然后写入到mango里边,我们还需要什么来着,还需要mango的连接配置,所以这里边我们还是借鉴之前的写法,是不是可以定义一个影视参数啊?呃,把这个mango config作为影视参参数传传入,那接下来没有返回值对不对?呃,这个值直接写,写完就可以了。
04:15
所以里面具体实现怎么写呢?就是DF,它是不是会有一个对right。然后我们定义这个writer,它的writer里边可以有一些配置选项的定义,Option。呃,这里边是不是uri还是一样的写法对吧?呃,然后mango config.uri。同样我们定义这个,哎哟,Option可以定义。这里的collection就要用到传进来的collection内,对,这是我们要写的这个表名,然后写入,对,可以给一个overri,是不是overri,呃,下边诶,注意还有format对吧。
05:07
com.mango DB spark.cq最后是对C,这样是不是就搞定了啊,所以其实是很简单的一个操作啊,我们把它完全提出来,里边用到的其实就是一个是数据,这个data frame,一个就是你要往哪张表里边存,另外还有mango的配置信息,我们把它提成这样的一个函数,方便调用就可以了。嗯,好,那么前面也就不报错了,这就是已经把这个第一部分统计实现好,那接下来我们看一下这个第二部分近期热门商品怎么样去实现呢?当时我们的思路其实是,哎,就是这个近期我们就按照时间戳来排,我们是需要把它转换成一个年月的格式,然后年月如果作为一个整数的话,它的大小排序是不是就是从近到远的一个近期排序啊?
06:07
呃,大家会想到我们现在是呃2019呃004对吧,那么所以如果是最近的一个月肯定就比啊之前的一个月201903要要要更加的这个,呃,这个数值要更加的大一点,我们做一个降序排列就可以了,所以这里边的近期热门,首先我们要做一个时间戳的转换啊,这里就要定义一个udf了,呃,所以在做这个定义udf之前,我们先创建一个日期的格式化工具。这个大家应该还是比较熟悉,应该用过的,对吧?呃,Simple data format,我们去new一个simple。
07:02
Date format这个Java里边有这样的日期格式化工具,我们直接用一个,这里边需要定义它的格式,就是YYYYMM。哎,有了这个之后,接下来是不是就可以去注册udf了,注册udf。将。Time step转化为。呃,年月格式YYYYMM就是我们这里说的对吧,所以这个udf。怎么样注册udf来着,是不是要用Spark Spark session里边是不是就有这个udf对吧?然后去register去注册一个它的名字,这里边就叫做change date。然后大家看后边是不是一个function,应该要去做一个实现了,哎,我们这里大家可以看到啊,就是在这里边我们定义了这个UDF之后,其实就是在之后就是要用这个UDF去做转换的,所以我们这本身就是一个一个函数。
08:19
呃,后边就是我们要把这个函数写出来了。呃,那这里边我们是不是可以去定义一个匿名函数啊,这里边比方说给的这个自变量,我们是就叫X啊,参数叫X后边拉姆的表达式,因为函数对吧,主要我们用到的是什么呢?应该就是。前面定义好的simple date format做一个转换,把X做一个转换是不是就可以了,呃,对,它要点format对吧。里边是不是能够想到把X传进去做一个转换,那么这里要format的其实不是X,因为X对于我们来讲只是呃,UEF那个时间戳,只是一个int数据,对吧?这里边我们是不是form ma得form一个date呀,对,所以要new一个date,大家看到前面这里边提示的这个数据格式应该是date啊,所以我们要去new一个date,这里边应该是把X传进去就可以,对不对?大家想到是这样的,但是这里大家要注意啊,我们先把这个date引入。
09:31
呃,我们用这个Java里面的date就可以。大家看一眼这个date里边的参数应该是什么数据类型的,对应该是一个long。而且大家会想到他这里边啊,其实就是要用这个KA的time是那个毫秒数对不对。要用毫秒数作为一个long数据类型的这个整整数传进去长整型,那这里边我们本身的X大家知道是什么吧,对,是一个int,而且它是秒数还是毫秒数呢?哎,这这这个大家有概念吧,像这里边这个时间戳是秒数还是毫秒数,呃,大家如果不确定的话,呃,我们找一个。
10:21
大家可能知道在Linux环境里边是不是有一个date命令,直接可以看当前时间,呃,那如果说我要看当前的这个以秒数毫秒数的方式来看的话,那可以。这样啊,加百分之S,那大家看一下这个数据的长度,跟我们这里的这个时间戳是不是一样啊,都是十位对吧,都是十位,那直观的看的话,大家觉得这是秒数还是毫秒数,对这这明显是一个秒数,所以这里边我们本身有的评分数据,这个时间戳都是秒数。用int类型保存成的一个秒数,那这里边要的是long类型的一个毫秒数,那怎么办呢?哎,对,大家想到了,乘以1000不就完事了吗?所以乘以1000。
11:11
然后呢,你乘以1000,它也不会自动转换成这个长整型啊嗯,对,我们做一个强制类型转换,那更简单的一种方式是,是不是对直接把1000变作为一个长整型成相乘之后,是不是自动会把它的数据类型转换成一个长整型long型啊啊,这个就是比较简单的一种写法啊,呃,那这里需要注意的是,我们把它做了这个format之后,最后是不是给回去的YYYYMM,诶,我们是不是还应该定义成一个in inter类型啊,所以还。大家注意啊,这里传进去之后是做这个format的,Format之后得到的结果是什么?对,得到的是一个string对吧?所以我们如果要去按照这个数值大小做排序的话,是不是再把它做一个转换啊。
12:07
然后把它转成int就好了,这就是我们定义好的这个udf,好,接下来按照标准的流程,是不是又要去生成一个临时表,对吧?呃,我们把这个这里啊,把原始RA数据。呃,转换。成想要的。想要的什么结构呢?呃,就是我们最后在这这张表里边,这张临时表里边想要的是product ID得有一个这个对吧,然后是不是还应该有。大家想想还应该有什么,是不是还应该有一个count啊,就是要去做分组之后做统计对不对?呃,最后还要有,呃,那这里边我们现在还不去做count统计,我们在最后才去做那个count的分组统计啊,这里边我们就从原始的数据里边提取出来对score,然后还有什么,还有这个time step,是不是要做一个对年月的转换啊,所以最后我们得到这个叫做year month这样一个东西就可以了。呃,那这里我们是不是去定义一个。
13:35
定义一个rating of yearmon,定义一个这样的data frame,它就用我们的Spark CQ去写就可以了,这是不是就是从本身已有的ratings表里边把这几项。直接提取出来就可以了啊,那这里想要的一个是product ID,还有一个是score,还有一个year month,我们是不是要做转换了,调用已经对生成的这个udf change date,它里边的参数是对time step。
14:20
As year month,这就是我们的from ratings,这就是我们想要的这一个张临时表啊,当然就是data frame,还要再做一个转换啊。A year,对,然后我们create temple view,这个就叫做RA of month好了。终于到了最后一步,可以按照这个年月,还有对应的这个product ID去做group by,然后我们去看一下是不是就得到了近期热门统计,哎,所以最后一步去写一个。
15:10
还是Spark CQ这里面我们得到的就是就按照我们的表名来啊,More recently。DF。Spark这一句select最后想得到的是什么呢?呃,这里边其实我们最后想得到的是不是也是一个product ID,然后对应一个count,注意另外还有一个年月对吧?呃,所以我们最终得到的是。Product ID count,还有一个month年月,这里边我们写的也就是product ID,另外就是count count什么呢?是不是跟前面的历史统计一样,我们也是只关心product ID啊,呃,就是把它做了group之后,统计它的个数就可以了啊,跟score其实没什么关系。
16:14
所以这里直接count product ID count。然后我们还需要什么?还需要year month。From RA a month。哎,从刚才我们的这个临时表里边把这几项选出来,然后做了这个,呃,我们要去做group对不对。不。这里边我们是先根据year month做分组,还是先根据这个product ID做分组呢?呃,对,我们先根据那个时间分成每一个月的数据,然后在每个月里边再去统计个数,对不对?所以这里边先month,后边是。
17:05
ID。啊,当然我们把它聚合起来之后就可以了啊,大家如果想要去还要做order by的话,当然也是可以啊。Year month。呃,这里面应该降序对吧?呃,同样我们把这个count也做一个降序,这就是我们完整的这个过程。最后把。DF保存到mango DB,我们已经有现成的函数了,直接调是不是就可以了?这里要传入的是对more recently df,以及它的表名完全一样的那个名字more recently,这完全搞定。大家再把这一部分稍微梳理梳理啊,就是可能稍微过程步骤有点多,但其实思路只要理清的话,就还是比较简单的。
18:08
好,呃,最后我们还剩下一个所谓的优质商品统计,这其实就是做这个商品的平均评分,这个就很简单了,我们定义一个叫做average,呃,Products df。呃,那么他同样去写一个CQ select,最后我们要的是什么呢?是不是就是一个product ID?然后加上一个对一个平均评分就可以了,对吧?所以这里我们写的就是select product ID,然后再上对平均评分,我们直接可以调用这个函数啊,它是不是对score进行一个平均数的统计啊,最后这里SAVG。
19:02
Froms,当然了,需要group对ID,当然最后我们可以根据平均评分再做一个降序排列啊,Order by avg de。做完之后同样调用存储到mango的方法,把data frame和对应的表名传进去,这样是不是就完成我们的这一部分内容了,这就是统计推荐这一部分内容,其实完整的做下来的话还是比较简单的好,那接下来我们还是运行一下,看一下结果吧。跑完之后看起来没有什么错误,还是到mongo里边查看一下当前的表格是不是写进去了,所以大家看这三张表已经都有了,多了三张统计表,也就代表我们的内容应该成功写入了。呃,那我们还是看一眼吧,Average product find,对,然后好看一眼,呃,内容已经完整的写入了,对吧?好,那还有另外两个表,其实另外两个表是差不多的,一个叫read more,一个叫more recently recently,可能数据多一点,看它吧,大家看它里边的数据就是有一个count,有一个year month,对吧?呃,就是这样的一个数据的结构。
我来说两句