00:00
我们已经知道process function里边到底有哪些主要的组成成分了,到底有哪些重要的方法,最重要的就是一个process element,另外一个on timer,另外它里边还有一个内部的contact,它的上下文里边是有一个timer service的定时服务的,里边可以做很多事情,可以注册定时器,可以去获取当前的时间戳和auto mark。除了这些之外,另外还有一个就是测试出流啊,那所以process function啊,独有的能做的事情就很多。除此之外呢,还有负函数,所有那些能做的东西,他也都能做啊,因为他继承了嘛啊,所以接下来我们给大家大概的看一看这个process function到底到底能干些什么吧,我们简单的做一个测试,接下来我们应该要新建一个。零。首先我们来测一下这个基本的process function的用法,Process function test。
01:03
新的一章我们还是完整的把这个再敲一下exception,呃,然后接下来stream execution environment,对吧?At execution environment,把这个叫做env啊,全局的并行度还是设成一,方便我们打印输出测试啊。Env execute执行起来。是基本的这个环境的流程,然后接下来就是读取数据源env啊,我们这就还是直接那个a source吧,直接用click source。进来之后直接assign。And water marks还记得吧?Water mark strategy ored out waterness,这里可以直接给一个当前的类型,是event里边一个对应的延迟时间,既然产生exce的时候啊,并没有什么延迟,那我们这里边就是直接zero,然后接下来我们再定义一下with。
02:06
I stamp a sign,有一个very liable stamp a s,哎,自动把这个补全。Element点直接提出来就完事了啊,这个如果要是熟悉了之后的话,只要记住这几个关键字啊,艾idea可以帮我们完全都把这个一个一个都补全出来,所以很快就把这一部分就敲完了,然后接下来有了这个啊,我们先定义一下这个就叫STEM吧。啊,大家看就是本来这个是呃,That stream source对吧,然后经过了这一个分配时间提取out之后,那就变成了single output stream operator了,是一个算子,这是一个再来就是基于它去做转换,那我们现在。简单粗暴,直接process基于一个data stream啊,大家知道这个,呃,Single output stream operator就是data stream嘛,啊,所以直接基于它里边传入一个access方式。
03:06
那来到这里边,我们可以。呃,定义一下啊,就这里到底要输出什么东西啊,那比方说这里边我们就输出一个string吧,要不就是做一个测试嘛,输出一个string。里边必须要实现的方法,但是我们看啊,这抽象方法是element,那onver呢?啊,那onver当然不是必须实现了,只有用到了定时器的时候才需要去实现ontime方法啊,就这里边肯定不是必须的嘛,我们现在实现process element,这是每来一个数据都会调用的方法,当然是必须实现了。现在诶,我们看到拿到了当前的当前的这个event啊呃,然后当前的数据还有上下文,另外。还有一个是collector用于输出,所以这里面就,呃,我们自然就想到了,这很容易实现我们之前讲到的map的,呃,这样的功能嘛,你有输入我对应的一个输出,这不就是map吗?或者说我可以做一个filter啊,比方说我这里可以根据。
04:09
根据比方说当前这个value.user我看他到底是谁,然后判断到底要做什么事情,对吧,比方说equals的话,如果是。哎,那么我就out out.C输出当前。字对吧,啊,那我也可以加上一个。BI。后面加上value.url就是谁点了一个一个什么,那同样或者说我们l if啊。如果当前的value.user。Equals,如果是的话。那大家知道我不光可以输出条,也可以把它滤掉,另外也可以输出多条啊,所以这个其实就是相当于我们之前简单的那些转换操作都可以直接把它搞定嘛。
05:13
大招就是这样啊,别人能干的事儿他全能干,别人不能干的事儿他也能干,比方说我输出两条这个Y6.u啊,这是很简单的一个实现啊,然后呃,另外他还能干什么呢?啊,那这里可以给大家看一眼啊,比方说我们可以。我们可以直接用这个下文,关键就在这个ctx这里,对吧,大家看到这里可以直接获取当前的time。我们可以直接把它打印出来啊,获取到当前的这个time stamp到底是什么,然后另外也可以。这个啊,Time stamp还是有有一个有一个这个说明会稍微的好一点,然后另外我们看到这里边还可以看到output特殊出流啊,那这个我们就先不说了啊,它可以使用这个特殊流传入一个特殊流标签,那就可以向这个标签指代的那个流里边去发送数据,就相当于可以把我们的这个数据发到旁支去分支去啊,测出枢流里边去,就像我们之前那个把迟到数据扔到测殊里边去单独获取一样,这个就不详细说了,那另外还有一个就是timer service timer service里边可以看到我可以获取当前的处理时间,也可以获取当前的waterma啊,当然了可以注册日器,对吧,但是前面我们说了,只有KBY之后keep stream里边那个k process function才能用这个定时器啊,我们这儿的话就不能注册了,对吧?啊,那这里边我可以看它的automark呀,啊,我可以把这个。
06:52
那比方说我这个也打印一下。当前的auto。
07:00
就是我们能够获取到的一些东西啊,啊,那另外前面这是一些判断,我们也可以直接比方说在外边直接就做一个输出,比方说value to string啊。要把它完整的这个信息转成string啊,做一个输出啊,那中间呢,这里可能我输出各种各样自己想要的信息,这完全是可以的,哎,但是只能做这些好像也没什么呀,哎,大家不要忘记啊,当前我们这还是一个负函数。啊,如果是负函数的话,大家看我可以get wrong time呀,把可以get wrong time contact,接下来不函数里面能干的事现都能干了,对吧?Index,呃,Of this sub task,当前的索引号是完全可以的嘛,完全可以把这个也打印出来,那当然了,当时我们也说过,函数还有一个特点是运行上下文里边可以去。艾,各种状态对不对?哎,可以把这个各种各样的状态拿出来,然后去使用,我们可以自定义状态,这就涉及到状态编程了啊,这一部分我们后面讲到可以再说啊,大家只知道在process方式里边这些东西都能干。
08:08
当然另外还有就是说你在外边如果看的话,就是我们只实现了这个process element嘛,如果在外边看的话,有一个on timer方法,对吧?啊,当然就是你现在在这儿去定义on timer是没有意义的,因为没有没有定时器嘛,啊在这儿是没有义的,那另外还有一些生命周期方法也是可以用的,因为它是函数嘛,Open close这些都是没有问题的。啊,所以就是关于process function的基本的一些用法,它能干的事情大家可以看的很明显,诶,这是print,可以做一个测试反应啊,那假如说在里边我们定义了那个测殊潴流的话,哎,当然你在外边,那就相当于还得去get side out foot把它拿出来了,就跟我们之前那个做法是一样的啊好,我们可以运行一下,看看当前这个效果怎么样。啊,大家可以看到我们这里边输出了很多信息啊,Event,然后time water mark这里比较有趣,我们先停一下啊,我们来分析一下当前这个数据是怎么回事啊。
09:10
首先来了一条数据,爱ice丝,我们是随机生成的嘛,对吧,23分39秒,611毫秒啊,然后Alice访问这个PRODUCT100,然后有这个时间戳time Sam啊,当前是这个这个这个算正常吧,大家看这不是611嘛,末三位就是毫秒数嘛,啊,这个是正常的,那这个water mark为什么是一个负数呢?啊,这个大家需要注意,就是我们回忆一下auto的定义啊,我们一般情况在这个处理乱序数据的时候,Founded outness extra,就就像这个提取的时候啊,在提取时间戳和water mark的时候,它是周期性的去提取的,默认每隔200毫秒提取一次。那大家想一下当前的这个事件啊,爱丽丝这个数据来了的时候,我们当前到这儿的时候啊。
10:00
走到这个process element的时候,当前的water mark已经改变了吗?按照大家的想法是当前的这个数据来了之后就应该基于它,哎,我们不是零毫秒延迟嘛,但是大家知道应该其实还是要减一的啊,即使是零延迟也是要减一的,那至少你应该也是跟它差不多的一个watermark呀。但是没有。因为大家想到water mark是。哎,数据中间是隔200毫秒一次,隔200毫秒一次是这个样子,诶当然在这里的话,我们这个数据是一秒一次啊,就是这个数据没这么密,Water是不停生成的,但是也不是说你这个数据刚来了之后,我马上water就跟上了呀。所以肯定是我的数据先处理,先走到了后边,因为我们这里处理都很快嘛,他直接就走到了后边的process这一步,然后在process function里边调用了process process element,而这个时候WATER2200毫秒触发的那一次还没到呢啊,所以这个时候water就是原始的值,那原始的值又是什么呢?原始的值是一个非常小的负数,其实在water mark的定义里边啊,大家看这个啊,Bonded out of orderness waters。
11:15
这里边它初始定义的是长整形的最小值啊,那其实大家知道这就是长整形所能表示的最小的数了嘛,很大的一个负数啊,那作为初始值这个肯定是没问题的,因为你知道只要来随便来一个数,正常来讲的话,肯定应该比当前的这个值要大。你为什么不用零作为这个初始值呢?啊,因为大家知道我这里边还得考虑一个延迟呀,啊,对吧,你如果要是加上这个延迟之后,那可能就跟我们一开始的那个时间就会不一样了啊,啊就是我们还得减延迟这个时间啊,所以当前当前我们定义的时候用的是。长整形的最小值,这是初始的water mark,这是索引对吧,索引是零,这个没问题,我们只有只有一个嘛,一个分区啊,所以这个打印无所谓啊,然后接下来你看,如果要是第二条数据又来了,然后这个时候你再打印它的这个water mark的时候,就会看到他的water是610。
12:16
而且你看它是这个40616啊,离离它差一秒多呢,呃,所以这个watermark改变是谁引起的呢?是前面第一个数据引起的。啊,所以总有这样一个差距啊,就是总是赶不上趟,就是我们这个数据处理要更快一点,他马上出来之后就走到后面的process了,而这个时候呢,还没更新,还没有因为它而更新。其实这是肯定的嘛,因为watermark的生成也是一个算子,我们源头读进来,这使这个S。是那个生成的算子,然后后边是process。Process。那么大家知道现在我们打印watermark是在哪里打印的呢?是在这里啊。
13:02
在这里打印的对不对?我们现在连并行分区都没有啊,我们并行度是一,所以数据来了之后,诶,来了第一个数爱ice丝啊,39.611啊,这个数来了,然后到auto这里来处理的时候,诶,然后发现提取出时间戳,然后接下来更新了我们当前最大的时间戳,但是auto APP生成,然后当前数据就又朝下走了,继续到process这里来处理了,恩大来说这个数据来的时候可能water改变吗?Water只有在它后边才能改变对不对,你总得一个一个来嘛,总得先处理完这个数据才能生成water mark啊,所以接下来你这个process这里处理的时候,肯定接收到当前这个数据的时候,Water mark是没有改变的。按照顺序来,它还在后面呢,所以大家看就是总是上一个数据引发的这个watermark默认是减一毫秒的嘛,减一放在这儿了。啊,这顺带就把之前的这个water的机制也再给大家解析一遍啊,啊,这就是这个process function啊,大家看之前我们没学process function的时候,我们都不知道怎么看这个water是什么样子的,现在能看到了,原来watermark长这个样子。
14:13
这就是关于process的一个简单测试。
我来说两句