00:00
我们上节课讲到的这些呢,其实已经涵盖了一般情况下,我们在项目当中可能会遇到的这些SS来源啊,那这里边呢,再给大家多讲一个,就是所谓的自定义S,诶什么叫自定义S呢?顾名思义,其实就是说它不是有一个那个ADD source嘛,大家看那个环境里面不是可以直接去ADD source这个方法吗?它里边不是要传入一个s function吗?啊,那所以大家想你既然官方的这个呃,弗林格卡夫卡的连接器,它可以去实现一个这样的一个,呃,就是source function,那我是不是自己也可以去实现一个这个接口啊,呃,当然也是可以的,只不过就是我们这里边实现一般情况,如果我们自己去产生数据的话,那相当于就是产生的是测试数据,对不对啊,所以这个在我们平常测试代码的时候,这种情况可能用的比较多,实际的生产环境里边,我们当然不会去自定义source生成数据,然后去跑,对吧?啊,这个当然就是一般是用在测试环境里面啊,那现在我们来把这一部分代码还是给大家讲一下吧。因为尽管。
01:13
呃,用在这个生产环境里边比较少,但是测试还是比较普遍,特别重要的是通过这样一个例子,大家可以看一下在这个flink里边自定义函数的这种这种写法啊,这种代码的实现方式,好,我们写一个自定义。呃,那么接下来大家会看到这个自定义S的写法非常简单,跟前边这个卡夫卡的这种方式非常像,因为大家看这里边其实。它本身这个extend这个玩意儿对吧,这这就是零九对吧,一个一个呃extend base,它最早的这个东西是不是就是rich parallel s啊,它是什么?呃,Rich是负负函数对吧,这种是扩展了的并行的s function是不是这样的呀啊,所以它底层是不是也实现的是s function啊,所以这个大家自然是能够想到的,我们这里呢,其实就相当于是自己去实现一个s function这个类啊,那所以这里边我们就定义叫sensor s s啊。
02:29
呃,所以这里边我们定义的这个类是不是它就应该要实现一个s function的interface啊?呃,所以下面我们在下边把这个类写出来sensor s他要extend source function式,诶大家看这对吧?呃,当然这个里边还应该有泛型,我们这里边的泛型就直接给这个sensor reading就可以。
03:01
数据类型对吧?生成的数据类型是什么?最后产生的数据类型是什么?我们这里边是生成数据类型是string了,对不对啊,这里边我们给的是sensor reading啊好,大家看一下这个怎么样去做啊,那这里边呢,大家其实会看到这里还在报错的。为什么报错呢?对,Source function这个接口是不是它有一些必须要实现的方法啊,对,必须得去实现,那我们看一眼,诶有一个cancel方法,有一个wrong方法,大家能想到这两个方法分别是用要干什么事情吗?其实很简单,是不是就一个是cancel,那那就是表示要取消当前这个你的这个数据源,不要再再生成数据了,对不对?呃,那RO这个是不是就相当于你要去生成数据啊,你既然是流数据嘛,流式的这个数据,那当然就是连续不断的去生成数据,哎,所以这里边我们这是wrong,这是cancel,那大家可能会想到我用什么去表示当前应该去停止,呃,应该去这个继续生成呢?那可能我们要用一个全局的标志位了,对吧?
04:10
啊,可能是一个布尔类型的标志位去控制它。呃,定义一个定义一个flag表示。呃,数据源是否正常运行?呃,那这个当然我们可能会更改它的那个状态,对吧?所以是一个bar running定义这样一个,它是一个类型的变量,默认应该是true。正常运行状态对不对啊,那大家会想到如果要是cancel的时候,是不是要直接把它对改成false就好了,诶那它控制这个运行到底是在哪里控制的呢?其实很简单,是不是在running里边要控制啊,我们就根据这个running是不是true来判断接下来要不要去生成数据,继续生成数据,对吧?哎,这这就是在这里面去控制了,所以大家会看到can表示的是,呃,就是。
05:14
取消数据源的生成。其实就是数据的生成对吧,然后下面那个run的话就是。正常生成数据啊。呃,所以大家看下边在这里边我们生成给大家还是就是按照这个就是比较真实的一个测试的这种这种状态,给大家写一下这个程序啊,呃,如果要是真实的测试的这个状态的话,大家想我们的这个数据是不是应该。按照一定的标准去随机生成啊,大家想是不是应该这样去做,哎,所以这里面我们先定义一个随机数生成器吧,呃,初始化一个随机数发生器。
06:13
呃,这里边我们就直接用这个,对,有一个呃,Scla u里边提供了一个random这样的一个类,我们直接把它扭出来,然后接下来啊,接下来就是大家会想到我们想要的这个生成的这个状态,它是流数据,是要持续生成,对不对?哎,所以我们这个相当于应该是什么呢?隔一段时间是不是就应该生成一个数据啊,哎,我们这里边不要生成一个数据了,我们生成一组数据好了,因为是测试数据嘛,我们有不同的传感器,所以隔一段时间,我就比方说我们那个不是有34ID吗?我比方说定义一到十,十个3CID,对吧,我就隔一段时间这十个三。
07:00
Sens,这个传感器的温度全改变一下对吧,所以隔一段时间生成一组数据,隔一段时间生成一组数据,这样就可以实现我们自定义的这个,呃,零数据源这样的一个效果好,那接下来,呃,你既然是隔一段时间去生成一个,我不要每次都重新生成,因为真实的状态可能是什么。可能是是不是应该是在之前的那个基础上,温度发生一点微小的变化,大家想想是不是这样,温度不会是一下子那个随机变化特别大,对不对,其实是应该在之前的上一个那个温度的基础上,可能有一个随机微小的变化啊,变成一个升高一点啊,降低一点啊,可能是这样一个状态啊,所以这就相当于是什么呢?相当于我们是要保存一个当前的那个温一组温度状态对不对,然后每一次在当前的这个状态下去叠加一个随机变化的值啊啊,那那这个既然是要叠加随机变化的值,那一开始我们先给它初始化定定义一组数据吧,对吧,先初始化定义一组。
08:10
初始化定义一组传感器温度数据,好,呃,那么当然了,后续我们是要不停的变的,所以它也应该是Y对吧?呃,这里边比方说我们就定义它叫当前的温度初始化的时候,这个初始化的时候,当然我们初始化可以都给零,但是你要给零的话,后面在它基础上去随机波动,这个就有点不太合适了,对吧?所以初始化的时候我们也是弄得真一点,怎么真呢?我们不同的这个sensor I sensor给不同的值随机生成对吧?呃,那这里边我可以用一个,呃,比方说大家可能知道这个1TWO 10one two ten,这相当于是一个range,对不对,然后它就可以直接去用一个map方法,是不是可以去啊,就相当于相当于是一个for循环啊,啊对吧,这个我用这种方式直接实现了一个for循环,大家也可以for循环去。
09:10
啊,那这里面就是每一个I,我对应生成的数据应该是什么呢?哎,我生成的数据包装成一个元组。前面应该是一个sensor ID对吧,我们的sensor ID是是个string,所以是sensor下划线加I,这是不是就是我们的3ID,呃,然后后边我们这个元组现在那个时间还不知道时间,我们最后再加对吧,然后一开始的这一个东西,我是不是应该直接拿这个,呃,应该有一个初始化的random出来的一个一个那个温度值啊啊,这个温度值我也不想。就是太随机,比方说我以一个温度作为基准,比方说这里我给一个60度作为一个基准,对吧,然后接下来我是不是可以加一个随机数啊啊,这个随机数当然就是可正可负,这里面大家看run run下面有不同的这个随机数生成的方式,可以next int生成一个随机生成一个int,对不对,可以一个float,可以一个double,平常用的最多的是什么呢?大家看其实是第一个这个这个是什么呀。
10:23
这是什么?是下一个高斯随机数啊啊,这个高斯随机数是什么东西,这个大家知道吗?这个大家听说过高斯分布吗?听说对高斯分布还有一个名字更有名,叫做。正态分布大家应该听过对吧?对,其实高斯分布就是正态分布啊,所以NEX的高斯数其实是什么呢?其实就是创建了一个标准正态分布这个曲线,在这个曲线上随机选取一个数对吧?呃,按按照它的那个分布概率随机选取对不对?呃,那大家知道标准高斯分布它其实是什么呢?标准正态分布它有两个参数,大家知道吗?有一个sIgMa,有一个缪。
11:11
这个大家知道这两个参数吗?这个标准正态分布谬就等于。零西格玛就等于一。哎,那这个谬表示什么呢?谬表示对,谬表示的其实就是对这个这个正态分布,它不是中心对称的嘛,对吧,左右对称的对吧,所以其实就是表示它的这个对称轴的位置对吧,那谬等于零是不是就表示。关于我们的这个Y轴对称啊,所以就是标准以零为界这样的,那是不是可正可负,哎,这个就比较标准啊,然后这个西格玛等于一的话,这是什么意思呢。诶,这个大家应该听说过,在质量管理里边其实有这个概念,就是那个什么几C格玛那个概念对不对,什么1C格玛,2C格玛,质量管理里边什么五西格玛,六西格玛对吧?它表示的是什么意思呢?它其实说的是在正负一西格玛这个范围内对占的百分比,这个具体我忘了啊,好像是百分之伊西格玛,好像是百分之六十六十多还是70%多,这个我忘记了啊,一西格玛还比较小啊,然后二西格玛这个范围。
12:28
正负2C格玛的范围,对,就到了90%以上。所以就是说按照它的这个分布。现在西格玛不是等于一吗?那相当于是不是就是正负二之间,落在正负二之间的概率是90%以上啊啊,然后我记得好像是三正负三西格玛之间的那个概率就是95%还是99啊,好像我记得是到99了,就已经是99点几了啊这个大家回头可以去查一下,就像所谓的那个五西格玛,六个西格玛,那就都已经是99.9%几对吧,就是那个小数点后很好好几位了,就是小概率事件了,呃,所以大家看,其实你这个理论上随机,它是不是正无穷到负无穷都可以取啊,那我需要担心它有可能超出我正常的这个范围吗?其实概率很小对不对?对吧,正常来讲它大大概率其实就是啊,正负可能正正负二正负三之间的一个数啊,所以这个涉及到这个,就给大家稍微补一点数学知识啊,啊就是正态分布啊,这个在真真正我们做测试的时候,真的是一般随机数是要用这个的,因为大家。
13:38
在想你你真的是完全随机,这种真随机其实不太适合真实的那种场景,对不对,呃,往往真实的它这种随机就是一个正态分布啊,所以往往是用这个啊高斯发生数,这个max的高斯随机,随机数好,所以呃,那这个NEX的高斯数,这是在一般就是在正负二之间,那大家会想你一个60加正负二这个是不是。
14:05
差别有点太小了,对吧?我想让每一个传感器它在不同的点的话,这个差别大一点,那我后边是不是可以乘以一个数啊,相当于这个比重大一点,对吧?比方说我乘以一个十,或者乘以一个20,这个比重就大了,对吧?啊,那相当于它的一个偏移范围是不是就在正负40或者正负20之间了,对吧?哎,这个就大一些,所以大家看,通过这种方式,我相当于用一个for循环就定义了一组三四它的初始值,对吧?都是随机生成的啊,按照这个方式随机生成的。然后接下来大家会想到我是不是就可以啊,用一个我应该用一个什么去生成我持续不断的数据流呢?产生数据流应该用什么去生成。是不是应该用一个无限循环去去生成啊,对吧,死循环对吧?用无限循环产生数据流,所以这里边就是一个well啊,当然不应该well true,而是应该well什么,因为有可能它可以中断的,这里它在什么情况下整理,对是不是就是我们这个标志位啊,对吧?正常情况下它都是处,一旦我们cancel做了这个操作,是不是就应该把它制成false,后面就不要产生了,呃,还是有中断机制的啊,要不然这个死循环真的就很很难受了,所以这边我们传进来的就是标志位,根据它来判断是否继续产生,好,那呃,在这里边我们是不是就应该先去更新一下当前的那个温度值啊,对吧,就是每一次来了之后,在最初始值的基础上,是不是想要去偏移一小点啊,在那个基础上随机的去波动一小点,好,所以更新。
15:58
在前一次温度的基础上更新温度值,呃,那大家可以看到我就要把这个current current temperature要做一个改变了,对吧?本身这个呃current temperature它其实是一个相当于是个数组,对吧?这是个例子了,那我这里边其实就直接可以把它再做一个map转换,是不是就相当于又是一个for循环啊,把它里边每一个都做一个变化啊,那么这里边每一个temperature那个本身它应该是个元组对不对?我现在是不是一应该保保留一是什么一就是那个3ID对吧。
16:43
二那就应该是不是把那个温度值要做一个变化啊,在之前的基础上可能随机的再加一小点对吧?啊,那这个我们就直接random random下一个高斯数,高斯随机数是不是就可以了啊,就这样做一个调整就可以了,所以每一次都重新生成一个新的这个温度值,然后接下来干什么呢?是不是还得有时间啊。
17:14
大家记得我们那个3READING里面是不是还有一个时间戳,获取当前时间戳啊,那大家可能知道current time啊,定义一个这个东西我们是不是可以直接用system里边可以对current time,呃,S,这就是当前的那个毫秒技数的那个时间戳,对不对啊,直接拿到,然后接下来。呃,大家会想到may不应该是current time啊,我是不是应该用current time,前面这是个相当于这是个列表,对吧?一个数组还是要去循环,还是for循环,我直接去for each。For each,每一个怎么做呢?是不是每一个温度我们都把它包装成一个sensor reading,然后输出啊,对吧?输出是不是就相当于成为了我们整个这一个s function的一个输出的那个数据就是我们的数据源,对不对?
18:16
哎,那大家会想到它的这个输出,怎么去输出呢?怎么能把包装好的那个sensor reading,你这只是一个for each而已嘛,我当然可以在这里边写一个那个sensor reading,对不对,把它包好,我们先把它包好吧,Sensor reading里边应该第一个是ID,那是不是它的一啊,元组的一,第二个是,哎,不对,第二个是看一眼sensor reading第二,哦,第二个是temperature,对吧?呃,诶不是啊,第二个是时间戳,对,第二个是时间戳的话,那我们这里边应该是直接就给current type对吧?然后呃,当然大家如果要觉得你想统一跟我们前面那个统一都是,呃,就是那个十位的话,你就除以1000对不对啊,这里边如果我不除以1000是不是也可以啊,对吧,我我就用毫秒来表示这个时间也是可以的啊,这个大家按自己的需求来,然后最后还有那个温度是不是就是它的二啊啊这个包好啊。
19:20
那大家会想到这里边,我直接把这个sensor reading这个包好了,但其实还没有输出,这只是做了一个for转换,怎么样能把这个输出呢?哎,这里面就要用到什么呢?大家看wrong这个方法,它有一个参数叫什么ctx,其实就是我们的上下文,对不对,Context对吧,所以大家看这是一个什么上下文呢?这是我们当前source function的source上下文。哎,所以它相当于是什么?用这个上下文就可以把我们当前的某个包装好的数据,然后发出去,就相当于成为了我们流数据里边的一条数据源,就是数据源产生的一条数据,对不对啊,一条一条就可以产生了,那大家看一下这个方法是怎么样去调用的啊,直接就是ctx里边有一个collect方法。
20:22
这个CLA的方法里边传入的东西就是他要去。到时候发射出去的那个数据对不对,生成的那个数据,所以大家看用这个方式就相当于一条一条把它发送出去了,我们的数据就产生了,对不对,而且是一条一条输出去的,不是十条一下出去的,对吧,这是我们还是尽管是十条是一次性我们产生的,就是一条一条一个放循环全部产生的,但是我们发送的时候是不是还是一条一条发出去的呀,所以还是留数据对吧?啊,这是我们的这个状态,呃另外我们这里可能做一个,呃,就是每一次在这个无限循环的时候,一组发,发射一组数据的时候呢,中间sleep一下,就相当于呃,让我能看到它的变化过程,对不对,不要太快,比方说我这里边sleep sleep500毫秒对吧?呃,设置时间间隔。
21:19
呃,可以方便我们那个观察它的输出啊,好,大家看到这就是我们完整的一个自定义的S,看起来还是有点复杂是吧,但是这就是呃,在这个link里边,后边我们会看到越来越多的这种。更加复杂的应用场景,就是要用自己自定义的这种方式去实现啊,它就更加灵活了,对不对,你想要什么样的方式,想要什么样的功能,都可以在这里边去自定义。呃,这里边我们把这个。STREAM4,给大家输出看一眼啊,大家能想到这个输出是什么样子的吗?
22:00
跑一下看看。大家看到这个输出它是包我们已经包装好的sensor reading,而且是不是一直在变,一直在生成啊,然后sensor reading1,呃,这个sensor ID一直就是一到十对不对啊,我们是一组一组生成的嘛,大家在这里边如果仔细去看的话,比方说你看这个呃341对吧,这个每一次生成的这一个时间戳是不是不停的在在增长,时间戳在增增长对不对,后边的这个温度值大家看是不是在随机的跳动啊,在上一次的基础上有可能变大,有可能变小,对不对,就是随机跳动,而且每个sensor它的这个温度各不相同。大家看有60多度的,有50多度的,有30多度,也有20多度80多度的,对吧?啊,这是因为我们一开始生成初始数据的时候就各不相同,所以大家看这就是我们产生这个测试数据比较好的一种方式,这是这个自定义SS啊,给大家看一下这个效果。
我来说两句