00:00
我们已经了解了怎么样从卡里去读数据,发现实在Li当就是直接用ADD source方法,然后里边传入一个卡flink卡夫卡连接器给我们提供的flink卡卡consumer,它就实现了一个S方式啊,那所以整个这个过程,我们其实所要做的只是API的调用和一些配置项的传入就可以了,那里边的内部细节都由连接器帮我们搞定。除了卡夫卡之外呢,其实弗林还可以连接到一些其他的流式数据处理的系统,呃,那当然了,也是直接提供了对应的连接器,比如说像rabbitq,像Google的帕萨,像PA,像。Twitter的API,这些连接器都是直接提供了,那当然了,大部分情况下,我们所能使用的流数据的来源可能不外乎就是这些类似于消息队列的一些数据存储结构,那很多需求也都能满足了。但是呢,总还有一些比较特殊的场景,我们想要读取的数据可能来源不是这些,Flink并没有给我们提供相关的连接器,那这个时候怎么办呢?
01:22
那就只好自己动手把当前要传入的function完整的做一个实现了,那所以接下来我们要介绍的就是自定义去实现一个S方式,也就是说相当于当前我们的S算子,这里的数据源全部由自己定义出来。那对于自定义的function呢,本身这是一个接口,里边我们重点要实现的就是两个方法,一个叫run,一个叫cancel,那这一部分我们也可以在源码里边来看一下到底要实现的是什么。
02:00
其实前面我们已经看到source,这里需要传入的就是一个source function source function那本身是一个interface,里边主要就是两个抽象方法,一个叫做说,那顾名思义这就是运行嘛,那它的主要作用是什么呢?本质上它是有一个参数啊,叫CX,这是一个s context,就是当前S原任务的上下文。这个上下文它主要就是用来发出数据,然后。向下游任务去传传送的,接下来我们整个数据流里边所有的数据就是来源于这里。而wrong方法的调用,它和整个运行式架构有关,所以它可以理解成是当前S任务对应的线程它的执行方法。那所以我们就想到了,如果说一个线程启动之后,当前这个任务已经开始运行之后,那很显然就会执行这里的run方法,当前run方法就会被一直调用。
03:08
那如果说我们想在这里持续发出数据的话,很显然这个run方法就不应该发出一条数据,然后就直接退出了,而是在里面应该有一个循环,不停的发送数据。啊,那对应的我们就会想到,那如果说这里一直发送数据,一直循环下去的话,看起来这确实像一个无界流了。无穷无尽一直有数据,但是我们总还是有特殊的场景需要让它停下来呀,那怎么停下来呢?呃,我们自然能想到可以使用一个标志位,这个标志位就代表了当前这个wrong方法里边进行循环发送数据的一个判断标准。那当这个标志位变为false的时候,也就代表我们当前这个run方法应该要终止了。那这个标志位在哪里能够接收外部的信息去进行更改呢?下面这个方法cancel cancel很明,很显然这是取消当前任务当前作业的一个对应的命令,那对应在我们的操作上的话,可以理解成就是在web UI上点击了对应的cel,那就会调用到这里的cancel方法。所以只要我们在run方法里边有一个标志位来控制它的循环生成数据,那么在cancel方法里边的话,直接把对应的标志位改成false就可以达到控制的目的了。
04:31
所以这就是s function里边最主要的两个方法,接下来我们可以创建一个自定义的数据源,看一看对应的s function接口到底怎么实现啊,接下来我们可以创建一个新的class。这个因为是单独要测试用户自定义的数据源,所以我们可以单独把它创建出来,就叫。Custom test。
05:01
当然,这里我们主要是一个测试的框架代码。Throws exception。里边的话,整个的流程还是类似的,我们先把stream execution environment创建出来。哎,同样为了方便测试,我们把全局的并行度先设成一,后边的话直接调用env的点source方法,里边要传入的是一个自定义的。S啊,那得到的这个我们可以叫做custom s。Custom stream。因为我们刚才就得到了一个数据流嘛,Data stream,接下来可以把当前的数据做一个打印输出,最后env执行起来,那这里边我们缺的其实就是唯一缺的就是一个真正的s function了。所以接下来我们干脆在当前。包下边直接新建一个类,实现对应的p function接口。
06:02
这个类我们里面的数据产生的话,就还是用用户的访问点击事件来作为我们当前的数据数据类型,所以呢,我们干脆就直接把这个叫做click。那么他需要去实现一个function。那我们只看到了south function本身是需要有,这里是需要有泛型的,我们现在能够想到的泛型自然就是了。用户的访问点击事件。里边必须要实现两个抽象方法,一个是,一个是cancel,在这个过程当中,我们需要声明一个标志位,用来控制数据的发生,数据的生成,所以这里边我们首先声明一个。标志位。这个标志位当然是一个布类型了。我们可以把它叫做running,初始之处。
07:03
所以下边这个run方法里边,很显然我们应该能够想到是应该使用一个循环不停的生成数据。所以应该是一个Y。那这里的判断条件自然就是running了啊,那这里边如果要想要直接去生成数据,用什么方法去生成呢?就是用当前的south contact原任务的上下文,在这里边有一个非常关键的方法叫做。收集的意思,它收集起来数据要干什么呢?当然就是朝下游任务去发送啊,所以这里面我们会看到当前的上下文,它并不是直接要对应的要返回数据,或者是某一个方法要有返回值什么的,它是直接调用上下文的collect的方法啊,就可以发出一个对应的数据,这里边传入的参数就是要发入的数据,当前泛型T就是数据要生成的数据流里的数据类型。
08:05
所以接下来我们要在while里边调用的就是ctx点里边我们就是直接去new一个event创建出来,发送出去就可以了。当然了,在具体做这个测试的时候,我们可能会发现,呃,这样直接写死数据去发送,这就不太合理,那就每次发送的都是一样的数据了。为了尽可能的去模拟真实的生产场景,所以这里边我们一般是会用一个随机生成数据的方式。生成当前的数据流,那所以这里边我们需要定义一个随机数生成器了。随机生成数据。首先,我们。New一个。Random。就把它叫做,然后接下来我们还应该定义一个。
09:02
随机数据选取每个字段的时候。它选取的范围是什么?所以我们应该是定义字段选取的数据集。在哪个范围内去做随机选择,那这里边我们三个字段,一个是user用户名,那所以可以在一个string类型的数组里边,我们把它叫做users去进行一个选择。这里可能有Mary。有Alice。另外有Bob,我们可以再加一个,有carry。同样的对应的URL这个字段呢,也可以在一个词string类型的数组里边去做一个随机的选取,那这里面我们可能有home。这是主页面。Heart。购物车对应的页面啊,那可能还有favorite,就是对应的用户喜欢他所喜欢的所有的页面。
10:03
那另外呢,可能还有一些具体商品的页面,这就是。问号后边加上对应商品的ID,这是我们能够想到的各种各样的定义。可以多列出几个。然后接下来啊,那还应该有一个time step,这个就不需要随机生成了,我们可以直接选取当前的系统时间,转化成一个长整型的时间戳可以了。所以接下来我们要做的事情就是直接生成我们想要的数据了。一个user,那怎么从users里面随机选取呢?很显然,我们需要。随机生成的是一个当前users数组里的索引位置,那所以这个索引位置我们就直接调用random.next in来做一个随机的选取啊,那这里这范围到底取多少呢?选取当前数组的长度作为范围,就可以在当前零到LA减一之间随机的选取一个索引位置啊。
11:09
这是对于user的选取,那URL的选取其实也是完全类似。只是。从URL数组里边随机的去选取一个索引位置就可以了。好,那接下来另外还有一个长整型的时间戳,Time。这个我们应该获取当前的系统时间,Calendar get instance,然后get minutes,我们获取的是一个好述,接下来你有一个event,那就把对应的三个字段全部填入对应位置就可以了。这就是我们循环生成数据。随机生成数据的这样一个过程。那当然了,当前是以running这个标志位作为判断继续循环生成数据的这样一个标准的。那假如说我们现在想要让它停止怎么办呢?那其实非常简单,只要在cancel方法里边将running标志改为false就可。
12:10
这就是我们所定义的随机生成数据的这样一个方式。那接下来我们可以。在测试的。类里边直接用当前我们定义好的clickus了,那就直接new一个clickus,当然了,这里面我们得到的数据类型应该是。呃,那我们现在呢,是。在这里啊,直接就不停的循环调用去生成,这个可能对于系统的性能要求会比较高啊,对系统资源的占用会比较大,如果说我们希望稍微的把当前生成数据的频率缩短一点,稍微的缓和一下的话,每一次生成数据之后,也可以调用thread点去休息一段时间啊,这样的话我们就比方说休息一秒钟,一秒钟生成一条数据,这样的话对于我们整个系统性能压力就不会那么大,我们也可以对测试的结果看得更加清晰。
13:09
这是常见的一个用来测试的方式,我们可以运行一下。我们可以看到,现在就是每隔一秒会生成对应的一条数据。当前我们看到Mary Bob、爱丽丝、carry。四个用户他们会随机的出现在我们生成的一个event里边,那URL这里其实也是随机生成的,当然选择的话,可能呃有一段时间某某个URL访问的频次会比较高,整体来讲是平均分布的,完全随机的,后面time就是基于当前的系统时间去做了一个转换。那这种方法我们会发现,很显然不能用在真正的生产实际。直接把这个作业啊,当前的数据源就直接写死在这里边,它一般是用来做测试用的啊,那在真正测试的时候呢,最好就需要把我们这里的数据集选取的数据集定义成真实的每一个字段能够取到的数据集,而后边对于它的这个随机生成索引的方式呢,也应该尽量贴近于数据的分布情况,如果能够做到这样的话,我们一个随机自定自定义的随机生成数据的数据源,就可以完全模拟出一个生产实际的流失数据源。
14:33
那这种方法在呃进行测试,在写完代码想要放到提交到生产环境之前,其实还是非常重要的一环。所以这就是关于自定义测试数据源的一种使用方式。
我来说两句