00:00
这些东西其实都不是重点,因为一来它比较简单,二来在生产应用场景里边,其实这种用flink做这种操作的场景呢,不是特别的多,为什么呢?大家会想到前面这两种方式,相当于都是用flink来做什么了。它处理的是什么流啊。我们当时把流是不是分了两类,一类是有界流,一类是无界流,前面处理的相当于都是什么?对,都是有界流,那从这个flink的他的这个世界观的认知角度讲,有界流其实是不是就跟批处理一样啊,因为你这个数据都已经在那了吗?你其实还是,呃,只不过在这里边我们假装他还是一个一个读进来的,对吧,然后一个一个处理,但事实上你的数据已经全在那儿了,你一批去处理,其实也没问,没什么问题,呃,所以大家会看到我们其实在真正的生产环境里边,要去做真正的流处理的时候,更多应用的不是这种方式。
01:02
那而是什么方式呢?呃,大家会想到我们要从真正的那种流的数据源里边读数据,对不对?什么是真正的流式的数据源呢?啊,要不就是我们之前用过的那个socket文本流对吧?呃,直接从一个soet里边去读取,但是平常生产环境可能你不可能说产生数据是从一个socket里边去产生的,那更多的是是什么呢?对,大家想到了符合这种标准,符合这种要求的最常见的就是卡夫卡,哎,所以我们接下来这个其实是重点啊,就是给大家讲一讲从卡夫卡。中读取数据。呃,那那大家就会想到我们要从卡夫卡里边去读取数据,首先大家能想到第一步我们应该干什么呀,大家觉得应该干什么啊,肯定这里边我肯定还是就是env得去怎么样去去读进来一个S对吧,这个S要设定成卡夫卡。
02:05
那大家想里边的这个配置,我们是这就是说里边你怎么读这个,怎么跟卡夫卡做做连接,做交互,这个是我们自己去写吗。嗯,那就太复杂了对吧,那肯定不可能这样,其实大家会发现这个flink的这种处理流数据的这种模式,是不是天生就应该跟卡夫卡非常的配啊,它俩是天生是一对儿对吧?啊,所以其实大家在官网上也能发现弗link跟卡夫卡的对接是做的最好的,而且也是做的最方便的啊,那当然了,就是我们就会有官方的连接器对吧?所以这里边我们直接把那个官方的连接器先引入,是不是要在泡门店里边引依赖了啊,所以大家看一下我们这里边引入的是flink connector卡夫卡。然后这里边这个卡夫卡对应的那个client的版本是0.11,然后后面这个skyla的版本2.11,本身这个flink连接器的版本1.7.2啊连连接到我们这个flink的版本是这这个1.7.2,所以连接器的版本也一样,对吧?然后这里边。
03:15
我把这个复制过来,大家已经知道他干什么事儿的话,这个我们就copy就好了。我们把它引入。引进来之后,那我们就准备去用它了,用之前大家想我们用卡不卡的时候,往往一开始先得去干什么呀。是不是先得有一堆那个卡夫卡的配置项先得配好啊,哎,所以这里边我们还是啊,就是按照大家习惯的那种方式,先定义一个proper,在之前也是这样,对不对,用一个proper对对吧,先用出来直接用那个Java u下面的那个properties就可以,然后接下来是不是properties去set proper啊啊一个P一个value对不对?呃,常见的,首先你肯定要boot bootp.service。
04:12
呃,然后我这里边的话,大家根据自己的情况来配啊,我这里边就是local host9092了对吧?呃,然后下边啊,那当然这个我就可以不不用详细去配了啊,大家肯定就知道,比方说有这个group ID啊对吧,有这个P的它的那个呃,反序反序列化的那个工具啊对吧?呃,各种各样的那个value的反序列化工具啊,这些大家看到在文档里边都已经写出来了啊,这里我就不一个一个详细去写了,大家大家可以自己再手敲一遍啊,后面我们还指定了group ID,然后这个呃,这个这个key的decisionr value的decisionr,另外还指定了一个all to offset reset对吧,这是指的是我们那个偏移量的呃,自动重置哎,我们用什么区别自动重置呢?啊,最近的那个latest对吧,这都是常规的一些配置,我把它复制过来。
05:13
嗯。好,复制过来。已经配完了之后,接下来就是真正的创建我们这里对应的这个流了,定义一个stream到几了三是吧,STREAM3还是env,然后点。啊,大家会想到这里边我们得去怎么样去创建呢?之前我们都是from什么,或者read text file,对吧?诶这里边给大家介绍一个更加通用的方法。怎么样去创建一个sce呢?更加通用的方式是叫做对ADDS对吧,添加一个sce,这就是一个通用的啊创建source的方法,那大家看这个ADD source里边要传什么东西呢?
06:02
大家看,这里边要传一个source function。呃,所以大家看就是当然了,就是说大家会想到这是一个函数对不对,所以我们可以直接真的是定义一个函数。也可以是定义一个什么。这个source function,它名字叫s source function,对吧,大家能想到这是不是一个类啊,对吧?啊,这相当于是一个这个我们点进去就会看到这是一个接口,对不对,我们相当于得去自定义一个类去实现这个接口啊,所以这是这个flink里边编程的一个一个风格啊,所以大家会看到这里边我们要不就相当于我是不是得去new一个s function,我自己去写了。哎,要不我们有更简单的方式,就是官方已经给我们提供了连接器,它是不是应该给我们写了这样一个S方式啊,哎,所以这里边我们直接去一个。
07:02
Flink卡夫卡,诶大家看卡夫卡,我现在要去用一个什么呢?这个连接器大家看到里边可以有consumer,可以有producer,对吧?这里边我要的是consumer还是producer,对,我这里边作为原我这里边添加的这一个组件,是不是要去消费卡夫卡里面的数据啊,诶所以我这里边其实应该是一个消费者对吧?所以这里边是卡夫卡consumer,诶这里边我们用这个,大家看这个,呃,0.11的这个client版本,它里边0901 0011都有,我们用这个011对吧。然后这里边给一个泛型的话,我们这里边其实是序列化之后的那个状态,一般就直接string就好,这个最方便啊,给他一个string。呃,大家看这里边得传参数呢,大家看一眼传什么?呃,要传一个topic,然后要传值的那个反反序列化的那个工具,对不对,然后还有我们的配置项传进去对吧?哎,所以这个就是我们该有的都已经有了,我们直接往里传就好了,至于topic的话,我们随便定义一个吧,比方说现在我们这个传感器,我们就叫sensor吧,这个对吧,Topic就叫sensor,然后后边那个反序列优化的工具,因为现在我们定义的那个泛型都是string嘛,所以我是不是直接定义一个的那个呃码就可以了,对吧,有那个simple string。
08:36
STEM,对,把它引入,这是flink里边给我们实现的啊,然后后边把定义好的properties传进去,所以大家看如果除了前边的这个property的定义的话,是不是就这么一行就搞定了呀,啊,只不过就是这里面参数稍微有点复杂而已,别的好像没有太太难的东西。好,那我们现在来测一下吧,呃,当然了,这里边at s之后,下面我们还是去。
09:03
把它print一下啊。我把这个应该。放到后边来,这个改成STREAM3。打印输出看看它的结果,呃,这里边我们要去测试的话,我是不是首先得把那个卡夫卡里起起来啊,这个大家就是常规操作,自己去起就好了,我这里边。开一个新的窗口,看一眼这个卡斯卡下面去啊。呃,我先看一眼啊,大家看我现在的那个卡夫卡提起来了对吧?啊,所以我这里边就不用再去重新启动卡夫卡了,大家如果要是没起来的话啊,该怎么起怎么起,这个大家之前都是学过的啊,这个也很简单,那这里边我在这里把卡夫卡已经起起来的话,我这里边是不是就可以启动这个代码了?呃,那我们如果要做测试的话,还应该还缺什么东西,对,是不是还得去生产数据啊,那我是不是得在卡夫卡这里边要去,对,要去创建一个生产者对吧?所以并下边我们卡夫卡console producer对吧?诶,这里边要给那个broke list,我的local host9092。
10:30
Topic,我们的topic叫叫3S对吧,所以这里边给一个三。然后接下来我们就可以一条一条数据在这里发了,对不对,好,我们接下来来测试一下啊。呃,这个我们打开这个直接一条一条复制吧,我们把这个就是窗口左右这个平铺开,大家在这里看的更明显一点啊好,传输一条数据,大家看这里是不是就输出了一条数据对吧?然后这里边我们一条一条传,它这里边就是一条一条读对不对?呃,来一条处理一条,来一条处理一条,这跟我们当时用那个socket发送,呃,这个文本流是不是一样的呀?啊,所以这个就是都是这种流式的处理方式,大家直观的一看就知道它是怎么回事了啊这里边既然提到了这个卡夫卡多跟大家就说一句,大家想一想,这个跟我们在Spark里边,Spark也可以,呃对接卡夫卡做处理对不对,从卡夫卡里边读取数据,那么我们在这里边跟flink里边,跟Spark里边。
11:40
跟卡夫卡的这个连接有什么区别呢?当然了,大家可能都是调用相关的一些这个组件,我直接把它这个呃连接上,把数据读进来就就好了,对吧?呃,这个大家是能直接想到的,我们这里边其实要考虑的并不是他怎么样能把数读进来,这个对于我们来说太简单了。
12:01
我们要考虑的是比较高端的问题,比如说哎,就是我们说的,在整个分布式的这个系统里边,我们得保证它的一致性,对吧,得保证容错性,比方说我们整个这个处理过程当中,假如后续的有些步骤后面给挂了呢。化了之后怎么办?大家想一下,如果说前边我们对于这个卡夫卡这里边没有操作的话,我是不是只要原这里边把它数据一读进来,它那边的偏移量就已经改了,这个数据就消费掉了,对吧,那后边如果再出现问题挂掉的话。呃,正常来讲大家会想到我我挂掉的话,我这就涉及到我们那个checkpoint的检查点的机制了,对不对,我应该得有一个,这个相当于时不时存个盘对吧?我不要像我们编辑那个,呃,这个word文档的时候,大家肯定之前有过类似惨痛的经历啊,特别是这个电脑不给力的时候,呃,就是那个忘记保存对吧,敲了一大段的那个文档,然后忘记保存,电脑一下子给卡死,直接给关掉了,然后发现什么东西都丢了啊,这个是特别郁闷,特别特别是有同学可能写那个什么毕业论文的时候,出现这种情况,那是最抓狂了哈,呃,那么大家自然就会想到很好的一种方式,就是你时不时存一下嘛,对吧,CTRLS这个又不花什么时间,所以说。
13:27
在这种大数据的处理框架里边呢,同样为了保证我们这个,呃,容错性,容错一致性的保证,那么都在里边加入了checkpoint Spark里边也有checkpoint,对不对,这大家其实并不陌生,所以大家可以简单的认为它就是一个一个存盘的一个机制,对吧,就时不时你去存个盘,但是大家想一想,我现在是从这个卡夫卡里边去读数据,我在这个过程当中是不是相当于。读一条数据,它的这个偏移量就得更改一次啊,假如说我们用Spark在做处理的时候,卡夫卡读进来数据了,对吧?然后Spark的处理方式是一批一批的读,对不对啊,一批去处理,一批去处理,然后在这个过程当中,我们有检查点checkpoint,那checkpoint是不是也是一批一批的呀,对吧?啊,你要不就是这一批全全部成功搞定存盘了,要不就是如果要是中间挂了的话,是不是相当于全部重来,重新来过啊,哎,那中间这一部分就全部,呃就就给他全部回滚了,相当于那大家有没有考虑一个问题,如果我们从卡夫卡读取数据的话。
14:36
下一波这个数据已经来了,我已经从卡夫卡是不是已经读进来了呀,那卡夫卡那边的偏移量改了没有,已经改了对不对,然后我后边再做处理,处理的时候,哎,我在做计算做转换,对吧,最后再做这个结果的分析输出,这个过程当中挂。挂了之后,我现在是不是得回滚啊,回到上一次那个我checkpoint检查点的那个位置,然后重新开始,再去重新读取数据,处理数据。但是大家想。
15:08
本身我当时卡夫卡的那个偏移量是不是已经改变过了呀,他认为那个数据你已经消费过了,而我们现在是回是不是回滚到之前的那个状态了。我们是不是还想让卡夫卡再把那个数据再让我们消费一遍啊,诶,那这个过程怎么办?啊,不认识啊,当然就是如果说我们是那个,呃,就是不去改它的偏移量的话,是不是相当于这个我们就没有办法重复消费了,哎,你就只能是那段数据就丢掉了,对不对?这个从状态一一致性上来讲,大家想是不是就有了数据的丢失啊对,这当然最后你的结果就不对了,呃,所以状态一致性这个就不能叫我们,我们后面不是说那个exactly once嘛,对吧,你这个显然就不能叫exactly one。那Spark是怎么保证这个问题的呢?怎么去处理这个问题的呢?啊,其实也比较简单,就是我们在这个Spark把这个处理,就是整个处理完成之后,呃,就是你加一个机制,就是说不要让他消费的时候,就直接把这个这个这个偏移量就更改,我等到最后整个操作完成的时候,再去把那个偏移量做更改啊这是一种方式,另外一种方式就是我可以怎么样呢,可以在。
16:32
恢复之前的那个状态的时候,哎,就是前面我存那个保存点的时候,是不是应该把之前我进来的那些数据应该是都要去存下的呀,然后存下来之后,我应该恢复的时候,再把之前卡斯卡的那个偏移量是不是要手动的更改,重新提交一次。然后相当于哎,我就告诉卡夫卡,我重新再消费一遍数据重新来过嘛,对吧?啊,这相当于是我们在Spark里边可以去做的这个这个方式啊,就是呃,其实不管是怎么去处理吧,你都得去手动的去提交偏移量,手动写程序去处理处理卡夫卡的偏移量。
17:15
啊,这是我们Spark里边对状态一致性的一个保证。那大家可以看到在FNK里边呢,就不一样啊,这这里大家可以看到flink里边为什么它跟Spark可以不一样呢?就是因为首先flink不是像Spark那样一批一批读的,Flink本身就是就是一条一条读取的,然后另外还有一个非常好的特点,就是flink本身它是有状态的流处理,对不对,在这个过程当中它可以保存状态,所以flink可以把我们卡夫卡当前的那个偏移量。作为他的一个状态保存下来。
18:00
诶,所以言下之意就是什么呢,等到到时候你去做恢复的时候。是不是要我也有检查点机制对吧,要从后面处理的时候过程当中给挂了,我要从之前存盘的那个地方重新读取,重新恢复,那是不是相当于就又可以把它的那个当时的那个偏移量恢复出来啊,然后呢,本身flink卡夫卡的这个连接器给我们也实现了,就相当于类似于我们手动又重新提交了一下那个卡夫卡的偏移量,对不对,把当时它保存的那个偏移量重新再提交一遍,所以这个过程相当于我们是不是什么东西都不用做。在这里就是弗link自动跟卡夫卡连接的时候,它自动给我们保证了在这个过程当中的状态一致性,这就是非常舒服的一点。如果要是Spark的话,这个状态一致性的保证,对Spark是能保证状态一致性,但是我们跟卡夫卡连接的时候得去手动的做flink的话,相当于我们什么都不用管了,呃,这就是flink比较比较好的一点啊。
我来说两句