00:00
接下来给大家讲实时系统联调这一部分,那大家首先到目前为止啊,我们已经实现了基于统计的推荐模块,然后还有离线的基于英语义模型的推荐模块,呃,另外呢,还有自定义模型的实时推荐模块,这几大模块我们都已经实现了,那现在其实就是说大家现在得把这个就整体的数据流要搞清楚,离线部分的话比较简单,就是我们从这个数据库里边把数据读出来,然后做做计算,啪啪啪一算对吧,最后得到结果再写回到数据库里面去,这就完事,跑完了就完了,都是离线做的计算,这个比较简单,比较麻烦的是实时这一部分,大家要把这个数据流向理的很清楚才行,那大家先梳理一下我们整个的这个数,实时系统实时推荐的数据流向是怎么样的呢?还是之前我们那张图,这里边用文字给大家再做一个梳理,就是首先是业务系统里边用户会在界。
01:00
面上有一些操作,比方说在这里我们的典型操作就是做了一个评分,给某一个商品做了一个评分,那么获取到这个评分之后,业务系统就会啊,把它写到这个日志里面去,在这个比方说我们这里面是后台做了买点,那么就会写到对应的日志里面去,我们用flu进行日志采集,采集到的东西。给到卡夫卡,卡夫卡这里呢,我们会用一个卡夫卡streaming进行一个数据的清洗和预处理,这是做什么事情?对,大家可以回忆一下,首先我们可能需要去做一些过滤,另外就是大家会想到我们最后要的不是固定格式的一个评分流嘛,用竖线分割的这么这么四项,那从日志里边提取的话,呃,这个你直接就能生成这么好的你想要的一个数据,这种可能还是比较小的,所以我们还是从日志里面要去做一个过滤和提取预处理,然后处理完了之后,呃,那大家会想到这个Spark stream这边就可以消费卡卡里边的数据,这里就可以做流失计算了。
02:07
那么完成计算之后呢,把对应的推荐结果写到mango里面去,然后再让业务系统读取mango里面的数据,就可以给用户实时的显示出来,这就是完整的这个业务流程。那在这个过程当中,首先我们用到的这些组件,该起的我们要提起来,呃,上节课我们其实已经把这些东西都已经提起来了啊,所以这里就不再给大家详细说了。接下来关键的一步是我们要去构建这样的一个卡卡stream程序。呃,他是做了一个什么事情,他要做预处理对吧?呃,所以他其实是。嵌在这个卡夫卡里边的这这个环节,那么日志系统flow那边采集到的东西,我们去给他配置一个卡夫卡think,那么可以就是产生的数据定义成一个topic,然后呢,我们定义的这个卡夫卡stream程序可以接收这个topic的数据作为输入,然后进行处理之后再输出另外的一个topic,这相当于一个管道的流程,对不对啊,就像管道一样,有输入有输出,所以呃,输出之后的这个topic就应该是我们想要处理的那个订阅的那个recommend。
03:20
这个流程大家搞清楚就好,所以接下来我们在还是在recommend下边再去新建一个模块子模块,这个子模块就叫做。卡夫卡,Strip。Stream。呃,同样我们把这个路径配置好,接下来要看一下它的这个po文件怎么写了,这里边需要什么样的依赖呢?呃,这部分因为不是最主要的内容,所以我们只要大概的看一眼就可以啊。呃,我们这里边是不是应该要用到卡卡stream去做处理啊,所以一定要引入卡夫卡streams,然后还需要什么呢?对卡夫卡的客户端,卡夫卡clients是不是也需要?所以主要其实就是这两个依赖,当然在文档里边还给大家提供了就是打包里边应用的一些插件的配置,呃,用这个maybe assembly plug in这个插件,然后下面定义了很多相关的配置,大家可以仔细的去看啊,我们这里就不详细讲了,我只要把这个复制过去就可以。
04:29
好,在这里定义好了,同样我们还有一些比方说这个日志文件copy过来,日志的配置放到resources下边,呃,那这里我们的原文件目录Java,因为卡卡stream就是一个Java Java的程序,所以就是Java下面这个没什么问题啊,我们去创建一个啊,这就应该是Java class了。
05:00
这里面的name我们就叫做加上包名啊。com点艾特硅谷。点卡夫卡。Stream。Application的一个plus。好,这就相当于是我们的这个呃,主程序了,对对吧,呃,那么在这里边我们怎么样去写这个主程序呢?直接上来之后写它的内函数吧,呃,那Java class啊,PSVM,我们定义一个主程序程序入口,这里边我们需要用到什么呢?先定一些用到的配置参数,比方说这里边有broke。卡夫卡对吧,Logo host9092,然后还有per。
06:02
LOCALHOST2181。呃,然后另外我们会定义输入和输出的topic,对吧,定义输入和输出的topic。同样我们都把它定义成street,这里边我们就叫from吧,From topic,这是从那边的卡来的,对吧?那么它定义的我们把它叫做log。然后输出two。应该叫做什么?这是我们在呃,SPA streaming程序里边想要去消费的那个topic啊,好,接下来我们就会去定义这个卡卡stream的一些配置配置参数了。
07:02
呃,这里的这些配置参数呢,大家可以参考一下我们的那个文档啊,这里去定义一个proper,那么这个就叫做setting。New一个呃,Java的这个写法对吧,所有的这个对象都要扭出来,呃,New一个pro,那么大家可以看参考一下这个文档里边他做了哪些配置呢?啊,其实就是要要塞几个这个参数对吧?有一个这个application ID conig,然后要猜一个,呃,要去添加一个bootra service,然后还有一个look keepper connect啊我就我这里就不详细写了,把这个直接copy过来吧。好,那大家看到这里面的这个STEM config还需要去引入一下,然后接下来。
08:00
呃,那就去创建这个。卡夫卡,Stream。呃,这个配置对象。这里边是streams对吧。那么他应该是。我们给一个名字叫做config,应该去new一个streams config里边是不是要传入settings参数啊?呃,就是这样把它创建出来就可以了,那这样一个卡夫卡STEM的配置对象到底是要干什么用呢?呃,其实主要就是我们在后边创建卡夫卡stream的时候要用,对吧?啊,它的那个定义的参数里面要把这个定义进去,那除了这个卡夫卡stream配置对象之外,还需要什么呢?还需要一个拓扑。
09:02
这样啊。定义拓扑。构建器还需要一个这样的一个东西,那大家会想到就是这是什么意思呢?什么叫这个topology builder啊,托扑构建器,这是个什么东西呢?对,就是from from哪里,然后到哪里,最后再输出到哪里,对吧?整个这个流程,这是不是画出来有点像一个拓扑结构啊,哎,所以这个东西叫做topology builder啊,那么我们定义一个这样的builder,当然了,Java class嘛,又是new一个top builder,来家看这个卡卡streams processor下面的对吧?那有了,先先把它声明出来,然后接下来我们主要是要把它做定义啊,Builder大家看他有什么方法。是不是可以ADD s ADD think啊,所以这里边我先去ADD一个S,这个S第一个给一个name吧,我就叫做好了。
10:11
后边那就是你具体这个source是什么呢?是前面定义好的from,然后接下来ADD的一个process,大家看这个有点像我们流式处理的这种方式,对不对,整个把这个呃,相当于也是流式处理,流式处理的管道拓扑结构定义出来,接下来我们定义一个process,就是从这个from这里来了,来了之后做一个处理,最后再输出,那么这个processor我们这里边给一个名字name,就叫做。Process。然后后边需要定义一个这个既然是processor处理嘛,那得定义一个函数了,我这里边用一个,呃。
11:02
Lambda表达式。定义一个匿名函数,这里边我们在另外一个类里面去实现吧,叫做log process。然后后边大家看还有parent names拓扑结构嘛,他的这个前面的父亲是谁,所以我们这里应该是前面的S对不对。定义了process之后啊,最后我们就来定义一个think,对啊,其实就是这样的一个结构啊,那么think这里边还是name的话,我们就叫think吧,那么他到底是谁呢?是to对吧,是我们前面定义好的recommend。这样的一个topic,然后后边还是对应的这个parent name应该是处理完了之后给到这里来。啊,这样就把它定义出来了啊,当然这里边我们具体的这个love process这个类还没有实现啊,所以我们在这里就等一下再来处理这个错误。
12:08
好,那有了这个卡普卡STEM配置对象和这个top builder之后,接下来我们就可以创建。卡夫卡了,这里就是卡夫卡streams。定义一个叫做streams的东西,New,一个卡卡stream全是new,对吧,那里边它传入什么参数呢?其实刚才大家应该也已经看到了啊,先要传入builder,然后后边还要传入config。把这两个参数传进来就可以了,那接下来这个它既然是流式处理嘛,那是不是有了这个流还得把它start起来啊,所以streams.start。
13:01
这就可以了,当然我们为了方便这个观测它的这个运行状态,我们可以在后边再输出一句,打印一句控制台,啊呃,这是Java里面那是system.log print,我们输出一句叫。卡夫卡STEM started。这就是我们的这个主程序applicationp,那接下来其实这里面的逻辑很清晰,就是把它这个该配置的配置好,该定义的这个结构拓扑结构管道定义好,然后最后我们主要是在这里面处理对不对?所以接下来我们的关键是在这个下边再去new一个Java class,叫做log process来处理它。这是我们的流处理程序,呃,那么这里面它是流处理程序,但它其实对于这个。
14:03
在这个top建构器里边定义processor的时候,他其实是有要求的,你必须得满足一些条件,大家看这里面还报错对吧?呃,你必须得满足一些条件才能够符合这里的要求,那么得满足什么条件呢?就是他得去实现一个接口,实现一个。大家看process。我们应该选择第二个这个卡卡stream的process,对吧,我们去把这个接口做一个实现,当然这里边大家看到是有泛型定义的,我们要求就都是这个K86都是。BAT数组就可以了。这里再报错,呃,因为我们要求他实现这个接口,那是不是对我们得把它里边的这个方法得复写啊,所以直接把这些方法实现出来。那现在大家看这里边主程序是不是就已经不报错了,呃,就看起来是正常的一个状态,呃,那大家看到这里边它主要有哪几个方法呢?有一个初始化方法,然后一个process方法,这是两个最重要的方法,呃,另外两个我们都可以不管啊,就只要是把这个初始化和process搞定就可以了,大家看到初始化的时候,它这里面有一个什么东西呢?对,有一个上下文,Processor上下文,所以我们在这个类里边啊,去定义一个上下文对象。
15:31
Processor contact,呃,我们就把它叫做contact,然后这个初始化的过程当中,其实非常简单,就是把this.contact。复成这个process就可以了,那接下来最关键的其实就是这个函数,那大家看就是这个,在这里边处理的时候,我们艾特了一个processor,那真正来了这样一条数据的时候,我们到底调哪个方法去处理呢?其实就调的是process的对process方法,所以这里才是我们最关键的地方啊,这里是这个。
16:15
核心处理流程。啊,那么我们看一下这里边大家看一下这个输入是什么呢?诶,输入是两个bit数组,这就是我们前面这个泛型定义好的,对吧?这里边第一个这个其实只是一个名称而已,就是一个key而已,所以没什么重要的,我们就把它叫大米好了,后边这个才是我们真正要处理的value,我把它叫做lie啊,那大家会想到这里边你进来之后,这个数据是一个BY数组,呃,这个可能就是到倒是它比较方便这个传输,但是我们处理的时候显然还是把它变成string会舒服一点,对吧,还是把它转换成一个Java对象,那么我们可以去new一个string,把这个line传进来,是不是就可以用出一个根据这个半数组扭一个字符串啊,好得到这个字符串之后。
17:22
我们要做的是什么呢?啊,就是要进行过滤,进行提取了,想要数据的提取,对吧,提取。数据,那我们以什么作为标志,作为过滤的这个,呃,原则呢,我们主要其实就是以一个固定的前缀。前缀过滤日志信息。提取数据啊,那这里边大家可以给大家看一下这个前缀到底叫什么呢?大家可以参考一下我们这个文档里面写的内容啊,大家看就叫这个,这是在业务系统里边已经定义好的,所有的我们要给这个评,就是我们后边这个推荐系统要用的这个评分数据,它前面都有一个前缀叫product reading prex加一个冒号,所以我们是不是要提取的时候,只要是这里边来的这个input有这样的一个前缀,我们就处理它就可以了啊,然后后面处理其实也很简单,大家看其实就是把它对根据这一个做了一个切分,然后直接取它后面的内容就好了,因为前面可能有一长串,比方说那个时间啊,什么什么样的一些,呃,附加的这些格式,那后边其实就是前,这里是前缀,后边就是我们最后的那个数据内容,而且直接就是已经按照。
18:55
竖线分割开的,分割好的,所以这里边我们就只是做这么一个简单处理,大家如果想要在这里做更多的复杂的操作的话,当然也可以自定义,我们这里边就给大家把这一段再做一个实现啊,If判断这个input里边是否有包含了这个前缀啊,前缀我复制一下啊。
19:20
假如包含了这个前缀的话,我们才处理对应的内容。呃,那么这里边还是为了方便我们监控程序的运行状态,我这里边控制台打印输出一条记录,呃,比方说这里边叫product data coming,呃,后边还可以再加上我们具体的这个input的内容,对吧?把这个完整的内容都显示在这里,然后接下来我们把input更新一下,最后只要什么呢?是不是对只要前缀后边的那一部分东西啊,啊,所以这里边我们就直接用input去做一个split。
20:08
呃,它里边的内容对,是不是大家会看到,还是按这个前缀来做就可以了呀,只要用这个前缀做一个切分,切分出来的内容取它的。后面那一部分对不对,是不是取一啊啊,然后最后当然了,我们可以这个去掉这个首尾的空格,CH一下得到的这个input,就是我们真正想要的那个数据,那之后怎么样再把它就是按照我们的那个呃,定义好的管道再把它发送出去呢?那大家看后边它是有一个固定的这个方法啊,就是上下文,它不是有一个上下文吗。有一个forward,然后forward里边就是我们给定的这个key value,我们指定的必须要是BAT数组,对吧?所以说K是什么呢?K就是我们这里可以直接给一个啊,也可以用前面的那个当米processor processor这个我们随便给一个就可以,最后我们还需要把它转换成bit数组,所以get bits后边是不是就是input,直接get就可以了啊,这就是我们完整的这个处理流程,把这一部分实现了之后,我们就可以从日志信息里边,根据它的前缀把它做一个过滤提取,提取出来后边的就是我们想要的竖线分割的那四项内容。
21:42
User ID product ID score,还有一个time step,所以呃,大家可以看到这个实现的过程啊,好,我们这一部分内容已经做完的话,接下来诶,我们就可以启动这些主程序了,Application就可以提起来了,对吧?好,我这里先把它提起来啊,这个输出信息太多的话,我们还是把他的那个日志级别调高一点,把日志级别调高之后再重新让一下。
22:14
大家可以看到这里边我们已经提起来了啊。
我来说两句