00:00
接下来我们来讲window API的调用方式啊,那么大家看这个window API其实整体来讲调用方式非常简单,就是首先我们直接要定义窗口的时候,就是直接点window方法,那这个点window是在哪里掉的呢?这个大家要稍微注意一下,必须是在KBY之后,也就是基于一个k stream才能去点window k stream里边才有window方法,所以我们创建window相当于必须是分组之后才去创建这样一个window啊。但是后面我们会讲到,呃,你不分组,直接基于data stream也可以去创建window,但它调用的方法就不是点window了,而是有一个方法叫做window or啊后面给大家再统一说一下,一般我们最常用的就是先KB分组,分完组之后呢第window去开窗啊,那接下来大家看一下这个常常见的这种开窗之后操作的形式。
01:00
啊啊,这个这个事例代码就是非常简单,你看这里边我们就是首先data stream,基于已经读进来的一个data stream,我们做一个转换啊,比方说随便做啊,做一个这个呃,二元组的一个转换,对吧?然后呢,KBY做分组,然后就time window开窗,开了一个time window对吧?时间窗口之前我们说过有time window和count window嘛,所以呃,这里边我们就直接可以点time window和点count window,直接就可以定义出来到底是时间窗口还是技术窗口啊,那这里面大家看time window,你可以直接传一个参数十五,大家看这个稍微有点特别啊,是time.SECONDS15表示15秒对吧?那这个表示什么意思呢?这里面只有一个参数,当然就是当然就是窗口长度了啊,所以你只传一个参数,那大家想这个时间窗口,这应该是一个滚动还是滑动呢?哎,我们之前想过滑动窗口,那得有两个参数,除了长度还得有一个滑动,不长嘛,你现在只传一个参数,当然就是滚动窗口了啊,所以这个滚动窗口的定义是最为简单的啊,直接点T模拟走,然后后面传一个时间就够了,然后大家看后面你怎么办呢?这个窗口里边要做什么操作呢?比方说我们还可以reduce,对吧,就做对于窗口里边的数据做一个规约,做一个聚合,得到自己想要的一个结果啊,这就是基于窗口去做的一个操作,做的一个计算,所以整体来看的话,大家会发现这个其实就是调用还是很简单的,对吧?啊,这个整体来讲没有什么呃太太大的难度,所以我们还是在代码里面先给大家整体的敲一敲,大家来看一看啊,呃,那我们还是在API test下边去new一个object,这个我们就叫做window,呃,Window API test对吧,Window test。
02:55
命方法先写出来,然后前面的这一部分啊,算了,我我也不再去重复去写了,对吧,我们前面就是还是创建执行环境,然后读取数据源,做简单的转换操作,我们就当是全做完了啊,这个大家下来之后可以多练一练,练熟了就可以了,先把它copy过来,同样还是上边影视转换先引入,然后大家注意啊,接下来我们基于这个data stream,比方说这里边我就直接定义一个这个result stream啊,做开窗统计得到一个结果,那我就直接基于之前的data stream去做转换啊,当然大家如果要是你觉得呃不想分开写,你直接就用我们说的这种链式调用,对吧,点点点不停的往后写也是可以的啊,只不过是最后得到this stream就是你最后转换完的结果了啊,那这里边比方说呃,我们就照这个文档里边差不多这样来写啊,我可以先做一个map。
03:55
对吧,比方说这里边我们想要去求取,就是当前一段时间内,每一个传感器最低的那个温度,之前大家还记得我们做那个聚合的时候是直接啊,就是KY之后,分组之后直接来了一个明白对吧,或者一个呃,一个命,或者做一个reduce,直接得到的就是来一个数据就马上更新一下结果,来一个数据更新一次结果,它更新结果是基于什么来更新,就是算什么的最小值呢?其实是之前所有数据,就当前传感器对吧,三四十一或者346自己所有数据的一个最小值啊,那现在我们就想到,你往往我们不需要知道所有的历史数据,对吧,我只关心它最近一段时间内啊,那你比方说我这里边就关心当前啊,一分钟之内的所有的温度里边的最小值,那这当然是可以去考虑的啊,当然一分钟可能稍微长了点,我们就还像这个,呃,文档里面例子一样,15秒15秒等一下测试,我们不用等太久,对吧。
04:55
啊,那所以这里我们首先做一个map map成,哎,大家知道这个本来是有那个时间戳的嘛,我们如果要是关心它当前一段时间内的那个最小值的话,那肯定就是只关心最小温度喽,呃,对于这个时间戳其实不太重要,对吧,我只要窗口把它收进来就完事了,所以接下来我们就首先做一个这个map操作,对吧?把当前的data我们map成一个二元组,当前是sensor reading,所以。
05:25
诶,ID对吧,首先ID要放假,然后另外还有一个就是temperature,先把它放在这儿,然后接下来呢,当然开窗之前必须要做皮摆分组,对吧?啊,这个分组呢,前面我们讲过有几种不同的方法,你可以直接给一个零,这表示当前的第一个位置,另外还可以,哎,之前我们说过可以给一个ID表示当前的这个,呃,我们当前的这个字段名称对吧?但这里大家要注意,我现在已经把它变成了二元组之后,那是不是就不能直接用ID了呢?
06:01
大家想想是不是这样对吧?你变成二元组之后,之前我们这三次reading里边,你知道这个ID是什么,属性是什么,对吧?Temperature是什么?你变成二元组之后,这个已经没有名称了,哎,那你这个时候就不能用ID了,但是可以用什么呢?这里给大家点进去看一下啊这里边的KBY大家看到有这么几种不同的实现方式,有四种重写的方法,重载的这个方法对吧?啊,那这里边上面第一种是传这个int表示下标,所以你的下标,然后第二种呢,传一个string,表示当前的一个字段名称,后面还有两种,大家看这个就就更加的一般化了啊,可以传一个拉姆达表达式,传一个函数进来,这个函数是什么呢?啊,你通过这个函数类的这个写法,这表现的更加明显,这叫什么呢?这叫一个key select。一个键的选择器,好,那这个含义是什么?就是我们当前输入的数据类型不是T吗?然后你写一个函数,就是从这个输入数据TT里边抽取出当前的K作为返回,你定义这样一个函数,那当前我们的K就定了,对吧?你就是以它作为当前的分组的依据,当前的这个键来做的分组,那所以接下来我们这里边有另外一种写法啊,大家就知道了,我是不是可以直接用贝塔点ID来做一个啊,当然这不是ID了,因为是二元组了,所以是下划线一来做一个分组啊,啊,甚至我们还有这个更简单的写法,大家还记得这个盖拉里边拉姆达表达式可以省略,对吧?参数只出现一次的时候,我们用下划线把它代替,所以我可以直接下划线,点下划线1KBY这个这就是按照我们还是写一下啊,按照二元。
07:52
组的第一个元素分组,也就是ID对吧,ID分组。
08:00
好,那然后接下来我们讲这个不同的这个类型对吧?这个KBY的这个重载的方法,就是大家现在感觉的话,就是用哪个都行,你喜欢用哪个都一样啊,那至于它到底有什么区别呢?哎,其实刚才我们在里边调用的时候,大家也看到了啊,就这里边这个函数,你这里边这个是不是必须它是哪个K,你就必须返回什么东西,对吧?呃,这里边你必须直接在这里边定义好,然后如果要是你前面是给一个这个下标数,相当于我们那个元素的位置下标的话,可以它是一个可扩展参数,你可以给多个,或者你给这个字符串类型的话,也可以给多个,对吧?啊,那大家就发现了,这里边你既然可以直接给多个,是不是我们这里边得到的这个k stream的类型,它的K的类型是不是就是一个Java temple啊,对吧,是一个va元组类型对不对,因为你前面可以给多个值嘛,啊,所以后面这个。
09:00
类型就包裹的稍微的复杂一点,那你这里边K选选择器呢,你已经明确的确定它就是选择这一个,所以你看后边他得到的KSTEM就是当前数据,当前的这个数据类型是什么,是不是他这里边k stream的这个K的类型就是什么呀,对吧?啊,这里边就是你到底看哪种使用方便了啊,就有时候在实际使用的时候哪种都行啊呃,那比较推荐大家的是,就是你尽量让我们这里边的这个就是至少是表达看起来更加的呃,就是更加的。明白一点,比方说你像我们做这个,呃,Sensor reading,这是一个样例类,输出的时候你用字符串对吧,也可以,或者说你用这个传一个,你说下划线点我直接点ID也可以,对吧?直接把那个ID拿出来,这样的话你至少能看到当前你是以什么字段作为分组标准的,你如果直接传一个零一的话,这个可能我还得找到那个sensor reading的定义,对吧?这个代码的可读性会降低,所以推荐大家还是让这个代码可读性更高一点好,然后接下来我们就是开窗了啊,接下来的这个开窗操作非常简单,可以直接点window,但是如果说啊,我这里边要是直接做了这个点window操作的话,大家看这个里边得传什么呢?啊,有同学说,那那我里边就就还是传一个那个什么time什么什么样的东西嘛,啊不是的啊,大家点进去你可以看一眼,这里边要传的window是什么,里边参数是什么呢?是一个window a signer,我们要传的是这么一个东西。
10:37
这个window a signer又是个什么东西呢?啊,这个就感觉好像有点奇怪对吧?它叫窗口分配器,它是一个抽象类,那它具体的实现大家可以在这个源码里边稍微的看一下啊,就在当前啊,这个API windowing.ainer这个分配器这个包下边,大家看到这里边有各种各样它具体的实现,我们比较常见的常用的主要是什么呢?诶,就是大家看这这分类分的啊,Tumbling even time Windows对吧?Tumbling processing time Windows啊,当然还有一些timely,其他的一些Windows,另外还有什么呢?还有sliding even time Windows sliding什么processing time Windows对吧,大家直观的一看啊,另外就还有这个什么global Windows,还有even time session Windows对吧?还有这些东西,那这里大家直观的一看的话,主要就分成几大类,这不就是我们说的滚动窗口滑动。
11:37
窗口对吧?呃,另外还有这个,呃,Even time session Windows,这是一个就是我们说的会话窗口对吧?当然还有这个processing time session Windows啊,那另外在这个这里边大家看到全部都是时间窗口对吧?啊,那有的同学可能想,那这个技术窗口怎么样去定义呢?啊,技术窗口就直接你用那个点抗window啊,用那个简单的那种简写形式就可以了,我们这里边看到的这个具体的实现的这些分配器都是时间窗口啊,那大家看一下这个具体我们要调的话啊,里边怎么写这玩意儿,比方说哎,我们这里边要给一个这个滚动的时间窗口。
12:15
怎么来呢?哎,那就是前面我们看到了tlin,然后这里边有这么几种不同的这个时间窗口的选择,对吧?常见的大家看就是两个,一个叫做tumbling even time Windows,一个叫做processing time Windows,那这里边的even time和processing time又是什么呢?哎,这是我们所说的这个后面要给大家讲的时间语义,不同的时间语义对吧?啊,那对于这种不同的时间语义,我们可以给它定义不同的窗口定义的方式啊,这个就是后面我们可以讲到这一部分的时候再给大家展开,现在大家只要知道有这么一种调用的方式就可以了,然后这一个,呃,当前的这一个类型啊,我们把这个点进来看,它实现了一个window sign,对吧?啊,但是这里边直接调用的时候,哎,你又发现了啊,它的构造方法又变成了一个protected,又变成了这样一个方法,所以就是说我们这里边调用它的时候呢。
13:15
你又不能直接去拗它,那得怎么办呢?哎,这里边我们就来看一看,它里边其实是有对应的一个方法的,这里边有一个方法叫做of,大家看有一个叫做of方法,然后这个of方法呢,可以有这个重载的参数,传一个参数,诶这里边就是一个这个一个size就传进来了,因为大家知道当前是滚动窗口嘛,已经定义死了,它只要传一个size就OK,对吧?然后另外还有大家看底层,它调的是两个参数的这个方法,这个参数多了一个什么呢?多了一个offset offset大家知道就是偏移量的意思吧,啊,偏移量指的是什么?哎,这个偏移量大家看上面我们调用的时候。
14:02
就是刚才我们看那个这个调用的时候,偏移量直接给了零,给了零是什么意思呢?啊,那自然我们就想到了,你在定义这个窗口操作的时候,我们说这个方窗口分配,分配器是干什么呢?它其实就是呃,把我们当前所有的窗口定义好,然后来了数据之后,那是不是它属于哪个窗口,我就按照那个时间给它去做分配,对吧,我们说不是相当于是不同的桶吗?来了之后你直接把它扔到这个不同的桶里边去就完事了啊所以这个整体来讲还是还是比较简单的,哎,那这这里边如果说就是你,你要是想就是大家会想到我只给一个size啊,比方说我要定义这个滚动窗口,一个小时的这个滚动窗口,那你说我定义这个滚动窗口是八点到九点,九点到十点,九点十点到11点,这样整小时的去去创建这个一组窗口呢,还是说哎,就像我们前面说。
15:02
的,我要的是8.05~9.05,然后9.05~10.05 10.05~11.05,这是不是也是一组长度为一小时的滚动窗口呢?那这两种定义方式完全不一样啊,所以大家会发现它在定义的时候呢,默认给了一个offset offset是零,哎,那大家就想到了offset是零,那是不是就是应该整小时去定义啊,啊,所以你也可以单独去定义,你如果就比较膈应,我就想要它的这个offset变成一个五分钟对吧?哎,那你这里边就把这个offset传一个,也是一个time,传一个五分钟进来就完事了。啊,所以这里边调用就有各种不同的调用方式,好,这里边可以给大家看一下,就是你在实现的时候就直接这么来,对吧,直接调它这个嗯,方法,然后呢,里边要传一个啊,你看这个time这里引入对吧?这里边大家注意一下啊,有好多地方有这个不同的time,我们现在要用到的是一个windowing time.type是是选取这个东西,大家导包的时候不要不要导错了啊,这个容易搞混,呃,然后接下来大家看它里边就有各种days hours minutes对吧,Many second seconds就有不同的时间单位啊,你要定义什么定义什么就可以了,比方说这里边我们定义一个15秒的对吧?啊,这15秒的一个滚动窗口,那你如果想要有这个偏移量的话,你后边还可以继续跟一个偏移量对吧?呃,比方说这里边给一个三秒钟的偏移量,对吧?啊,这个是完全可以的啊啊一般情况这个偏移量不怎么用,大家可能想这个偏移量。
16:38
之后用的会就是会会用到这个偏移量呢,啊,这里边其实在二方法里边,上面这个已经说的比较明显了啊,大家看这个,呃,这里面我们给大家看一下这个。呃,源码里边的注释对吧,大家看一下这个啊,这里边调到了这里边传这个offset的时候,大家看这个源码里边注释,他讲了一句,主要是用来干什么呢?哎,就是用用来考察我们当前的时区,这个用的时候比较多啊,为什么?因为大家想到这里边它底层啊,你去创建这个窗口,时间窗口的时候,它底层是干什么呢?诶都转化成这个毫秒数,然后我们要的是一个就是标准的一个时间戳对吧?哎,那个时间戳其实是什么呢?都是我们打上那个时间戳,它其实都是从就是1970年1月1号,而且大家知道不同地区的这个时区是有这个时差的嘛,我们用的都是那个标准时间,就是伦敦那个格林威士时间,对吧?呃,1970年1月1号00:00:00开始算起到现在为止的一个毫秒数啊,那大家可能就会想到,那你要这么算的话,这个毫秒数如果算成轮。
17:53
蹲时间,那可能是一个时间,如果要是算成我们当前的这个,呃,北京时间的话,那就是另外一个时间了,哎,所以接下来如果说我们这里边得到的那个日志里边,大家去提取时间的时候,对吧?你看到的那个时间,或者说我们运行代码的时候,我当前获取当前系统时间,我拿到的如果说是本地时区的时间的话,那大家知道这个北京应该是一个,呃,是一个东八区的时间,对吧?那你拿到这个东八区的时间后边要做转换的时候,你把它当成一个标准时间来转换这个时间戳,那是不是就会有问题啊,对吧?啊,涉及到这个交互的时候肯定就会有问题,当然你如果要是不涉及到跟跟不同时区的这种这种交互,那其实好像也没什么,对吧,你就稍微的错,这个八个小时其实好像也也也还是正常的啊呃,但是有时候我们可能这个你考察比方说呃,每一天的时候,对吧,你比方说你要定义这个直接就定义了time.days那你time.DAYS1它表示的是什么?
18:53
那它表示的是标准时间的每天零点到第二天的这个到24点到第二天的零点,对吧?啊,所以这是伦敦时间,那如果要是北京时间,你想定义一个就是我们当天的零点到第二天零点的一个一天的窗口,怎么定义呢?给一个偏移量啊,大家知道我们北京时间是比伦敦时间要早八个小时对吧,太阳从东方升起嘛,我们东边的时间要早一点,所以这里边正常情况你就要给一个什么呢?在days one,然后time hours,负八,对吧,你给一个负八小时的偏移量,这样的话,你用起来它就是每天就是按照当前的这个。
19:33
加八小时啊,每天就是伦敦的早上八点到第二天早上的八点,那那对应的就是我们北京时间每天的零点到第二天零点了啊,这个稍微给大家说一下,在实际这个应用的时候有可能会有用啊啊然后这里边你具体接下来要去做操作的话啊,这里边是定义了一个滚动窗口对吧?滚动时间窗口哦,那那大家可能会想到,那这个滑动时间窗口怎么去定义呢?啊还是一样,这里边就是我把这个要注掉啊,因为大家看到这个window操作必须是kid stream里边,大家看一眼这里才有这个window操作,对吧?啊,你如果要是到这个data stream里边的话,大家还。
20:18
呃,知道这个,这是data stream啊,Data stream里边的话就不叫点window了,而是叫点window on对吧?啊,然后里边这个基本上跟我们那边是一样的啊,这里边要传的是一个也是一个window的分配器,得到的是一个or window的string,而如果说这里边我们直接给一个这个window啊,大家看这个k k stream里边点window之后,它得到的是一个window的stream啊,这个数据类型又又发生改变了,对吧,整个这个stream的类型又发生改变了,那在这个window stream基础上呢,它又可以去做各种各样的操作,你看这里面可以reduce对吧,可以aggregate,可以fold,就是我们常见的这些聚合操作嘛,这就是类似于这个KPI之后,分组之后那个状态啊,只不过是分组之后,我们现在无无界流变成了一个有界流了啊,分分不同的桶,把它装起来,然后去做聚合统计了,所以接下来这个操作跟我们前面讲的那个KY之后的呃,聚。
21:18
格差不多,大家看也有max me some对吧?呃,明白max by这些是都有的啊,所以接下来后边我们要做的就是这样的一个操作,你可以做一个reduce,那我们现在还是给大家看一看这个其他的几种不同的时间窗口的定义啊,啊,那大家知道如果要是滑动窗口的话,那就是直接sliding啊,这里边你可以even time,你就可以processing time对吧?啊,这个是时间语义的不同,那后边你如果要去定义的时候,大家看它的这个参数就比刚才就多了一个对吧?默认至少得有两个参数,一个size,一个slide啊,然后后边呢,这里边你还可以多一个offet offet的含义是一样的啊,所以这个大家就是稍微的写一下知道怎么回事就可以了,比方说我这里边,哎,你是这个长度是15,然后我滑动距离是十对吧,完全可以滑动距离是三也是完全可以的啊,随便给一个都行,那另外刚才大家也看到了,如果说我这里边要的是一个啊,我把这个也注了啊。
22:18
如果要的是一个会话窗口,那怎么办呢?绘画窗口的话,同样这个aign,哎,那就得要的是一个啊大家看这个,比方说我用一个event time啊,Event time session session Windows,哎,那接下来它没有of方法,而是什么呢?呃,同样你点到里面去发现这是一个protected的构造方法,对吧?啊,现在我们要大家其实一点就看到了,有一个with GAP,呃,就是我们说不是他两次会话之间要有一个,就是要有一个GAP嘛,对吧,呃,要有一个这个超时时间嘛,我们平常两个数据之间的那个间隔就是大于等于这个超时时间,大于等于我们定义好的这个东西,它就可以直接啊给我们截取一个窗口出来,所以这里边你看它本身定义的也是一个time size,对吧?啊,那接下来我们就是传一个比方说啊,一个十秒钟,那就是如果说数据。
23:18
不停的来,哎,假如说这两个数据间的间隔时间小于十秒钟的话,他们还属于一个窗口对吧?如果说两个数据之间的间隔,诶,这个大于十秒了,大于等于十秒了,哎,那这个时候前面这就是一个窗口,后面这个就属于下一个窗口了,对吧?啊就是这样去做统计啊,所以这个整体来讲还是非常简单的啊,然后大家会发现这是最一般化的点window的这种调用,那还有一个简写的方式,当然大家也也知道了哈,这个我们写一下,这个是滑动时间窗口,呃,然后下面这个是会话窗口,另外大家知道,因为这个写法太麻烦,对吧,你每次还得记这么一长串,记不住啊,诶,那怎么办呢?有一个非常简单的简写方式,就是直接time window,然后这个time window里边参数就简单了。
24:18
你看它其实就是直接传两个参数,这就表示什么呢?表示它是一个sliding processing time Windows对吧?啊,你看它就说这是一个shortcut,就是这是一个简写的形式,对于这个底层而言,它最后还是什么呢?还是一个点window啊,Sliding even time Windows,或者是sliding processingtime Windows传两个参数的时候,对吧,那或者呢,我可以只传一个参数,我们看一下那个time window啊。只传一个参数,这就相当于是一个滚动窗口了,对吧?这也是一个简起tumbling啊,那我们点进去看一下源码啊,它底层是什么呢?哎,你看底层这不是还是最后return的时候给了这么一个分配器吗?对吧?Window,然后传了一个tling processing time Windows,这里要判断时间语义到底是什么,然后给我们调用这个不同的当前的这个分配器,然后of当前的size就可以了。
25:10
啊,这就是这个time window这个简写的方式啊,我还是把这个给大家直接写出来,这就是一个滚动窗口啊,那那另外滑动窗口就两个参数,这个就不写了,另外还有一个,那就是大家看到有一个我把上面这个注掉啊,这个还是必须基于k string的,大家看还有一个count window对吧?Count window这里边呢,跟这个定义是一样的,只不过就传的不是时间了,而是直接大家看传一个长整形就可以了,你传一个参数,那就是大家看,这就是一个滚动的滚动的count window对吧?呃,然后你如果要是传两个参数的话,这就是一个滑动的count window。那它的底层是什么呢?哎,这里大家注意一下啊,它的底层大家看到这里边,诶,这个点错了啊,这个是,这个是time对吧,这是time window啊,我们看那个count window count window在上面,它的底层其实是一个,大家看window里边用到了这个global Windows,这个global window又叫什么呢?这个叫全局窗口,全局窗口就是它把所有的数据直接来了之后,全丢到同一个窗口里面去,然后没有结束时间,所以说你得怎么样呢?你得自定义它到底什么时候触发,哎,所以大家看它后面有一个自定义的触发叫trigger对吧,这个叫自定义的触发器啊,然后用这种方式去给我们实现了,还有一个艾米,艾米叫移除器,就是什么样的数我扔出去对吧?不,不做考虑把它排除出去啊,这个是一些其他的这个window API啊,到最后我们再给大家统一再来说,这里大家至少可以看一下这个count window的底层实现啊,所以说它是用这个。
26:50
Window来实现的,那这个调用肯定就太复杂了,对吧,你也可以用这种方式去写,但是一般情况我们就直接用简写形式抗图window就完了啊,然后里边你传一个数,这就是一个滚动窗口,传两个数就是一个滑动窗口啊,那我们平常用的最多的肯定还是time window了,一般大家直接这么写就OK,这就是关于这个代码里边窗口定义啊,窗口分配器的写法。
我来说两句