00:00
接下来我们都已经知道了,从不同的数据源读取数据,然后进入到弗link系统里边做处理,呃,那这里边还要给大家讲一个比较有趣的东西,就是我们可以去自定义source,哎,这主要是用在什么场景呢?呃,一种场景就是我们想要去做测试的时候,对吧?我我当前光是把这个系统开发出来了,我还没有现成的数据能够去做处理,但是呢,我又想看看这个当前业务逻辑到底对不对,是不是正确,那怎么办呢?我可以自己随机的生成数据,按照某种规则生成数据,然后看一看它的表现是什么样的啊,这是一种一种应用场景。另外还有一种场景就是说前面我们说这个卡夫卡,这里边是天生是一对,对吧,他们可以这个连接在一起,那假如说我们想从别的一些地方读取数据呢?啊,有同学可能想到了,那假如说我要从这个MYSQL里边读取数据,对吧,我想从have里边读取数据,那这行不行呢。
01:00
哎,当然一般情况我们不推荐这样做,因为本身你像MySQL have这样的这样的数据库啊,里边的数据其实它就是一个已经现成的,就是类似于有借的一个数据了,对吧?诶那你说我们本来是一个流式处理嘛,你这里边又又把它弄成了一个这个现成的有界的一个数据,这其实不太合适啊,对于这种就是有界的数据的场景,我们其实一般情况就是你把它做批处理,可能是更好的一种操作,那对于弗link的场景而言更合适的,那其实应该就是你前面加一个卡夫卡对吧?那所以说我们说卡夫卡的这个,呃应用啊,作为这个SS源的这种应用,其实是最普遍也是最常用的这种方式,但是呢,也不排除有时候你就要自定义,对吧,我就想从别的地方去读数据行不行呢?哎,当然是可以的,那你就得自己去做定义了啊,所以接下来我们还是在代码里边做一。
02:00
呃,具体的实现,接下来这个是还是在这个代码里啊,我们就不分文件了,我们直接做这个自定义S啊,做一个这样的实现,那本身这里的实现呢,我定义一个STEM4啊,大家想到了,那既然是自定义,那是不是必须调用通用方法对吧?任何东西都没有嘛,那肯定没有现成的方法,那就直接at source。然后呢,诶,之前我们卡夫卡这里边其实也是a source,但是这里边你必须要实现的那个source function是卡夫卡的官方连接器给我们实现了。现在现在呢,你要去自定义,那就非常悲剧了,这个source function没有具体的实现,也没有工具能直接掉,对吧,那你就只好自己去创建一个啊,比方说这个东西我们就叫做sensor s啊,或者说我们可以叫做my sensor s,对吧?做这样的一个表达定义一个自定义的类,那这个类到底又要实现什么呢?哎,我们把下面这个先改过来啊,等一下我们把这这个数据直接读进来之后,直接打印输出,那这个类到底要怎么做?哎,大家看就是。
03:15
自定义,呃,自定义的source function对吧,这就是我们自己要实现这么一个东西啊,所以class前面我们my source function,它本身应该是要实现一个什么接口呢?就是前面我们讲的source function对吧,就是要实现这个Java接口,然后这里边本身SS方式,大家看到它是有这个泛型的,有有泛型当前泛型是什么呢?其实就是你最后想要输出的数据的那个类型啊,这里边可以给大家看一下,就是你看这里边不是有这个source context有这个上下文嘛,对吧,在这个source context上下文里边有一个方法叫做collect。Collect是什么意思呢?Collect的意思就是说收集我现在要发出去的数据,对吧?啊,那发出什么数据呢?就是element对吧?你你把要发的数据作为参数传给这个collect的方法,调一下这个方法就把这个数发出来了,那你发出来的数据是什么类型呢?就是T,这个T是哪里定义的,是不是就是这里south contact的这个泛型,那source context south context的这个类型啊,这个泛型其实也就是整个source function,诶大家看一下在前面。
04:37
Source function式这里边我们定义时候的这个泛对吧?啊,绕了一圈又回来了啊,所以现在我们既然想要发出的数据是什么样的呢?诶,你也可以定义string对吧,Int这个都可以,我们现在是想测试一下,就是我真的要模拟生成一个我想做做测试嘛,真的模拟生成一组这个温度传感器的数值,那是不是这里边就应该是一个sensor reading的这样的一个数据类型啊,哎,所以当然就是这样去做做这个输出啊,那后面我们看一下要把里边有两个方法做一个override,做一个重写,这两个方法一个叫做cancel取消,另外一个叫做呃,叫做wrong,对吧,然后就是。
05:21
在这个wrong里边,你看到这里边我们有一个ctx,这不就是上下文吗,上下文里边有一个class方法,不就是可以发出数据吗?哎,这个就就串起来了啊,诶,那这里边这个看L我到底该怎么样去控制,我当前就取消了,你不要再去继续发出数据了呢。哎,这个时候其实非常简单,我可以给一个标志位对吧?哎,这里边定义一个标志位,一个flag啊,其实就是用来表示数据源是否啊正常运行发出数据,哎,这就是这个标志位的一个含义,那当然这里边大家知道你标志位,那也就是说如果正常情况下,那比方说我们这个标志位是比方说这个标志位叫做running,对吧,正常情况下它应该是个布尔类型,是一个true。
06:16
那如果说要是取消的时候,是不是它应该改变它的值变成false啊,所以这里边不是value,而是变成了一个变量一个Y对吧?啊,这里边我们定义一个running,它是一个布尔类型的变量,默认值给一个true。然后接下来,诶,这个要取消的时候,放这个cancel的时候,那怎么办呢?这里面并没有返回值,你就直接定义怎么怎么改变就可以了,那我就把这个标志未改了就完了嘛,Running。Running这里啊,直接给它赋值赋成false就完事了啊,那你这里边赋值是很简单,看起来这么一步就完了,那你怎么能控制到它下面不不发出数据了呢?哎,那当然就是下面我们发出数据的时候,这个时候还要用这个标志位来控制,对吧?那我们后面再去做实现就可以了啊,那这个wrong方法啊,这里的这个wrong方法其实就是在底层,我们在执行这个任务的时候呢,它外层就会不停的调用当前s function的wrong方法。
07:18
啊,就是它它直接就会把这个run方法直接调用起来,跑起来啊,啊不是说不停调用啊,就是直接吊起来之后,直接就就就会运行这个这个方法,那么这个方法里边如果说我们想要去连续不断的发出当前的这个ss reading的数据的话,那大家想是不是就得有一个循环呀,啊对吧?所以这里边其实就是涉及到我得定义一个无限的循环,如果说我们这里面不停的话,那就是无限循环啊,然后不停。的产生数据,然后呃,除非被cancel掉,诶,那这里边就说明我们当前的无限循环,不是有同学说无限循环嘛,那我直接来一个well处得了对吧?WELL1得了啊啊,不是的啊,这里边我们当然就是用这个running作为评判的标准了,对吧?如果说running是true的话,我就继续循环去生成数据,如果它是false的话,那就退出了,对吧?诶,那接下来你就不要去生成数据,我就直接执行完了,退出就完事了,这就是cancel的一个状态,所以接下来我们其实要实现这个东西,呃,那你要实现这个不停的产生温度传感器的数值,如果说我们想要去模拟一个真实的场景的话,真实的场景温度传感器应该是什么样子呢?
08:45
哎,它其实应该是大家会想到,首先我们定义一个场景,我有十个温度传感器对吧?有十个温度传感器,那真实的场景我应该是不停的采集每一个温度传感器的数据,然后里边带着时间戳,对吧?啊,那每一个传感器温度传感器的数据呢?它因为这个温度传感器可能是放在不同的地方,它本身基准的温度可能都不一样,对吧,这个差别比较大,这个我我可能需要随机去生成,然后接下来呢,随着时间的推移,时间的变化,应该它是在这个基准的。
09:18
温度值基础上可能又会有一些微小的扰动,微小的波动,对吧?啊就是呃不停的叠加,比方说它初始是20度,然后过了一秒钟之后,有可能变成21度,变成20.5度,21点,呃三五度,对吧?呃有可能会做这样的一些波动啊,有可能有可能变低,变成这个18.75度都是有可能的,所以接下来我们的策略就是我先给它生成一组初始温度,然后呢,在这个循环里边,就是每次这里边可能我要定义一个时间间隔,对吧,每一次循环的时候,我间歇,比方说间歇个呃一一秒钟,或者说100毫秒0.1秒钟,然后再去生成下一下一组数据,然后生成下一组数据的时候呢,是在之前一次温度的基础上,然后微小的波动一下。
10:10
啊,所以我们这就是比较呃,接近于真实的一个场景啊,就真实的你采集温度数据就应该是这个样子嘛,所以接下来我们实现一下这个功能,那你既然是要随机生成一些这个初始温度,那大家想是不是需要有一个随机数发生器啊,对吧,我们先,呃,就是。定义一个随机数发生器啊,那这个大家知道,本身在Java和SC里边其实都有这个random。这样的一个一个实现对吧?啊,就是你你当然是可以去,呃,就是直接用这个Java里边的random啊,那那我们直接就是new一个random对吧,把这个把这个引入也可以,你用这个SC里边的random,这个也是可以的啊,那你如果要是用这个SC里边的random的话,你可以去,因为它有这个类嘛,或者说我们在这个random里边,你发现它有这个呃伴生对象对吧,你直接不加,你有直接去用这个伴生对象其实也是可以的啊,这这里边我们直接就还是用这个new的方法把它创建出来就可以了,然后接下来呢,那就是随机生成生成啊一组十个对吧。
11:28
呃,传感器的初始温度,好,那那接下来这个初始温度应该怎么定义呢?嗯,其实大家会想到这十个嘛,其实我可以啊,有同学说那你一个放循环来来搞定对吧,其实我不需要那么麻烦,我可以直接定义一个啊,大家知道我有这种写法对不对,Skyda里边one two ten啊,或者说这个是空格这种写法啊,大家知道这它其实底层都是方法调用嘛,我用这个ONE1,然后点TWO10,这其实就可以得到一个range对吧,得到一个范围一到十便利这个一二三四五六七八九十的一个范围,然后这个范围呢,我就相当于这就是我的这个三四的ID嘛,然后我就把它做一个map map成一个二元组啊,这里边大家注意啊,这个初始温度我要的是一个什么呢?是一个二元组啊,就是一个ID,然后一个一个温度值是这样的一个二元组。
12:29
所以接下来呢,我基于这个这个range啊,做一个map。把它map成哎当前的当前我这个数据大家知道都就是一个I对吧,就是一个编号,然后我把它呢,Map成二元组,这个二元组前面是ID,我加上三四,就像我们那个呃这个数据啊,这个334,这里边定义的这种方式一样,三四下划线几,334下划线几,那这里边加上一个I,这就是当前的ID,然后后边是一个初始的温度值,这个温度值就呃就就不太呃不太准对吧,这个说不准到底应该是多少,那我们这个怎么样去定义呢?啊,你比方说直接用这个run,我们当前的这个直接去给一个ma int,或者说啊,大家看到这这里边有这个next double对吧?啊,你去随便给一个这个参数,其实都是可以的啊,但是这里边就是说你会看到你直接这么去给的话啊,它这里边本身定义出来的是。
13:29
就是从零到一的这样的一个,呃,Random的一个number对吧?哎,那所以这里边你如果还要想给它做一些范围的调整,就是说我们的这个初始温度你总得偏差大一点呀,对吧,你不能是这个就是一一上来之后,它它这个都是在零到一之间的一个初始温度呀,那后面我们微调根本都看不到它的这个变化,哎,那你这里边可以再给它乘一个数对吧?哎,比方说我这里边直接乘以一个100啊,那就相当于我可以把零到一之间随机生成的这个小数,然后乘以100,那就变成了零到100之内的一个温度值对吧?呃,随机变化的一个温度值,这样的话我就有了一组这个初始值,然后,哎,那当然了,这个初始值大家会发现啊,呃,就是我首先得给它定义出来对吧。
14:21
大家可能会想到我可以定义一个in temp temp,那当然大家知道这我定义是叫temp,其实是一个二元组啊,就是一个ID,一个温度值这样的,那后边在这个无限循环的时候呢。我第一次应该是基于这个去做变化,对吧,在它基础上做变化,那之后呢,下一次的时候,其实是应该在上一次已经变化之后的温度基础上去做调整,去做微调,对吧?啊,因为你有时候你这个温度发生偏移的时候,有可能它是越变越高,越变越高,一点一点累积,它就变高了,你如果每次都是基于这个初始温度去做变化的话,这相当于就没没那么大变化嘛,对吧,只是每次随机扰动而已,它是很稳定的。
15:05
所以我们会发现这里边我们要定义初始值,其实之后呢,应该是它可以变动对吧,那我们就不需要再去单独定义一个就是可变的这样的一个一个二元组的一个一个序列了,我就直接在这个基础上改好了,对吧?我把初始值定义成不叫初始值,我叫做current temp当前的温度。那那一开始呢,当然就是初始值了,那之后要改变,所以我同样还是把它定义成一个变量variable y对吧,那后边我们无限循环的时候怎么办呢?就去在之前的基础上更新温度值就可以了,对吧?在。上次呃,数据基础上微调更新温度值啊,其实主要就是这样一个操作,哎,那所以我们就是current temp啊,就直接在之前的自己的基础上对吧,做一个map啊,然后再付给自己就完事了啊,其实就是把自己还是做做了一个一个转换对吧?啊那这里边我们map的时候呢,这里边之前的data大家知道这个data其实是一个二元组,一个ID,一个一个温度值,我们现在呢,其实还要一个二元组。
16:25
当前的ID不变,下划线一,但是温度值呢,在之前贝塔的下划线二的基础上做一个微调,那这个微调怎么样去定义呢?诶,那当然有同学就想到了,那你这个直接去在next次double不就完了吗?在零到一之间去随机生成一个一个这个,呃,当前随机生成一个这个呃,Double类型的一个一个值不就完事了吗?哎,这里边给大家呃介绍另外一个方法,大家看到有一个方法叫做NEX,这个高高斯肾对吧?这个叫高斯随机数,这个是表示什么呢?当然这个默认这个生成也是在零到一范围内的啊,那这个高斯随机数大家可能知道高斯分布对吧?这是数学上的一种分布方式,它其实就是指。
17:13
就是传说中的正态分布对吧?啊,那这个正态分布的特点其实就是说什么呢?就是离我们的这个均值越近概率越高对吧?如果说离这个均值越远的话,概率越低,这样的话就可以保证我们大部分的这个扰动,微小的扰动都会距离我们的这个零,呃,默认的这个偏大家知道那个有均值,那个谬应该是等于零对吧?标准正态分布嘛,嗯,谬等于零,西格玛等于一对吧,这个标准方方差等于一,那所以接下来我们的这个大部分数据就应该都聚集在啊零附近对吧,这是比较符合我们真实的场景的,服服从高斯分布嘛,每一次的扰动,你扰动很大是有一定的概率扰动很大啊,那那但是这个概率比较小,那大部分的情况呢,都是一个比较小,在附近范围内的一个扰动。
18:07
这就超出了,就不要受限,像我们之前那个next double的话,只能在零到一对吧,那万一现在有一个异常,它就是一下子跳了很高的温度呢,是有这种可能的,所以我们给一个NEX高斯数,高斯随机数啊,然后接下来大家记得就是你这个还没包装成ssor reading的,最后我们要的是censor reading,它其实还有一个时间戳对吧?啊,所以我们还要获取当前时间戳加入到数据中啊,那所以我们可以定义一个current time当前的时间戳,那我们直接用这个系统,这个system current,呃,Time这个millions对吧?毫秒述直接拿出来作为时间戳加进去完事了啊,所以这个其实还整体来讲是比较简单的啊,那后边我们可以对这个当前的current time,呃呃,Temp这个二元组这个序列啊,你可以继续做map啊,也可以比方说我调这个for each是不是也。
19:07
可以啊啊,因为你如果要是做这个map的话,完了之后还得把它再赋值赋回来啊,这个可能还稍微有点麻烦,因为因为呃,就是说呃,不不不是不是说这个赋值赋回来啊,是是什么,就是你如果说这个做了这个map,在做这个map操作的里边呢,它本身是一个需要传进来一个拉姆的表达式,传一个函数,那我们其实这里边想做的是什么呢?就不需要得到什么样的返回值,其实主要就是把它这里边的每一个数包包装当前的时间戳打进去,包装进去得到一个s reading。然后发出去就完事了。刚才的数据就产生了吧,那发的时候怎么发呢?是用那个ctx,大家还记得吧,CTx.claft方法用它来发出数据,所以这里边我们干脆就直接做一个for each就完了,那就有点像我们之前那个呃,做Spark操作的时候for each r DD一样,对吧?直接for each,你去对每一个做操作完事啊,那这里边我们是对于这个序列里边的每一每一组数据做这样的一个操作发出去啊,那这里边我们还是啊data,当前的data啊,要做的操作是什么呢?要把它包成一个sensor reading,这个sensor reading里边首先是ID_一,然后第二个我们是那个时间戳它态,然后最后是当前它的温度值下划线二。
20:33
这个sensor reading呢,不是直接返回就完事了,是要把它发出去,所以说调用这个ctx的collect方法,对吧,把它包装成一个sensor。这就是我们这一步。整个要做的操作。啊,大家看一下这个可能稍微会有一点绕啊,我把它这个放到下面来,这里边是调用了ctx的C的方法把它发送出去,对吧?啊,大家看到前面我加一个这个注释啊,调用Ctx.collect。
21:09
发出数据,这就是一个完整的流程。好,那当然了,为了防止这个运行起来这个太快,对吧,不可控制,那我可以这个在外循环里边,每做完一次发出数据操作的时候。呃,这个这个括号是这样对吧,这个FOR1的括号是这样对齐的啊,然后接下来呢,我可以先间隔一段时间,然后再发下一组数据,对吧,稍微有一点时间间隔,比方说我间隔100毫秒吧,那这里边我直接用thread.sleep一下就完事了,对吧?SLEEP100,这就是一个自定义。Source function的一个过程。啊,然后接下来我们看一看这个,呃,执行起来的结果怎么样啊,大家看前面我已经把这个添加进来了,而且已经打印输出,那我们接下来就直接运行看看效果怎么样。
22:00
在已经执行起来了,大家看一看当前的这个效果。当前的这个效果大家看到啊,这里边啊,这个顺序有点乱,为什么呢?因为我们当前还是并行度是四对吧,所以大家看到我们这里边生成的时候,所有的数据都是并行生成的,这里面的顺序就稍微有点乱,可能有点看不清楚,但是大家至少可以看到,基本上就是一二三四五六七八九十,这个每一个三四啊,当前的这个数据都在不停的变化,对吧,你看后面跟的这个数据是在不停的变化的,如果现在大家看不太清楚的话,我们可以稍微做一个调整,比方说。为了让大家看清楚啊,我们不失正确性,结果结果是肯定对的,我们全局把它这个并行度设成一,这样的话完全按顺序了,对吧?呃,一二三四五六七八九十,那肯定这个顺序就不会变,然后另外呢,我把这个时间稍微调的大一点点,这个比方说我给一个500毫秒吧,就大家稍微等一等,能够看清楚这个数据到底是什么,然后我们再来运行一下,看看效果是不是非常明显的能看清楚。
23:03
好,大家看现在的这个执行效果,这就非常明显了,对吧。一到十所有的这个三色按照顺序依次生成,然后我们发现诶一的这个温度,哎,那这个基准可能基本上是100了,对吧,大家看这个基本上是在在100左右,100以上在在变化啊,然后它还不停的在在增大,然后你二的话,这个是80多度啊,然后下面有也有比较小的,你像这个三四十四的话,它基准就20多度,然后你看每一次都在上一次温度的基础上,随机有一个扰动,可能变大,可能变小,可能这个调整的幅度大,可能调整的幅度小,这就是真实场景里边,就是经成可能会出现的这种这种这种情形啊。啊,大家可以下来之后把这个自己好好的测一测啊,这个还是应该还是会比较有趣,在我们实际项目当中,你没有数据,想要去模拟生成随机数据的时候,这个非常有用。另外就是说,假如说你如果用到大家会想啊,假如说你用到一个没有官方给我们提供连接器,但是我又想从里边去读取数据,那怎么做呢?
24:10
比方说MYSQL,那怎么办呢?呃,其实非常简单,也是自定义实现一个s function,那在这个wrong方法里边,你想想应该干什么,是不是就得去啊,首先我得建立到这个MYSQL的连接,对吧?然后在这个wrong方法里边呢,我就去读取MYSQL里边的数据,把那个数都拿出来,然后用Ctx.C的方法把它发出来就完事了。啊,当然你这个发的时候可能也也得呃,就是有调整对吧,因为我们是流式的嘛,一点一点发的,所以你得考虑你那是一批数据进来,你到底怎么发,诶这是你自己要去实现的一些业务逻辑了,啊这就是关于自定义source这一部分内容。
我来说两句