00:00
DOS这一部分常用的数据源,其实我们都已经给大家讲完了啊,呃,像前面的这个集合和文件读取数据呢,这个从集合读取数据,这个一般就是做测试了,就是自定义,相当于把数据都写死了啊,一般就是一些测试数据,然后这个从文件读取数据呢,我们可以做这个,呃,一些测试数据放在文件里面啊,也可以做测试,另外还有就是说,如果说我们做的是一个离线处理的话,这种方式还是挺多的,对吧,那就做这个批处理的时候,这个比较多啊,那真正的流处理那一般就是卡夫卡作为。数据源对吧?啊这种方式是最常见的,那另外接下来又给大家单独讲一个,就是所谓的自定义S啊自定义一个数据源,这一部分又是主要是干什么事儿事情呢?呃,简单来讲就是说,那假如说有一些场景下边,我就是想,呃,首先我做这个,呃,如果要是做测试的话,我也不想直接把这个数据就写死,就放在那。我还是想就像这个流逝的数据一样,源源不断的生成,我随机生成数据对吧?哎,当然有同学想到,那你可以就是写一个脚本吧,写一个脚本随机生成数据,然后往那个,呃,就是相当于卡夫卡里边去发送,不就完了吗?对吧?卡夫卡那边相当于有一个卡夫卡producer随机生成数据的一个程序啊,不停的生成不就完了吗?我们这边去消费啊,当然这是一种方式,那另外还有一种更加简单直观的方法。
01:27
就是我可以直接自定义一个数据源,然后自己去实现那个s function接口。在实现的过程当中,我们在自己定义的那个类里边就去实现怎么样去随机生成数据,那大家想如果要是把这个做出来的话,那是不是都不需要连接外部,外部的一些东西对吧,外部的所有的呃,工具和这个数据存储的地方都不需要了,我直接在代码里面运行起来,它是不是源源不断的产生数据啊啊所以这个在有时候啊,在这个模拟数据源做测试的时候,还是非常的好用啊,那所以接下来我们就给大家讲一讲这个怎么样自定义S啊。
02:10
呃,那除了就是关于我们这个做模拟测试之外,就多说一句,就是这个自定义S,其实可用的方式还挺多的,就比方说前面我们讲这个官方的连接器给我们有提供了这个卡夫卡的连接器啊,那假如说其他没有提供连接器的地方,我想要去读取数据怎么办呢?哎,那你也可以自定义啊,对吧,我就直接在这个呃,S function里边去定义怎么样去读取数据,然后生成数据,这不就完了吗?啊,所以它其实是比较通用的一个接口,所以接下来我们就在代码里边做一个实现,首先还是啊,在S下边去new一个class,当前这个是source test。四哦,之前我们那个应该是三对吧,卡普卡应该是三啊呃,现在我们是自定义的这个数据源,那我这个就直接写成这个u DeFine,呃,Udf吧,因为大家其实知道,对于那个要实现的那个接口也是叫做source方式,也可以认为是一种特殊的函数类,对吧?方审啊,我把上面这个稍微改一下名字。
03:19
Reflector。这个是三。好,然后接下来我们其实就主要是把这个udf的这个过程啊,呃,自定义的这个过程要写出来,那主体的流程的话,大家会想是不是跟之前基本上也是一样的呀,对吧,也是首先你要把那个执行环境创建出来嘛,然后哎,读取数据,然后打印输出,然后执行啊,我们现在只是做测试的话,中间没有任何的转换操作,所以我先先把这个整体流程先放在这儿,那接下来当然就是说读取数据的过程,那就不是直接从文件里面读了。接下来我应该env。
04:01
那大家想是不是既然自定义,那显然应该用最通用的这种方法,At source对吧?然后接下来我就要定义一个,哎,自己声明出来,比方说我这个叫my sensor s。直接声明出来,这是自定义的一个类,它需要去实现一个SS方式这样的接口,对吧?啊,所以接下来我们在下边做一个实现啊。实现自定义的source方式啊,那这里面我们用public static class my sensor source,那现在是不是要implement一个接口,Source function接口对吧?好把它写出来,这个s function写出来之后,大家就会发现它其实是有一个泛型的,那这个泛型应该是什么呢?是不是就是当前你数据源嘛,这是数据源产生的数据到底是什么类型,是不是就应该放在这个范型里啊,所以这里边我们产生的数据,最后其实就是这里边要去处理的这个数据,对不对?那假如说我这里边得到data stream是一个stream类型的data stream的话,那我这里应该就是stream对吧?啊,那我这里面既然都已经自定义数据源了嘛,那大家想是不是我可以直接把它包成,哎对当时我们定义的那个po类型s reading啊对吧?
05:26
直接把它写在这儿啊,就包成我们想要的那个类似于Java病的那个类型,所以这里边我生成的时候也需要是一个sensor reading类型啊,写错了啊呃,就是这里边大家看到直接把这个类型写出来之后,是不是前面就已经完全不报错了啊,这个就就已经对了啊啊,那所以接下来我们其实就要实现这个接口里边的一些。方法,大家看这个接口里面必须要实现的方法有两个,一个叫做wrong,一个叫做cancel,那这两个一看字面的意思大家就知道了,Run,那是不是就相当于是运行起来之后,它就要不停的运行,不停的运行啊,啊这当然这个不是那个flink run,我们那个提交drop啊,因为这是当前S这一步任务里边的一个方法,对吧,所以大家当然就知道它是运行嘛。
06:18
所以对于这个SS任务而言,它其实在底层结构上就是什么呢?只要把它执行起来之后,它就会直接调用这里边的wrong方法,对吧?啊,调用这里的wrong方法,然后就开始不停的生成数据啊,读取数据了,那大家会想到,如果说我这里边是流式数据源源不断要生成的话,那这个run方法里面应该怎么办呀?对,是不是应该有一个循环啊,啊,这是一个while对吧,类似于一个,那在想如果你要是while处的话,那变成一个死循环了,永远不会退出。嗯,那其实大家看到下面是不是还有一个方法叫cancel啊,这个应该是取消要退出对吧?但是你说这个方法就是我如果要是调用的时候,我肯定是外边传一个信息过来调用这里边的这个cancel方法,对吧?那这里边我调用这个cancel方法的时候,怎么样能控制住这里边的wrong,退出那个while循环呢?哎,大家想是不是我来一个标记啊,来一个标志位就可以了啊,所以接下来我们的做法非常简单啊,这也是比较通用的在s function里边做这个,呃,这个生成数据的一个控制的这种方法啊,所以这里边我们定义一个标志位。
07:32
用来控制数据的产生哦,那当前定义一个private这样一个属性啊,我直接定义一个布尔类型标志位嘛,对吧,True false就够了,呃,这个我就直接叫做running,默认的情况下是true,因为默认情况下应该就是不停生成的,对吧,就是正常运行的啊,那所以下边这个cancel我先写出来吧,这个比较简单,是不是我直接把当前的这个running直接制成false就完事了呀,对吧?哎,直接指定成false啊,那么然后上面的这个wrong方法里面我该怎么办呢?
08:12
那是不是就是有一个while循环,当running running啊。当装为处的时候,我就不停的生成数据就完事了,对吧?哎,那这里面有一个问题,就是你生成数据怎么生成呢。当前这个run方法它又没有没有返回值,而且大家知道你既然是Y循环嘛,那它不能返回对吧,你一旦返回那不就退出去了吗?哎,所以这里边大家就会想到,诶,我们看它的参数是不是有一个ctx啊ctx这是当前的一个S任务的上下文,而这个ctx里边大家会发现有一个方法。叫做collect,这是不是我们之前那个Fla map的那个类似啊,那种生成数据的方式类似对吧?所以这里边我们直接就用的是Ctx.collect这样的一个方法,就可以生成我们当前想要的数据,来一个就生成一个,来一个生成一个,那我们是不是直接在这个外部循环里面不停的生成就完事了啊,所以这个直观的看起来其实是非常简单的啊,那在这个例子里面呢,我想把它做的稍微复杂一点,因为我想尽可能的贴近于生产实际,模拟出实际的那种呃,温度的变化状态来啊,那大家想一下实际的这个生产环境里边的温度值,它应该是什么样子呢?
09:35
我可能要同时监控很多个传感器,也就是说这个三四那个ID可能是不一样的,对吧?诶,那所以这里边我先呃定义,比方说我要定义十个传感器,对吧?啊,就是一到十啊,有十个传感器,然后它的那个温度值,一开始可能我直接随机的生成一个初始温度,这十个传感器是不是应该初始温度各不相同啊啊,因为它分布在不同的地方嘛,有些地方温度高,有些地方温度低啊,这个差别可能还挺大的,然后接下来呢。
10:07
接下来是不是这十个温度,十个传感器的温度值要在之前的基础上不停的上下波动啊,啊应该就是有可能变大,也有可能变小,但是这个范围可能非常的小,对吧,而且每一次都是在之前的那个温度值基础上随机的做一个微小的波动,这就比较符合我们真实的这种呃生产环境收集到的温度值的场景啊,那大家想现在我们怎么样用这个呃代码来实现这样一个需求呢?啊,简单来讲,这里边既然一开始是要随机生成初始温度值,后面又要有随机的那个小波动啊,那是不是我们至少是需要有一个随机数发生器啊,对吧?所以首先我们先定义一个随机数发生器啊,那这个我们就直接用。大家知道这个Java u里边本身就有这个random这样一个类,对吧?哎,我直接把它定义出来啊,然后接下来,那我首先应该要。
11:08
啊,就是设置十个传感器的初始温度值对吧?啊,做一个这个初始的设置,那这个大家其实就想到我这个应该按用什么方法去设呢?那应该是一个34ID一个温度,一个3ID一个温度,对吧?啊那所以最终我应该把它保存在一个啊这样我我保存在一个list,或者是保存在一个,大家想这不就是一个key value吗。我可以存在一个哈map里面对不对,而且大家会想到如果是一个例子的话,后边我要基于它去做更新的,呃,基于当前这个初始温度值去做更新的话,第一次更新很简单,那第二次更新的时候,我是不是还应该把之前历史里边对应的那个温度值应该改掉啊,对吧,应该都是上一次的温度值基础上吧,我不能永远是用最初的那个温度值去去波动,对吧?啊,那所以你如果要是要更改那个之前的温度值的话,用list的是不是就不如用map呀,对吧?用map你直接找到那个K改了就完事了,所以接下来我们是用一个哈希map来做这样的一件事啊,所以我你用一个哈希map。
12:18
啊,那当然这里边我可以指定它的这个数据类型,ID是一个string,然后温度值是一个double对吧?把这个先定义出来啊,当前我这个就叫做sensor temp map先把这个定义出定义好啊,那既然是十个数,那我是不是来一个for循环啊啊,这个在Java代码里边可能就稍微麻烦一点,大家知道skyla代码里边的话,是不是我可以直接一,然后TO10啊,这样一个range就可以做这样一个便利循环了啊,那现在没那么方便啊,那我只能是for int I等于零,然后I小于十,十个对吧,I加加,这是我们标准的这个for循环的写法啊呃,然后里边的话,我是不是每一次循环就应该在这个map里边塞一个ID和温度值啊,哎,所以就是sensor temp.put。
13:14
啊,要传进去,那我现在是不是要就是温度值应该是3S4,然后一个下划线,大家还记得那个ID的写法是吧?然后再加上当前的一个对一个I,但是大家注意这个I是从零开始的,我想要从一开始对吧?哎,所以呢,我是不是来一个I加一啊,所以这个其实写法还是非常简单的啊呃,前面是一个string,后面是一个呃,这个当前是一个int,那大家知道一加那就相当于还是to string了,对吧?啊,这个没有任何问题啊,后边呢,是要随机的生成一个double类型的这个这样一个温度值啊,那大家可能会想到,那就用random了,对吧,Random怎么样去生成这样一个随机数呢?啊,那大家可能想到我可以直接next double对吧?诶这里边我要给大家介绍的是大家看随机生成的double类型的,呃,方法啊,还有一个next goion,这个是下一个。
14:11
高斯随机数那什么叫高斯随机数呢?诶,这对,这就是高斯分布,按照高斯分布随机生成的啊,这样的一个随机数,那大家知道高斯分布其实就是是什么呀。其实就是正态分布对吧?啊,这个正态分布大家就很熟悉了吧,对,它其实就是按照我们这样一个曲线啊,那是不是离这个呃零轴啊,这个均值更近的地方,这个取得的范围这个概率就比较大对吧?呃离得远的地方那个概率就比较小对不对啊,所以呃,这里边这个呃初始肯定就是这样的啊,那大家知道这里面如果我直接按照这个去生成的话,呃,它的这个范围应该是多大呢?大家知道标准正态分布其实是它有两个参数,一个叫谬,一个叫西格玛是吧,对吧,这个谬是均值,均值应该标准正态分布是零对不对,那这个西格玛是标准方差啊,方差默认是一对吧,那所以这里边其实均值是不是就是我们呃,它它中间这个对称轴啊,对吧?啊在在哪个值范围的那个左右啊,所以我们如果要直接这么生成的话,你生成的数都是在零范围内上下波动对吧?然后呢,这个S格玛呢,就代表它波动范围,那大家知道这个呃,就是它代表的含义是啥。
15:38
这个正负一西格玛范围内,落在这个范围内的概率其实应该是啊,应该是60%几对吧,百分之六十八点几啊,不到70%的样子,那如果要是落在正负二西格玛这个范围内,这个就已经达到了,呃,就大概是95%左右的概率了,啊,这个就大概是。
16:01
啊,那这大家可能也听说过,就是在质量管理里边,其实有这个所谓什么三西格玛对吧,什么4C玛甚至六西格玛啊,有这样的概念说的是啥呢?呃,就是你的那个,呃,就是正品率应该是落在这么大的范围内,对不对啊,就是那个出故障啊,出现出现质量偏差的那个概率是非常非常小的啊,所以这个是质量管理里面经常提的一个概念啊,我们这里面也知道,你如果要是这么定义的话,三四格玛的话,正正负三四格玛基本上就是99%了,对吧,百分之九十九九十点九十九点七啊,应该是啊,那所以我们现在随机生成的这个double类型的数据应该是什么样的一个数呢?绝大可能啊,就是极大的概率应该是落在正负三之间,是不是,因为它是一嘛,对吧,S是一,所以它是正负三之间的一个double类型的一个随机数啊,以零作为这个分界啊,也中心的这个呃均值啊,那所以这里边我们这个感觉这十个温度这就有点。
17:01
偏差太小了,对不对?哎,那我怎么样把这个范围扩大一点呢。哎,这个其实简单,那你说它是正负三之间,我再乘个20。你说这个是不是变大了呀,这是不是就是正负60之间了呀,对吧?然后你如果要是说我标准的这个当前的设备啊,呃,你都是在这个正常的生产环境里边,那肯定不可能一个设备到零下60度对吧?啊,我觉得这个可能性不大啊,那所以我可以前面直接来一个60,然后加上这么一个数,那接下来生成的随机的这个double是不是就应该在零到120度之间啊,哎,所以这个就是你看你具体的需求了,实际应用场景到底是什么样的,就按照这个标准去生成数据对吧?哎,这就是我们的初始温度值,然后后边去做这个running的,呃,循环里边啊,我们要去做更新的时候,这个其实也非常简单,既现在既然是这个一个map里边十个数,那是不是我还是应该有一个for循环啊,挨个去更新这这十个当前的这个十个三四对应的温度值,那所以当前我就不用去把这个I等于零,I小于十再去变利了,那。
18:13
大家想是不是我直接便利当前的?当前map里边所有的那个key是不是就可以了啊,所以我当前拿的是当前这个sensor的ID对吧,从哪里去取sensor temp map里边我是不是直接可以取它这个key set呀,对吧?啊,所以接下来我其实是从这个key set里边把每一个当前的传感器ID拿出来啊,那在里边是不是就是在当前温度基础上做一个随机波动啊啊,那所以呃,这里边我首先定义一个double类型的new temp啊,当前我想得到的一个新的。温度值,那就是之前的温度,那应该是不是这个map直接get呀,Get呃,Get当前key是不是已经在这了,Sen ID对吧,直接把它get出来,然后怎么办,随机随机波动怎么办?哎,那我是不是直接加一个random,然后下一个高斯随机数是不是就可以了啊,你也可以定义那个波动的范围对吧,大家知道这个,呃,绝大多数是不是都应该在正负三范,呃,正负三度这个范围内啊,你如果要觉得这个范围太大的话,你也可以比方说我乘个0.5对吧?啊,那就是变成了正负1.5度之间啊,就看你的自己需求啊啊那所以这就是我当前得到的这个新的温度值。
19:40
然后接下来我是不是要把这个温度值要直接put进去啊,更新,所以是Sen ID,然后new ten new ten。塞到当前的哈西map map里边,最后是不是应该Ctx.collect是不是应该在这里把它发送出去啊,啊,因为你这个每一个sensor对应的数据都得发出嘛,啊,所以这里边我又一个sensor reading里边的数据,那就是当前的ID sensor ID,然后诶下一个是时间戳,时间戳的话,我是不是可以直接用system直接获取当前的时间啊,对吧?而且我们要的是长整型,那我可以直接current type millions啊,当然这是一个毫秒数啊,我们之前的那个数据里面,大家看这个应该是毫秒还是秒啊,对,这个其实看起来应该是个秒,对吧?啊,所以这个可能还是稍微有点差别的。接下来最后还有一个温度值,就是newtime放在这儿就完事了啊,当然大家还可以控制一下我们这个更新输出的频率啊,叭方说我这里边就用一个这个thread,是不是可以sleep稍微等一等啊,对吧,你不要更新的太快啊,要不然我看不清楚过一秒钟。
20:51
更新一次啊,我们控制输出频率。这就是我们进行自定义S模拟,生成这个温度传感器的数据源这样的一个代码啊,啊,接下来我们来运行一下,大家看看效果怎么样,下面我们生成的那个数据是直接有这个打印输出的,对吧?啊这里边的这个代码,而且已经执行了啊,整个过程都是完整的。
21:18
接下来我们就看一看到底应该怎么,呃,输出的是什么,诶,大家看。这是不是就是我们当前输出的这个结果啊,啊,当然这是因为大家看不是按照一二三四五六七八九十按顺序,那是因为我们是便利的那个k set对吧?啊对,所以这个因为是哈希map嘛,所以它里边保存的这个顺序不是按照我们定义的那个1234的那个顺序啊,但是大家看到他们每一个温度值是不是本身的这个范围还是有点差别的呀,对吧,有些20多度,有些40多度,有些80多度,然后每一个每一次跳变,它是不是都是在之前的那个温度基础上,可以大可以小,随机的有一个波动啊。哎,所以大家会看到这就是我们非常贴近于生产实际的这样的一个效果啊,啊,在我们实际操作的时候,有时候也会用到这种方式,就是我们生产环境里边还没有足够的数据,对吧,还没有直接上线,那我们在在这个测试的时候,你怎么测呢?开发的时候想做测试啊,你就自己来定义一个这样的数据源。
我来说两句