00:00
前面我们已经了解了,在事件时间语义下啊,我们看到设置water mark的延迟,Water mark就是控制后边这个窗口什么时候做计算,什么时候关闭的这样一个时间,一个原则啊,那这里面大家会发现一个问题,之前我们的代码测试的时候呢,定行度全是一哦,那现在我把这个直接可以做一个更改,或者说直接我把这一个注掉,大家想我这边如果住掉的话,开发环境里默认并行度是按照CPU的核心数量啊,所以我当前如果电脑是四核的话,那当前的这个就是四,并行度为四的一个程序,那大家想一下,现在这个代码如果要是运行一下的话,跟之前会有什么区别呢?哎,我们运行一下啊,起一下。然后接下来我们还是把这个一条一条数据做一个测试,现在代码已经启动,我们还是NC这边来发送数据。199这是第一条对吧?啊,后面这个六七十这个我就不管了啊,当然大家也可以输入就是。
01:03
大家想一下,就是我们这个water mark啊,就是关于这个water mark,如果来一个346的数据,那341那个分区,大家想它他基于哈希克的重分区,有可能分到不同分区对吧。那341的那个分区,假如跟它不一样的话。三一那边那个时间会推移吗?Auto map会会增长吗?诶,大家要注意一下,这就要看当时我们的这个数据到底是从哪儿来的了,对吧,假如说大家想一下,如果我们之前就是两个。就是读取数据源的时候,他们分别就是从两个不同的数据源的这个SS分区啊,SS任务有两个不同的分区分别来的一个341,一个346,那大家想一下,他们后边追加那个watermark是不是也只在当前分区有效啊,那这个是不是后边就不会影响到上面。
02:02
下边这个分区就不会影响到上面对吧,但是如果说我们现在是输入是一个分区,一和六都是这样来的,然后后边再分开。再分开,我们后面不是有那个map操作,然后才分配时间出和water mark吗?那大家想一下这个六的数据会不会影响后边这个1K之后的那个分区的water mark。大家想一下,六来了之后,后边是不是有这个watermark呀,这个watermark会怎么样出去,它会广播出去,那所以是不是在这种场景下,就所有输入的数据都会影响到后面的各种分区,每一个分区的那个结果都会影响到啊哎,所以这个大家自然是呃能能响到当前的这个状态的啊呃,然后接下来我们就来看一下,呃,当前我们做的这个操作啊,当前346,然后347。先把这个数据啊,一条一条先输入进来,然后三四十二零五的一条数据,好,然后接下来这个别的分区我们就不管了啊,就看341就可以了,大家知道第一个要结束的窗口应该是什么时候关呢?应该是212,那条数据来了,我们延迟两秒钟生成water mark,那是210,第一个窗口195~210该关了,对吧?哎,所以这是我们之前想到的这个状态啊,212。
03:26
大家看一下现在关了吗?现在现在有啊,当然当时我们那个代码好像是做了一个。做了一个那个一分钟允允许迟到数据,对吧,所以这里边我们还不是直接会关,但是大家想到是不是至少应该输出一个结果啊,至少应该触发一次计算,这里大家看到没有任何的计算结果输出。为什么没有呢?啊,有同学可能就想到了哦,那可能是这个,呃,当前我这个时间还不够推迟对吧,比方说我再来213,比方说再来214对吧,然后再往后215。
04:08
这个37,诶,大家看是215的时候一下子输出了,大家看我现在我的这个,呃,本来当前是不是应该有四个K啊,34134呃一六七十,所以大家看到我们说它的那个不同分区滚动时间窗口的话,其实时间都是对齐的,对吧?啊,所以它的那个窗口开的都是195~210,所以到了这个窗口要到了这个结束时间出发计算的时候,是不是每一个K对应都有一个结果输出啊啊所以我们看到这个呃六七十的话,它本身就只有一个数据,所以就直接输出了啊,然后341的话,取最小值是209的这个32.8。好,那问一下大家,为什么这里边我是输入215之后,这里才输出了所有的结果呢。有同学说215,那当前的watermark应该是应该是213对吧,那难道当前我这个窗口是213时间关闭的吗?
05:08
呃,不是关闭,我们现在是结束对吧,结束要出发,这个计算结果并不是这样啊,按我们之前的那个分析,应该是195~210是第一个第一个窗口啊,明明就是沃到212就应该结束了,就应该输出结果啊。大家注意,这就是我们当前因为并行度调成了四导致的这个结果,大家还记得当时我们说这个water mark传递的时候,下游任务的water mark,它是只要上游假如说有这个多个并行任务,它是都要给他发送water mark对吧?诶,那下游任务只要收到一个water就会更新自己的事件时间吗?不会。他需要接收到上游每一个分区的water mark都更新了之后,他取那个最小的对吧?呃,最小的那个water mark是自己当前的事件事件,所以这里边我们完整的来来分析一下这个过程啊。
06:07
首先大家想一下,呃,我们这个,那那那有同学还想到了,刚才我们也说到了,那假如说你前面的这个数据啊,呃,它是这个,呃,就是一个数据,如果能影响到后边所有的那个automark,因为它要广播吗?那这样的话,后边不也应该是来一个数据所那个auto都上涨了吗。这个我们要单独的分析了啊,一个一个分析了,首先第一步任务我们是一个socket文本流,这个大家知道。一开始我们说过它的特点是是不是必须要并行是一啊,所以数据是不是就是按照一个一个来的呀,哎,这个当时我们说如果在这里它后面紧跟着watermark的话,这里我们就真的是就是后面的watermark就广播出去了,对吧,所有的那个watermark都会所有分区都会发生影响,发生变化,但是。大家想一下,Socket的那个数据直接读进来之后,我们是后面就直接追加watermark了吗?
07:06
没有对吧,我们是在什么时候才做的waterlo,是不是后边有一个map呀,先map成sensor reading这个portal类,然后再做,呃,分配那个water mark时间说和water mark,然后接下来是不是就KBY去开窗了,所以后边这个窗口操作。跟中间的这个map操作是不是都是并行路是四啊,然后一开始这个数据源socket文本流读进来之后,它的数据要传递给下游的map任务的时候,后边这个是K外之后开窗对吧?啊,大家知道这个K外开窗其实KY并不是一步计算操作嘛,其实就是最后开窗的这一个操作对吧?哎,那所以大家想一下,前面这个数据来了之后,到到这个map任务应该怎么分配呢?这是不是就是我们说的并行度调整,这应该是啊,这个应该是轮巡去做分配reb,所以第一条数据来这儿,第二条数据来这儿,第三条数据来这儿,第四条数据来这儿,对吧,第五条数据接下来继续这。
08:13
那后面如果要KBY的时候,是不是相当于这个就完全打乱了,对吧,这个就有可能涉及到这个基于哈扣的重分区的一个过程了啊,这个我就不详细写了,大家知道每一个对应的这个分区啊,它都会分发到不同的分区里边去。哎,那所以接下来接下来有这样的一个问题啊。我们的是在哪里生成的?是不是应该是map后边才去追加这个watermark呀,所以大家会发现了,就是当前啊,比方说我这个1234。哎,比方说1245吧,那是不是一分配到这边,那应该是一后边才加上这个watermark对吧?那这个加的是不是以这个一当前时间戳为准来判断的呀?而第二个分区是二后边加water mark,那是不是会以这个二为准,跟上面是不是就不一样了,所以大家会发现现在我们这个不同分区的watermark其实就像这个数据源读了那个不同的分区的那个数据源一样,对不对?哎,所以他的那个watermark真的是不一样的,不是源头,这里边直接就控制了后面的增长的。
09:23
所以后边我们在做判断的时候,你看来了一个四,那后边这个增长当前的那个时间推进是哪个分区推进了呢。是不是只有第三个分区这里的这个watermark增长了呀,别的其他三个分区是不是根本没有受到影响啊,所以我们的问题也就在这里。大家看前面我们这个给时间的时候啊,199来了之后,199是不是到了这边到了第一个分区,那后边的三个分区water是多少呢?第一个分区water rock是1197对吧?那后三个water rock后三个分区是多少?是初始值是不是那个长整形最小值再加一个那个延迟时间啊,大家还记得那个对吧,是一个很大的负数对不对?所以现在下游所有任务的watermark是什么。
10:12
以最小的为准,是不是都是那个很大的负数啊,所以大家注意啊,你来一个时间,这个来一个数据的时候,后面我们的那个water rock根本没没涨上去,那大家就知道了,什么时候才能涨上去呢?是不是?哎,大家就是想到201来了之后,第二个分区water rock变199了,所以现在的时间应该是,如果我这里边有这个分区watermark的话,大家知道啊,是不是应该是现在就是197199,诶,很大的负数,很大的负数,对吧?应该是这样的一个分区water,那当然它的时间还是那个很大的负数。然后同样下面第三个分区202来了,它的watermark是200,然后第四个分区205来了,它的watermark是203,这个时候当前下游是不是当前的时间就变成了197啊,最小的那个奥特曼就变成了197了啊,所以基于这样的一个原则,大家再看后边。
11:09
207,呃,一直到212,大家想这个212来的时候啊,它其实更新到哪里了。是不是当前是第一个分区最大的那个时间戳是207对吧?所以watermark是不是205啊,然后第二个分区是209,那它的water mark是207对吧?第三个分区212,它的water mark是210,第四个分区没变,是不是还是之前那个205啊,它的watermark是不是才到203,所以问一下大家,现在我们这个212这个数据来了之后,当前下游的这个任务啊,它的每一个分区对应的waterlock是多少?是不是只有203啊,最小的嘛,对吧,木桶原理啊,四个上游分区里边奥最小的那个,所以基于这样的一个原则,大家就想到了,那有可能是不是我这里边不给这个212也可不,后面不给213214215也可以啊。
12:10
在想是不是后面我每个分区,因为它是轮巡的嘛,后面我只要每个分区是不是都给一个212,是不是相当于这个分区的watermark就涨到210了,哎,所以大家可以基于这个想法啊,我再重新测试一下。啊,这样我们重新把这个启动一下,大家看看这个效果啊。接下来还是呃。来一个这个199,这是最初的数据,然后接下来201202。202呃,三字十的205。哎,后边这个都是341的数据了,这个我就随便输了啊207对吧,然后接下来209。
13:01
啊,然后后边我们再大家知道应该是212对吧,就正常来讲的话是212啊37,然后大家看现在没有任何的反应,那假如说我现在啊,继续212。大家看这相当于是不是有两个分区的watermark都变成这个,呃,当当前都已经是这个210了,我继续复制这个2123个分区对吧?哎,四个分区,这第四个分区我先给大家211试一下啊,二幺幺三十五点二,这个没反应对不对,那我再给一个二幺,哎,当当然大家知道我如果再给212的话,这个就有问题了,这是不是又到第一个分区了,哎,所以你现在给的话没用对不对,又没用啊,第二个分区,第三个分区,诶对,大家想第四个分区是不是输出了啊,所以大家看这就是我们之前讲的啊,并行度不唯一的时候,并行度如果是自己设置了一个比较大的并行度,那我们测试输出的时候你要注意。
14:09
Water mark在上下游任务之间传递的规则,对吧?必须得是每一个分区的water mark都要上升,我们取那个最小的值才是当前的奥才会出发后边的这一个窗口聚合的计算。啊,这就是关于这个并行度啊和这个呃,Water mark传递的一个测试,所以大家也发现了,就是当前我的这个water mark啊生成。提取时间戳和order map的这一步操作在哪里比较合适呢?是不是就是离SS越近越合适啊,啊,当然在这里边,我直接在这个S里边去把它生成,好像好像不太可能,因为我还没提字段吧,对吧,你那那或者就是说你在这个socket的文本流读出来之后,你就相当于要直接在我们这个that time stamp里边,大家想那你拿到那个element是不是就是一个string啊,然后你在这儿去做分词直接提也是可以的,对吧?啊,但我们是觉得那个有点麻烦,你先把它包装好,然后再提,不是很简单嘛啊,所以更好的方式可能是你这里边读进来之后是不是直接就提啊,离south越近越好,甚至现在还有,呃,就是另外的做法,就是大家知道实际项目是不是一般都是。
15:24
从卡卡里边读取数据啊,那卡夫卡我们说跟跟这个flink本来就都是这个流失处理消息队列嘛,天生是一对对吧?啊,那所以其实卡夫卡是支持直接在卡夫卡里边就生成watermark,那大家想那就相当于我这里边读进来的数据,是不是本身就带着时间啊,本身是带着watermark对吧,那后边就相当于所有的那个,呃,这个时间就都可以广播出去啊,都可以保证我们后边的这个watermark进展。这就是关于事件时间的并行的测试。
我来说两句