00:00
现在我们还有最后一个案例,那么这个案例呢,就是通过会话日志呢,来统计这个用户的啊,总观看视频数,然后用户最长的会话时长啊,最短的会话时长,以及用户最后的一次呃,他的会话时间。把这个错的改一下。那么关于这个数据怎么来呢?实际上我们自己写了一个Python脚本。呃,他呢,会向卡夫卡里面去传递数据,然后关于这个卡夫卡集群的地址,还有这个,呃,我们说的这个主题,实际上你可以通过这个,到时候后面会演示,就是可以通过命令行的方式,像这个脚本里面传参啊,这个脚本呢,最后发送出去的数据是这样的,也就它是一个Jason。然后呢,这里面会有关于这个呃,绘画的一个ID,然后这个绘画呢,在这个绘画期间啊,用户看了多少视频数,然后这个这个绘画的持续时长,然后用户的一个ID,用户的这个年龄,然后用户所在的城市,然后这个绘画的开始时间和这个绘画的结束时间。
01:09
那么呢,我们要做的东西呢,就是用can诺来把这个卡夫卡里的这些数据啊,首先是有一个这样的这个数据生成的脚本,把这些数据发到卡夫卡,然后由我们的can呢啊,他来消费卡和卡里面的数据,并且把这些数据呢啊。这个呃,输入到这个我们的这个Doris里面,然后由Doris呢,来完成一个这样指标的统计。那么接下来呢,我们来正式开始这个操作。啊,那么首先呢,我们先去这个o Mo下面。啊,随便写个目录吧,比如说叫这个data。然后呢,我们四级到这个菲利里面。把这个我们自己写的这个。啊,伪数据的生成的一个代码给它发上来。
02:05
嗯。好,那么这个脚本上传上来之后呢,我们需要去安装它的两个依赖库。嗯,在这个tos上面呢,是配三,然后install安装这个我们需要的一个。嗯,我这边呢,电脑上已经安装过了。但是还能还是象征性的,这个敲一下,你可以看到这里是already啊,实际上如果是你们的话,第一次跑这个脚本应该会呃去装一些东西,当然也很小啊,不会占很长时间,然后。还有一个库是这个卡夫卡Python,它是一个我们Python向这个卡夫卡里面发送数据的一个客户端。啊,把它安装一下啊,这里面也是already早就存在了,然后呢,我们现在呢,去这个啊卡夫卡上创建一个新的话题,也就是说。随便建个名字吧,比如说这个卡卡。嗯,Topic,然后完了之后呢去。
03:06
指定我们的这个集群地址完了之后。罕见。我们直接这个topic名称就要test test2吧,然后呃,指定它的这个分区数。嗯,直接给他一个就行了,因为毕竟是一个演示,然后。呃,有四吗?先试一下吧,听一下。关键这个话题啊,可以看到这个地方。不对。啊,那么我们创建了一个叫TEST2的话题啊,我们怎么样去用这个脚本去发数据呢?其实我们这个地方是啊,因为在这个呃。你们先把Python可以看一下,因为在这个SS里面呢,它有呃2.73.6,然后它有各种各样的这个版本的这个Python啊,所以说呢,我们这个地方要用PYTHON3,因为我们的这个Python脚本,它就是用PYTHON3写的。
04:11
然后fake video啊,直接去传我们的这个卡夫卡的一个目标。嗯,那我们1029092,然后完了之后呢,啊,指定我们刚才这个创建的话题。啊,你现在看到呢,就是说它什么都没有显示,但是这个命令卡住了,意思就是它在运行。啊,那么我们这边的话呢,再去起一个这个,嗯。消费者去消费一下,看看什么情况啊。啊,考。然后2P。AA。
05:04
刚才是二这个分区这个话题。我们可以看到这里面已经有数据了,然后就是他在一条一条产生这个啊,我们这个。里面的一个绘画日志。这些数据呢,都是我们模拟生成的。那么现在呢,我们的这个Python生成伪数据的这个脚本已经跑通了。那么现在呢,就是去这个MYSQL上呢,不是MYSQL了,去do瑞S上啊,创建一下我们的这个数据库和表。嗯,首先。嗯,我们是指定这个机器还有端口。9030哈,然后用户呢,就是我们的硅谷。密码我直接写在命令行上了。啊,这样我就连接进来了,然后呢,直接去创建一个数据库。
06:04
Your test。吧。啊,这个数据库我们也创建出来了,我们可以看一下当前这个所有的一个数据库的情况,那么我们先选择到这个SDB底下。然后去这个我们的这个资料里面呢,有一个。嗯。电表的语句在这个代码里面,我们直接把它粘进来,复制粘贴就行了。啊,这个地方呢,可以稍微做个讲解就是。我们这里面呢,实际上是建了一个表,然后呢维度呢,有这个用户的ID,然后用户所在的城市,然后以及用户年龄这些。啊,我们统计的指标呢,一个是video sum,也就是总观看的一个视频数,那么聚合的方式呢,是求和。啊,求和的方式,求和,然后还有一个呢,就是这个用户对话的这个最长的这个单词会话时长,也就是说在他所有的这个一个用户嘛,它可以产生多个会话,比如说我今天早上刷了半小时抖音,然后呃,下午呢,就刷两小两小时抖音,那么这一天呢,我就产生了两个会话,那么最长的这个会话时长呢,啊,当然是下午弄两小时的,那么这个地方呢,我们给它一个默认值就是零,然后聚合的方式呢,就是max。
07:24
啊,同样反过来呢,就是说这个用户今天最小的一个啊,这个会话时长,那么我们给他一个最这个极大值完了之后呢,啊聚合方式呢,就是幂取最小值啊还有一个就是啊,我们的这个用户最后一次会话时间这个地方呢,实际上是用的是replace。嗯,其实这个地方用replace是想表示什么呢?就是说我们期望哈,就是我们的数据是有序的啊,这样的话呢,就是后面的一个,呃,插入的数据呢,会把前面的一个数据,这个插入的数据呢给替换掉啊,那么这样的话就是用户的最后一次分化时间啊,A着呢选op,然后a get k呢,就是这个聚合的这个K呢啊是我们的一个啊用户ID,然后还有CT,还有age来保证它的唯一性。
08:09
呃,那么还有一个就是。啊,一个它的一个分桶,我们是按照user ID,然后分成16个桶啊,直接指定完之后呢,直接按照回车来建表。电表。啊,这个表我们也建完了啊,这样的话呢,我们现在已经准备好了这个Doris这边的工作。这个麦的会话呢,我们先留着啊,和多瑞斯的会话我们先留着,完了之后呢,这边的话我们去啊做一下这个脚本的问题,就是我们配置的问题,去编写这个应用的配置,我们还是去那个他诺和底下的这个。目录目录。啊,我们来这个编写这个1304。
09:00
啊,这个地方呢,我们也是从文档里面去拿一下,完了之后来给大家做一个讲解。嗯,我想不要去这个文档里面拿了,在代码里面直接有拿来复制吧。呃,那么大家看到的就是现在这个东西呢,呃,比较长比较多啊,我们现在给大家一一讲解一下,这里面是干了什么事。把它的这个格式给规整一下。啊,那么好。嗯,首先呢,我们可以看到这个even呢,它还是没有变化哈,还是我们说的这个,呃,一个并行度的设置,然后S呢这一块。这一块。啊,我们可以看到这一次呢,就是呃,待会儿的话,我们可能要去改一下这个topics啊,这次的话呢,啊,Group ID啊,还有这个boardsh server都没有变过,然后result name啊也没有变,那么form type这个地方变了,上一次我们写的是CSV啊,那么这一次呢,我们写的是。
10:06
也就是说我们这个卡夫卡里消费到的数据会是一个Jason,那么对这个Jason这个模式的一个设定呢,实际上你会发现,首先跟上一次CSV那个地方呢,语法变了啊,上一次CSV我们是什么,我们是写的是一个Jason数组。但是这一次呢,我们要写的是什么呢?我们要写的是一个啊,再分对象啊,我们的每一个K,我们的每一个K,你可以看到我们的每一个K。也就是这个session ID,它这一个K就直接就是我们数据里面的啊一个K,然后呢,后面的这个value这个值,其实这个地方也可以写一个这次对象啊,做更具体的一个说明,但是如果是不写的话,我们可以直接对它做一个类型上的说明。啊,这个地方呢,我们直接对它就是说这个session ID,它的类型是string,然后呢,这个Jason里面还要有什么呢?还要有video count啊,也就是说我们说的这个啊,单次绘画的一个视频观看总数,然后它的类型呢,是得是int啊,意思就就这个意思,然后我们通过一个Jason对象呢,来给这个啊卡发卡里小道的数据设定了一个模式,然后在这个地方呢,还是去设定一下啊,如果说这个解析出错了,那就直接忽略。
11:16
啊,这次呢,我们的这个transform呢,需要注意一下。需要注意一下啊,首先是select,你看因为我们这个最后的这个表输出的这个呃字段名呢,这个字段里面它没有这个session ID,所以说呢,我们在这个circle的这个select的时候呢,啊,需要把这个30ID给它去掉,呃选的时候呢,只有用户的一个用户的ID,然后CT然user edge,然后把它这个名字也顺便改一下,这样话方便统一啊。啊是这样,那实际上呢,这个table里面的这个名称是什么呢?跟后面的关系其实不大,因为最后呢,我们要保证哈,保证这里面的这个一个数据源和这个最后的这个Doris,呃,克拉姆的这个地方啊列。要保证这个顺序上的一致,呃,最后呢,这个我们在这里面啊说明了一个,它的表明叫test to,呃,在这个地方呢,用name,其实这个地方s name它不写也行了,因为我们刚才知道这个开源码的时候,我们知道呃,这个think它就只会直接会这个啊使用前一个这个呃插件所输出的数据作为它的一个嗯输入,那么在这个地方呢?啊,我们需要注意,这个地方我们需要注意。
12:24
这里要写8030,我们刚才连接MYSQL的时候呢,啊,这里的这个端口号呢是九零。啊,9030还是9050啊,当然这个地方呢,嗯,为什么呢,因为这个啊dori think它的实现呢,实际上它是用了一个叫做啊Doris的呃,Stream load,呃这种方式往里面导的数据,那么这个地方呢,必须要写它这个stream load这个开放端口号就8030。啊,然后data base呢啊,直接就是说我们刚才选的那个啊数据库啊,然后完了之后呢,这是我们里面的表明,然后包括你的这个用户啊,然后用户的密码啊,其中有一个better size,也就是说啊,这里面这个卡夫卡里面消费到的数据,卡夫卡里面我们消费到数据处理不到数据到这里面啊实际上会进行一个暂批的操作啊,因为Doris呢,它是建议就生产中的时候呢啊,千万不要就是一条一条的往这里面插,至少呢也要攒成一个P,呃,默认一般图形化是一千一千多或五五百这个样子啊,但我们这个为了演示呢,就直接就是把这个P大小设置成50啊最后呢,你需要注意啊,就是streamlo的这种方式,它实际上是以一个HTP请求的方式啊,往这个do里面插数据啊,那么这样的话呢,啊,我们需要去给这个,这样的话,我们这个数据需要进行一个序列化,那么序列化的时候呢,啊,需要对它进行一个这个呃,列明的这个分隔符的这个指示啊,我们这个地方呢,就直接使用这个制表符啊进行一个这个。
13:50
啊,列列之间的一个分隔。啊,这个就是我们整个配置文件的一个情况。那么现在呢?我们的准备工作都已经完成了啊,现在我们直接先用塔来启动这个任务。
14:11
然后是flink啊,我们还是用con来指定这个。我们的一个配置文件路径。直接回车先看能不能正常跑通,就他能不能啊,不会报错。好,我们的这个任务呢,已经正常的提交上去了,然后我们看一下这个。啊,现在已经有一个运行的任务了。啊,这边呢,我们是在这个会话先留着,然后我们在另一个新的哈多普102的这个会话上呢,啊,去用我们的Python脚本生成数据。一个这个路径下,然后呢,我们用这个PYTHON3。A video。
15:11
好,那么这边呢,就已经再往里面插数据了啊,我们去这个啊Doris上看一下有没有相相关的数据,我们先莱克先看一下。Example。啊,那么这样的话呢,我们就是可以看到我们现在这个啊,Doris里面已经有一个相关的情况,我们每一次查询的时候呢,我们可以看到这个用户的这个啊,我们的一个聚合情况的变化。比如说我们找一下这个。它是按字典排序的。因为我们看994吧,这个这个994的这个idv 994的这个用户。
16:06
看到他这个994,我们刚才查到他说他已经是他刚才是一千二是吧,现在已经七千二了。啊,那么这就是一个完成了,用Doris完成了一个指标的统计。啊,那么这个案例我们也讲解完了。
我来说两句