00:00
我们现在已经了解了,在flink当中,我们可以直接调用执行环境的不同方法,就可以从不同的数据来源去读取数据啊,当然了,之前我们介绍的,不管是从元素里边还是从集合,从文件里边读取数据,这个读取出来的呢,都是有界的数据,然后把它包装成一个data stream来一个处理一个去进行计算,最终它肯定都是能够处理完的啊,是有一个结束的时间点的。那我们知道,在实际的生产应用当中,流处理面对的数据那是无休无止,所以我们要处理的应该是无界的数据,这个时候又应该调用什么样的方法呢?啊,其实最简单的方式前面我们也已经了解了,在word count里边我们就使用了socket文本流调用socket text stream这样一个方法,就可以源源不断的读取流数据了啊,但是呢,它有自己的问题啊,因为我们知道文本流这种读取方式,它的并行度默认就只能是一。
01:03
哎,所以它的吞吐量是非常小的,另外呢,它也不够稳定,稳定性比较差,所以在生产环境里边啊,不会用它,我们一般都是用它来做一个简单的流式输入的测试就可以了。那实际生产当中用什么呢?哎,这个时候我们就要隆重的介绍另外一个可以说是跟flink相辅相成完美配合的另外一个大数据组件,那就是传说中的卡夫卡。啊,我们知道卡夫卡其实是一种分布式的消息传输队列,那消息队列它的这种机制是什么呢?我们知道它其实也就是把所有的数据要按照一定的顺序排列起来,然后挨个读取,挨个处理,啊,所以这种处理方式很明显跟我们的流处理架构就是完全一致的,所以呢,弗link跟卡夫卡可以说就是天生一对,往往就是把它们放在一起来使用。很多企业现在的流处理架构就是什么样的呢?我们可以看这张图,那就是首先前端有对应的这个数据来源,采集到对应的日志数据之后,先把它放到卡卡里边进行收集和传输,然后接下来呢,就接上flink,由link从卡夫卡里边读取数据进行处理、转换、分析、计算,然后最终得到的结果再写入到其他的各种各样不同的数据库和大数据组件里面。
02:27
这种架构已经成为很多企业的首选。那我们自然就想到了,那要这么说的话,是不是我们在这里直接调一个执行环境,比方说诶,那就from卡夫卡就可以直接搞定这种方式了呢?哎,非常遗憾,直接要连接卡夫卡的时候,显然我们需要做更多的配置,不能内嵌在弗link里边做对应的连接,哎,那所以没有提供预实现的这些方法,所以我们只能调用最通用的a source来进行一个实现,那这里边呢,就需要去实现一个s function接口,传入一个它的实现类。
03:07
这种方式显然就会比较麻烦啊,如果我们自己写的话,工作量就比较大了,诶,那好在弗link跟卡夫卡他们之间的连接太过于经典了,所以flink官方给我们提供了一个连接工具,就叫做flink connector卡夫卡。然后在这个连接工具里边呢,它就给我们实现了一个消费者flink卡夫卡consumer,诶这样一个类,它就是专门用来。在link里边用来读取卡夫卡数据的SS方式,它就实现了一个S方式。啊,所以接下来呢,我们就需要引入卡夫卡连接器的相关依赖。这里我们就需要在泡文件里边添加这样的一个dependency啊,我们看到就是link connect卡夫卡,然后后边下划线跟着的是对应的scalela版本啊,那下边的这个version呢,对应的是flink的版本啊,所以我们只要把这一部分copy到po文件里面就可以。这里需要注意的是,这个是官方给我们提供的一个通用的连接器,它会自动更新当前最新版本的卡夫卡客户端啊,目前支持的是0.10.0版本以上的卡夫卡啊啊,那如果说我们有特殊需求的话,也可以指定特定版本的连接器。
04:24
而且另外还需要多说一句的是,我们这里边啊,在连接器引入之后。我们需要使用的是一个flink卡夫卡consumer,这是01:13版本里边的具体实现,那在01:14版本里边呢,呃,其实已经对这个又做了一个升级,做了一个改版,那就不叫弗link卡夫卡consumer了,而是直接叫做。卡夫卡S。诶,所以之后的话,如果我们使用01:14之后的版本啊,那引入对应的类的时候就不需要再去纠结啊,因为我们知道啊,卡夫卡里边有消费者,还有生产者,还有producer啊,我们这里边是要连接到卡夫卡去消费数据,所以引入的是一个consumer啊,那对于这个呃,对卡夫卡不熟悉的同学来讲,可能就会总会纠结啊,到底应该是consumer还是producer呢?之后就不用担心了,只要是前面读取数据源,那直接用卡夫卡S就可以了。
05:24
这是后续版本的啊,我们多说一句,那现在呢,我们还是使用弗林卡夫卡consumer,所以接下来我们首先在po文件里边。多做这样的一个引入。引入之后,哎,我们这里可以刷新一下。这里已经看到有了对应的连接器啊,那接下来呢,我们就可以新创建一个。LA的object来进行一下当前的测试,我们是卡夫卡作为数据源的测试,S卡夫卡test。下面首先问方法先写出来,那首先我们还是先来引入stream execution environment,哎,把当前的执行环境先引入get。
06:07
把它叫做,因为接下来为了方便我们测试的话,也可以直接把当前的并行度啊,统一设成全局设成一,接下来我们就是按照顺序一个一个输入,一个一个输出就可以了,然后呢,呃,我们既然是要连接到卡夫卡,肯定是要做一些卡夫卡的连接配置的啊,所以我们这里边用一个。Property。Properties对象来保存卡夫卡。连接的相关配置。啊,那这里面我们就直接一个properties。把它就出来,然后接下来下边当然就是直接给里边去做一些配置了啊,那首先我们知道连接到卡夫卡最重要的当然就是bootstrap。
07:00
点service。我们这里还是啊,直接使用哈杜102啊,这台虚拟机作为我们的呃,哈杜的这个集群的节点啊,所以我们这里就是哈杜OP1029092的默认端口啊,然后接下来proper set property。接下来啊,那当然了,还可以比方说我们指定这个group ID啊。我们就叫做consumer吧,Consumer group。当然了,我们还可以有其他的一些配置啊,比如说呃,你像我们这里边可以指定什么呢?看文档里边我们以指定当前K的R啊,Value的izer,另外我们还可以指定比方说当前这个呃,自动的偏移量的这个重置的策略啊,比方说指定是latest,这些都是可以单独去指定的,当然了,这里边我们不去指定也是可以的啊。就只要有啊,前面我们对应的这个books STEM service其实已经是足够了,那接下来要做什么事呢?诶,那就是直接烟V现在做一个a source。
08:09
里边。要创建一个当前的flink卡夫卡consumer,哎,这里面我们可以点进去看一眼。这个consumer,哎,它是扩展自flink卡夫卡consumer base啊,那这又是个什么东西呢?我们看这是一个抽象类,这个抽象类它就哎扩展自rich parallel source function啊,那前面的这一堆我们先不管啊,后边的source function这个就看得很明显了,很显然这就是我们要的那个原算子要传入的参数。那前面的rich指的是什么呢?指的是这是一个负函数,这是一个富有的东西,所以它比平常普通的这个s function肯定要多一些功能。另外还有一个是parallel parallel就是并行的意思嘛,所以它还是一个并行的s function,它可以并行读取,那对应的点进去我们会发现它也是一个抽象类,它就继承了abstract rich function抽象的负函数,这个抽象类另外呢,还实现了parallel source function接口。
09:14
这个并行s function自然也是一种s function啊,所以我们这里传它完全没有问题啊,这就是我们在源码里边可以看到这个他们的继承关系啊,但是这里边我们不用对于源码做过多的纠结啊,我们直接知道怎么样用就可以了,诶那这里边呢,本身卡夫卡consumer里边还有一个泛型。这个泛型当然就是我们读取出来的数据,要把它解析成什么样子好啊,那当前呢,我们如果要是简单起见的话,干脆啊就不要做单独的解析了,直接给一个string就完了啊,就是从卡卡那边啊,读取出来的都当成字符串啊,所以这里面就是一个。弗Li卡consumer string里边那要传入一些参数了,它的构造构造方法构造器我们可以看到啊,下边最简单的方式就是这样去传参,首先给一个string类型的topic,一个主题,然后接下来呢,是一个当前值的dizer,当前值的一个反序列化器啊,那最后还有一个就是properties,我们对应的那些连接配置要传进来啊,那这里的话我们当然就现成的啊呃,首先我们先直接定死一个topic吧,我们就把这个叫做click。
10:31
当前的用户点击的事件啊,创建一个这样的主题,然后接下来的这个反序列化器呢,诶,我们当前就是string嘛,那也无所谓啊,我们直接拗一个simple string STEM啊,直接用它来做反序列化就可以了,最后还有前边我们已经定义好的properties,把它传进来。然后得到的。Data stream。当然我们这里边的stream,那就应该是一个。
11:00
String类型的数据流了。哎,所以我们可以这样做一个书写,后面我们可以看到现在还在报错,那是因为我们没有引入对应的影视转换啊,这里我们可以直接把这一部分啊,直接用下划线来做一个代替,这样的话就没有问题了。好,读进来之后,哎,当然最后我们可以直接把它做一个打印输出,看一下结果,最后不要忘记envecu把它执行起来,这就是我们整个读取卡夫卡数据的一个测试代码。整体这个代码还是比较简单的,因为当前的s function其实是连接器已经帮我们实现了,只要引入就可以了啊,那接下来我们可以做一个简单的测试,在测试之前呢,显然我们需要在哈杜102上把卡夫卡要先起起来啊,那所以接下来我们到哈杜普102这里来,首先。找到卡夫卡对应的安装目录,然后接下来呢,我们首先要起一下keepper啊,那我们直接调用zoo keepper server start,后面我们加上杠demon。
12:07
做一个后台的运行啊,那后面跟上当前的配置文件。先把它提起来,我们先看到,诶,当前look k提起来了,接下来呢,哎,我们要启动卡夫卡并卡夫卡server。Start。同样,我们也是告demon。后面跟上对应的server.properties配置文件。啊,我们首先把对应的per和卡夫卡都提起来了,接下来呢,我们就要去首先要创建一个对应topic的producer,啊,我们在这里producer里边去发送数据,然后接下来呢,呃,那么在flink程序这里去创建consumer,去读取数据,接下来做转换处理,最后打印输出,我们就应该能够看到结果了,哎,所以整个这个流程应该是这样顺下来的。所以呢,我们在这里边就先去创建一个还是并卡夫卡console。
13:06
Producer创建一个生产者。这里我们要指定broke list。LOCALHOST9092。啊,因为本来当前就是在卡102这台机器上吧,Local host就可以了,后面指定topic clicks。我们这里边没有直接先去创建主题啊,它自动会帮我们把主题创建出来,好接下来已经有了producer之后,我们就可以直接去运行当前的代码。那运行起来之后,我们会想到一开始显然不会有任何的输出,我们是在等待数据输入啊,所以这里面我们可以随便的输入一行数据,比如说。Mary。的一行点击数据。输入,然后接下来我们就会看到当前的控制台,就会对应的打印输出一条me,点击访问数据,啊,那Bob的访问数据再来一条。
14:03
输入,当然这里边我们就会得到一条报的访问数据啊,其实这里边我们并没有针对这个数据格式做任何的定义啊,我们这里边直接就是读取出来,然后直接打印输出了啊,所以这里边即使是我们随便说写一个hello word,这里边同样也可以原封不动的得到这个输出啊。这就是我们当前。读取卡夫卡数据源的完整的测试过程。
我来说两句