00:00
到目前为止,我们已经了解了几种不同的flink读取数据源的方式,那这几种方式呢,多少都有一点缺陷,因为我们当前想要做的是一个流处理,而前三种方式其实它处理都是一个有界流啊,不管是从文件读取,还是从集合,还是从元素。而对于socket文本流而言,它确实是真正意义上的流逝数据,但是本身不够稳定,而且吞吐量很小,我们说它一般是应用在测试场景里面。那对于真正的企业级的实际应用,我们想要真正的处理流数据,读取流失数据的话,又应该定义什么样的数据源呢?那这个时候就必须要了解大数据流式处理里边一个非常重要的组件,那就是卡夫卡。我们知道卡夫卡本身其实就是一种分布式的消息传输队列,它本身呢,有高吞吐而且易于扩展这样的特性。
01:03
卡夫卡在消息的存储和分发这样的一个传输机制上,本质其实就是跟流处理是完全一致的,它也可以保持按照一定的顺序去处理传输当前的数据。所以我们会发现在真正的实时流处理里边,卡夫卡和flink往都是成对出现的。那目前呢,很多企业在进行大数据的实时处理框架搭建的时候,往往也都会应用卡夫卡去进行当前日志数据的收集和传输,而使用flink去进行数据的分析处理,最终得到的结果呢,那就可以在写入各种各样不同类型的数据库或者一些数据存储系统去进行提供服务或者进行存储了。接下来我们重点要介绍的就是flink跟卡夫卡进行连接,从卡夫卡里读取数据进行消费处理的这样一个过程。
02:07
那前面我们在讲到其他。读取数据方式的时候,其实都是直接调了系统里边en下边的某一个方法,而现在如果我们想要直接从卡里读取数据的话,这个连接明显就会比较复杂了啊,那出于对于flink内部核心部分。需要保持一个简洁性的这样一个考虑,那么flink内部并没有直接给我们内嵌对应的实现方法,那这个时候怎么办呢?呃,其实也没有关系,因为env下边还有一个通用的创建数据源的方法,这就是所谓的a source a source显然就是添加一个原算子,Source算子里边要传入的是一个s function,只要实现这个s function,在这个s function里实现对于外部系统的连接,消费数据,读取数据这个过程,那么接下来我们就可以把数据从外部系统拿过来,然后进行转换处理了。
03:07
啊,那现在我们的任务就是必须要实现s function跟卡夫卡做连接,那好在flink官方直接给我们提供了一个连接器工具,叫做flink connector卡夫卡。在里边呢,对应的就有像卡夫卡连接之后,创建一个对应的消费者或者生产者这样的一个组件,所以接下来我们就可以直接引入对应的连接器,然后进行消费数据的处理了。所以接下来如果说我们想要以卡夫卡作为数据源的话,首先应该引入一个连接器的依赖,那这里我们可以看到引入的连接器就叫做flink,卡夫卡后边跟着的同样还是的版本,下边对应的是flink的版本,当前连接器的版本就要跟当前flink的版本完全一致。我们首先在泡文件里边将对应的。
04:04
依赖先引入。引入依赖之后,接下来我们就可以直接在代码中去调用进行测试了。从。卡夫卡中。读取数据。那接下来我们要做的就是直接a source a source,这里要传入的呢,是一个source function,而弗林格卡夫卡连接器已经帮我们把它实现了,那在弗林卡夫卡连接器里边有消费者,也有生产者,那我们这里边要传入的很显然应该是一个消费者,因为我们当前是要从卡夫卡去读取数据,要消费数据嘛,所以我们创建一个,你有一个。Link卡夫卡啊,那这里。当前的link卡夫卡consumer。我们可以看到它本质上。其实就是一个啊,当然它是继承自fli卡夫卡consumer base这样一个基础类,然后它本质上呢,是一个rich parallel方啊,那当然它本质上就是一个function。
05:11
前面这里还加了rich parallel,从字面上理解的话,它是一个有的,另外还是一个并行的。关于这个概念,我们可以放到后边再去做详细的介绍。所以既然已经有了现成的function,接下来我们可以直接引用就行了,那里边呢,它需要有一个泛型,当前这个泛型很明显啊,最简单的方式呢,就是当前value,当前我们要存储的数据值的对应的数据类型。最简单的定义当然就是给一个string了。我们把它当成字符串去进行存储,那我们可以看到这里边要传入的需要,呃,有各种各样的传参方式,最简单的就是传入三个参数,第一个参数是一个很显然是一个主题topic,然后接下来呢,是对于当前值value的一个izer,一个反化器,啊主要是考虑到我们要怎么样去解析当前这个数据类型的数据,那我们当前如果是string的话,这个就比较简单,我们直接把它作为一个string进行处理就可以了。
06:21
最后还需要有一个practice,那很显然pro就是针对到卡夫卡的连接,我们需要去配置一些相关的属性项啊,那这里我们可以先把前两项都定义好。首先应该有一个topic,当前我们就直接把它叫做clicks,点击数据,这是一个主题。然后另外接下来是基本的string类型,我们就直接去一个simple STEM就可以了。啊,那最后还应该要有一个properties,这个properties稍微麻烦一点,我们可以在上面单独的做一个定义,你有一个properties。
07:00
Java下边的proper创建出来,然后在里边可以直接set property去添加一些相关的属性字段,这里最关键的当然就是boot STEM。点service。如果我们还是基于哈杜102去创建对应的一个卡夫卡的服务的话,那么我们当前就直接写入哈杜102。9092。这是我们的配置项啊。啊,除了最重要的boottrap server之外,我们还可以配置其他的一些参数,比如说我们可以看到这里可以配置group ID,当前消费者组的group ID,另外呢,还可以配置当前的K和value啊,那其实我们已经在这里指定的话,这里上面不做配置也是可以的啊,那另外还可以去配置当前的off reset这样的重置偏移量的方式,我们可以定义成最近一次。
08:04
所以这些都是比较常见的配置属性的方法,我们可以添加也可以不添加,加进来之后,接下来在参数里边把。做一个传入,全部传入之后呢,那我们得到的就可以把它定义成叫做。卡夫卡啊。String。那下面我们可以把对应的卡卡strip做一个打印输出。为了看得更加清晰,上面我们可以把前面的所有打印全部注掉,现在我们就只测试卡夫卡这一部分。测试之前,在启动运行当前的flink程序之前,首先我们应该要启动卡夫卡相关的服务。所以接下来我们还是到卡图比102这里来。我们首先。为了方便操作,先切换到root用户,然后找到卡夫卡对应的目录。
09:02
首先把zoo keeper要先启动起来。Start。Even。加上当前keepper的配置文件,好,我们可以检查一下,当前已经起来了,那么接下来还需要把卡夫卡也要提起来,Server start。同样,Demon。下边server点加上配置文件,当前卡夫卡都提起来了之后啊,那我们首先还需要创建一个。创建一个producer,创建一个生产者啊,那当然了,我们也可以先去创建一个,创建一个主题啊,我们知道如果说不创建主题的话,直接创建生产者,然后去消费数据,其实主题是可以自动创建的啊,那所以这里边我们直接创建一个生产者,然后由flink代码运行之后去消费数据。这样连接起来可以看到结果了。所以当前我们要做的是创建一个生产者,那就是卡夫卡。
10:05
Console producer。接下来我们要给出的配置是。Local host 9092,因为我们就在当前的102上面,另外还需要有一个主题的指令。CS。接下来我们就可以在这里去一条一条的生产数据,观察flink代码里边能否正常的把它消费到,然后接下来在控制台做一个打印输出,那我们可以把这边的flink代码运行起来,当前的消费者也就创建出来了。好,这里已经启启动起来,接下来我们还是可以复制一条数据。放在这里。一条的点击数据,我们可以看到正常在当前的控制台做了一个打印输出,当前是卡夫卡stream的打印输出。
11:06
这里面我们没有任何的前缀的表示,那另外我们还可以继续再给一条数据,Alice的点击访问数据。粘贴在这里,第二秒钟又来一条啊,那接下来我们可以看到,下面就又来了一条Alice的访问数据,Bob的访问数据同样可以粘贴在这里。可以看到,每来一条数据就会由flink代码进行读取,我们这里的flink卡夫卡consumer进行读取,然后直接在控制台打印输出,这就是从卡夫卡去读取数据的一个测试的过程。这种方式应该说是在实际项目应用当中使用最为广泛的读取数据源的方式。
我来说两句