00:00
我们已经把这个代码做了一个完整的实现,大家也看到输出的结果了,那接下来呢,我们在这个代码基础上做一些调整和改进,呃,那其实也没有特别需要改进的地方,大家发现这个流程其实已经很顺了,主要是出于什么考虑呢?大家想到在这个实际生产环境里边,显然不可能数据全已经准备齐了,放在一个文件里边让我们去读,对吧?呃,真正的生产环境我们既然是要做流式的处理,那应该要有一个流式的数据输入,我们这里边应该是源源不断的来,而不是把数据读完了之后,整个处理完了就结束了,这有点像批处理了。所以我这里边要再给大家讲一下,把它的数据源改成卡夫卡,因为我们知道实际生产环境里边流失数据最常见的输入源就是卡夫卡对吧,消息队列啊,所以接下来我们把这个助调再做一个卡夫卡,从卡夫卡中读取数据的一个测试。
01:00
读取数据啊,那这个从卡夫卡里面读取数据,大家知道,一开始肯定我还是先要去定义一个proper,有一个proper啊,这个就直接用抓va下边的这个proper,然后里边呢,主要就是哎,传一些这一些配置项,BOO STEM service local host 9092对吧,这是我这里面配置的,另外可能我们配置一些,呃,这个消费者组group ID对吧?啊,你也可以配置一些,比方说这个可序列化工具啊,Key的这个decisionizer value的decisionizer啊,那当然我们前面说过,像这个呃,序列化反序列化工具的话,我们也可以在调用这个就是连接器啊,我们创建那个弗Li卡布卡consumer的时候,里边传一个参数就也可以,呃,给我们规定好当前这个做序列化反序列化的这个方法,对吧?啊,所以这个其实都是可选的,这里我就不详细给大家说了,我们可以直接看一下,呃,文档里边。
02:00
大家看这个替换卡夫卡作为数据源,这里边给的这个参数还挺多,对吧?BOOSTEM,我这里边兹9092GROUP ID给了一个这个消费者组的这个group ID啊,后面还有这个K和value的R,这里还配了一个auto of set reset对吧?呃,就是做这个自动重置的时候,我们直接用最近的,其实这个没必要啊,因为大家知道,呃,就是我们说这个,因为底层有这个故障恢复的保证嘛,那么弗林卡夫卡的连接器,它其实会在发生故障,呃,就是需要去恢复状态的时候呢,直接把自己保存的那个偏移量重新提交,对吧?所以这里边其实你不需要去配置这些东西,那假如说你没有配置,这里边主要是考虑到你,你如果没有配置这个拆point的,没有配置这个重启策略的话,那可能这里边你需要有一个就是就是我们这里边啊,如果要是重新读取的话,从哪里开始,对吧,可能要去定义这样一个,当然就是感兴趣的同学也可以看这个弗林卡consumer的一。
03:00
这配置项啊,它可以直接指定你当前这个偏移量的位置,对吧?啊,就是从哪里啊,Start这个从从哪里去指定这个offset这个都是可以去配的,我们这里面就不给大家详细说了,直接把这个copy过来啊,最后那一行我也不copy了啊,直接拿过来放在这就可以了。呃,这里我们是不是定义的不太一样,Properties在报错是吧?Properties set proper看一眼啊,Properties,然后后边我们接下来要做的其实也非常简单,就是定义一个input stream,然后把从这个env,然后艾source里边要传入的就是一个flink卡夫卡的连接器,大家还记得那个是什么吧?呃,就是flink,呃,卡夫卡,诶,现在大家注意是consumer还是producer呢?现在是要从flink去读,从从卡夫卡去读取数据所应该是一个消费者,而且大家看到现在里边我们不需要去引入那个呃,PRODUCER011了,对吧?呃呃,这这这是producer啊,我们要引入的是consumer,我们直接引入通用连接器,因为当时我们直接引入的通用连接器就不带版本嘛,所以这里边你引入这个消费者的时候,也是直接给这个consumer就可以了。
04:21
然后里边的这个类型,当时我们给大家呃做过这个,就是直接把这个定义成这个string就可以了,对吧?里边传的这个参数呢,最简单的就是前面给一个当前的topic,比方说我这里边叫hot items啊,那后边呢,给一个string,呃,就是最简单的这个simple s对吧?做一个这个当前指定啊,这个的S啊,然后后边再把对应的这个properties传入,对吧?这是非常简单的这个做法,然后这里边我们主要是把这个copy过来之后,上面有两个定义,把这个去掉,上面就不报错了,然后下边对应的参数一个一个传入当前,对于这个呃,卡夫卡连接器啊,一个消费者配置在这里,我们就可以从卡夫卡去读数了。好,然后接下来我们就可以运行去做一个测试了,首先我得看一下卡夫卡是否已经启动起来了,好,没问题,卡夫卡已经在了,对吧。
05:21
然后接下来我们是进入到进入到这个卡下边去要一个R,然后我们这边相当于是一个这里是一个consumer对吧?啊是这样的一个处理流程,所以我可以把这边起起来,代码起起来,然后接下来在这边呢,呃,去启动一个卡夫卡,然后定义一个R啊然后我们定义啊呃,当前这个broke list9092,然后这里边我们定义的是对吧,把这个创建出来,然后就是一条数据一数据入了这个输入数据的这个过程当中,其实就是大家直接把那个对应的那个数据拿出来就行了,但是我们这里面本身数据呢,它那个时间戳进展太慢,对吧。
06:21
这里边它都是一秒一秒跳的,我这里边就找了一些,呃,就是这个测试数据啊,直接给大家快速的把这个做一个做一个测试,这里边就会稍微的容易一些,好我们还是把这个窗口做一个分配,这边的数据我们放在最下边,然后这个窗口放在上面对吧,这里边一条一条输入,大家看我这里边的第一条数据,这个543462这个用户啊,啊对于这个1715这个item,这个商品做了一个浏览操作,PV操作啊,然后这个时间戳是后边这个啊,这个58000啊,我们先把它这条数据输入,然后大家看到这里边没有任何的输出,那很自然啊,当当前你输入第一条数据的时候,并没有窗口关闭嘛,所以当前肯定是什么都没有,然后接下来呢,哎,我们就给一个这个060,大家知道这过了60秒一分钟之后,对吧,那我们看一看当前是一个什么样子啊这。
07:21
复制一下还是没有任何的输出啊,那大家可能会想到接下来我们就继续往后推移了,120,这应该是两分钟之后还是没有输出,那这到底要到什么时候才能输出呢?我的这个数据基本上就是60秒一个,那就是一分钟一个啊,大家会想到这主要就是关系到我们当前的这个数据,哎,到底它是就是我们窗口是以多少,以什么时间开始,以什么时间结束,那我们现在说五分钟一个,滑动一个窗口,对吧,然后一小时呃大小的这个窗口,那它应该是创建出来之后,应该是什么样的状态呢?我们说就应该是整点,然后隔五分钟划一次对吧,那其实就应该是整五分钟一次嘛,每五分钟就要关闭一个窗口,对吧?诶,所以正常来讲,就是比方说八点到九点有一个窗口,那就是九点钟关一个窗口,9.05关一个窗口,09:10关一个窗口,九。
08:21
十物关系出口对吧?那所以这里边我们就关键是考虑当前的这个数据啊,它本身应该是什么时间了啊,那那这个我们如果不管的话,大家只看后边这个000啊,只看后边的这这这么三个零,这就相当于我们前面肯定是一个至少是一个整数对吧?而且我们看一下前面这个这个字段啊,1511658,如果我们大概的看一眼的话,会发现它相加应该是三的整倍数对吧?呃,因为大家知道所有的数字加起来能被三整除,就是三的整倍数,而且是个偶数,也是二的整倍数,那它不就是六的整倍数吗?啊,所以大家知道当前这个肯定它是它是什么呀?就是一个6000的整倍数对吧?这相当于是一个6000的整倍数啊,那肯定它就应该是一个整十分钟对吧?啊,就是600的整倍数,600秒的整倍数,就是一个整分整十分钟吧,啊,所以至少这个。
09:21
个数据,它肯定就是卡在比方说整五整十这个点上输入的一个数据,然后后边呢,诶过一分钟,过两分钟,过三分钟,过四分钟,大家会想到是不是应该是过五分钟,还是卡在一个准准的五分十分的这个这个点上,就可能会输出一个,就有一个窗口应该要关闭了呀,因为我们说每五分钟应该有一个窗口关闭嘛,对吧?哎,我们把这条数据copy下来,没copy上啊,然后在这里做一个输出,但是大家看这里边还是没有任何的结果,哎,那这是怎么回事呢?这就是我们说的,当时我们不是还等了一毫秒,然后才输出,等到所有数据都到齐吗?那你说等一毫秒,现在这个water相当于是到了多少了呢?我们现在升序本身就有一毫秒延迟,对吧?所以你如果现在是到了一个,比方说整五整十分钟对吧,比方说我现在就是9.05,这个数据是9.05,那watermark应该到什么。
10:21
那呢没到9.05,现在是到了,比方说哎,是这个09:04:59,然后哎这个就是999毫秒,对吧,就差一毫秒到了这个时间,所以接下来其实我们当前的这个数据呢,还应该没有统计,就连那个窗口还没有关闭呢,对吧?啊那应该什么时候关闭呢?是不是应该这个时间要再往前推进一下呀?啊对吧?所以这里边我可以去再把这个,我就直接把这条数据啊直接复制一份,大家会想到我直接把这个时间戳再加一,是不是就应该有效果了,对吧?哎,我这里边相当于就往后推迟一秒钟,那是不是所有的这个窗口也应该关闭了,另外就是我当前所有的这个,呃,就当前的窗口关闭了,然后所有的数据都已经统计输出了,这个也已经超过了我们定义的那个定时器加一毫秒的时间了,最后的这个排序是不是。
11:21
直接就输出了对吧?啊,这里边我们是中间加了一行空行啊,所以这里边大家看输出的就相当于这个分割的就会开一点对吧?啊,这里边就能看到当前这是哦,原来这是09:05关了一个窗口,那大家看到这里边窗口数据有哪些数据呢?我们看输入的数据啊,1715这个商品的热门度是二,那家看一七一五一七一五两条数据没毛病对吧?诶然后第二名的是22440742244074,这个数据两条没毛病对吧?然后接下来这个3611281,诶有同学可能就说了,那3611281,这不对啊,这应该是有三条数据啊,你怎么他只有一条数据排排第三呢?
12:06
诶,大家就知道了,当前09:05关的这个窗口,说明这个时间戳应该就是什么呀,是不是就是我们当前这个658300这个时间戳啊,对吧?啊,就是当前这个时间戳表示我们这个09:05这个时间点,所以你看超过它之后,我们这个窗口不就关了吗?哎,那另外我们说是窗口里边是前闭后开,所以当前本身卡在这个09:05这个点上的是这个数据,它不应该属于这个窗口对吧?不属于09:05的窗口,它应该属于下一个窗口啊呃,那那这里面还有就是3301,那已经过了一秒了,09:05:01了,当然也不属于之前这个窗口了,对吧?所以这里面就没有把它统计进去,那我们自然就想到了,后边如果再去输出下一个窗口的话,那应该是什么时候呢?那应该是再过五分钟,那就是加再加300秒600的时候,这里要输出下一个窗口对吧?啊,但是大家知道你给这个六零。
13:06
的时候呢,这里还不会有输出,就是我们说的在这儿它只是呃,这个到了之前前一毫秒的那个时间点,对吧,你这里边如果给一个601才会输出下一个窗口,诶大家看这里边输出了下一个窗口对吧?这里边你看3611281热门度是不是就是三了,前面都统计进来了,对吧?然后这个2274074你会看到,诶这个还是呃。哎,这里看到你,你尽管是多输入了一条数据,但是事实上这个数据没有包括进来对吧?因为它本身已经是09:10:01的那个数据了嘛,所以我们统计的还是之前的两条数据对吧?啊,那1715也是两条数据啊,所以大家可以看到这样的一个效果啊呃,那呃,当然了,如果大家想更加清晰的看到这个效果的话,我们可以在这个代码里边给大家做一个做一个更加明确的展示输出,就是我可以在这里边把这个前面我们不是有data stream吗?我直接把它打印输出,这是每输入一条数据就会输出对吧?呃,然后另外我们中间不是有一个聚合结果AJG吗?也把它做一个打印输出,这里边我们给一个AJG,就是每一个窗口聚合,出现这个聚合结果的时候,我直接把它写在这里,呃,然后另外这个最后的结果,这个叫做这个就不用result了,因为大家知道这个都有这个分割线的嘛,对吧,这里面有分割线看的很明,明显如。
14:33
大家觉得这个这样换行不舒服的话,我们可以把这个换行再放到,就是每一次我们这里边输出结果的时候,把这个再再换换一行,对吧?啊,这个就是相当于整个这个窗口结果都输出之后,最后分割线再换一行显示,然后我们再来运行一下,大家看看这现在的效果是什么样子。好,现在已经提起来了,大家具体在看的更加清楚一点啊,我们我们一条一条数据输入,大家看一眼这个过程,首先这个右面复制上啊,好,这个第一条数据输入进来,大家看有一个塔的入对吧,当前这个数据这个11658000放在这儿了,然后接下来我们知道它不会触发任何的这个窗口操作嘛,060啊,又来一条数据对吧,当然也不会有窗口关闭,然后接下来这个120啊,这个就是一条一条数据,大家知道一分钟一个嘛,这这个数据都不会触发操作,但是这些数据都是里面的一部分,所以说都会来一条数据,就输出一个结果,240,这是已经到四分钟了啊,然后接下来300,注意大家看啊,300这个数据输入,大家看这个300输入之后,它这里面是什么呢?诶大家看这里边,它是直接已经给我们关闭了这个。
15:56
呃,窗口,然后输出了这里的这个聚合结果对不对,但是没有输出我们后续的排序结果,这是为什么呢?哎,这里大家要注意一下,就是我们当前在做这个数据调,就是在在做这个操作的时候啊,相当于是当前的时间已经进展到了这个窗口关闭的时间,对吧?哎,那这里面就涉及到一个问题,我们不是说本来当前这个升序数据处理的过程当中是有一个这个watermark延迟一毫秒的吗?那为什么之前我们这个窗口就可以关闭了呢?大家注意一下啊,我们当时窗口关闭,我们说是要达到这个时间窗口就关闭了,对吧?比方说达到这个呃,我们呃整五分钟的这个这个时间,58300的这个时间点窗口就关闭,那这里面它其实要关闭的这个时间点是什么呢?其实就是窗口所有数据都到齐的时候,我就应该关闭就可以了,对吧。
16:56
那什么时候窗口数据都就都到齐了呢?是water mark1定要涨到这个,呃,58300才可以吗?其实不是,我只要找到它之前一毫秒那个点就是58299999就可以了,因为我当前的窗口里边大家想是不是不包括58300当前的这个,呃,这个这个时间点上的数据啊,哎,我们要的是前闭后开嘛,所以当前我其实要的就是里边的所有数据啊,最大的时间戳就是比它要少一毫秒。
17:34
所以大看如果这里边我来了这个比它少一毫秒的这个时间戳,是不是就可以触发当前的这个,呃,就是当前这个窗口的这个关闭和计算的这个操作啊,所以大家看到底层窗口的这个关闭啊,时间窗口的关闭,它其实也是按照这个减一毫秒的这个时间点去出发的,对吧?啊,所以这里边我们看的很明显,在这儿它输出了一个聚合结果,当前我们这个17152对吧?啊,3611281122440742,这个没问题,我们这里边都已经输出聚合结果了,但是这里边呢,还没有触发后面我们的排序操作,这是为什么呢?这就是我们说的后边又去叠加了一毫秒,对吧?如果说我们不再加那一毫秒的话,那其实在这儿就应该直接输出了,对吧,你给这条数据的时候,就应该直接输出结果了,因为这条数据并不属于我们当前窗口要去统计的数据嘛,对吧,它只是把我们这个时间啊推移到这个点。
18:35
呃,表示窗口可以关闭就OK了啊,所以接下来那怎么样才能让它真正的有这样的一个,呃,最终的这个输出呢?那就是我们把这个再去推移,再朝前去做一个推移,对吧?再来一个301的这个数据,你看这个数据输入进来之后,之前的这个窗口就真的输出这个排序结果了啊,这就是这个完整的一个测试流程啊啊,那后边同样就是你给这个600这条数据的时候,它本身这里边只会输出大家看什么呢?当前600,也就是又过五分钟之后,09:10这个窗口关闭的时候,我统计出来的这个结果,对吧,但是不会输出我当前的排序结果,那什么时候才能输出这个排序结果呢?同样还是给一个601的时候,这里边就会把之前的这个排序结果列出来了啊啊,那有些同学可能想,那呃,假如说我要突然给一个很大的时间戳它会出现什么情况呢。
19:35
啊,就是比方说这这里大家看啊,直接给一个661600,这比我们一开始的这个658000已经是超过了这个58000,这里是,呃,这个61600相当于已经加了3600秒对吧,3600秒就是一小时嘛,我直接如果把这个时间给一个一小时之后的数据的话,这会发生什么事情呢?那就相当于我们系统里边的watermark是不是,是不是就会直接一下子跳变一下呀,对吧,直接跳到一小时之后,诶那大看一下这里的效果是什么样的啊,这条数据输入进来,然后很多窗口都关闭了,大家看58900对吧,呃,这个,然后59200,然后59500啊,就是每隔五分钟这很多窗口都关闭了,统计数据都输出了,然后大家看后面是。
20:27
09:15的结果输出对吧,09:20的结果输出,25分的,30分的每一个最后的统计结果,一直到09:55对吧,一小时之后可以统计输出的这个结果全部都输出啊,这就是这个我们在做这个流式测试的时候啊,大家可以用一个卡夫卡作为数据源,做一个完整的测试。
我来说两句