00:00
那大家其实想到了啊,在我们真正的流失处理啊,要做这个flink计算的过程当中,数据源其实不会直接放在集合里边,也一般不会放在文件里边,因为如果这这样去做的话,这个数据就已经不是实时来的了,就已经收集好了,对吧?那这已经是一个有限流,或者说就已经类似于一个批处理了,那真正的流处理应该什么样呢?诶,对,那是我们之前就是像我们之前做那个我count的时候,是不是用NC起一个,然后发送这个流失数据啊,啊,那大家会想到实际应用场景里边,我们的数据不可能是NC产生的,这也是一个测试,对吧,流逝的测试,那真实场景就应该是。对,我们先从数据日志里边提取,呃,这个采集,然后比方说有啊,有其他的那些采集工具,然后是不是先扔到卡普卡里啊,然后接下来弗link应该跟卡夫卡连接,去读取卡夫卡里面的数据,这才是我们真正去消费数据的一个过程,对吧?啊,那那接下来我们就要看一看怎么样从卡夫卡里边去读取数据了,这一部分其实是。
01:09
大家必须要了解的一部分内容啊,因为在实际的生产环境里边,往往我们的数据源就是卡夫卡,所以我还是完整的写出来,Source test3,当前是卡夫卡作为数据源,诶那前面这个其实我都不想完整的写了,大家想这个过程是不是都一样啊,对吧?还是那方法里边啊,先创建执行环境啊,然后我可以把这个全区病毒设成一,后边呢,读取数据,然后打印输出,然后去这个。最后seq执行起来啊,这个过程其实都是完全一样的啊,那当然这里面读取数据的话,我就不是。对,这个就不是text file了,那接下来我应该怎么样呢?现在接下来是不是我必须要去读取卡夫卡里面数据啊,那大家可能想,那我能不能直接read卡夫卡呢?哎,没有对吧,那有没有卡夫卡相关的东西呢?没有也没有对吧?因为哎对大家想卡夫卡这是一个外部组件啊,外部的这个数据处理的工具,对吧?你如果要是flink本身要跟它做连接的话,那你是不是要应该有一个对应的这个连接工具啊,啊,对吧,相当于你要有跟这个卡夫卡要要连接起来,然后才能去做相关的一些操作嘛,那所以接下来我必须要引入一个对一个另外的依赖,那就是flink官方给我们提供的flink connector卡夫卡,也就是说flink跟卡卡的一个官方连接器,然后后边0.11这个是卡普卡的版本,后边这个下划线2.12这是。
02:47
对,这是skyla的版本,哎,大家想为什么有skyla,那因为弗林可是不是本身就因为里边我们说有阿卡对吧,本身它就依赖skyla嘛啊,所以这里边还需要有这个skyla的版本啊,连接器也是要求有这个skyla版本的,另外下边本身连接器的version,它的版本一点十点一,那这个版本是跟当前我们flink的版本完全一致,对吧?因为它这个本身就是官方连接器给我,官方给我们提供的连接器嘛,啊,所以这个。
03:17
跟这个当前的版本完全一致,这个也是很容易想到的啊,啊,那这里边我就不直接把它凹to import了,我直接刷新一下啊,把当前这个引入大家看多了一个连接器的依赖对吧?啊,那接下来我们就要去做一个操作啊,那大家可能会想到,那接下来是不是就有卡夫卡了呢?也没有,因为他知道我现在是调env,就是执行环境的这个方法对吧?这个方法是不是在弗林克底层都已经定义好的呀,那没有就是没有嘛,那你这个不能直接去调,那怎么办呢?烟V给我们提供了一个非常通用的创建数据源的方法,叫做ADD source,大家看就是直接添加一个数据源,对不对,A source,然后这个a source就比较特殊一点,大家看它里边的传参啊,要传的是一个什么参数呢?是一个source function。
04:14
也就是说这又是一个大家看类似于一个类对吧,要实现的一个接口对不对,Interface我自己又得实现这样一个接口,那所以现在我如果要是实现这个接口,其实大家知道这个接口里面要干什么呀,哎,这大家看这里边是不是就得定义,大家看有一个wrong方法,那是不是就得相当于我得从外部去读取数据,然后要把这个数据相当于发送出来,接下来我在这个flink内部就可以使用了呀,啊,那所以这个S方式主要就是干这些事儿啊啊,那接下来我这个卡夫卡里边,难道说这个我得自己自己去定义吗?啊,我得拗一个呃,Class的对象对吧,实现这个接口吗?哎,不用啊,有连接器嘛,对吧,连接器帮我们直接实定实现了,那这个连接器里边我们要引入的就是一个,诶大家看flink卡夫卡consumer对吧,当然这里边有这个010,呃01109010BASE,那这里边其实它是有继承关系的,我们当前当然。
05:14
版本就是011对吧?啊,就这个大家会看到啊,里边零幺幺点进去啊,它继承自010010又继承自0909继承自贝斯,贝斯继承什么,大家看这个。这是一个source function对吧?啊,这是一个source function啊,啊,这里边这个source function它比较特殊,它是一个rich parallel source function,然后rich大家知道是什么意思,富有富予对吧?所以它是一个rich source function,就是这是s function的一个有版本,一个加强版本对吧?啊,所以它相当于也是可以实现s function的source function的所有这个需求的啊那另外还有一个parallel parallel意思就是并行对吧,当前这个读取数据可以并行读取,因为大家知道卡夫卡那边我们可以设置,可以设置分区对吧?哎,直接是可以做这个并行读取的啊啊那所以接下来我们这里边其实就是要实现这么一个东西啊,那大家想这consumer为什么consumer呢?
06:21
这个对吗?对消费者,因为是不是卡夫卡那边应该是有数据,然后我们要消费它里边的某一个主题的数据啊,啊,那所以里边要传的参数,这个也就非常简单了,大家看最简单的传三个参数,是不是要有一个string,大家知道这个string是啥吗?是不是要有一个消费的主题啊,啊,另外后面是不是要有一个value的反序列化的这样的一个工具,对吧?呃,Disriizerization stemme,另外还有具体的一些property啊,所以接下来其实我要做的其实就是给一个当前的topic,比方说我当前是sensor嘛,对吧,传感器我就叫sensor,然后后边来一个这个sche码,那大家知道我现在value都是string,都是字符串,对吧?按照字符串读进来就可以了,我new一个simple string STEM这个就可以了,对吧,然后后边呢,还要有一个property,大家知道这个property其实最关键的是要要定义什么呀,你有一个property大家其实知道,这其实就是最关键就要有。
07:21
那个卡普卡相关的连接对不对,对吧?所以这里边啊,我直接就是properties.set proper最重要的是不是就应该有一个put stra对点service,然后这里边我给一个值,这就是一个K一个值对吧?Local host9092啊,当然大家可以还可以去做一些其他的配置,比方说我这里边列了很多消费者组合对吧?还有这个键和值的这个decisionizer啊,序列化工具,其实这里面我们指定之后,这个就不需要了,对吧?啊,另外还可以指定比方说这个,呃,这个偏移量啊,Reset的那个原则可以用这个latest对吧?最近一次其实这个也不需要指定啊,大家如果感兴趣的话,可以把这个都copy过来,请a no。
08:07
其实这里边这些都不重要啊,都不需要必须拿过来啊,所以这样就直接把它搞定了对吧?好,那接下来我们来做一个测试,大家看一下这个效果怎么样,要做测试那是不是必须要把卡夫卡要先提起来呀,对吧?哎,那那这里边我首先啊到兔子下边我先去起一下这个look keepper,对吧。呃,这这这个啊,我是先去CD到look keepper下边,然后调,呃,我要去直接执行当前并下边的z k server start对吧,把这个ZK先记起来,呃,然后我到卡夫卡的目录下边去启动卡夫卡,哎,那就是卡夫卡server start对吧,然后我这个可以静默执行啊,这个demon。
09:04
后台执行,然后对应的还有那个config serve properties对吧?配置文件指定出来,然后我们看一下当前已经提起来了对不对?然后接下来我是不是就可以去运,诶当前这个还报错啊,我们看一下哦,Property是不是没传进来啊,然后后边还有这个分号没写对吧?那接下来是不是就可以运行当前这个代码了对吧?这里边就可以消费这个,但是我现在还没有那个主题,但是大家知道这个,呃,没有的话也可以自动创建对吧?这个我们不需要去单独要要去指定啊,我可以把这个代码直接提起来,然后接下来我这边要消费数据,是不是还得有一个生产者啊,对吧?所以B卡夫卡console producer啊,这再来一个这个啊啊,然后这里边杠杠broker list local host 9092,然后杠杠topic,我们那个叫sensor,大家还记得对吧?哎,把这个创建出来。
10:04
然后接下来是不是就可以一条数据一条数据去发送了,诶这里边我们有这个数据嘛,对吧,这样啊,我把它做一个分屏显示,大家可以看的稍微的舒服一点啊,这里边我们一条一条数据一条数据copy过来。大家看这里面是不是就读取到了,这就是真正消费卡发里边数据,来一条读一条,输出一条对吧?这里边我们没有涉及到数据的计算和转换啊,那如果说中间我们有转换的话,是不是也应该可以得到最终的结果啊,只要能消费到数据,我们就已经成功了,对吧?连接起来了,这就是读取卡夫卡里边的数据。
我来说两句