00:00
讲到了这个connect的用法,大家也发现了啊,这个Co map function它是一国两制嘛,相当于是用两个map function分别处理这两个理由,哎,那自然我们就想到了,如果我不这么干,我在前边把当前这个STREAM1做一个map map成一个stream,然后STREAM2也map成一个stream,两个再来一个union,啊,类型一样了嘛,可以直接union了,那个更简单,呃,UN起来不就完了吗?啊,对于我们这个事例而言,这只是个例子啊,这个事例而言确实是这样的。诶,但是大家会想到有一些场景下,或者说我们我们想要连接两条流,往往是想干什么呀,如果只是把它转换成一个相同的数据类型的话。那没必要,那就确实是,你像我们前面说的啊,直接把它先map成相同的类型,然后再优起来不就完了吗?那这样最方便,当然就是说你用connect connect function也是可以做到的啊,我们说它是底层的大招啊,什么东西都能用它实现,但是它的主要目的显然不是这样。回忆一下,我们一开始说河流操作的时候,其实我们想干的是什么呀?
01:05
啊,我们其实想干的是啊,两条流里边,它可能这个数据的某个信息,某个字段是一样的,可能我是想让他两条流里边匹配的那些数据把它检测到,然后去做一些操作,做一些对应的这些行为,所以在这种场景下,这是有点像真正意义上我们做这个表的这个连接查询的啊,两张表撞的那种感觉现在就出来了,如果我们用union的话,能不能实现这样的功能呢?诶,这显然是不行的啊,大家会想到UN宁,它其实就是要求你的类型必须一样,然后我直接把它塞到一块儿了,他肯定不再去判断你还两条流怎么怎么样匹配在一起,它根本不考虑这个事儿,所以你看UN呢,它就叫做两条流的合并,这个是非常准确的connect,其实如果从根本意义上来讲,它不是简单的把它塞在一起的合并。
02:00
为什么你看它叫connect,叫连接,它是两条流连在一起,串在一起,所以它是有这样的一个应用的,大家还记得前面我们讲到这个connected stream的时候,里边有一个KBY方法吗?大家看到这个KBY调用的时候,这里边可以干什么事儿呢?可以传入两个啊,当然这是可以传入两个int啊,当前的位置,那更一般化的表达当然就是啊,这还可以传入这个字段名称啊,更一般化的当然就是传入两个key selector了,那大家想象一下这两个key selector到底是用来干什么的?当然就是指定我们当前连接的那个K。所以大家需要注意啊,如果掉了K的话,它就会对我们当前连接起来的连接流里边的原先的这两条流分别去提取一个K啊,S k input1和INPUT2去去把它提取出来,然后接下来如果我们要去操作的时候。
03:03
就可以在针对同一个K去做操作了,所以这个应用是非常非常有用的啊,大家想这不就是我们所说的这个。就是两张表直接join的时候,给一个连接条件的这样的一个做法嘛,JOIN1张表,Join另外一张表啊,On什么样的一个条件对吧?啊,那所以对于这样的一个使用来讲啊,呃,那其实在实际应用当中,往往我们不会直接就这么简单粗暴去把它连接在一起,往往我们是需要去KBY的,需要指定它的K的啊,那这里边指定K啊,其实有两种方式的。就是在这个connected streams里边,其实它会自己去判断你当前连接的两个流里边有没有K,就只要有K的话。大家想你我们前面讲这个connect是一个data stream连接另外一个data stream,那假如说我是一个kid stream连接另一个kid stream可以吗?
04:02
显然也是可以的,因为它本身也是data stream嘛,假如你这里本来就有K的话,那么它连接之后默认里边我们这里边。其实这里大家可以看得到,在这儿啊,他是直接拗了一个connected streams,他拗出来的是什么呢?其实就是把原先的这一个掉,它的点KBY,然后后面这个点KBY,所以跟我直接在外边就把两个流先做KBY,得到k stream,然后再连接是一样的,这个操作是一样的。好,接下来我们可以给大家举一个具体的例子来看一看啊。我们现在来看一个实时对账的需求,哎,什么叫实时对账呢?因为大家知道现在我们的这个支付啊,网上的这个支付往往都是跟第三方支付平台要连接在一起的,因为大家知道我们一个,当然你像这个阿里,它本身就有自己的这个支付平台,对吧?支付宝本来就是他自己做的嘛,但是一般情况啊,我们的一个电商网站,或者说你的一个任何功能的一个网站,一般你自己不会单独开发跟银行那边对接的这个系统啊,你一般我们是要调用第三方的一个,呃,支付平台给我们提供的服务的,所以用户支付的时候,它这个操作流程其实是什么呢?
05:22
啊,就是用户这边。我这边下一个订单啊,那首先是下订单了啊,下订单是跟我们的这个系统来交互,我们这边有自己的这个系统。用户这边首先应该在他的这个前端页面上啊,或者这个手机上去提交自己的订单请求,然后我们这边把这个订单状态更新,然后等待他支付,然后接下来关键的当然就是他要做这个支付请求,那么做了支付请求之后,我们的这个服务后台这边其实是会。诶,就是一般是用户那边啊,他自己就会弹出跳转到第三方支付平台对不对,那就是我们的这个后台服务是会直接给他调第三方支付平台的那个服务的,所以接下来的这个状态其实是什么呢?是他这边提交一个支付请求,给我们的后台提交一个支付请求,然后去第三方支付平台这边。
06:20
微信或者支付宝啊,去把这个钱转过去,然后呢,第三方支付平台给我们这边,当然它本身是会会给这个用户这边首先给一个反馈对吧,给一个反馈你这边支出,然后呢,会给我们的平台这边也会发一个消息。因为他这边支付。第三方支付平台应该是要把这个钱要转到我们平台账户这里来的,诶所以它其实这样的一个三角关系啊,那在有些场景下啊,就有可能出现一个状况,就是什么呢?呃,就是用户这边给我们提交的这个支付请求,就是最后完整的一个回路是我们收到了第三方支付平台这边的这个通知之后,然后我们给用户确认。
07:06
这边已经到账了啊,然后那个用户这边这个订单就就完全可以关掉了啊,我们就可以这个啊买的东西就可以发了,那这里边大家就会发现这个环路里边啊,假如说出现了一些状况的话。啊,就比方说这边用户已经提交已经支付了这边的这个数据了,那我们这边呢,假如接收第三方支付平台的这个通知的时候,我们服务器太忙,或者说服务器开小差了,没收到这个消息。哦,那这种情况下会怎么办呢?那这种情况下,大家会想到是不是就相当于我们这边已经收到钱了,用户也花钱了,但是我们这边订单就不给他确认,因为我我认为你这边没支付,这显然是不正确的啊,这不应该出现这种情况,对不对啊,所以正常情况下我们应该要去做一个对账,就确认用户已经把这个东西支付了,或者至少呢,我们即使是不自动给他,呃,认为他支付也至少应该做一个提醒,对吧?啊就是诶有可能这个丢数据了,要不要再去确认一下啊,那你人工确认之后没问题,那就应该让用户的这一笔账,就是相当于啊,他他这个支付是已经完成的啊,要确认啊,那当然了,还有另外一个可能,就是第三方支付平台给用户这边的这个确认也有可能丢掉,对吧,那就有可能是。
08:24
就是说整个这个支付的过程当中,有可能是我们这边认为啊,当然正常情况下的话,用户这边肯定就是他这边提交完了之后啊,只要我们收到钱,我们的系统这边就应该确认用户那边已经支付了啊,用户那边他知不知道,其实呃,这个到问题不大是吧?呃,就是用户他他那个发现诶没收到没收到就没收到啊,他只要我们这边订单能给他。更改状态,他已经支付啊,这个就OK了,第三方支付平台,这个你收不到,那就是信号的问题嘛,啊这个呃,主要就是要做这样一个对账啊。先给大家把这个业务背景大概的描述了一下啊,这个稍微有点有点绕啊,那然后接下来我们就给大家说一下,现在我们在这个代码里边怎么样简单的测一下这个东西呢?诶,那大家就会发现了,现在我们要测,一方面要测的是用户提交的这个支付订单的支付请求,对吧?诶,那另外呢,我们要要呃,处理的是第三方支付平台里边给我们反馈的这个订单支付的那个请求啊,那所以。
09:30
接下来我们其实是有两条流啊,一条流是应用这边,呃,用用户那边肯定是通过APP啊,通过这个手机啊,或者什么电脑客户端,直接给我们提交的这样一个请求,然后这里我们认为诶他他这边。他说这个我这边确认支付了,但是我们应该要到这个第三方支付平台那边真正把这个账转过来,我才能知道它真正的支付对吧?啊,那所以第三方支付平台那边也会给我们发一个消息,这当然应该是两条流了,来源都不一样嘛,啊所以接下来我们其实是要有这样的数据,然后去按照啊,当然这里是订单了啊,那大家可以认为是支付的那个那个单对吧,支付单号啊啊,那所以这里边我们可以按照这个单号。
10:15
把它连接起来,然后呢,哎,去看是不是对应的一个这个APP,这里的一个支付,这里边第三方支付平台也有对应的那个支付呢?啊,那这样的话,如果能够对照起来的话,当前就没有问题,如果要是对不起来的话啊,那那那就对不起了,对吧?那那这个就肯定就出现问题,我们应该输出一个报警信息了。啊,所以接下来我们可以把这个需求在代码里边给大家实现一下。我们来拗一个。呃,当前这个我们是一个具体的场景啊,所以呃,我们可以给大家把这个,呃这个叫对账的话,我们就叫check吧。
11:01
这个又是一个具体的事例啊,Example。主要是用到了两条流的CLA。Exception。然后前面的这个过程其实都一样啊。我们把上面这个先直接copy过来。当然,这里我们就不是这么简单的两条流了。接下来我们的流也还可以单独的定义一下啊,我们现在应该是有一个来自就是应用端啊APP的。支付。日志。哎,那这里边我们就是应该。这个from elements我们单独定义一下吧,啊就就也是一个这个我们没有单独定义po类对吧,所以这里干脆就直接用元组类型把它定义出来得了啊,所以这个元组类型里边应该要。
12:02
包括三个内容啊,就是当前的。前的这个单号支付的单号,然后一个呃来源对吧,我们我们这里边其实大家知道这个来源倒也并不是特别重要了,因为你这条流都是从这个日志里边来的话,那它肯定就都是从这个APP这来的嘛,啊我们还是把它就是把它定义出来吧,一个单号一个讯类型,然后呢,一个呃,对对应的这个来源啊,我们也把它定义成一个讯类型,就第三方支付平台还是APP这里来的,另外还有一个长整型的时间戳日志嘛。好,那接下来我们就。三啊,这里边直接给我们把这个数据直接copy过来就好了。给大家简单的做一个测试。好,有订单一和订单二两条,由APP这边提交的这样的一个支付日志,然后我们再来定义一个来自第三方支付银台的。
13:07
支付日志。啊,所以这里边我们其实还是一个三元组,对吧。类型就不变了啊,它两条流其实类型可以改变啊,那如果考虑到这个的话,那我们就把这个。中间这个改一个吧。呃,或者我们再再加一个,就是比方说啊,第三方支付平台可能还有另外的一个段,就表示当前这个支付到底有哪些对应的那些状态啊,假如说我们多一个这样的一个处理。好,那接下来的这个from。又要更改了。汇过来啊,这里我们还是三条数据啊。我主要是想给大家看一下,目前connect类型是不一样的,这个也是完全可以的啊,那这里边我们就应该是四。啊,然后中间我们再来一个,呃,这这个就随便了啊。
14:04
比方说我们就说他成功还是成功率吧,Success success好是当前我们定义好的数据来源啊,然后接下来我们要想做去照啊,那会想到对应的这个,我们应该用质件时间去去对,然后另外还有就是说什么情况下就考虑到他这个对应的这一个就相当于是没来呢,那比方说我们再来一个。T啊。这里边收到一条,但是这边第三方支付平台没收到。ORDER3,然后APP比方说这个三点五百毫秒对吧,3.5秒的时候来了一个这样的一个请求,然后在这边没有,诶那这个会出现什么情况呢。当然就应该要直接报警了。检测同意。叫支付单派,两条流中是服匹配。
15:06
不匹配,就那这里大家就会发现一个就是你这个不匹配的话,我们这里数据是有限,实际测试的时候,它应该是无穷无尽的。大家会发现我找到了的话,那可以说它是匹配的,这是没问题的啊,那假如说要是找不到的话,那难道他永远都不匹配了吗。哎,那这个就就有这样的一个问题对吧,找不到的话,什么情况下叫不匹配呢?那我们就是等一段时间啊,比方说我等上五秒十秒,如果这段时间理论上我认为他如果网络正常的话,马上就应该收到,短时间内就应该收到,那如果没收到的话,我就认为已经有问题了,那比方说我把这个设成。五秒钟的一个等待时间,如果要是没收到另外一条流里边对应的那个数据的话,那我们这边就直接报警,而且我假如说啊,我们我们这个首先两条流里边的数据其实是可以互相等的,那其次如果说实际的应用场景里边,APP这边的日志总是先收到的话,他总先的话,那我们就呃,Stream一等STREAM2就可以了,哎,这个是容易想到的,为了。
16:14
看的更清楚一点吧,这个叫做APP stream,然后下面这个我们叫part stream。好。这样的话,接下来我们就一看就知道到底是怎么回事了啊,那既然是事件时间,你还要跟时间有关,要等五秒钟,那是不是得还得有对应的这个定义在这还得有这个啊,Time STEM water啊,然后这里边我们这个water re,给一个founded out of orderness啊,那现在我们这个好像也都是顺序的啊。那所以就直接给一个zero得了,然后前面这里我们给一个二还得类型就是TEMPLE3,把这个放在这,然后接下来。
17:01
而with有一个s liable对吧,然后在这里边我们直接提取的应该是第三个字段。也就是我们直接拿element r f2出来完事了,那这里边一直在报错,这个主要是因为这个类型啊。舞曲源数据源之后是data stream source,经过这个转换之后,大家还记得变成了single output。Game operator改成这个就可以了,那同样下面这个也差不多。好比一下吧。把这个copy过来,哎,那这里的这个类型又又不对了,对吧,他四啊。稍微的,呃,这个元组类型就是这个很麻烦啊。里边类型显得很长啊,然后我们在大边在做处理的时候就会很麻烦,好这样的话,这里边就不是啊,就不应该是F f2了,是F3对吧,最后一个这个字段,然后同样上面这个改成single output operator,好那两条流我们就创建好了,然后接下来我们要做的测试当然就是。
18:11
连接,然后匹配应看这里边有一个比较有意思的地方,就是我们是需要有一个kid的啊,需要有当前的第一个字段啊,这个单号必须要匹配起来,你检测到另外一条流里边有,这才有意义嘛,那你假如说这个连当前的这个号都没有,都是不同的啊,那你检测到别的数据也是没用的,所以接下来我们的操作就是APP stream,先做一个K。哦,那。这里边。塔也取的是德尔塔减F0,然后再去大看。Connect连接另外一个也应该是一个kid stream,对不对?哎,两个k stream的话,这样他们就可以基于这个K做一个处理,这里third party dream。里边是A啊,一个当前的F0啊,当然了我们是用了这种情况,那另外一种方式就是前面我们也说过的。
19:14
另外还可以怎么做呢?呃,APP stream,然后直接connect third party stream,然后接下来再去K,哎,这个时候我们就是要给两个。给两个select对吧?哎,那直接把这个抄过来就可以了,Eight啊就就看大家更喜欢用哪种方式啊,如果大家觉得是你要写成像CQ那样的方式啊,就是比方说我这个drawing这个,然后on一个条件,哪个K等于哪个K啊,那如果是喜欢这么写的话,那大家可能喜欢这种方式一点啊,啊那这样我们就是它连接它,然后KBY,就像那个连接条件一样,然后把它定义出来,那接下来呢,哎,那就可以去去处理了,因为涉及到要等一段时间。
20:02
那自然我们就想到了,那这个等多长时间,那岂不是要设定一个定时器,既然有定时器啊,那后面就是process了,大招直接放。然后里边我们定义一个啊,这叫match吧,对吧,我们叫all match。Result。啊,最后来做一个打印啊,当然最后我们最关心的,其实它的那个是否是一个需要报警,当前有问题没有检测到的那种场景,我们是最关心的,接下来我们自定义。直线。呃,一个process方式现在就来了啊,Public static class。The order match result。所有的process function大家知道都是抽象类,所以是extend Co process function单个泛型,INPUT1 input2和output,那之前的第一个二个啊,都是这个它啊,这个又有点烦啊。
21:08
一个是三对不对,String string,然后第二个是四四元组,四元组啊string哦,三个string一个浪。然后另外还有输出的数据类型啊,这个输出数据类型就简单了,我们直接打印一条那个本信息不完了吗?对吧,一个好,这个一写上面就不报错了,然后接下来。里边要实现的就是ELEMENT1PROCESS element2就来了啊,那这里边大家就会看到。第一个这个process element1处理的是APP那个,呃,对应的这条流里边的所有的数据,它是一个三元组,而这个呢,处理的是。另外我们那个第三方支付平台拿到的所有数据是一个四元组。诶,这里面有一个好处是,我们当前既然已经K败之后,只要进入到当前这一一个处理实例里边来的,就必须得是相同的K啊,相同的K去做做相同的这个处理,那这里边我们就想到了之前我们不是可以。
22:16
定义状态吗?前面我们在做这个top n的时候,定义了一个list state啊,它是根据K有关的,那我们现在也可以定义一个state呀,跟当前K有关,我就去保存一下你之前的那个,比方说啊,呃,我们这是那个APP那边来的数据,我就保存一下。当前另外一条流,第三方支付平台的数据来了没?那或者如果是第三方支付平台这里的数据我要判断的话,我就看APP那边的数据来了没,也就是说我调到process element1第一条流的数据来了,我就看状态,看对应K的那个第二个,第二个流里边的那个数据来了没?如果已经来了的话,两者就匹配了,如果要是没来的话,那我就等设定定时器开始等,这他俩本来是这个平等的啊,谁都可以等谁,对不对啊,你如果我们只让一个等另外一个也是可以做到的。
23:11
啊,所以接下来我们的一个整体的想法,其实就是做这样的一个实现啊。
我来说两句