00:00
接下来我们再给大家来讲另外的一种实现思路,前面我们讲的是cep的实现方法,大家看到这个过程其实还是比较简单的啊,就是按照cep定义的流程,先定义一个patternython,然后把Python应用到数据数据流上,然后接下来呢,如果要是检测超时事件的话,定义那个呃,测试之流的那个标签啊,如果要是不需要的话,那我们就直接select就完事了,对吧?Select的时候,呃,如果要是检测这个超时事件,你需要传入一个patternon timeout方式,然后正常的这个检测事件呢,传一个pattern s select方式,就是这样的一个流程,那大家可能会想到,假如说我们不想用这个CP的这种方式,因为这里边有一个这个超时的处理嘛,我觉得好像还挺麻烦的,我直接上大招,用这个process function式底层的这个实现,能不能直接把它搞定呢?呃,其实也是可以直接搞定的,这个需求其实没有问题的,对吧,因为大家看这个当前我们这个需求整体来讲状态和事件还是比较。
01:00
比较小的,因为只涉及到订单,我们要处理的什么状态,就是一个创建下单和一个哎后面一个支付,一个配的事件,对吧?啊,这个过程其实并不是特别的复杂,所以我们在这个,呃,可以在这个状态编程里边直接对它不同的场景做一个分支讨论,可以把它做一个实现,接下来我给大家讲一讲这个状态编程的实现啊,啊,那我们接下来用这个新建一个object还是啊,那这部分我们叫做all timeout啊,之前我们那个是with cp啊,现在我们这个叫做without cp吧,大家知道without cp肯定就是要直接用process方式去做实践了,那这里边用到的东西其实跟之前都差不多,诶前面的内容我就不详细写了,还是创建执行环境啊,这里边我们不从文件读取啊,从这个如果从直接这个set文本流直接读取的话,接下来读进来map成样例类分配时间戳,提取时间戳。
02:00
和water mark,然后按照这个order ID去做分组,对吧?到后边这个定义patternon之前,我都可以照抄,完全没有问题,好,先把它放在这里,然后上面我们还是啊定义的这个对应的影视转换,先引入,先把它放在这儿,然后接下来如果说我们想要去直接用process function去做对应的处理的话,那是不是就不用定义那么多东西了,哎,直接在这是不是?哎,自定义啊,自定义process function进行。呃,复杂事件的检测,哎,那这里面我定义一个order result stream对吧?还是我们想得到这个当前的order result啊,那基于之前的order event stream,直接去做一个process,哎,大家注意,这是我们之前就批判过了对吧?啊或者说大家看这个不舒服的话,我们把这个拿到后边来,就之前还是没分组的啊,现在在分组,然后再做一个process,为什么非得做process呢?因为大家想到你之前在这里边用到什么东西啊,你既要考虑之前的状态对吧?哎,到底这个有没有来过,对应的那个创建订单有没有,有没有支付啊,这些都要考虑,这是有状态,状态编程,另外还得有15分钟那个定时器啊,啊,所以当然是要用大招process方式,别的都实现不了,只有用它,那这里边我们自定义实现一个有一个啊,这个函数类,呃,比方说这个我们叫order pay match result,定义这样一个处理函数,然。
03:37
然后接下来最后我们的实现,那还是啊,就是order event stream打印输出,这里边我们要的是成功支付的这样的一个主流,对吧?哎,就是正常支付的放在这儿,那如果要是说超时报警的话,哎,我们扔到测殊出流里边去,对吧?那这个测输出流我们就没办法在之前直接定义出来了,那是在这个process function里边才能去定义的,对吧?哎,所以接下来我们放在后边啊,Order the result stream get,诶,这个不是even stream啊,前面写错了,All the result stream,然后我们这里边get side out foot side output,呃,然后后边我们要给一个当前的奥put t。
04:21
当前类型大家知道结果类型都统一是order result啊,然后里边我们给一个标签,比方说这个叫timeout,后边打印的时候也按照这个来做一个打印输出came out,上面是成功支付的,下面是超时的,后边我们再来定义一个,不要忘记把它执行起来啊。当前是all the timeout without cp,其实就是with这个process function啊。接下来关键就在于自定义实现这个了,对吧?啊,自定义实现key的,大家想到这个我们分组之后的,对吧?还是一个key的process function只针对当前order ID有效,Class。
05:07
Order pay match result,然后这里边我们需要去实现这个啊,Process function,它的类型KIO,哎,那当前的K是什么呢?按照order ID去做的分组嘛,哎,那当然,而且我们前面是下划线的写写法,对吧?哎,那不是driveva元组,这里边我们是长整性,然后输入数据样例类all the event输出数据啊,这里是主流,主流和特殊一样吧,都是all the result啊把这个写好,上面检查一下,不再报错了,类型匹配对吧?啊接下来本身要复写的一个方法啊,重写的一个方法就是process element啊,这个先把它列在这儿,就是每来一个元素之后都会调用这个方法,在这个方法里边我们还可以获取状态,可以用到这个测试,就是当前我们这个,呃,当前的上下文里边可以去获取当前的执行环境,也可以去注册定时器,还可以去输出测输出流对吧。
06:07
这里面能做的操作就都在这里去做了,那我们先定义状态吧,现在需要哪些状态呢?定义状态大家仔细一想的话,在这个处理的流程里边,我其实需要处理的这个状态是不是就是一个看当前。好像也不需要处理特别复杂的状态,对吧?我其实就是只要判断一下当前的这个,呃,就是比方说啊,假如说有这个乱序的情况的考虑嘛,那是不是有可能我一个这个支付事件配事件来了之后,有可能那个create事件还没到啊,是有这种可能的,对吧?啊就正常最正常的情况其实是什么?就是来了先来了一个create,后面又来了一个配,这不就完事了吗?啊那所以其实我怎么样就把前面来的那个create保存起来不就完了吗?或者说我就定义一个一个这个布尔类型的状态,我就告诉告诉当前我的程序之前create来过没有,对吧?哎,只要他来过了,我就等后面的配吗?配来了之后,那不就匹配起来,直接输出成功,成功匹配输出就完事了吗?啊,这是最简单的一种实现啊,但如果说我们考虑的问题多一点的话,就会稍微麻烦一点,就是我们前面讲的,如果说还出现乱序呢。
07:25
那出现乱序,那就是什么,就有可能这个用户手快对吧?啊,他这个create之后马上就这个配了,直接就支付了,那就有可能出现什么,我们这里边处理的流程里边是配先来了,Create后来对吧?哎,那那与之对应的就应该还有什么样的情况呢?啊,就是大家大家发现就是首先有两种情况,就是配先来create,后来配要等create对吧,然后另外一种就是create先来配,后来create要等配啊其实其实就是谁先谁后,一个要等另外一个嘛,所以在这个等待的过程当中,我们是不是都要判断一下另外一个到底来没来啊,那所以这里面非常简单,是不是就是定义两个布尔类型的状态表示。
08:13
就是表示当前啊,就是比方说我这个配已经来了,我判断一下create来没来对吧,然后create已经来了,我判断一下配来没来对吧?如果要是另外一个已经来的话,就是我们说的啊,就是一个已经来了,现在是第二个来了的话,是不是不管什么情况都匹配起来,直接输出就就完事了,对吧?啊直接输出这个成功支付就完事了,那如果在一个已经来了,另外一个没来的情况下,这种情况要怎么样呢?哎,首先我要保存状态对吧?啊,就是我我告诉这个弗林格啊,就是一个已经来了,比方说create先来,最常见的create先来,对吧,我把那个布尔类型支成注已经来过了,那接下来我要等配啊,等配是不是还要去注册一个定时器啊,对吧?这里边我要等15分钟嘛,所以说注册一个15分钟之后的定时器啊,这个是需要去考虑的,那这个定时器有没有可能要删掉呢?
09:13
也有可能要删掉对吧,什么情况下删掉,就如果说后边那个配来了之后,是不是你注册的那个定时器就可以删掉了,没有必要再去做操作了,对吧?啊,所以这里边你如果要大家就会发现啊,这么简单的一个场景,我们要是用这个process function去做处理的时候,逻辑你要考虑的还是要考虑的比较清晰的,对吧?哪个分支里边到底做什么操作,然后我到底需要哪些状态,那现在我已经发现了,首先需要的状态呢,两个状态就是create到底来过来过没,另外还有一个就是配来过没,对吧?呃,这是因为有乱序嘛,所以说我都得判断一下啊,看到底是来过没有了,另外还有一个就是后边我有可能还要删定时器,那你要删定时器的话,那就必须得是不是得保存定时器时间戳啊,哎,所以这里边我们定义的状态就是这个布尔类型的标志位对吧?标志位表示create配是否已经。
10:14
来过,那另外还有一个就是定时器时间戳对吧,保存的就是这些东西,所以接下来我直接用lay的方式去定义啊,那首先就是is created state对吧?如果要是已经创建过的话,已经有created来过的话,这是一个value state,它里边的类型应该是布尔类型就够了啊bully,然后接下来get wrongtime connects,接下来我们直接get state,里边去传一个value state script,我把这个放大一点,然后里边啊给一个名字啊,当前这个叫做is created,里边来一个class of,这里边把布尔类型传进去啊,就是这么简单对吧?啊,然后另外我们还得有一个,既然有is created的,接下来是不是还得有一个is配的呀,跟它定义差不多对吧?啊,基本上都是一样的,因为类型也是布尔嘛,只不过名称大家注意。
11:14
要变一下对吧?这是is paid,然后前边这里边本身这个变量的名也得改is paid,哎,这样就定义好了,然后另外我们还需要有一个时间戳,对吧?时间戳大家知道是长整型,所以我从后往前改类型应该是长整形对吧?哎,当前应该long场景型,然后这个名称改一下,比方说我这个叫timer ts描述器的类型也要改,前面这里边我们定义的这个类型状态的类型也要改,对吧,长整型,然后前面把这个名字再改一下,当前是timer ts state对吧?前前后后就是能需要改的地方,大家注意不要漏掉啊,每一个地方都要改啊,最需要注意的就是这里边的这个名称一定要改啊,因为别的地方你不改它会报错,这里边如果不改的话,它不报错,但是你肯定有问题啊,那相当于你同样的这个状态啊,就类似于我们的这个变量,你相当于给了同样的名称一样嘛,那肯定flink就错乱了啊,找不到具体的位置。
12:14
啊,那接下来我们就看一看process element,每一个数据来了之后,接下来到底要做什么样的操作呢?哎,啊,那另外大家还想到我们后面做那个输出报警的时候,还涉及到一个测出输入流对吧?呃,测出入流的话,还得定义一个测输流标签,我为了统一上边直接把这个定义出来吧,对吧?统一使用啊呃,定义测殊出流标签value,把它定义出来,All the timeout output tag,你有一个output tag,然后里边的类型大家知道特殊流是out the result对吧?里边给一个timeout,大家注意这里边的这个类型跟前面我们这里边要去获取的时候,拿这个流里边数据的时候啊,测殊流的数据的时候,定义的这个类型和名称必须一模一样,对吧?这个tag必须一模一样啊,我们这里边定义的都是timeout啊。
13:14
然后接下来就是process element,每来一条数据到底做什么操作,哎,那我们说先不管别的,我先把状态都拿到吧,先拿到当前状态先先存出来啊,就是先把那个状态里面的数先拿出来啊,那首先我就另外定义这么几个对应的变量吧,Is paid state value先拿到,然后定义一个is created is created state拿到,另外再定义一个呃,Time ts,把对应的那个时间戳保存到那个状态里边的时间戳拿到,先拿到啊,然后接下来呢,就要判断事件的类型了,来的到底是啥对吧?哎,所以接下来判断当前事件类型看啊,其实主要就是看到底是create还是配对吧,别的事件我们直接放行,如果你是更改订单的话,呃,不管对吧?啊,这个我们并不关心,看是。
14:14
Create还是K,所以这里边首先我们要做一个一服判断啊,value.create呃,这个它even type啊,这里面判断的,首先判断这个是否是create对吧?诶这里面大家想一下当前的这个这算是一对吧,情形一啊,这是来的是create,那来的是create怎么办呢?因为有可能乱序是不是我还得判断是否有过配啊对吧,要继续判断是是否配过,所以接下来我当前的这个一这个场景里边,接下来要继续去做判断,If if paid对吧,判断他是否已经啊,就是已经支付过了,那这里边接下来的这个啊,大家会想到1.1对吧,这边就是。
15:14
如果已经支付过的话,那应该是怎么样呢?已经支付过了,如果已经支付过,那是不是就匹配成功啊对吧?正常支付,正常支付输出匹配成功的结果,那这个是侧出侧输出流还是主流呢?当前是主流对吧?我们主流输出的是那个成功的结果嘛,啊所以这里边我们直接out.collect包装成order result young类,呃,里边写当前的order ID对吧,直接就是value嘛,拿出来就完事了,然后写一个配的successfully,这就是我们之前的这个输出的当前的这个信息,对吧?哎,那另外大家要注意一下,你这里边已经成功输出这个匹配结果了,但是有可能有可能之前是什么呢?这不是已经已经这个配过吗?配的时候是不是有可能我还定义了。
16:14
定时器在这等着呢呀,对吧,所以我现在要做一个什么清空定时器,清空状态对吧?哎,把该把这个该清的东西都清空啊,所以接下来我其实是需要把之前的这个所有的呃,Is created,呃,当前的这个做一个clearer对吧,状态做一个清空is paid做一个is paid state啊不是当前的这个变量是吧,之前的那个状态做一个清空,Timer也直接做一个清,哎,注意不能先去,不能先,就是我们这里边不能先去清空当前的这个,呃,Timer state对吧?啊,当然你也可以清空当就是直接清空timer state,为什么?因为我把这个已经保存出来了嘛,对吧,所以这个先后顺序就没关系了,你如果说前面我们没有把它这个状态先拿到,先提取出来的话,是不是必须先要删除定时器,然后接下来才能清空那个定时器状态啊,对吧?呃,这里边大家需要注意一下啊,这里边我们无所谓了啊,直接把它做一个清空,然后接下来删除。
17:14
时器对吧?呃,Timer service调这个啊,Delete even time timer,我们就是当时定义好的这个timer ts对吧?哎,这是做这样的一个操作,这里边大家注意一下啊,啊已经处理完毕,清空状态和定时器,这就是我们这个最简单的这个流程对吧?就是之前这个在create来的时候,乱序的那个配已经来过了,已经匹配上了啊这是最简单的一个场景,然后接下来呢,有衣服就有else了,Else else的话大家想到这是1.2的场景啊,那应该是什么呢?还没有配过对吧?哎,这其实是这是最基本的场景对吧?就正常,最正常的场景应该是就create先来嘛,啊如果还没配过,还没配过那干什么呢?是不是得去注册定时定时器对吧?注册定时器等待。
18:14
15分钟啊,那这里边这个注册定时器的时候,我们先定义一个当前要定义的这个定时器的时间戳,用当前的time stamp,这是一个秒,所以乘以1000对吧,然后再加上大家知道900秒对吧?15分钟嘛,啊,乘以1000对吧?写成这样的形式你至少不会错啊,因为要不然那个一共90万,你这个到底几个零容易搞错,你写成900乘以1900秒没有问题对吧?啊,这是我们当前定义的这个15分钟之后的定时器,然后接下来啊,这只是定义了这个时间戳,还没真的定义定时器呢,对吧?定义定时器注册的时候,我们要调这个timer service.register time timer,把这个TS传进去,另外还得更新状态对吧?啊,不要忘记把这个保存到状态里面去,Timer state,然后这里边update,用这个ts update进去,另外还有一个状态要更新,你现在不是。
19:14
这个配没来过,我现在是来了一个create吗?那是不是需要,那需要把create is create这个state要做一个保存啊,对吧,这里边触对吧,更新成触就完了,一开始默认都是false了啊,其实这里大家发现了,就是假如说啊,就是我这里边是这个前面已经来过create,然后现在又来了配的话,那其实我应该是只去清空这个啊,就是如果说现在来了create,之前来过配的话,其实我是不是只去清空is paid就可以了。Is create是不是只有在先来了create的时候,我才会把它update update成true啊对吧?默认情况下来了之后它应该是false对吧?你这里边如果没有更新处的话,其实没有必要去清理的,当然你直接写一个CLEAR2页也没毛病,对吧?我们就简单粗暴,直接全清空就完事了,好,这就是先现在来的是create这种情况分成之前来过没来过配有两种讨论啊,然后接下来大家就知道是不是还有两种分支情况啊好,我们把这个继续给大家说完啊啊,那接下来else else if啊,因为还有其他的一些类型,我们必须得判断,如果说当前来的这个type是配的话,这要接下来去做一个单独的处理了啊,那那这个是我们说的这个注注释一条啊,这是第二种情形,对吧?如果当前来的是配来的是。
20:48
是配,哎,那在想是不是还是一样判断是否create过啊,因为我们说有可能要乱序嘛,乱序它有可能来的时候这个create还没来过啊,啊,所以要判断是否create过好,所以这里边我们接下来的这个判断啊,就是if又是is created的啊,这个逻辑其实是类似的,对吧?哎,那我们接下来看一下,哎,这里边还有一个问题,就是说假如我们还想处理之前说到的那个乱序情况,呃,就是像像我们之前说到的那个比较特殊的那种场景的话,还有一种什么场景呢?呃,就之前我们说的,假如说已经超过延迟时间了,但是呢,又来了一个当前这个支付的这个请求,对吧,又来了一个配的事件啊,那这个怎么办呢?我是不是相当于还得去定义一个啊,就是相当于得得去判断一下当前时间戳是不是超过本身的那个超时时间了,对吧,还得做。
21:48
的一个判断啊,这里边其实大家想到这里已经是这个2.1的场景了啊,就是如果如果呃,这个已经create过啊,那其实是匹配成功对吧?匹配成功,但是我们还要还要判断一下,还要判断一下时间,就是配支付的时间是否超过了啊,就是之前的那个超时时间,定时器时间对吧?定时器时间啊所以接下来啊,大家看到就是还得继续判断是吧,又一重一啊这个就好麻烦了是吧?哎,那这里我们用value.time step,哎,注意还得乘以1000对吧,要去假如小于哎那个timer ts的话,定义好的那个超时时间的话。
22:48
那是不是这才是正常的输出结果啊,对吧?这里边是2.1.1啊,就是没有超时正常输出,哎,这里边正常输出怎么样?输出out.collect对吧?主流里边result,哎,那大家知道这里边结果应该跟前面这个输出还是一样的嘛,对吧?只要是匹配成功还是value,点这个order ID,然后把后边我们对应的这个successful里直接输出就完事了,这是正常的情况,那要是已经超过已经达到或者超过呢?2.1.2对吧,已经超时,哎,那这里边输出测输出流对吧?输出超时的那个数据啊,超时的结果,所以这里边我们要用ctx output对吧?哎,这里边我们是直接可以用前面定义好的这个timeout output tag,然后当前的数据呢,包装成一个order result里。
23:48
该还是用value.order ID这个时间我们就写一个不一样的对吧?啊,就是这个信息写一个不一样的,因为当前是paid已经支付了,But already timeout对吧?啊就是已经是失效了啊,已经超时了,但是它还是支付成功了,看起来是这样的一个结果,所以我们提醒一下啊,这是一个特殊的情况,可能是业务系统那边出现问题了,对吧?业务系统那边本来应该不能让他支付结果提交支付了啊这是这种场景,然后大家注意一下,不管怎么样,不管你输出到主流还是测输出流,是不是当前都已经匹配完成结束了,大家想想是不是这样对吧?哎,不管怎样是不是都已经,只要输出完,只要输出结果,那么当前处理当前订单,对吧?Order处理已经结束,清空状态和定时器对吧。
24:48
哎,就是又是这么这么样一堆操作,那这堆操作我直接copy过来完了啊,这个逻辑其实就是不是这个更新状态啊,这对吧,直接把这四行直接copy过来做这个操作啊,然后这一部分这个分支我们就搞定了,这就是create已经来了的这个状态啊,后边还要有一个else,就是create如果没来的话,这是我们前面说的2.2对吧,相当于是这种场景啊,如果create没来诶那怎么办呢?Create没来怎么办?
25:23
哎,有同学说那就直接把这个配,直接就是呃,相相当于我们把那个呃一字配的那个状态先更新了,对吧,更新一个状态,然后直接就等着就完了吗?那要等的话,你等多久呢?这就是一个问题对吧?哎,所以这里边我们可以还是定义一个定时器,注册一个定时器,大家想想这个注册定时器,注册什么时候的定时器。那是不是我就等到当前配的这个时间就可以了,如果等到配的时间create还没来,那说明又是业务处理系统那边有问题了,对吧?啊,就相当于是没有create的记录,结果就莫名其妙的突然来了一条配的支付数据啊,那这个我就输出一个特殊情况的报警就完事了,对吧?啊,所以这个也是可以做处理的,但是这种场景可能不常见啊,这这里我们就把所有的分支都考虑到了,所以注册定时器啊,等到等到配的时间就可以,所以大家看一下当前我们这个定义是怎么定义啊,直接ctx time service去做一个register even time time对吧?用谁的时间呢?直接就用当前的这个时间,哎,有同学可能就说了,那你直接注册当前的time Sam难道不是马上就出发了吗?
26:44
不是的,大家注意啊,英文time timer触发的条件是什么?Water mark涨到这个点对吧?哎,那现在是乱序啊,啊对吧,现在乱序数据的话,你这个当前时间是这个时间点的话,就马上就出发了吗?显然不是,对吧,是要等到过一段时间,等到这个数据来了的时候才会出发,为什么有这个操作呢?因为你现在pay已经来了,Create还没来,是不是就应该是一个乱序场景啊,哎,那所以你是不是就等到你的那个water的延迟的时间,等到等足了就够了,对吧,如果还没来,那就真的是他这个create就丢了嘛,这个数据就丢了,那我们就报报一个错误对吧,做一个提示就可以了啊,所以就做一个这样的操作啊,那后面还有就是更新状态,对吧?做一个状态的更新啊,这个更新状态我们可以直接。
27:37
类似于之前的这个操作,直接copy下来,当然这里的更新操作你你就不要TS了,需要去copy我们这里边定义的这个东西对吧,我们没有定义TS啊,直接把这个copy过来,然后另外这里面定义的也不是is create,是is paid state,要update成处对吧?啊就是这样的一个处理流程,好,这就是整个的流程都处理完了,然后还有一个重要的东西没有做,就是on timer,对吧,真的如果要是这个真的这个触发定时器触发的时候,那怎么办?其实大家会想到这个真的定时器触发的时候,才是真正意义上的那个等没等到,然后超时的那种场景,对吧?但是这里边我们定义了两种场景,就是定时器触发有两种场景,一种是什么呢?一种是一种是啊,就是create来了,等配没等到,这就是我们的15分钟超时对吧?呃,正常输出超时信息,另外一种是不是前按前面这个,还有一种是什么呢。
28:37
是配来了之后等great没等到对吧?啊,这也是一种场景啊,所以接下来我们要分两种场景啊啊就是我们先判断这个,呃,如果配来了,配来了没等到没等到create啊,这个比较特殊啊,但是我们也判断一下,那什么情场景就是配来了没等到create呢?那是不是只要是配过这个,如果要是它点value还等于true的话,大家想是不是就相当于是配来了没等到create呀,因为这里边我制了触之后,假如之后create来了的话,是不是就会输出结果清空状态啊对吧,只要都来了就会清空状态,所以如果这里它为触,我就可以认为肯定那个create没来对吧?所以这里边我输出一条测输出流的一个报警信息output啊,还是用那个太帽子吧,尽管它其实这个不是真正意义上timeout啊,这里边我输出一条这个result,当前这个应该算什么呢?诶当前的这个ID从哪里去取,现在拿不到ID了是吧。
29:37
诶,从上下文里面去取去对吧?这里边我们不是有那个get current key吗?当前的key就是order ID,然后后面给一个,哎,我们当前就是paid but,但是找不到not found create对吧,Create log就是相当于这里边应该是有错误的数据,那边有错误的把这个也做一个提示,另外else else的话,那就是如果它是空,但是还触发了,就是当前一配配没来过,但是还触发了定时器,那是不是就是create来了,结果没配没有,没有配成功啊对吧,Create来create来来了没有,嘿,所以接下来我们就是这是真正意义上的我们想做的那个超时对吧?哎,最终我们是放在这了啊,这里边all the result定义出来ctx,同样还是get current key,然后接下来。
30:37
这里边诶,给一个这是真正的all time out,这就是完整的一个处理流程,最后大家不要忘记是不是还是要清空状态啊,清空所有状态啊,那这里边我们就直接抄了,这里边除了什么定时器就不要再删除了,对吧?定时器都已经触发了,你就不用再去删除了,当然这里边直接把这个这里边的三个状态清空就搞定了,这就是我们完整的一个处理流程,对吧?好好,当然这里边我们可以就是先把这个,呃,先把前面的这个助教给大家运行一下,看看这个结果啊,我们先把这个流式数据助教先读取数据源给大家看一看当前的这个结果是什么样的,运行一下,现在已经输出结果了,大家看一下,现在这个报警就不一样了,对吧?啊,略有不同,就是之前还有一些特殊的场景都检测到了,比方说有什么呢,大家看到。
31:37
34756,这个是说的什么配的,But,它already太out对吧,就是说后边是来了那个配了,但是其实已经太out了,那还有什么呢?34767,大家还记得前面我们把那个删了对吧?呃,支付的那条消息给删了,所以这里边大家看到它报的就是一个完整的tout,然后还有什么情况呢?34768,这里边这个比较特殊一点,呃,这里边报的是一个paid but not found create,大家知道在数据里边就应该是什么只有配没有create对吧?啊,所以这些场景我们都分门别类测试出来了,所以大家看你如果要是想做这个精细控制的话,用process function也是可以实现的啊,这个就是大家可以下来之后再试一下啊。
我来说两句