00:00
好,我们现在已经知道了怎么样从集合里边读取数据啊,另外也知道了怎么样从文件里边读取数据,那其实大家会发现这种方式更多的就是相当于读取了一个有界流,读完了之后这个直接就结束了,退出了,那我们就会想到实际运行的过程当中,不是说这个流式处理更多的场景,它这个数据应该是有头没尾,无休无止的吗?啊对吧,我这里边这个应应用啊,我就跑起来之后,这个任务应该是一直在这里运行,不停的等待新的数据进来,应该是这样一个场景才对,这才是真正的流式处理嘛。啊,那能不能做这样的一个,呃,数据源的读取呢?当然是可以的,之前我们其实已经做过一个简单的测试,就是ET文本流,但是呢,K的文本流这个大家发现有呃不太尽如人意的地方,就是它的这个并行度你只能设成一对吧,啊就是这这里边就是默认它只能是一,那这样的话,我们在做做这个转换,做这这个操作的时候,可能就会有各种各样的限制了,那什么样的。
01:09
这个数据源是跟这个fli这种并行的分布式的大数据框架,流失处理大数据框架更加契合,更加能配合操作的数据源呢,啊,当然就是前面我们接一个卡夫卡对吧?其实前面大家也想到了,就是如果说出现这种数据量非常大的时候,你前面其实是需要有一个这个消息队列作为一个蓄水池的啊,要不然的话会导致我们后边这个呃,累积的数据越来越大,那个被压就产生了,对吧,最终我们都是放在前面应该有所缓冲的,卡夫卡就是一个最好的选择。而且我们会发现卡夫卡本身也是一个流式的处理数据,消费数据对吧?呃,就是这样的一个消息队列,它这样的一个数据存储的工具,所以跟这个flink本身的流失处理的方式来讲是完美契合,就可以说他俩是天生一对啊,那接下来大家就看一看怎么样从卡夫卡里边去读取数据啊,接下来我们第三种情况啊,从卡夫卡中读取数据,那这里面我还是定义一个STREAM3。
02:23
呃,那那当然了,这里面STREAM3我还是肯定是基于env对吧,大家发现第一步我们做source任务的时候,那一定是基于环境去调用一个方法把这个圆任务定义出来,那那前面我们定义的时候,你要不就直接from collection对吧,你要不就直接read什么什么东西,那你现在要从这个卡夫卡里面读取,大家可能想到了,那有没有from卡夫卡呢?没有,有没有直接这个read卡卡呢?也没有。啊,其实我们想到,因为你现在既然是一个外部的连接工具,对吧?呃,你是相关的一个这个数据存储工具嘛,这跟弗link本身是跟弗Li克自身啊,这个整个的一个框架应该本身是没有必然联系的,那弗Li克自己当然不会带着卡夫卡相关的这个支持啊,这这些依赖对吧,本身是没有的,所以这里边如果我们用到想要跟卡夫卡做连接的话,那需要导入新的依赖啊,那这个依赖我们就叫做flink卡夫卡的连接器,本身这个连接器呢,Flink官方已经给我们提供了啊,大家看到在泡文件里边,我应该要把这个flink connector卡夫卡把这个要引入这里边,我们后面大家看到就是卡夫卡的版本是0.11,然后呢,下划线2.11这个指的就是scla对应的版本了,那最后这是当前这个组件连接器啊,这个connect的版本是跟flink的版本要。
03:54
要一致,这里边我们用的是一点十点零,所以接下来我们首先把这个,呃,大家看这是flink官方提供的,对吧?呃,这个group ID是阿帕奇link,所以我们把这一部分直接在po文件里边先引入。
04:08
接下来我们就希望。直接用官方的连接器提供的一些,诶这个读取SS的这种连接方式,我们把它从卡夫卡里边啊连接上之后读数就完事了,对吧?啊,那这里边我后边这个就是STREAM3啊,那这里边我们如果要是想要去创建这个东西的话,那应该怎么去用呢?这里边要给大家隆重推出哈,就是关于我们这里边数据源定义的一个一般化的方法。就前面我们讲的这个from collection也好啊,From elements,这个read file这些东西其实都是一个特殊化的方法,对吧?大家一看这个名字,包括我们前面那个socket text file,其实它都是一看名字你就知道它当前这个源就已经限定死了,你这就是从集合来的,对吧?你这个就是从一个文本文件来的,或者说就是从一个soet文本流里边读进来的,而我们现在呢,一般化的配置这个输入原读取原文原数据这个任务,这个操作应该用什么算子呢?这里边就叫做ADD source。
05:16
添加一个圆对吧,字面上理解就是添加一个圆,那这个就是一般化了,比较一般化了,你并不知道这是一个什么圆,从哪里添加,呃,连接什么样的外部系统,这个都都不明确,你可以去自定义,所以这其实里面要传什么呢?点进来看一下source啊,我们到源码里面看一下,它里边传的,大家看其实就是要传一个方式。然后这个方式是什么呢?就是基于一个S的上下文,这个这是一个拉姆达表达式,对吧?后面这是拉姆达表达式,就是一个S上下文作为输入,把这个上下文传进来,然后得,呃,这里边是一个unit,没有返回值的一个输出,哎,那大家可能会想到,你如果要是没有返回值的输出,我怎么样能够把这个数据能读出来呢?
06:06
哎,这个这个没关系,那肯定就是在里边我们得需要有这样的生成数据,用上下文对吧,把这个数据发出来,然后收集到的这种方法,那最终得到的啊,这个at source这个方法得到的就是一个data stream了啊,这就是我们说的基于执行环境流处理的执行环境,经过添加源之后得到data stream,后续我们做的就全都是data stream的转换了啊,其实前面都是一样,对吧,前面这个无无论是这个text file也好,哎,这个得到一个data stream,或者说前面我们from collection也好,都是得到一个流数据流data stream,现在也是一样,这是一个一般化的操作啊,那这里边大家会发现就是我可以传一个直接传一个函数进来,匿名函数,但这个匿名函数很难自己很难实现,对吧,我怎么知道这里边我到底该怎么定义呢?哎,这里边还有另外一种a source的实现方式,大家可以看到这种实现方式叫做。
07:07
错。上面这个啊,就是也是传一个方式,但这个方式就不是直接写一个匿名函数进来了啊,大家看这里边有这个呃,重载方法对吧?这里边就是你传不同的参数都可以去实现这个a source这个方法的调用,那这里边的这个参数就变成了一个,大家看它的名字叫做source function,但事实上这是一个,这是一个Java接口对吧?啊,这是一个interface,所以这里边其实我们是需要传一个实现了这个source function接口的一个类。也就是说这里边相当于我们要传的是一个类似于函数功能的一个函数类,对吧?啊,在在这个flink里边有这样的一个概念叫做函数类,后面还会还会给大家展开再去做详细讲解啊,这里我们先接触一下,那那所以说你像这样的一个source function,它接口里面到底又是什么东西呢?大家点到这个源码里边来看一眼啊,就会发现它里边其实最核心的就是首先有一个source context。
08:12
大家还记得前面那个拉姆达表达式的那个function是要是要呃怎么样去处理吗?是不是就是要传一个source context作为参数进来,然后去做处理,对吧?哎,那同样这个接口啊,我们的这个类里边它就会有一个,呃,大家可以认为你相当于得有一个嵌套子类了啊,诶,也得有一个这个south context,这是必须要有的,然后另外呢,你必须要去实现的方法有两个,一个叫做cancel,一个叫做。啊,这个很明确了,既然我们是数据源嘛,大家就想到了你wrong,当然就是源源不断的读取数据啊,源源不断的产生,那我们这边就相当于是有头没尾水流一样,一直在产生数据了,那这里边如果说我们手动要把它取消,那怎么办呢?诶有一个cancel方法,取消的时候调用这个cancel方法,接下来我们就不发数停止不就完了吗?诶这就是关于这个source操作它的底层的一个原理,所以接下来我们这里边就相当于是你要不自己去写一个呃,拉姆达表达式匿名函数,要不去实现一个这样的S方式,实现这个接口,写一个类。
09:20
啊,这个听起来好像很复杂,但是实际上没那么麻烦啊,因为我们现在是跟卡夫卡做连接,我们引入它的连接器之后呢,哎,天生一对嘛,那当然连接器里边帮我们把这个S方就实现了啊,所以这里边我们不会去写拉姆达表达式,当然是去拗一个已经帮我们实现好的那个类的实例就完事了。好,那这里边我们要实现的这个类是什么呢?叫做大家看啊,叫做flink,卡夫卡这里边它会有这个consumer,也会有producer。大家可以想象一下,我们这里边要的是consumer还是producer呢?就是我在这个里边要有一个类啊,这个类看起来它是一个卡夫卡的,呃,反正我们这里边是连接器帮我们实现的嘛,看起来要不是一个消费者,要不是一个生产者,那我们这里边在。
10:13
当前啊,我们在这个原任务这个里边要去读取数据,我这里边是消费者还是生产者呢。诶,可能有同学说,诶应该是生产者啊,呃,就是当前我们这里边,你你这不应该产生数据,然后当前这个弗林去做处理吗。我们可以想到,如果说这个时候我们用生产者的话,他其实应该是在这里边就就要直接去啊,就是用这个,呃,相当于在卡夫卡里边要要去生产出产生出数据来,对吧,我们现在这个SS任务,它是要自己产生数据吗?其实不是,我们的要求是他去连接外部的卡夫卡系统,把卡夫卡那边产生出来的数据读进来,然后进入接下来的处理流程里边,让弗林肯去做操作,对吧?所以他的任务其实是要读卡夫卡的数据。
11:04
那读卡夫卡的数据当然就是一个消费者了,对吧?所以这里边其实我们要引入的是卡夫卡consumer啊flink卡夫卡斯consumer这里面大家看到我们引入的那个版本本来是0.11对吧?所以它里边给我们提供的呢,有零九,有010,有011,还有一个base斯,哎,那我们到底引入哪个呢?你既然是011版本呢,那当那当然是引入CONSUMER011了,你点进去会发现啊,它其实就继承自010对吧?啊,然后010呢,就继承自零九,零九就继承自base斯啊,其实就是这么一层一层来的啊,那base斯又是又是个什么东西呢?大家看base斯就实现了一个啊,当然这又是继承了啊,它是继承了一个rich parallel。Source function啊,那rich大家想这是rich嘛,是富,富有的对吧,相当于它的功能会多一点,然后parallel,这是一个并行的对吧?富有的并行的这都算是一个修饰吧,最后的本质它是一个source function对吧?那这本身它就是一个source function,那所以这个本身这是一个抽象类,大家点进来的话会发现这个抽象类它继承自这个抽象的rich function,另外还实现了TEL source function这个接口,那这个接口大家知道并行的这个s function嘛,当然它继承了source function这个接口,对吧?对,这个继承和实现的关系就是这样一层一层来的,所以大家会发现我们这里边的底层都已经掉进来的是Java代码了,对吧?大家看这个interface,这是只有Java里面才有的嘛,Skyla里面没有interface的定义啊,所以我们也可以看到啊,在这个,呃,Skyla的这个。
12:51
我们用skyla写flink代码的过程当中,有些时候会发现它的底层源码其实是抓va代码啊,所以大家一定要小心,一定要注意有些时候引包的时候到底应该怎么做操作啊,另外还要注意就是这里边我们在用的时候,如果这是一个这个这个skyla的,呃,如果这这是一个Java类的话,我们当然是要去new它的一个实例,对吧?啊,那skyla里边有时候有半生对象的话,我们是直接不加new的啊,所以这里边遇到这种语法的区分,大家也稍微的留意一下,回回忆一下之前的这个语法的知识点啊,然后这里边有泛型啊,这里边这个泛型应该是什么呢?诶,那大家知道这个我直接去消费数据读进来最简单的方式是不是就应该是string啊对吧?直接啊,直接把这个读成一个字符串读进来就完事了嘛,然后接下来我们可以看一下里边可以去啊,就是给它呃,实现的这些调用的这些构造方法啊来看。
13:51
看这个构造方方法,传参有不同的这种,呃,有不同的重载实现啊,传参有各种不同的方式,我们这里边最简单的方式是什么呢?诶我们一眼就可以看到,最简单的方式,你可以传一个string,然后呢,诶传一个就所谓的这一个反序列化的一个scheme嘛,对吧?另外还有一个property,就直接传这么三个参数就可以了,那前面这个string是个啥呢?
14:17
这个点进去你看哦,这是个topic嘛,对吧,你要你要消费数据,当然你得定义当前到底是什么样的主题啊,然后后边这个stemma,这就是做反序列化的时候值的这个反序列化器,对吧,Value的Dis dizer,然后后面还有一个当前你进行的这个consumer的一些配置项啊,那所以基本上就是这些要求啊,那我们就直接把它写出来吧,当前的这个topic我就定义成叫sens好了,Senser啊,然后后边这个序列化的反序列化器,这个其实我既然是按照string把它读进来嘛,那这个其实就我就直接把它当成string好了,对吧,这个就非常简单啊,那这里边我们就simple string,直接用一个这个simple string string STEM。
15:06
直接把它转换就完事儿了,然后另外呢,还需要有它的配置项,那这个配置项我们能想到最主要的其实应该要什么呀。啊,你得定义那个不STEM server嘛,啊,这个是肯定需要需要定义的,对吧,那这里边我们就给一个proper pro,呃,Properties这个你有一个,因为这里边大家知道这是Java代码里边要传的这个proper,大家看它是不是也是要一个Java的properties啊啊,对吧?啊,所以这里边我们当然是要new一个va的properties。就是Java u下边的这个properties,然后里边塞啊,我们做这个set property,塞里边的这些属性就好了啊,最主要的就是boot strap service。
16:00
它的值,哎,当前我就直接在本机啊,所有的我都起在本机,所以我直接给一个local host9092,大家如果要是起在自己不同的这个Linux虚机上的话,你把对应的那个主机名写进来就可以了,端口号写好对吧?啊然后另外大家可能想到还有一些呃,就是可以做的一些配置,比方说呃,那我可以给这个消费者组对吧,Group ID啊啊,那这些东西就是你可以去做设置,也可以不做设置啊,我定义叫做consumer group,这个都是可以的。那另外就是大家看到文档里边还可以做一些,比方说我们这里边,呃,直接定义当前这个呃,就是这个呃,Key和value的decisionizer对吧?啊那这个地方其实你不需要去重复定义,因为为什么呢?我们在调这个构造方法的时候,你相当于这里把那个value的那个letter已经定义好了嘛,啊,我们这现在最关心的其实就是value的对吧,K其实不太涉及到。
17:00
啊,那这里边你就不用管也是可以的,那呃,后边还有就是你还可以定义当前这个,呃,Auto offset reset对吧,我们偏移量的那个,呃,重置的那个方式,我们这里边给一个latest,这个都是可以的啊,这个其实并不是特别的重要,为什么呢?后边我们给大家讲到就是状态一致性的时候啊,大家会发现就是在底层啊,卡夫卡的这个连接器,弗林克卡夫卡的连接器,它其实是会自动给我们维护这个偏移量,然后去啊做做提交的啊,所以这个其实我们也一般并不太关心,这里面最核心的,你把这两个实现就完事了,后边把这个property。参数传进来啊,大家看就这么简单对吧,那现在我们就来测试一下啦,看看这个效果怎么样,要测试的话,首先我应该把这个卡夫卡要提起来对吧?啊,那首先这里边我们先看一眼啊,我这里边没有,没有的话挨个起吧,呃,我先把这个look先提起来啊z k server。
18:04
Start启动啊,然后接下来把这个卢黑写起来之后,我去起一下卡夫卡,呃,那这个是要用卡夫卡server对吧,Server start啊,然后我们一般情况直接用这个,呃,加一个这个demon,直接静默启动后台就可以了,然后把这个config下边的server的properties配置文件传进来啊,然后接下来我们看一下这个卡卡提起来了是吧?然后接下来我们当然就是啊呃,有同学说先创建这个topic啊,有其实我们不不创建topic,你直接创建producer也是可以的,对吧?啊,这个都都会自动去创建啊,所以我们就简单一点了,快速做一个测试,直接用卡夫卡producer,哎,这个啊。我们用这个conso卡夫卡console producer啊,然后后边需要指定当前的broke list啊,Local host9092对吧,然后指定当前的topic,这个topic大家要注意,肯定是要跟你代码里边要消费的这个to一致,哎,我们知道里边叫三,那把这个加上现在创建这样的一个。
19:24
生产者对吧?呃,这里边有了生产者,然后那我们这里边有了消费者,这不就可以消费数据了吗?然后把它打印输出,然后我们看一下运行一下,看看结果啊,看看效果到底怎么样。然后这边运行的时候,我们在生产者这边呢,等一下我们就传输这个sensor这里边的这个数据就可以了。好的,大家看这边已经提起来了啊,它并没有任何的输出,那是因为我们已经说过了啊,你一上来之后,现在是流式处理等数据输入呢,对吧,现在不是直接读文件或者说读别的东西等着呢,所以我们一条一条输一下看一看。
20:02
好一条数据341,诶大家看到这里边是不是就输出了一条341,原封不动,因为我们没有做任何的中间转换嘛,直接就输出了对吧?啊,然后接下来我们这个三四十六再来一条。哎哟,大家会看到这里边又是原封不动的输输出了,对吧?啊,这这当然你可以一条一条做一个测试啊,这里边它都会做一个输出,那这里面大家会看到我们本身。呃,没有定义并行度,所以前面呢,我还是按照并行度四来执行的,但是我们会发现啊,为什么呢?因为这里边我卡夫卡这边是非常简单的做了一个啊,这样一个单节点的启动啊,也没有做任何分区啊这样的一个操作,所以这里边我们产生的数据相当于是什么呢?就只有一个。一个这个SS任务并行的一个任务在接收当前的数据,对吧?啊,那这个大家自然也能想到,就是连接器,卡夫卡连接器这里边假如说检测到我们这这里是本身有不同的分区的话,它是可以去直接拉取不同分区的数据,然后做并行处理的啊这个是没有问题,大家如果有环境的话,可以下来之后测一下啊,啊就是这里边我们看到就是当前是只是全放到一个分区里边去做了,但是是可以有并行度的,不像之前我们那个ET文本流并行度只能是一啊这个就解决了我们之前的一个非常重要的问题,这是关于卡夫卡作为数据源,大家可以做一下测试。
我来说两句