00:00
好,来,我们看一下这个自定义的source啊,My source这是呃出去应该是07101个班学生出去之后,在公司当中面临一个业务问题,这实际问题啊,然后给我们,我们帮他解决的啊,然后代码,当然我这边精简了一点啊,这这代码是核心的一些东西都是我写的啊,我直接写的就是它是实际的工作当中的一个需求,就要监控这个MY数据,你想想看,你要看之前的,呃,这个东西应该第一才讲过三四期吧,就是因为之前没有没有这个需求,所以就没加这个东西,是后来加的啊。这公司当个需求就是因为嗯,SS大家都知道,就是接收的一个语言数据的一个来源,对吧,啊就是一个来源地址,但是呢,就是说他提供的这些啊,阿芙罗一个常用的吧,应该提过对吧,然后这个SEC还有这个。监控文件夹的NEK监控端口的对吧?这几个东西是比较常用的,但是呢,你会发现它整个核心的有提供的,提供好的那些啊,有可能在我们工作需求当中,特别有的公司比较奇葩是吧,有很多公司比较奇葩的需求,那这个时候你要想到自定义某些东西啊,自定义某些东西OK,呃,关于自定义的这个东西,应该你们没有提过,你们看的文档应该是那个用户的文档对吧?From点。
01:16
七点。呃,你们看的文档应该是这个盖用户指南对吧?啊,它自定义的一些东西在这开发者指南里边啊,开发者指南里边,这里边有相应的一些自定义的一些东西,Develop啊,这下面都是一些customer,什么S,就是用户自定义的一些这个组件的意思吧,啊自定义组件,那它的组件无非就是source think China嘛,啊这三个都是可以自定义的,但是这一块呢,我们就讲一个这个source啊,你点过来这块就告诉你应该怎么去做,应该什么集成一个什么抽象的一个S类,对吧,然后实现两个接口啊,实现两个接口,当然这块它这个文档标注啊,你现在看去照。
02:02
照他这个写就是记成这个类,然后实现这两个接口,你会发现你需要你重写的一个方法啊,有点不一样,到时候提一下,呃,首先我们看一下它这里的要求了吧,OK,呃,第一个是这个什么。配置信息对吧,Context上下文它这个东西呢,就负责读什么读你不是有个A呢,你不是要写一个那什么配置文件吗?对吧?哎,要写一个配置文件,那这里边他就去读你那个配置文件了。读你那个配置文件的,你想想看,为什么你写一个配置文件,然后启动一个agent的,它就能把你的配置配置信息读走呢?它就通过这个context context有get string get in的等等这些方法来读你那个配置文件里面所写的东西,你看到的,之前看到的,我再看一个。然后看一下这个,呃,之前应该海哥跟你们提过的一个点,就是随便搜一个吧,啊CRLF那个ex e ex ec s找到一个,呃,他给你们提的像这种。
03:11
就是它没有默认值的,对吧,这种是有默认值的啊,它是怎么做的呢?就是这。他通过这个context的,就是说这个地方啊,它可以传一个什么,传一个,它这边是一个,呃,承载的方法,就是说有一个参数的,也有两个参数的,一个参数的,就是说如果说调用一个参数的方法,它其实就是这种属性值。没有默认值的,必须要在你的配置文件里面指定的,要不然它就没有啊,特别这种黑体呢,是完成这个需求最少最少要填的点的这黑体的,你看这里边不有黑体的和这个非黑体的嘛,黑体的是必须要写的,就必须要填的,而且也是完成这个功能最少最少需要填的啊,这是它用黑体标注的啊,黑体标注的OK,然后还有这种,那有默认值的,它是通过什么方法呢?就是这个。
04:02
重载的另外一个方法,传两个参数的,你看它后面叫什么。T value吧,哎,就是它有有一个默认值啊,它也是通过这个context去拿你配置文件里面的一个信息的,你配置文件不是通过那行命令叫什么from NG的,然后杠N。对吧,杠杠C-F等等这种东西传进去的嘛,对吧,杠F传进去的啊,所以它这个配置文件能动态的能够读取到的啊,读取到的它是通过这个方法,这个其实就是初始化它整个的一个配置信息的,初始化个的一个配置信息的,OK,呃,这个stop还有stop这两个东西不用多说了吧,啊启动和关闭,你现在新的这个版本列表已经没有这两个东西了,已经没有这两个东西了。宝贝,我们说一下它用什么代替了,呃,还有一个process。Process一个什么进程对吧,它这个东西你看啊,这里边要干什么事,你稍微看一看,说要获取一个。事件是form当中最小的传输单元吧,对吧,哎,一个一个一,它就在这里边干的,干什么事呢?在source这个process方法里边,它这个方法会被整个的流程当中循环的去调用的,循环的调用的啊,然后这里边它主要干的是把你读过来的数据,当然它这里边没有读过来数据,它直接就是get some data。
05:20
对吧,这个就表示获取一些数据吧,啊获取一些数据,他就把你读过来,那些数据通过,那是端口号exec so通信这种方式,对吧,你加S的时候也应该讲过啊,S通信啊,Eec就是这样做的,然后呃,还有那个什么其他的一些阿波罗。阿芙罗用的比较多的,你要搭建多层的一个架构的话,都用了那个阿芙罗组件,就是你们阿芙罗think,对阿芙罗S嘛,就是把跟form之间连接起来啊,那个读阿芙罗AV嘛。那个比较常用的,然后你看一下干了什么事,首先要获取他说receive new data获取新的数据,对吧,然后把这个新的数据是什么样子的,封装成了even的吧,封装成even的,然后你看一下他接下来干什么呢?
06:06
Get China process process就是获取China的那个进程啊,对吧,因为你在配置文件里面,是不是你将source think China最后那个地方把它绑定了呀,对吧,它分成五个部分对吧?开始DeFine定义,然后中间是三个部分干什么,分别描述source think China,最后一部分就是他们互相之间的一个绑定嘛,也就是说你这个source绑定到哪个channel,它也是通过文配置文件读进来的,所以它直接调用get channel process process就能够获取到你对应的绑定的那个channel。啊,对应的绑定的那个channel,所以说它调用这个channel.process这个方法,他就将这个事件是不是推到那个channel里边了,对吧,它通过这个方法来的啊,那循环呢,就调用,然后呢,它会有一个。Ready啊,状态准备好了,就是说可以进行下一轮了,可以进行下一轮了,然后还有一个back off back off,应该在你们讲那个S组的时候,应该提过这个东西吧。
07:10
Think。C组直接搜back off吧,呃,或者给你们说一下back off是什么?呃,Back off是你们呃,应该配S组的时候没见到这个属性啊。被四个组里边,等会我看一下啊,应该我文档里面写了有。嗯,Think组,Think组应该是单数据源多出口的一个案例,对吧,Think组,嗯,单数据多出口think组在这他们之间到底是负载均衡还是呃那个什么。轮还有一个除了负载均衡这种方式,还有一个是那个。不是,那个是负载均衡这个大类里边,它到底采用什么方式来负载均衡的,除了负载均衡,还有一个36HA那种方式啊,就是说那个什么。
08:07
高可用了,对呀。呃,这个里边找不到。搜一下吧,案例二,Think group对,Group组在这,嗯,它里边有default value not balance,你们用的是这个吗?你们刚才说的轮询也好,随机也好,是这个底下它的一个策略问题,对吧?这我说的不是这个,就是说这个这个这个叫什么?这个其实对,就是你们学那个ha高可用那种东西对吧?哎,这个就是负载均衡,负载均衡,然后它里边负载均衡呢,它这下面分别定义了,哎,Default process,然后。Lord balance,还有一个fair。嗯,那个那个。
09:01
那个东西去哪了?它里边有一个,嗯。Default在这它里边应该是这个,这个里边有一个back off,这个process back off这个东西你们。配置文件里面没写吗?没用吗?用了对吧,这是一个推理算法,推理算法指的是么意思呢?因为你发送消息是不是会有可能失败呀,对吧,它失败呢,它是这个地方,它默认的以指数的方式,就是什么意思呢?就是每次翻倍,它有一个重发的一个时间,就是说这次发送失败之后,假如说它里面定义的一秒啊,它第一次呢,隔一秒发送一次,第二次呢,就是两秒,第三次就是四秒,以指数的方式来增长。就是败,就是退币,退币算法,但是它发送重发到什么时候不重发的呢?它里边有一个最大的重试次数,就类似于你们当时学哈度的时候,不是重任务失败了,有重试什么四次嘛,对吧?啊,他什么时候重试,还有他重试多少次都有限定的,这是一个退避算法啊,退避算法是这个大家注意一下就行了啊,这里边有back off ready这个,如果它这地方干什么抛一常了有问题了,对吧?哎,它采用调用了这个退三法,然后说隔多长时间调用一次,隔多长时间调用一次啊,它自己里边有基数的啊,基数的啊。
10:16
这块啊东西给大家提一下,可能海哥没有给你们说,你要知道一下,有兴趣的可以搜一下这个拜高推算法啊,去了解一下到底是啊怎么来的里边,嗯,好几个地方都用到这个算法,刚才包括这款S里边对吧,还有China think,因为它都有可能你们不是学了那个什么事物嘛,不有可能发送失败嘛,对吧,这种东西它都是有这个相应的一个退避算法在里边的,那核心的其实我们要做的事情就在这儿。我们要做的事情在这,但是my so source这个东西呢,对大家来说应该挺难的,难在哪呢?就是MY这样。你想想啊,我们现在说我们的需求了啊,这个东西刚才已经看过了啊,就核心的这块abstract source对吧,两个接口,一个抽象类,一个抽象类,然后就是我们自定义的里边,呃,它现在新的是这几个方法了。
11:07
新的是这几个方法了。就你刚才看到不是什么start stop嘛,对吧,现在等会你自定义的时候,你能看见它方法不一样了,但是核心的还是看还有一个process。啊,这些方法还是有的,这两个东西啊,初始化上下文就是初始化咱们一个配置信息第二个。就是咱们的一个。循环被调用的那个方法,因为你监控一个MYSO克数据,MYSO克那个里面数据第一你不可能说里边500个500万条数据,你一次给他读过来了,直接就报了,对吧,你可能选择哎,一次1000条2000条到不了5000条这种读法啊,慢慢的读,或者说第二种情况就是说什么呢?那你们搜索那个表啊,就算只有几千条数据吧,一次读过来是OK,但是那个表既然你要监控它,它肯定是一个增量的数据表啊。就是他每天要往那个表里边写入东西,然后你要给他读出来,要不断的监控他嘛,对吧,是这样的,所以呢,你要考虑的事情,诶,怎么把这个表读出来,这个简单JDBC对吧?啊,我们就写原生的就OK了啊,写原生的就OK了,Prepare statement这种方式来读出来,但是你要考虑的一个问题,就是我这次读到十了。
12:15
下一次应该是从十一开始读对吧,那这个十想保存在哪。啊,这个off对吧,便宜量就是类似于便宜量嘛。你要保存在哪?这是大家要思考的一个问题,第二咱们这个需求啊,呃,你要注意它是针对于有。自增的主见的这种表达。如果监控的话,一般的像公司当中开发的时候像这种。主主表都会有自增的组件,都会有自增的组件,除非你用的是什么新型模型,然后说用的什么那个雪花模型,这种有它的一个维度表对吧,有他一个维度表,维度表里边可能它的主件就不是自增的了,就可能是那个主表的什么。外键了啊,这种方式来做的,这种方式来做的,这要注意的好,然后第一我们要监控的表呢,是一个带自增主键的一个,呃,主表,第二我们要考虑的问题就是。
13:11
这个存档。存哪都可以吧,只要它不是一时的环境,不要存内存啊,存内存的话不太合适吧,因为你是想什么,这个挂掉之后,或者说重启之后,还能从上一次那个地方接着读吧,对吧。那你要存内存,你挂了不就没了吗?所以内存咱们是不考虑的,那其他的只要非一时环境,咱们都可以把一失啊。容易丢失啊,容易丢失意识和非意识嘛,内存是意识环境嘛,然后磁盘啊,或者说这种非意识环境嘛,咱们肯定要存在一个非意识,其实你存在ZK,存在本地文件,存在什么HDFS,或者在存回买soql,这都可以,无所谓,那这一块我采用的是存在MYSO,因为就不需要再跟ZK再写一个客户端去跟他请求了,因为我本身就有一个GDBC的连接,对吧,因为我要读主表的数据嘛,接下来呢,我直接跟诶另外一张表行交互,跟另外一张表进行交互,那这块应该我跟大家说一个知识点,这里面到时候我们写代码的时候会跟大家提,就是说大家应该没有讲过,在MYS当中有一个知识点没讲,就是呃,对于一张表。
14:21
有则更新,无则插入,这个语法有用过吗?有用过是吗?啊,有用过就好,有用过就好,没有没关系,我到时候讲一下好吧,就是说一张表那个数据呢,他数据在那或者不在那,你用一一条语法,一条语法就一一个语句搞定什么呢?如果它有,我就对它的值进行更新,如果它没有,我就插入这个数据啊,我就插入这个数据就一一个语法,本来的如果让你们写的话,你应该干什么,查一下那个表返回有没有值,对吧?有值的话做这个update的操作,没有值的话做音色的操作,对吧?它那个语法呢,就是音色的跟update写在一个语句里面了啊,写在一个语句啊,是这个意思,到时候我们看一下就行了啊,这块核心的就是其实那资义source并不难,你看自定义source这块我就不说其他的,就这个方法里面,我这个写一个for循环,我用for循环,我自己封装什么12345这种东西,封装这even可不可以啊,可以的吧啊,只不过说这没有实际意义,这就单纯的会讲那个了啊,正好人家在公司当中要用到这个MY,所以说我们就讲一下。
15:25
啊,这块对大家要求比较高的,就是呃,也不是要求比较高吧,反正这块都是粘过来直接用的,因为GDBC的一个连接在这一块咱们是不重点讲的,但是这里边一个逻辑要理清楚,我给大家讲清楚啊,这里面的一个逻辑啊,因为它里边还是挺复杂的,因为想一下嘛,首先你要获取一个主表的数据在这对吧,你关键你还要存奥呀。你存off干什么?你是为了读下一次数据的时候,要把这个off拿过来用吧?所以它中间的一个数据还是很复杂的,而且还有一个问题,你封装的时候,你想想看,JDBC拿过来的数据,返回的就执行那个ex query那个方法之后反回值是什么。
16:06
Result set对吧,而且set这个东西它是编译器不检查类型的,对不对。是什么意思呢?就是说你买so表里边,假如说你多个列,第一列存的是一个int类型,但是你去result set拿出来之后,拿这个set.get对吧,Get的时候你里边你get这个地方是不是要get string get in这种啊,对吧,它是这个方法吧,这个地方你get string写一可不可以啊。能通过吗?编译器能通过,但是运行报错了,类型转换异常对吧?啊就是这样的,就说呃,Result set这个东西呢,不好用,所以说我们还要写一个专门的方法,把result set结果集啊返回成一个字符串啊,返回成一个字符串,然后跟我们那个source这块调用啊就是说MYSO这里边写的相对来说比较复杂一点啊,比较复杂一点,OK,这是整个的我们对于那so这个东西大的一个方向的一个分析,大的方向的一个分析,OK。
我来说两句