00:00
我们已经了解了connect的基本用法,那在这个过程当中呢,主要分成两个步骤,第一步那就是两条流,通过调用data stream的点connect方法先连接到一起,得到的是一个connected streams。那么接下来呢,就是基于connected streams,再去调用对应的转换方法,得到新的data stream。在这里我们可以看到connected streams可以调用的方法主要就是map flat map以及process。Map和flat map,这是对应着我们之前讲到的基本转换运算,那么process呢?啊,很明显这就对应着之前我们说过的底层接口处理函数了,那么在这个process方法里边需要传入的就是一个Co process function,前面我们提到这其实也是处理函数家族里面的一员啊,那所以现在我们就又接触到了一个新的处理函数,那前面我们也点进去看过啊,这个处理函数呢,跟其他的process方式其实也没有什么太大的区别。
01:05
只不过就是这里边啊,对于process function而言,本身是一个输入类型,一个输出类型,现在呢,它变成了两个输入类型,一个输出类型。本来process方式里边必须要实现的是一个process element抽象方法,现在呢,就有两个这样的抽象方法,一个叫做process element1,一个叫做process element2,那同样后边还会有一个on time方法,我们在里边可以去注册定时器,处理定时事件,这里边它有一个当前的上下文。这里我们自然也会想到之前我们说process function式一般的处理函数里边,其实它是不能去注册定时器,然后去实现对应的on time方法处理我们的定时事件的。如果想要注册定时器,那必须得在kid process function里面才能够使用,那同样现在对于Co process function而言也是一样的道理,那如果说我们想要用定时器的话,那也必须基于键的指定,也就是必须按键进行分组之后才能够调用,那这个调用的是什么呢?啊,我们看到这里同样有这样的一个方法,就是connected streams可以调点process方法里边传啊,不传一个Co process function,而是直接传一个key的Co process function。
02:24
那同样K的Co process function呢?跟Co process的区别就在于它多定义了一个K啊,那里边同样也是需要实现process element1和process element2,在这里我们就真正能够去使用定时器了。那看到这儿我们肯定就会有一个疑问,就是同样都是connected streams调一个process方法,为什么它可以同时既可以传Co process function,又可以传kid Co process function呢?那当前的key到底在哪里指定呢?啊,其实我们进一步查看当前可以调用的方法就会发现啊,Connected streams里边本身就可以去调用KBY方法啊,那所以这里边我们可以直接调KBY,然后里边呢?啊,这里可以传两个参数。
03:12
就是指定了当前两条流里边的K,我们这里边可以传一个整数类型的位置,指定当前建的字段,也可以传一个整形的数组,指定多个字段啊,那当然了,我们也可以传一个string类型的字段名称,指定当前的键。另外呢啊,更加通用的方式当然就是使用两个LA的表达式来实现对于两条流里边K的选择。对于两条流的合并操作,我们指定他们的K有什么意义呢?哎,其实这个非常的有用,因为对于两条流合并操作而言,如果我们指定了K,两条流里边都选择了对应的K的话,那接下来我们得到的data STEM里边就会将本身两条流里边K相同的数据都放在一起。
04:06
那么接下来我们所调用的处理转换方法里边,就可以将两条流里K值相同的那些元素放在一起去进行处理,哎,那之前我们说过啊,在底层的process function里边,我们可以自定义状态去进行状态编程,那这个时候呢,诶,我们设置的状态,它的可见范围也就是针对当前的T有效的,哎,所以这样的话就相当于我们实现了一个。两条流按照某个K去进行连接,进行数据匹配的一个操作,这就有点像我们在关系型数据库里边按照某个K值去进行join的那种操作。哎,那所以说这种方式啊,在实际应用的过程当中,其实是非常非常普遍。啊,那对于connect而言呢,我们指定它的K可以先得到一个connected stream,然后再调用KBY方法,同时传入两个函数来指定两条流里的K也可以呢,啊,也可以直接就在connect之前,我们就把两条数据流先做一个K呗,哎,那就相当于我们就变成了一个kid stream,再去连接另外一条kid stream,因为我们知道kid stream本身也是data stream嘛,所以它可以调用connect的方法,那如果说我们是这样的一个调用方式的话,后边即使不做KBY,我们也可以默认认为可以实现一个key的Co process function去进行按键分组的两条流的合并操作,啊,那所以这个处理起来就会非常非常的灵活,非常的有用。
05:49
所以接下来呢,我们可以举一个具体的实际案例来看一看。在实际应用的过程当中,怎么样利用connect去实现更加复杂的需求啊,那这个需求呢,这就是我们所说的一个实时对账的需求啊,简单来讲呢,就是说我们当前在网上进行操作的时候,本身电商平台对应的这些支付操作,一般都会关联到另外一个第三方支付平台上面去啊,那比如说像现在我们所使用的微信啊,支付宝啊,一般我们都会关联起来,那在网站或者说APP上去进行的支付操作。
06:29
本身是一个事件,我们会记录到当前网站的日志信息里边,而第三方支付平台的支付操作呢,它同样会返回相应的信息啊,所以对于我们日志收集而言,这明显就是两个不同的数据来源,两套不同的日志。在实际应用的过程当中呢,正常的一个支付行为应该是我们在当前的网站上可以正常支付成功,可以返回一个正常的数据,另外呢,同时同一个订单应该也能够收到第三方支付平台给我们返回的一个成功操作的信息啊,所以如果说我们想要去检测一些异常情况的话,那可以把我们收集到的这两条日志数据流里的信息做一个连接匹配,查看一下是否每一个网站上的支付操作都对应着能够收到第三方支付平台给我们的返回啊,那如果说等不来对应的事件的话,那么我们就输出一个报警信息,比方说呢,呃,两边的这个支付事件我们都可以等待五秒钟,就是假如说我当前一条流里边有一个支付事件,那我就等待另外一条流里边的对应的那个支付事件。
07:48
等五秒钟,如果五秒钟还没等到,就直接输出一个报警信息,那这样一个需求的话,我们自然就想到了,它涉及到了一个定时操作,根据当前的事件时间,然后再延迟五秒钟去进行一个报警,所以这里面我们需要用到定时器,那定时器的话就自然想到了底层的process方式。
08:11
而同时呢,我们现在又想要把这两条流里的数据要按照对应的订单号,一个ID要进行一个连接匹配,这个时候当然就是一个双流连接或者双流状语的一个场景了,所以我们自然就用到的是一个Co process function啊,我们需要先把这两条流先connect连接起来,然后去做一个process处理。接下来我们就在代码里边去做一个具体的实现。这是一个实际案例了,我们创建一个object实时对账的需求,我们就叫做Bill check example。那方法先写出来,哎,同样还是先把执行环境获取到。DV上面我们还是下划线引入。
09:00
为了方便测试,我们还是全局的并行度直接设置成一,然后接下来我们就是读取数据,数据源的话,我们现在应该有两个来源,一个是来自当前的网站或者APP端直接收集的支付日志,另外一个呢是来自第三方支付平台给我们的返回信息,所以接下来我们应该要创建两条流,首先是来自。APP端的支付。日滞留。啊,这个其实就是直接我们定义一些测试数据就好了,那就from element,当前的这个支付日志数据呢,最重要的信息当然就应该是诶,订单的ID,然后还有当前订单支付的状态,到底成功了没有,最后还有一个时间戳,诶那简单起见的话,我们就把它包装成一个三元组的这样一个类型吧,一个订单ID,一个支付的状态status,另外还有一个time step。
10:02
这样的三元组,所以这里我们也可以直接去给定测试数据啊,那直接生成一个O1。他的状态支付成功,Success。哎,那最后来一个时间戳,这是第一秒钟发生的事情。另外我们同样可以指定一个第二秒钟一个新的订单,ORDER2同样也支付成功,这是我们能够想到的平台上啊,我们当前网站上能够看到的一些信息,我们就把它叫做APPSTEM吧。然后接下来还应该有第二条流。那就是来自。第三方支付。平台的日志事件流,哎,那这个同样的我们可以from elements,那对于第三方支付平台的日志事件呢,可能就会跟我们这里有所不同,它同样也应该有一个order ID。
11:03
那另外呢,也应该有一个status。接下来还应该有一个当前的支付平台,到底是哪一个,哎,那就是平台ID。ID。最后还有一个时间戳,哎,所以我们看到两条流的数据类型是不一样的,直接UN那是肯定做不到的啊,必须要用connect。这里我们就直接借用之前的数据吧,只不过现在不是三元组,而应该是一个四元组,比如说我们有一个AL1,然后success,接下来插入一个平台的信息,比方说我们当前是微信支付WeChat啊,然后第三秒钟。检测到了这样的一个日志事件,然后另外我们还可以定义一个。啊,这个我们就专门错开找到一个订单3ORDER3的一条日志事件success,另外我们可以啊,它是支付宝阿里配第四秒钟收到了他的一个日志时间。
12:06
这样得到的这条流,我们可以把它叫做第三方支付平台。Third party street。那这两条流接下来要进行连接,还要按照对应的order ID作为K去进行匹配,另外呢,还跟时间有关,因为我们说当前每一个数据到了之后呢,它要等另外一条流对应的那个数据,等五秒钟,哎,所以这里边我们还应该指定事件时间啊,就是我们要从里边提取时间戳,然后生成watermark,所以接下来我们必须有这样的一个操作啊,那当前非常简单,我们看到都是升序的,那就直接assign asending time steps,接下来我们使用第三个字段,三元组的第三个字段时间戳啊,那同样对应的这里可以assign,现在是第四个字段,最后一个字段是一个时间戳。
13:00
把它们都定义好了之后,接下来要做的操作,那就是连接。两条流。进行匹配数据检测。所以我们就是APP stream,直接调用一个connect方法传入third party stream。注意,这里我们还要进行一个KBY,需要指定他们两者的K到底是什么,哎,因为如果是不同的order ID的话,你去检测他们对应的那个事件到底有没有,这就没有意义吗?必须是相同的order ID,然后我们检测是否在两条流里边都出现了,这才是匹配的状态,这就是我们所说的对照啊,要对得上,所以接下来的这个KBY呢,我们要传入的就是他们对应K的位置啊,那我们可以看到。以order ID作为当前的K的话,哎,那我们选择的其实都是下划线一。然后接下来啊,那既然后面要注册定时器啊,五秒钟之后肯定就是一个process了,里边要实现的是一个扣process。
14:09
方式。Co process function里边有三个类型参数,诶,我们知道是第一条流的输入数据的类型,第二条流输入数据类型,还有最终结果输出的数据类型。那因为我们是要检测到不匹配就报警嘛,所以我们输出报警信息的话,直接用一个string就可以了,那现在输入的数据类型呢?呃,这个稍微麻烦一点,是元组类型。第一条流三元组。String string浪第二条流四元组。String了,那最终输出的数据类型就是一个string。然后里边我们就必须要实现两个抽象方法,一个叫process element1,一个叫做process element2,而这就是我们接下来所要去实现逻辑的地方。
15:04
因为前面我们已经做了KBY,那当前的Co process方式啊,我们这里边的对象实例能够获取到对应的信息呢,就都是基于同一个K里边的数据,会调用到同一个实例对应的代码来,哎,那所以这里边我们看到process element1里边,诶,得到的就是当前order ID啊,第一条流里的数据,那process element2呢,得到的就是第二条流里边对应order ID相应的那个数据,那这里面我们会发现他们俩又怎么样彼此产生交流,然后知道对面已经来了数据呢?因为我们知道就是如果说对面没来数据,我这边的话就要设置定时器,然后等五秒钟了嘛,那怎么样才能知道对面来没来呢?这就涉及到我们必须要做一个状态的保存了,必须得存起来,假如说之前已经有数据来过的话,必须要先做一个保存,所以我们现在既然是按照K要保存数据的划分,那自然就是要把它定义成一个当前算子的状态。所以我们定义。
16:11
状态变量,这就又涉及到了状态编程,主要的目的就是用来保存已经到达的事件。哎,那所以这里边我们可以定义两个状态,一个就是保存已经到达的第一条流里边的APP支付事件,另外一个就是保存第二条流里的第三方支付平台的支付事件,那这样的话就干脆啊,定义一个一个就叫APP。Event。另外一个就叫做。Third party。Event。哎,那这两个事件呢,很显然就要用到link给我们内置的一些状态类型了,哎,所以这里边我们就想到了它的类型应该是什么样的呢?我们这里边就直接,因为它只保存一个值嘛,这个类型就不再是之前我们用的list state列表的状态了,而是一个value state。
17:09
它同样也会有一个泛型类型,哎,那就是我们到底保存的是什么东西啊,那现在我们要保存的第一条流里的数据,当然就是三元组类型了。直接可以把它定义在这里。那同样对应的third party events呢,就应该是一个次元组类型的。Value state。我们可以直接把它先声明出来,定义在这里。当然了,按照之前我们对于状态变量的定义和使用,那在外边定义的话应该是一个va类型,然后接下来在open生命周期里边对它去进行对应的赋值,啊,那就是get runtime context。我们在里边APP event等于get runtime context,然后现在是value state,就直接get state就可以了,然后里边呢,要传入一个value。
18:05
State。描述器script啊,那里边呢,同样我们看到。它需要的参数一个就是内当前状态的名称,另外还有就是对应状态的类型啊,那后面还可以有一个默认值,默认值的话我们看到这个方法已经被deprecated啊,已经要被弃用了,那现在啊,最一般化的写法就是两个参数,就是一个名称一个类型就够了啊,那所以现在我们直接写里边这个我们就叫做APP event。哎,那它的类型当然就是三元组class of里边把这个三元组类型放进去。同样的另外一个状态,Third party。他就应该get runtime contacts,然后get state,你有一个value state script,哎,那同样这里它的类型是一个四元组类型。
19:00
里边给一个名称,注意这个名称必须是唯一的,跟前面我们指定的状态名称一定要是不同的,哎,那所以这里边我们就直接说third party。后边是一个四元组的类型啊,尽管稍微有点复杂。我们也要把它完整的写出来拉。四元组类型。这就是一个具体的状态的定义,然后有了状态之后,接下来我们就可以在process element1和process element2里边去实现相应的处理逻辑了,我们可以把这些框架先都写好。
我来说两句