00:00
我们主要是给大家讲一讲,就是实时系统的联调,我们接下来需要把所有的工具真实的状态来把所有的内容都加进来,然后看一下我们整个的数据到底是怎么样流转的,最后我们应该是在真正的这个业务系统里边,我们能看到一个行为,做了操作之后,在这里我们是评分,对吧?啊就就只实现了这个啊,就是我们去做了一个评分,这样行为之后,应该能够有了评分流,进入到通过一系列的转换,进入到我们的SPA stream里边来,对吧?然后最初最后做出一个实时的推荐,应该能够看到这个实时推荐的结果,对吧?啊,所以我们接下来会把所有的东西都给大家串起来啊,我们会应该有一个业务系统,对不对啊,这个代码就是后面已经写好了,这个不需要大家写,前后台都已经写好了,呃,到时候我把这部分代码给大家,我们从业务系统里边应该会收集用户的评分数据,然后他是不是应该写到日志里面去。
01:00
对吧,我们从日志去收集,那么接下来日志采集当然是flu,对不对,把要写起来,接下来我们会去写一个叫做卡夫卡streaming的一个数据预处理的这样的一个一个程序啊,所以大家会想到这相当于就是做这个有点像ETL了,对不对,就相当于是做一个简单的清洗和预处理,我们这里就是最简单的操作。提出来的那个日志信息,我们按照他的那个,呃,写日志时候一个前缀去做一个处理,对吧,按照前缀把我们想要的那个格式提取出来,给到后边的Spark streaming做流式计算,最后我们应该得到一个实时的推荐列表,更新实时的推荐列表对不对,写到mango里边去,然后我们再返回到这个业务系统里边,我们重新刷新一下页面看的话,是不是应该看到那个实时推荐列表应该变化了呀。这就是我们的一个基本思路,好,那当然一开始我们是不是应该把相关的这些基本组件应该提起来啊,啊,我们已经涉及到的东西啊,业务系统这里还没有,我们先不管啊,那大家想到这个flu flu,我们跟业务系统相关,我们放到后面再说,那后面的这部分我们如果要先写这个kaa streaming的话,它是不是应该跟后边这个Spark streaming,还有我们其他的一些东西都可以关联,就先串起来后面这一部分啊,所以我们这里需要启动什么呢?我们的这个流失处理的这个程序,推荐程序是不是应该模块应该启动啊,然后monggo red应该启动对不对,Monggo我这里边都已经启起来了,然后我们把这个先跑着让它启启一下啊,Streaming这个程序启一下,呃,这里边这个recommend,我们就把它关掉吧。
02:49
Streaming程序run一下,另外大家会想到我这里是不是还应该有这个look keepper,卡夫卡,这些都应该提起来,对吧?那当然这里边大家看一眼,我昨天已经起过,应该是起着的啊,现在还起着对不对啊,都起着,所以我们这一部分就都已经起起来了。接下来我们要构建一个叫做kafka streaming的程序去做一下数据的处理,那大家会想到这个,其实呃,在这里我们是不是要引入kaf卡stream呀?
03:21
呃,大家知道这个包是吗?卡STEM也可以做这个,呃,流式的一些数据计算和处理,那这个引入之后我们写的代码是什么呢?那就是Java代码了啊,所以大家先看一眼我们需要入哪些依赖,我们我们先把它创建出来吧,呃,这边还在还在跑,我们等一下啊,就是大家看一下需要引入哪些依赖呢?Po文件里边,这相当于又是一个子项目,对不对?我们到时候还是创建在这个recommend下边,还是跟它相关的,所以都放在一起,那这里面大家看到,首先我们要引入卡卡streams。呃,这是做这个卡夫卡上的流失处理的一个包,一个库,对吧?这个dependency要引入,另外是不是我们还要跟卡夫卡相关做连接,是不是要有卡夫卡clients啊啊,这个是常规的卡夫卡,引入这个是必要的,Dependency就是这些,大家看下面build里边还引入了一些东西,这是什么呢?
04:17
大家看是不是这是ma文打包的这个工具插件,我们定义了用哪个插件对吧?Maem assembly这个是不是还是还是继续引入了,另外这里是不是configuration还定义了一些东西啊,定义了我们的入口类对不对?Main class,大家看我们定义的叫什么啊,啊,就叫这个包下边的application,那大家已经知道了,我们要这么定义的话,后边我们是不是创建一个叫application的一个入口类成呃这样一个类啊,然后大家看下边这些就是我们执行的这些package对不对?打包过程当中调用这个plug in,这就是一些基本的配置啊,这些就不是特别重要,大家知道怎么用就可以了。好,大家看这边streaming已经started了,对不对?我们去新建一个module。
05:05
这里我们把它叫做卡夫卡STEM next,呃,然后创建出来,这些过程都一样。我们是不是把对应的这个卡夫卡它的dependency KA streams和卡卡plans全部添加进去,对不对,后边的这一个build相关的插件也把它添加进去,所以这里我就直接复制粘贴好,那么这个已经写好的话,我们的基本配置就已经有了,然后接下来我们看一眼它在哪,在这呢啊。啊,接下来我们会想到啊,这个resources里边,如果我们想要添加的话,还是把对应的这个添加过来,对吧。Log for添加过来,呃,然后这里边我们是不是本身要创建的就是一个Java程序啊,对吧?因为卡卡stream它的处理都是Java里边的这个API,所以我们这里还是把它这个创建出来就可以了,那么我们定义要new的就是一个Java class对不对,按照我们刚才的这个定义,是不是应该叫com.at硅谷卡卡stream.application啊对,所以是这样的一个定义啊。
06:27
好定义出来,所以我们就相当于创建了这样的一个入口类啊,那大家会想到在这个里边我们去做什么操作呢?呃,大家可以看一下,就是大概的这个处理流程啊,都在这里边写出来了,大家看到我们先定义什么啊,那是基本的一些这些配置参数对不对,我们先定义好,呃,这个卡夫卡break对不对,9092,然后zoo keepers2181,这些先先定义好,然后后边我们还要定义好,大家想这个卡夫卡stream是不是相当于是有一个。
07:03
呃,这个流式的管道有输入有输出,中间做一个处理对不对,然后我们这里就把这个流式的都定义成不同的topic了,那么来的时候是一个什么topic呢?是一个log,我们把它定义成from叫log,那大家会想到是不是flu那边采集之后输出到卡夫卡这边来的,是不是那边定义一个卡夫卡think应该叫log啊,啊,对吧?这这大家已经是能够想象得到的,然后这里的经过处理之后,输出是不是就应该是recommender,这是我们真正呃在这个Spark streaming,呃,Spark stream里边,我们要去做处理的时候订阅的那个recommend,对吧?所以做这样的一些操作,然后接下来就是定义卡卡stream配置了,对不对?这一部分大家就是大家定义一个settings,然后把想要的这些application ID啊,这些呃,Boot boottrap serve啊,Keepper这些传进去,参数传进去,我们是不是就可。
08:04
创建这样的一个,呃,卡夫卡stream,卡卡stream配置啊,然后接下来创建它的这个拓扑建构器,这是个什么东西呢?是不是就就是定义从谁来到到哪去对不对啊,就是定义这个拓扑结构,最后把它砌起来啊,就是这样的一个过程,所以其实这个还是比较简单的,对吧,就是很简单的一个小程序啊好,那所以这里边我们写它的main函数吧,那大家知道呃,简写是PSVM对吧,Java里边啊,然后我们先定义一个string,这是brokers,呃,这个抓va代码,大家后面要记得有分号对不对,大家现在应该就应该都能够学的语言多了,就是应该能够无缝切换啊,就是它逻辑都是一样的,只不过就是一些基本语法不一样,对不对,对吧?所以这些东西就是你使得熟了之后,就这样,然后。
09:04
U keepers,呃,Zoo keepers这个默认起的,我都是在本机对应的,就有一个Linux虚拟机我都起在,相当于都是local host对不对?大家如果不一样的话,把这个改掉就可以了,2181,然后接下来我们定义那个输入和输出的topic对不对,输入和输出的topic,那么呃,这里定义一个from等于log,那同样还有一个to等于recommender对吧?好,那接下来是不是要定义那个卡卡stream的配置啊啊,这个我就不详细写了啊,大家只要看到这个里边,只要会用就行了,就定义这些东西对不对,我就直接抄过来好了,这这些东西并不是特别重要。
10:04
那大家看这里我我的引入东西properties,那是不是从这个Java you里边直接引入就可以,对不对,然后诶这里边stream config它应该从哪里引入,是不是kafka streams里面去引入啊,对吧,既然要对它做配置嘛,好,这些东西已经配置好之后,我们就可以去创建一个卡夫卡的,呃,这个配置配置对象了卡夫卡。呃,就是卡夫卡stream的配置对象了,创建卡夫卡STEM配置对象。那当然这个configig,这个Java里边的写法是不是就它都是new啊,对吧?啊,就是定义一个config,它就要new一个streaming configig,那么这里边我们传入什么呢?是不是就是前面已经定义好的settings啊,就这样把它创建出来就可以了,然后接下来去创建一个啊拓扑建构器,对吧?大家想到是不是做这样的一个做法啊,那这里边我们去做的是什么呢?就是定义这样的一个,大家看topology builder,呃,Topology是不是就是拓扑的意思,那大家知道拓扑结构其实就是说有点像这个它的组织架构在几何上的那种表达,对不对啊,我们这里只要知道它就是说它那个从谁到谁这样一个流程就可以了,大家可以认为是一个流程啊,那我们定义一个builder,它就new一个top builder对吧。
11:48
啊,这个写法都很简单,抓va对象的建立,然后接下来我们就要去定义这个build了,对不对啊,定义流处理的拓扑,拓扑结构啊,那么这个拓扑结构应该怎么定义呢?大家看是这样做,它可以爱的东西对不对?那首先大家看一看这个名字是不是就可以看出来,就是有S有S啊,S是不是来的地方对不对?然后think是去的地方对不对?所以我们先去艾一个S,那么这里边的S我们给一个名字啊,大家看第一个参数是不是一个name啊,我们就叫它S吧,那么这个S后边的参数是不是到底是哪个topic啊,大家想对吧?所以后边这个我们是不是把from作为source传进去,然后接下来,哎,大家看。
12:48
后边是ADD think,另外它是不是还可以ADD processor啊,这是不是中间我们做的这个处理啊,真正的流处理的这个核心对不对?呃,我们是at用at processor来做定义的,首先又是一个name,我们就把它叫做process,然后后边第二个参数大家看到是一个。
13:13
Supplier,那这里要定义的是一个函数啊,那我们怎么定义呢?我们这里边就不实现了,我们用另外一个类来实现吧,我调一个呃,写一个拉姆达表达式对吧?一个匿名函数,然后这里边我去new一个log processor定义,定义你有一个这样的一个对象,那这个对象我另外用一个类来写,对不对啊,创建一个这个类的对象,我主要就是创建出它作为我们这里的processor就可以了,那大家看到最后是不是还有一个parent names啊。这是不是表示他从哪里来,数据从哪里来,然后做这个professoror对不对?他的数据从哪里来,他的parent是谁呢?是不是应该从south来啊,诶,S,好,这是我们的第二步,那最后我们是不是加一个think啊,ADD一个think对不对?那么它的name我们就叫做think,那同样大家会想到后边是不是要定义出来到底是哪个topic啊,我们这里的topic就是对不对,那后最后是不是还有一个parent names,这这是要用哪个啊,是不是用前面定义好的process处理好之后的数据是不是传回来,传到另外一个topic去去啊,最后再让我们在这个SPA stream那里去做订阅,对吧?那里有一个consumer订阅的这个recommender就可以做处理了,这是这样的一个转换流程,好,这只是把这个拓扑结构定义好了。那接下来我们真。
14:50
证的这个呃,实现是不是还要定义一个卡夫卡streams啊,对吧,这才是真正的这个处理模块啊,定义一个streams,那同样还是new一个KA卡,Streams里边要传入参数,传入什么呢?它的大家可以看一眼啊,这个卡对吧。
15:13
稍微有点慢,大家看传输的参数是不是一个我们定义好的这个top builder,然后后面是不是它的那个properties啊,配置参数对不对,所以我们前面都已经定义好了,是不是就是builder,再加上是不是看这个啊,就是这两项对不对,那么卡卡stream已经定义好了,接下来要做的是不是就是把它start起来啊,啊,这是一个流式的处理啊,那当然当然了,大家会想到这个流处理我们提起来之后,最好还是给他一个这个信息对不对,那提如果要给一个信息的话,我们对应的这里就可以给他一个wa,要不然他一直在在刷那个音粉啊。诶,放哪儿了。下面我们可以print line,那那大家注意这里是这个Java代码,我们system.out.printline对吧?这里边我们还是来一句叫卡夫卡stream started,好,大家可以把这个输出输出进来啊,输出输出到控制台上,我们就可以知道它真正已经骑起来了啊,这就是我们整个的一个视线,那当然大家会想到接下来我们还有一个任务,是不是要把这个呃,流处理程序的这个log process做一个实现啊。
16:37
那那这一部分大家会想到这个东西怎么去弄呢?那我们是不是同样在这个包下边创建一个Java class叫做log啊。好啊,那有了,有了这个process之后,呃,大家就会看到我们这里就可以可以知道它啊有这些东西了,对吧,但是这里还报错,为什么呢。
17:04
因为是不是这个log processor,它一定得是一种,大家会想到它一定得是一种特殊的函数,得有对应的方法,我们才能用在这里做processor对不对,诶,所以大家会想到他是不是要继继承,或者说要去大家看一下我们这里怎么写的啊。它是不是要去实现一个叫做processor的接口啊,哎,所以这里我们是对卡夫卡stream里边的这个processor,诶大家看这个processor啊,有不同地方的processor,我们选哪个呢?是不是要选这个第二个卡卡streams里边的processor啊,对,所以我们要实现这样一个接口,那么大家看到它是不是后边还应该有这个KV啊,呃,前面看到它的定义了,对吧?我们这里的KV就全部用BAT数组就可以了,原始的BAT数组,因为大家知道在做这个流式处理的时候,最好是序列化之后的状态,对不对?我们直接用BY数组就可以传递这个信息,这里还报错,还报错,大家看这个application,这个只要是实现了这个接口,它就不报错了,对不对?那我们这里还报错,为什么呢?因为你没实现对吧,啊,所以大家可以知道我们接下来。
18:23
把它对应的这个方法要去做一个实现,好,那么大家看到这里边我们有什么内容呢?啊,这里边其实就是说它是不是有一个in方法啊,然后最关键的大家看到是不是有一个process方法啊,所以这两个是可能我们要处理的呢,至于后面这个,呃,Punit和close这个我们就不去考虑了啊,就不用做做处理了,只要把它初始化,然后能够处理就完成了。那这里边我们的定义是首先要定义一个private,一个属性process process contact,我们要定义一个这个东西,定义一个contact,但因为大家看到是不是这个in的时候要传入一个processor contact呀,上下文对不对,处理的这个上下文,所以这里in的时候是不就应该this.context等于processor context context啊,是不是这样把它附进来。
19:23
哎,这就是初始化的一个过程,然后我们更加重要的这个处理过程其实就在这里了,两个字节数组,那大家看我们定义好的这个processor是不是核心应该在这儿啊,对吧?这里定义了字节数组的这个处理方式,我们这里的process,它的参数是不是就是两个字节数组啊?那这里我们定义一个不同的名称吧,比方说第一个其实我们没什么用啊,它大家可以认为它就是一个KV,那它的那个K是不是相当于只是一个名字而已,我们这个不太重要,所以我们把它叫做当米吧,就是一个变量,对吧?然后后边这个是我们真正的进来的一行数据,所以我们我们把它叫做一个赖一行,对吧?好,那接下来大家看,呃,我们这个具体怎么来做计算呢?怎么来做处理呢?是不是先把这个line要拿出来啊,我们的input是不是本身应该是一个string啊。
20:23
先从这个line里边把这个string拿出来,诶来啊,接下来大家会想到我是不是要根据一定的规则去做这里来的line,是不是就是直接从那里收集来的日志信息啊,对吧,我这里先。呃,把收集到的日志信息用字符串表示出来,然后接下来大家会想到我应该对日志信息进行过滤处理,对不对?呃,这里我们日志信息里边所有的基于这一个我们推荐跟推荐有关的这个评分,我们都加入一个固定的前缀啊,叫什么的前缀呢?大家可以在这里看一眼啊,我们定义了一个什么前缀,定义了这个前缀这个啊,以它打头的,有这个内容的,大家就会想到它后边是不是要跟我们那一串评分数据啊,对吧,就是UID,竖线mid竖线,竖线time step,就是后面跟这些,那我们是不是要找到它,然后提取它后面的内容啊,哎,所以这是我们的这个想法啊,就是。
21:44
根据前缀这个东西从日志信息中提取,呃,提提取评分数据,所以我们要判断大家看在这个,呃,文档里面也出来了啊,就是是不是要看这个input里边是否有我们这个数据啊,如果有这个前缀的话,我们就做处理,没有的话说明不是我们这里要做的那个评分数据对不对?大家看这个就叫做movie rating prefix前前缀对吧?啊,但大家当然就是日志里边怎么定义都可以,这个定义是不是在我们日志系统里边就已经定义好了呀,他写日志的时候就要加进去对不对啊,所以我们这里是跟日志系统匹配的啊,大家这里要改的话,到时候日志,呃,就是业务系统的代码里面也要改,那接下来这个怎么去做处理呢?首先这里来了这个信息,我们先把它打印出来吧,要不。
22:44
但我们不知道是不是来了信息,对吧?Out print line,我们可以写一个movie RA data coming来了一条数据。然后接下来我们还可以加上这里的input,对不对,把它这个数据真正的打印出来,看一下到底是什么东西,然后接下来呢,就会想到我是不是应该对这个input去做一个。
23:11
对,大家会想到,呃,我在这里是不是需要根据这个前缀去做处理啊,那我们定义的是什么样的一个定义呢?我们定义的时候是在前边可能有一串,大家想到日志信息嘛,都有一串时间,对吧,然后怎么怎么样一一串格式化的信息,后边就是这个前缀,然后它后边根着的就是评分信息,那么如果是这样的一个数据格式的话,大家想我是不是可以根据这个字段去对它做一个切分啊。哎,大家想这是不是一个很简单的一个处理方式,切分之后是不是前面的都不要,我就要后边的那个是不是就是我们的格式化之后的评分数据啊,这当然是最简单的一种处理啊,我们如果自己做ETL的时候,大家把这一块再去丰富一下就可以了,好,那么这里边同样还是做split,我们里边是不是把这个前缀传进去,那最后拿到的是不是应该拿它的切分之后的第一个元素啊,对吧?应该是拿到这个东西啊,那当然了,最后我们再把它做一个tri计算处,呃,处理掉前后的空白字符,对吧?最后把这个生成的input是不是就可以大家会看到这个,这是不是就相当于处理完了对吧?得到这个input之后就处理完了,那处理完之后做一个怎么样的操作呢?它的操作是要用到这个上下文,Content有一个forward forward大家会想到是不是就是。
24:44
转发继续对不对,就相当于是这样的一个一个状态对吧?那么我这里边大家看这是一个K,后面是一个V对不对,那这个就随便给了,比方说我这叫log processor后边是个V,呃,那那这里注意我们这里要传的都是bit数组对不对?所以光是一个string还不行,这个结构是错的,我们是不是要get by啊。
25:09
然后这个就可以了,后边是不是传的是它的V是input啊,同样input在这里,我们spli之后之后这还是一个string,对不对,是不是也把它get by穿进去啊,所以大家看到这就是我们这个完整的实现过程。嗯,大家梳理一下,觉得这部分有问题吗?呃,觉得没什么问题是吧,还还是说这部分还是就是稍微有点不知道在干什么,其实没关系啊,这部分就是大家最好是能掌握,不知道的话,大家是不是用其他各种各样的工具做这个,呃,预处理做这个ETL都没问题的呀啊这只是我们结合进来,就相当于也给大家复习一下,前面我们是不是用卡夫卡也可以,卡夫卡STEM可以做这个事情,对吧?啊所以大家就是之前大家做的都是用做嘛,啊都是是吧,这一块我好想哦,那那没关系,就给大家相当于做一个拓展吧,这个比较简单的一个实现,大家应该代码也很好很好写好,那么就是我们把这一部分简单的做了一个实现,然后这一部分完成之后,大家会想到我们要最后要做系统联调,是不是应该把这个要提起来啊,这个程序是不是也得提起来,你不提起来的话,那那边来的数据它是不是我们默认数。
26:33
出的是套叫做log的那个topic,它是不是就传不到这个recommend来啊,那我们SPA stream是不是就收集不到数据了?哎,所以我们要把这个起来,大家想我是起这个log process呢,还是起application呢?对,当然是application,对吧,我们把这个让一下。
我来说两句