00:00
我们已经了解了窗口连接window draw的具体用法,那这个过程会发现啊,它确实很好用,因为本身调用方式是非常固定的,就是一步一步按部就班把它实现就可以了,而且这里边apply里边啊,能实现的这个装修方式,它的功能也非常的简单,就是直接传入两个参数,然后得到一个返回值就可以了,哎,那所以我们会发现它的调用简单,那就实现的功能就会相对的比较局限。比如说我们在有一些场景下,可能对于这个时间的要求就不是固定程度的,那你像我们这里是只能定义窗口啊,那假如说像之前我们这一个Bill check实时对账需求里边,我们所定义的这个呢,肯定就是基于当前的某一个事件的时间,然后。开始等待,等待五秒钟,你不可能规定死,就是零到五秒,或者是五到十秒,哎,所以这个时间点就相当于是不确定的,这个时候使用时间窗口很显然就不是一个很好的方法。
01:07
那对于这样的需求,在弗link当中又怎么样用更加简单的方式去进行一个处理呢?啊,弗link也提供了应对这种需求的一种操作,那就是所谓的间隔连接啊,我们可以看到所谓的间隔连接就是interval draw,什么叫做interval draw呢?关键点就是我们要指定一个时间间隔,它的具体含义就是说我们先指定一条流作为一个连接的基准,然后呢指定一个时间间隔,比如说在这里我们就指定了一个。负二到正一的时间间隔。所以接下来我们做的操作就是针对下面这条流里边的每一个数据,那么它的时间戳减二到加一的这个范围内的上面另外一条流里的所有数据都会跟它进行一个匹配连接。
02:05
哎,所以我们看到现在选取这个匹配的数据,对的原则就不再是框在一个时间窗里的所有数据了,而是变成了每一个数据会发出一个扫描的区间,按照我们定义的这个间隔,在这个区间范围内去扫描数据。只要在这个范围内能够找到的另外一条流里的数据就全部匹配起来,哎,那所以这里边我们的规则就是限定了一个下季lower bound和一个上季upper bound,那么接下来如果是A流去internal join,一个B流的话,哎,那么接下来他们对于两条流里的元素能够匹配的规则,那就是。哎,我们用小B来表示B流里边的数据元素,哎,那么它的时间戳就应该大于等于A流里边的元素,时间戳加上lower bound,小于等于A的时间戳加上upper bound。
03:07
那这里需要注意的一点就是做internal join的两条流呢,必须基于相同的K,也就是说必须先KBY,然后才能做两条流的internal join操作。而且这里我们指定的upper bound和lower bound这个上界和下界都是使用加法来加到当前A的时间戳上,也就是说我们这里如果要是在他之前的一段时间范围的话,应该给这个下界要给一个负数,就像刚才我们说的啊,当前的这个指定上下界应该就是负二到正一是这样的一个范围。那对应的这一段时间范围内,时间间隔内的所有数据都应该能够匹配上,那刚好在这个时间点上的这个零能够匹配上吗?这个是可以匹配的,所以默认这个上下界。是包含在里面的,它是一个B区间,所以我们看到啊,对应的匹配数据零时间戳这个数据发出来的这个时间范围,匹配到的就是零零和零一。
04:11
然后第二条数据,这是时间戳为二,那么它匹配到的也是零和一两条数据,那就是20213的话,只匹配到一条数据,就是314,没有匹配到数据,那当然就什么都没有了,哎,那所以整个的这个逻辑跟窗口draw window join是基本上类似的,只不过它就是要指定一个间隔。那具体调用的过程呢,在代码当中使用方法也是非常固定的啊,跟window状语类似啊,只不过他现在是必须要先去做一个K。先指定TTBY之后,然后调一个点特join,用方法好啊,那我们看到里边传入的参数也必须是一个k string。必须KBY之后才可以做,然后接下来呢,调用一个点between方法,那between很明显,这指定的就是上下界了,给一个时间间隔,然后接下来呢,调一个点process方法,所以我们看这就有点像处理函数了啊,然后它里边传入的就是一个process drawing function,诶,那这个区别跟之前我们看到的drawing function或者Fla draw function区别就在于除了当前哎有这个第一条流里的数据,第二条流里的数据之外,还有这个al啊,用来做这个数据输出的collect collector之外,还有一个上下文ctx啊,这就跟我们那个处理函数的特征非常的类似了。
05:39
接下来我们可以看一看,到源码里边看一看这个具体的调用过程,首先我们来看一下data stream这个类里边有没有能够直接调用的interval drawing方法,哎,我们看到好像没有好,那如果说我们直接敲draw的话也会发现啊,它只有draw,没有interval draw。那怎么样才能调internal join呢?哎,那当然就是先要做KBY,得到一个k stream。基于k stream。
06:07
接下来就可以调一个inter draw方法,我们看,呃,就是当前这个inter drawing只是kid stream的方法,所以普通的data three完全不能调用,而且他这里边要求的参数类型也必须是k three,如果不做K是不能够传入的啊,那所以接下来他得到的数据类型呢,是一个interval join这样的一个数据类型,哎,那当当然这也是一个内部类了啊,接下来他能调用的方法就只有一个between。那么between定义了上下界之后,接下来返回的是一个interval join的类型,那么interval join的类型呢?里边有几种方法能够调用,但是我们会看到啊,哎,这里边有一个lower bond exclusive和upper bond exclusive这两个方法,哎,那这两个方法很明显它只是做了一个配置,就是表示当前的上下界是否要包含在我们匹配的这个范围内,我们默认不不是B区间吗?所以两者都是inclusive,都是包含在里边的,那我们可以调用这个方法指定去除掉当前的边界啊,它返回的类型还是inter draw。
07:20
那对应啊,真正能够进行类型转换,能够处理的方法其实就是这里的process,所以一般情况我们也就是只能调这个process方法了,里边传入的就是一个process join function,最终得到了data又绕回来了。那这里的关键当然就是这个实现process装方式抽象类里的抽象方法了,我们看它真的是跟process方式处理函数很像,里边实现一个process element。那process element呢,除了当前的left right2条流里边匹配的对应的数据之外啊,那还有就是collector,用来做数据输出,另外还有一个上下文context,那这个上下文呢,它可以获取到当前的时间戳。
08:06
另外还可以进行测输出流的输出,但是它同样也没有timer service,所以它也不能注册定时器啊,所以它的功能还是会比较局限的,它更像一个drawing function,而不像process function,这就是关于internal join的一个基本的用法。那接下来我们还是举一个具体的案例吧。其实我们知道啊,在电商的场景里边,很多用户行为,它其实就是在很短的一段时间内相互关联的啊,比如说就是我们当前假如说收集了两条日志数据流,一条就是订单流,用户下订单的那个事件流,另外一个呢,是用户来不停的访问数据,浏览事件的数据流,那我们可以针对同一个用户来看一看,他的下单行为和浏览行为在一段时间内有什么关系,因为我们想啊,很有可能就是诶,用户这段时间内在看哪些页面,接下来他很可能就会下对应的单,哎,那所以我们可以找到两者之间的联系。
09:13
很明显,这就是跟我们之前实时对照非常类似的一个需求了,那如果有对应的这两条流的话,我们可以把它做一个连接connect,然后接下来我们就可以去针对每一个到来的事件注册定时器,去检测有没有对应的另外一条流里的相应数据了啊,哎,那当然了,这种方式我们会发现注册定时器非常的麻烦啊。当然就不如直接使用一个使用间隔连接,就会非常轻松的实现这个功能,接下来我们来做一个具体的实现。还是new一个scared object。这是一个间隔连接的测试,所以是Joy test没方法写出来,还是stream execution environment先get出来。
10:02
我们把它叫做env,上面下划线引入。接下来还是全局的并行度。先设成一。接下来就是要读取两条流的数据了啊,这个测试数据我们还是直接放在这儿啊,我们使用文档里边已经写好的测试数据有两条流,一条流叫做订单事件流,另外一条流叫做点击事件,哎,那这里的event我们就可以直接使用。CHAPTER05里边的event跟我们之前的那个点击事件是完全一样的,那这个订单事件呢,跟实时对账里边的订单稍有不同啊,因为实时对账那里呢,并不关心用户,哎,我们只是考虑这个单到底有没有匹配,是是不是对上了,而现在呢,我们考虑的是用户一段时间内行为的联系啊,所以其实我们接下来如果分组的话,KBY的话,是要以用户作为一个分组的K,哎,那当然当前的这个订单事件就得有到底是哪个用户下了一个什么样的单,哎,那所以我们这里边保存的三元组事件啊,是user ID o ID以及一个时间戳,只保留这三个数据就可以了,那后边我们还是因为是事件时间跟时间有关的话,那我们要提取时间戳生成water mark assign asending time steps。
11:24
接下来就可以进行间隔连接了。两条流进行间隔连接,匹配一个订单事件前后一段时间内。发生的。点击事件,哎,所以现在我们这个逻辑就非常清晰了,我们是要以订单事件作为基准,这条流作为基准,那就是。Order the stream。接下来我们首先要做一个K,必须要指定当前的key是什么,很显然是下划线1USER,然后用它去internal draw。
12:07
当前我们的点击事件流PV string啊,那这里边我们同样要做一个KB,用它的user作为当前的K连接之后啊,接下来我们就可以指定一个between时间间隔了,那这个我们就随意指定吧,比如说time啊,我们先把time先引入window in time.time啊,那就当前我们这个数据时间戳都比较小,那我们就直接给一个描述吧,Second,比方说在当前一个订单事件。到来之前五秒钟和在他之后十秒钟内所有的点击事件我们要匹配出来,诶,那就是一个second负五。到time.SECOND10这样一个时间范围啊,那所以有了这个之后,其实我们也可以大概看一下啊,对于当前我们的测试数据而言,上边诶所有的用户应该是都有订单事件的,而下边的点击事件流呢,只有Bob和爱丽丝的,所以这里边我们只看Bob和爱丽丝就可以,鲍勃爱丽丝,比方说爱丽丝第一条数据它是第五秒钟产生的,诶那么对于这个订单而言,那我们要匹配的负五到十的范围自然就是。
13:20
零到15秒的点击事件,那零到15秒我们看一下。33.5这两个事件应该是可以匹配到的啊,那后边的这个36秒的事件肯定就匹配不到了,这是我们应该能够想到的啊,所以接下来呃,基于这个between。能够调的方法当然就是process了,这里边就必须要去实现一个process draw function啊,那本身这一个process function里边也是三个数据类型,指定我们当前两条流的输入数据类型,还有一个输出的数据类型,现在我们的输入呢,呃,第一条流这又是一个三元组,所以是。
14:05
String。Stream law第二条理由简单,这是我们的样例类类型event,最后的输出还是包装成string直接打印输出。必须要实现一个process element方法,在这个方法里边,我们能获取到当前匹配上的第一条流的数据叫做left,第二条流的数据叫做right啊。另外如果我们要输出的话,那是用这里的out.collect去输出,在当前的上下文里边呢,还能够去获取到当前的时间相关的一些信息,哎,我们这里好像也没什么用,干脆就直接拼接成一个字符串,直接输出就完事了。哎,那就是out.collect left。我们做一个拼接,加一个箭头,然后加上right,这样的话就一目了然,哪些数据能够匹配,现在就可以看到。最后如果想要打印输出的话,当前的字符串类型的数据流还要再追加一个S任务,最后env elect执行起来,这就是我们完整的测试流程。
15:09
啊,接下来我们可以运行看一看得到的结果是不是符合预期,哎,首先我们想到这个爱丽丝这个数据啊,它能匹配到应该就是三秒和3.5秒的数据,诶我们看一下已经输出结果了。现在我们输出的这个数据,每一个order,哦,那我们看ALICE5秒钟对应的订单二,那么他找到的访问事件,那就是三秒和3.5秒钟的事件,而Bob在20秒的时候下的这个订单三,他找到的匹配的点击事件是30秒和23秒钟的两条事件,啊,那当然了,这个是符合的啊,因为它的负五到十的时间范围,应该寻找的是15秒到30秒的数据,这个肯定都没有问题。啊,这就是我们对于间隔连接的一个简单的测试。
16:01
那当然了,我们可能会想到之前的实时对账这个需求能不能用间隔连接去实现呢?诶,当然也是可以的,类似我们这里边不就是一个等五秒钟吗?那我们就设置一个负五到正五的时间间隔就可以了。但是这里还有另外一个问题,就是我们不光要输出对账成功的信息,还要检测到没有匹配的上对账失败的那些信息。这个对于特状语来讲就比较困难了,因为我们看到特状语这里输出的都是正常匹配上的一对一对的数据A,那所以说如果对应着我们在关系型表里边啊,用CQ进行draw语连接的时候,那其实对应的就是一个inner draw,一个内连接。那能不能输出他没有匹配到那些数据呢?现在是不能输出的。所以如果说我们当前的需求必须想要找到那些未曾匹配的数据的话,那很显然我们就不能用。所以实时对账这个需求还是只能使用connect来进行实现。
17:05
这就是关于两条流进行join语的操作,一种是window draw,另外一种是draw。
我来说两句