00:00
是在实际生产环境里边,我们的数据流肯定不可能是直接保存成一个文件去读取,对吧?啊,既然是要实时嘛,所以一般情况数据往往是从卡夫卡里面去读取的,那在现在的这个状况下,我们可以把这个源改成卡夫卡吗?啊,当然是可以的,只要实现我们之前之前大家也已经实现过这个卡夫卡源啊,这里边只要我们把这个稍微改一下就可以了,那卡夫卡的话怎么样去读数据呢?我我先在上面写啊,是不是首先得定义一堆,呃,那个配置对吧,配置线pro。你有一个property。然后接下来肯定这个properties,首先要set那个boot service对吧。这个肯定是要写的local host9092啊,然后是不是后边还应该得有一些呃啊,比方说group ID啊,对吧,然后这个呃,呃序列这里边应该是那个反序列化对吧?呃,Deizer这样的一些东西啊,呃,这个都是要去指定的,另外就是说如果我们没有配那个checkpoint的话,还应该有那个自动提交重置偏移量的那个对吧,Latest一般都配配配置成latest,这里我就不详细再去把这个做一个一个去写了啊,大家可以参考一下我们这个文档里边写的这一部分内容啊,跟之前讲的那个卡source应该是一样,所以我把这个直接copy过来吧,Group ID对吧?这里边key的desireizer和value的最后还有一个auto ofetset。
01:51
好,我们把这部分都引入。
02:00
然后接下来我们就是在这一步可能会不一样,对吧。我们接下来定义data stream是要用env的,对,是不是要ADD哦,ADD source了,这里边要念一个Li卡夫卡consumer对吧?诶这里边给大家说一下,这里边为什么不像我们之前讲那个前面理论部分的时候,有那个什么09010011了,这里大家注意一下啊,我们在po文件里边,我们在po文件里边定引入的这一个卡夫卡。这个版本它就没有再加对应的后面那个小的版本,对不对,大家看到啊,这里,所以这里边我们引入的是一个什么呢?是一个通用连接器,是一个卡夫卡的通用连接器,从flink1.7以上开始支持的,它会自动跟踪最新的卡夫卡客户端版本,所以它其实可能会更加的好用一些。
03:03
当然就是一开始可能还是稍微有一些不太稳定啊,但是其实以后的话,大家在生产应用当中肯定都是用这个对吧?啊,所以我们这里边就把这个卡夫卡consumer把这个引入啊,另外这里边的这个类型,当然一般我们就直接把它序列化成这个stream,对吧。里边要传入topic,我们这里边叫hot items可以对吧,后边要传一个这个STEM simple string sIgMa。然后最后还有一个参数practice,把这些传入,是不是就可以从我们卡夫卡源去读取数据了啊,其实就是这样的一个过程啊,啊,那当然了,接下来我们就可以去这个去测一下这个卡夫卡源啊,这个好处是在于我们是不是可以就是一条一条去看它的那个效果,到底什么时候输出这个,呃,对应的那个窗口数据了,对吧,大家可以做这样的一些测试了,好,我们还是来。
04:07
启动一下给大家看一看这个结果,这里面我们就不用这个NC了啊,我先看一下卡不卡,有没有骑起来好没有啊。呃,我需要先去把这个卡夫卡对应的内容,我先把keep先提起来啊。呃,这里边lukeper。B下边的z k server start对吧。OK。然后去骑卡夫卡。呃,这个是B下边的卡夫卡server start啊,当然我们可以这个DEMO啊。
05:01
然后定义我们的对应的那个配置是server点啊。好,先把它吸起来,呃,当然我们看一眼啊。啊,这个卡夫卡应该是还在还在起的过程中。一诶,应该是提起来了对吧?OK,好,然后接下来,呃,我们其实可以直接去创建一个生产者对吧,这里边得有生产者,然后这边一个消费者啊,这边就可以这个消费数据了,我们在这里一条一条去说,呃,这里边卡夫卡cons so。Producer杠杠,List local9092对吧?Topic,呃,这个叫hot items items对吧?好创建一下,然后可以把这个代码要提起来。
06:04
这里边已经启动,我们来做一下测试,用这边的数据做一个测试啊。好,接下来一条一条输入,看一看效果。哦。哦,这里边还好,这是因为我们没有之前应该是没有创建那个hot item,那个topic对吧。好,继续输入,诶,大家看这边一点反应都没有诶。很正常对吧,为为什么没反应啊,对窗口这还等着呢,都在收收集数据都在那个来一条处理一条,对吧,最后还没输出结果呢,那得什么时候才输出结果啊。大家想一下,是不是一定得等到第一个窗口关闭才可以出发啊啊,那那大大家想一下,这个是不是差不多应该是得到。
07:07
就是整五分钟,哎,大家想第一条数据我输的是这个8000对不对,那大家什么情况下是这个五分钟呢?是不是应该是这个300啊,对吧,300这是不是是不是,呃,五分钟之后了,好,我们看一下这个300怎么样。诶,大家看好像没什么反应,哎是吧,我们再输一个,这个是330。诶大家看,诶是不是现在330它输出了一个呀,所以他关闭的这个窗口应该是什么时候的窗口呢。是不是就是大家看它是相当于是我们前面这个窗口是不是已经应该要关闭了呀,但是还没有输出,为什么对我们的那个对设置的定时定时器,它是不是要等水位线要漫过window and再加一才可以啊,所以我们当时是给了一个330,其实不给330,给一个对是不是301也可以啊啊这里为了。
08:14
验证我们的这个想法啊,大家会想到下一个窗口应该什么时候输出。六六对,大家会想到600,这里是不是又是下一个五分钟的窗口了,这里边我们可以再给一个601看一下。诶,大家看是不是又出来了呀,又出来一个窗口啊,哎,所以这就是我们确实是我们定义好的一个状态,那当然有同学说我假如直接给一个很大的窗口呢,呃,很大的一个时间戳呢,比方说这里边1600,这相当于过了多久了,5800061600。过了3600秒,3600秒就是一个小时,对,所以大家看一下,如果我直接给一个这样的时间戳的话,这里的效果是不是就是连续就输出了很多个窗口啊,对吧,很多个窗口就都关闭了,然后我们输出的时候是隔一秒输出一次,所以它是不是一个一个弹的这个状态啊,啊,所以这就是比较符合我们预期的这样的一个结果,如果我们把这个可视化的这个字符串,这个状态不要放在这里控制台输出,而是把它比方说输出到一个数据库里边,对吧,然后又有一个应用去读取,把它显示出来,那是不是相相当于就是一个实时监控啊。
09:35
对吧,有一个屏幕直接实时监控就可以了,所以这其实还是比较常见的一种应用的统计指标的一个实时输出,好这是这一部分内容。呃,然后大家可能会想到,呃,就是另外如果我们要是做这个测试的时候,测试的时候有时候是不是,那那就不想这么一条一条输入对吧,我直接现在是这个卡夫卡的源,我直接就想看到所有的这些这里面的数据,哎,到底是一个什么样的一个处理情况,最后输出是什么样的,那这个怎么来来实现这么一个东西呢?那是不是相当于自己写一个类似于脚本一样的东西啊,我们也可以直接把这个就放在里边去实现这样一个程序啊,比方说这里边我们再去写一个object。
10:21
那相当于是不是要自己创建一个什么呀,是不是producer啊对吧,自己创建一个producer,然后去直接去发数据,读文件里面读书,读书数据,然后用producer去发出来,那这里边我们定一个卡夫卡producer,这个主要是测试的时候可能会有一些这样的一些用途啊,就其他倒没什么,那这里边实现的话,可能就是比方说我这里边直接定义一个right to卡夫卡这样的一个函数,对吧,然后传一个,呃,诶你们当时是小写是吧。耗it传一个topic进去啊,然后这样就可以自定义了,对吧?啊,具体我们就是实现一个这个方法就可以。
11:08
Right to卡夫卡把它做一个实现,这里边的这个string其实就是一个topic对吧?啊,那当然了,这里面不需要有什么返回值,接下来实现的过程当中,呃,那是不是大家就会想到就得有一个卡夫卡的producer啊,但是注意这里边我们用的就不是flink卡夫卡连接器里边给我们提供的那个弗NK卡夫卡producer了,而是卡夫卡客户端里边的卡夫卡producer,对不对啊,所以这里边我们可能还要去给一些定义一些这个pro。你有一个property。呃,然后这里边大家想到可能需要什么东西呢?呃呃,这里我直接去put就可以啊,大家会想到是不是有那个bootstrap。
12:06
这个不能少对吧,然后local host9092写进去,然后接下来是不是还应该有一有这个对,有这个序列化的这个呃,工具啊,对吧,所以这里边还是要有对应的东西的,我可以直接copy这个过来啊。这两个过来,但是大家注意这个是反序列化的工具,所以这里边我还得改一下,对吧,改成这个sizer。呃呃,这个其实后面这里要改对吧,这个改成一个大写的S。改成大写S,哎,这样就可以了,对吧,基本的一些配置就都已经有了,然后接下来我就可以去创建一个,呃,定义一个卡夫卡producer producer,你有一个。
13:04
卡夫塔坡对吧,啊,我们直接把这个东西创建出来。嗯。当然这里边我们的这个类型需要去自定义对吧,要不然它它直接跟我们这个名重名了,对不对,对吧,这里边还得重新引一下啊好把它引入大家看一下是卡夫卡clients里边的producer,卡卡producer。呃,这里边要传的是那个,呃,就是key value的对应的那个类型对不对,String string。诶,然后这里边要传什么呢。No。这里边是不是直接给con就可以了,我们直接把那个property传进去就可以了,所以这个其实定义还是挺简单的啊,然后接下来是不是就是读取数据啊,对吧,从文件中读取数据发送。
14:10
呃,所以我们先定义一个呃,Buffer的S啊。先去读取io.s.from file,对吧,先把这个文件先读出来,那这里边还是要去读这个文件了。然后接下来,呃,读取出来之后,是不是就是便利这里边的每一行,然后把对应的数据用我们的这个producer发出去就完事了,所以这里边是for循环每一行从这个buffer的source,我们可以去get lines,对吧,把每一行拿到,这个文件的每一行拿到,然后接下来每一条这个record,这个记录是不是直接就是一个,我们先拗一个啊,Producer record。
15:07
把它包装好,还是一个string string对吧,它是不是这里面大家看一下它需要传什么东西。哦,这里边你看可以传topic,可以传这个petition,对吧,Time stamp key value,还有这个headers,呃,这里边有没有简单一点的方式呢?最简单的方式大家看是不是直接对直直接topic和一个value就完事了,对吧,这个最简单,好我们就用最简单的这种实现,是不是直接把那个topic topic是不是从这个本身定义这个方法的时候传进来的参数啊,Topic和什么传进去就可以了,Value就是line对不对,当前的这个这一行字符串嘛,直接传进去完事。所以接下来producer对。
16:02
直接把它发出去就完事了啊,当然了,就是最后这个全发完之后可以把它关闭对不对,直接把它关掉就可以了啊,这就是我们自己做测试的时候,可以写这样的一个小工具,然后平常要去读这个文件数据的时候,批量做测试的时候,用它来发数据就好了,好,那现在我们可以把测一下对吧,把这个启一下。好,我们把这两个代码都写写,诶这里边直接挂了啊,大家看到这里边这个bootrap给写错了啊,所以这里边报错了,我们重新让一下。好,也跑起来。我们看一下结果。呃,这两部分代码都已经正常启动,我们就耐心的等待这个,诶大家已经看到结果了,对吧,大家看这是不是跟我们前面直接从文件读取是一样的,这样的一个节奏啊,对吧,对吧,一秒钟输出一个一个数据,然后就是这样可视化的一个结果,每一个窗口是这样的排序好的,从大到小的商品啊,所以这就是我们这一部分内容啊,大家下来之后可以把这部分代码好好做一做,然后好好测一测。
17:22
真实的应用场景里面,其实基本上也就是这样的,对吧,测试的时候你可以自己手写脚本,或者手写这个,呃,工具类,然后这个平常使用的时候,那就是直接接卡夫卡,对吧?然后最后的think的话,我们这里面没详细做,你可能有可能是写到那就看具体要求了,你可能写到这个value里边,有可能写到ES里边,别人查出来之后,别人在做其他的一些可视化,我们这个可能就只是一个简单的可视化而已。
我来说两句