00:00
好,接下来写我们核心的那个类吧,哎,就真的这个类就要带着大家敲一下了啊,这个类就要敲一下了啊,我们叫my source so my source,他要继承这一个什么,这个东西反应这么慢吗?一个抽象的一个对对吧,还这个S,我都是从,嗯,尽量能从。官网拿我就从官网拿OK导进来,肯定是from source包下的吧,OK,然后还要实现两个接口对吧?这两个接口呢,我们也粘一下,从这啊把这个粘过来,OKOK,把这个接口都倒一下啊,都是包下的啊,不用写了,OK啊这个地方报错要实验方法对吧?对,导包这个ID导包out加回车,Out加回车导包,然后呃,重写,就是说这不要实现方法了嘛,对吧,报错了也是out加回车这个在ideal当中,Out加回车这个ecls out加斜杠是一样的提示的功能,能懂这意思吧,OK out加这个回车啊,他要你实现相应的方法对吧?确定好实现这四个方法,这四个方法看一下,他现在没有那个stop的stop了吧,那他就核心的多了,这两个方法做那个什么退币退三马用的,然后process核心调的一个方法对吧,然后configuration就是它初始化那个context。
01:27
OK,那这四个都要实现,都要实现好,那这两个方法我们因为暂时用不到,这两个方法就不用管,就让它返回零就行了,返回零就行了,那就传试的重试的次数和累,每次累加的一个次数都为零,就不做什么重试了吧,啊不做重试了,OK,这块是我们核心的要写的一个方法,这个地方如果说你在这个位置,呃,要到下一行的话,Shift的家回车,这个好像跟ID是一样的,对吧,ID也是它回车啊OK,嗯,这个地方就是核心的方法,核心的方法里边我们要干哪些事呢?啊这样吧,我定一个这样。
02:10
核心方法,然后要干的事情就是读取数据,核心的我写的都是核心的一些注释,读取数据要有吧这块啊,你看那个官网里边,他是不是先去。Get some封装啊对吧,这是核心的,OK,第二个就是封装事件,然后写入China吧,写入China。写入China,这是它process方法,而且是要被循环调用的,这次读了下次要接着读了啊,同样的一直一直往那个China写,一直往那写,OK,然后还有一个com,这个是获取配置文件,对吧。
03:02
获取配置文件信息。初始化谁啊,初始化这个context啊,就是说把你在配置文件,你不是要写一个什么from.com那个东西吗?对吧,写了东西之后,那个东西读怎么读进来,就是通过这个上下文啊,这个东西来读进来的啊,读进来的OK,这个东西我们在哪初始化的,刚才说。看一下这个地方我们是不是要传进来一个,然后去初始化get。Get get get,那你看啊,这里面的东西。属性名对吧,这后面是一个默认值,现在呢,看一看你们写的那个配置文件啊,怎么读进来的。这块就类似于你们写的之前写的那个什么,有什么参数啊,Com啊,延时啊,若什么那个hdfs think那块有一个什么多长时间滚动一个文件夹,多长时间滚动一个文件等等的,它是不是你在配置文件里面有一个属性等于什么值啊,对吧,这个地方就是你等于前面的那个东西。
04:14
啊,配置文件里面要写的跟这个地方要保持一致的,保持一致的它就是通过这个key来去你配置文件里边找有没有这个属性,有没有这个属性啊相应的这些东西,它是通过这种方式啊来初始化,你看一下columns to insect to select。默认的指示心吧,当时提到对吧,然后run carryry delay,两个直行之间的一个延时,延时start from default,对,这是起始位置,它都有默认值的,这个都是设置默认值的,这它这个编码集字符集是吧,U点杠八还U点杠八,这个都是相应的,你要配置文件里边的属性啊,配置文件里边要怎么写的,就是你前面要写一个什么C点什么什么内容点,然后点一个什么roll,就是类似于这样的,你这个属性怎么写的呀,C一点啊think应该是。
05:07
Think应该是K1对吧,K一点,然后这个process off,然后等于一个什么值了这个东西,这个地方你写的process process.back off就是什么呢?就是你的代码里面写的这块的一个内容,当然我们的属性跟它不一样,很懂这意思吧,它是通过这种方式来给它读进来的,就是通过这个上下文context读进来的,OK,那既然要初始化在这个comp里面做的话,那你是不是new对象,是不是应该在这个地方用。对吧,So so help啊把这个context给它传进去。得到一个返回值,得到返回值只有异常,我们先抛一下,嗯,有异常啊这块这块为什么不能抛。重写对,这是重写的一个方法吧,啊实现的一个方法,那它负类或者说它这个接口里面根本没有抛这个异常的话,你不能抛比负类更大的异常,对吧,所以说这地方只能抓啊,只能抓好,那既然抓到异常了,走到这就走不下去了,是不是应该把那个什么资源关掉啊,最后啊在这个地方,嗯。
06:18
把这个应该提成什么。全局的吧,对吧?哎,提成全局的在这儿,嗯,把它改成那个私有的,形成全局变量,然后把这个地方那个声明给干掉了啊,全局变量,因为这个so help,我们在这一块创建的时候,只是去初始化了一下context,创建了一个连接,然后真正的读数据,还有那些东西是不是也要拿着这个类调以相应的方法呀,那你想想看,读数据是不是应该在这里边去读啊,对吧,所以说你这个地方必须要给它提升成全局的一个。属性啊OK,然后这一块如果说它出现异常了点close啊,这个里边异是什么?异常解析异常,就是说它这个里边有一个类要去解析了,就把它关掉好,那初始化好了,我们来和写我们核心的一个方法,核心那个方法,那这里边儿。
07:23
第一步读数据对吧。读数据第二步。封装一的封装事件嘛,第三步写入China了,一样好,那读数据。读的是MY的数据,其实调用的就是执行的那个方法,对吧,执行query那个方法不就是读数据吗?这能理解吧,所以说拿着这个SOSO点。
08:01
XQ的query执行,嗯,XQ的吧,OK,读出去这块,那接下来可能同学想到,那这个地方是一个什么list list里边object,我们应该把它转成stream是不是,但是你要注意这个地方。有没有可能会空啊,就是说什么意思呢?到了晚上你这个你在公司当中开发的时候,肯定是一直在线上跑的,对吧,这个任务,那你这个表读到,假如说他现在存到100万条数据了,但是100万条读完了,他到了晚上也没有人往这里边写了,但是你这个任务按照延时是不是还是一直在监控,一直一直不停的跑啊,因为是一个监控任务吧,他一直要跑的,那他照样会执行什么读取的一个操作,他没有那么智能啊,没有说就是说你们搜QL当中没有数据了,他就不读了,那他怎么知道有没有数据啊,他只能读了他才知道了,所以说这个东西它一直按照你那个延时,他一直在一直在跑,一直在跑,那当你什么呢?100万条数据读完了,没有数据了,他是不是照样会去执行这个操作啊。
09:09
那一执行这个地方为空的吧,所以说你在下面封装事件,你不能封出一个空事件发出对吧,所以说做一个判断对吧?哎,封装事件之前判断一下这个违规空,如果说这个什么require.is empty,如果他不违空的话,我们干什么,我们才去做这些操作对不对,为空为空,你就直接延时做下一步就完了呗,对吧。这能理解吧?啊,你读一张表可以,它这个返回值是空的啊,返回值空的可以的,OK,封装事件,封装事件之前我们是不是应该把这个值转化成string啊,对吧?query.get a row。嗯,应该是搜点。返回值为list。
10:00
这个地方O转化为stream嘛,要用那个方法把我们那个XQ的给它传进去。Rose,所有的行啊,一行一行一行的一个数据,因为你一次读的时候可能会返回很多很多的一个行数据,对吧?OK,这是所有行的一个数据,然后接下来封装这个事件,封装事件你应该是一行封装这一个事件,还是你读出读出来这么多数据封装的一个事件。一行一行的封装吧,啊,一行一行的封装,OK,那这一块呢,我们要一行一行的封装的话,应该要做一个便利对吧。啊对了,嗯,这块这个声明是怎么出来的,在这后面ctrl out加V啊直接就声明不是你之前应该是CTRL1对吧,一个例子啊,这是ctrl alt加v ctrl al加V,然后还有一个你要是格式代码啊,假如说比较乱对吧,在这。很很乱哎,假如说比较乱的时候,空格比较多啊,这个怎么格式化呢?Ctrl o加L。
11:03
格式化代码啊,格式化代码这块注意一下两个快捷键,这两个东西相对于说用的也比较多一些啊,稍微提一下,然后接下来就是说我们要封装事件,刚才也问大家了,我们应该是一行数据一行数据来封装吧。对吧,所以说要对这个all Rose进行便利是不是啊a Rose便利点,我们用增加负循环,直接点for啊,就是一个方法直接回车,那这个一行数据,我把它叫弱吧。啊,一行数据啊,这一行数据这个东西就是一个string类型的,对吧,然后是封装事件。通常是念OK,那理论上来说我们应该new一个什么even的,对吧?对吧,你从这个地方能看出来这个even是一个什么,是一个类吧,对吧,哎,你一个尝试扭一下看看even even烟体。呃,因为我发现你肯定要找到这个from包下的吧,对吧,它是一个什么。注意在idea当中,你看啊这种啊都是类。这个是接口FA,然后lay是不是发现有两种啊。
12:03
这这两个图标有不同,能看能能看到吧。就是旁边俩多俩灰的嘛,对吧,竖杠的这个是抽象类,这种呢是抽象类,这种呢是可以实现的类啊,这块注意一下,这是ideal当中几个图标啊,图标你看这个地方哎,Interface它是一个接口,所以说你要new的话,是不是要自己在这实现一些方法啊,所以说我们不想在这扭干什么看它。时间类对吧,看它时间类OK点进来。嗯,先把这个这个导进来啊,OK,然后这个进去看还是一样的,CTRL加这个左键就行了,进来这块,如果这块要你下源码包啊,当然其实我们不看这个类,如果你要下的话,直接在这下。啊,直接在下这个地方就不用像E,当然ecl高版本的时候,它是不是也可以自己去。引入这个刚才我们所看到的那些内容啊,没有注释对吧,它是依赖里边的。
13:00
依赖里边是不是也是一个一个的炸包啊,打成炸包之后注释全没了,不是这种。要打成加包类的一种注释,它就没了,对吧,所以说你刚才看到都没有注释,你把这个源码下下来,它就有注释了,有注释了OK,我们现在看一下它这个接口,要看它那个时间类,那看这时间类怎么看呢?CTRL加H。CTR加H啊,它就显示在这一块,在这一块它有两个实现类,一个simple,一个Jason,那我们用那个简单的even就够了啊,用这个简单的even就够了啊,看时间类CTRL加H就放在这个类上面啊,放在这个类上面,放在类类上面干什么呢?CTRLH啊,它就能调出来,能看它的一个整个的一个继承结构是什么样子的。集成结构啊,相应的这几个快捷键在后面可能用的比较多一点啊,说的时候稍微提一下,OK,那这个知道我们应该用什么。Simple的对吧。Simple MP。
14:01
Simple,找到这个from包下的啊,From包下的OK controlt out加V啊,返回一个。Even的,或者说这个地方我直接用接口来接收,可不可以也可以吧,啊直接用接口来接收,OK,呃,这个地方你可以看一下,它是有空参构造的,对吧,如果没有这地方就直接报错了,如果说你要想看这个地方它要什么参数,就是你调一个方法看它参数怎么看呢?CTRL加P。CTRL加P这个地方它只有一个空参构造方法,它没有,你看这个地方我按了CTRL加P之后,如果它有很多构造方法的话,它会在这给你列出来,他要什么参数,这个就是不需要参数的吧,哎,不需要参数的,OK,那这个事件,那事件放在这了,事件我们直接发出去吗?不对吧,我们应该是把什么把这个封装到这个事件里边嘛,对吧,而且事件你注意是不是有头有包的呀。
15:01
提过吧,而且你们通过那个什么log这种think的时候,打印出来那个头head,还有一个head,就是一个大括号嘛,里面没有东西对吧,没有东西就空的,但是还有一个body body是不是前面有一些自建码文件,也就就是封装成那个一串数字,然后后面是你所打印的那些数据啊,对吧,有点印象吧,昨天我前天才学的吧,啊,前天才写的,OK,那证明这个时间啊,你应该把这个数据啊,应该放在他的头,或者说他的一个包底那个地方才能传过去吧。当前这个事件是空的吧,没有任何东西的嘛,所以说我们看一下这个事件有相应的什么方法点。干的保底赛保底干还到赛的还的。就是正常的一个类里get set方法对吧?啊,所以说这个地方我们应该是set body对吧,Set body啊,设置它的一个。值,那这个包其实就是我们一行一行的数据,对吧,它里面要什么参数啊。
16:01
Bed数组,那我们是SP,我们怎么办?Get best对吧?roll.get转进来,那这个body设置完了,还有一个had。对吧,我们看一下它还要设置一个头even,其实你可以不设置吧。因为你们之前的那个所写的那个log里边那些方法,你看头都没有东西吧,啊,他也没有封装很多的一个图信息在里边,但是我们可以自己封装一下,或者说我们自己也写一个那种空的啊,看空的怎么写,那你看它要一个什么,它里面参数。要一个卖。要一个map对吧。Spring的一个map还那我们放空的,你就放一个空的map进去就行了吧。对吧,OK,那空的map的话,我们应该在外面去给他扭一下吧,不要在不要在这个方法里面用了吧。因为在这个方法里面六的话,你是不是每一条数据,或者说每一次执行的时候。都会去创建一个卖对象。
17:03
对吧,像这种循环的就跟你们那个map方法一样嘛,不要在里面去生成对象嘛,对吧,我们放到外面OK。Private。和map,这个map的KV类型,我们刚才看到是spring spring对吧。对。OK,然后一个。啊,我们叫head吧,Head。爱的,嗯,声明。直接给他溜出来对吧,用一个。哈西,卖吧,不给,然后在这个地方把我们那个害的。传进去对吧?啊传进去传进去OK,这块就是头跟这个包里的封装好,封装好了之后是不是应该把它写出掉,那应该在循环里面写对不对。对吧,你是一个事件一个事件的写出去吗?在循环里边写,OK,看一下它是怎么写的。
18:04
它是什么?Get China process.process对吧,那我们看一下你跟他一样get China。Process点好这块我们看到确实是有这个什么process的这个方法,对吧,但是你看上面还有一个什么process even Bach批量提交对吧,批量提交它里边传一个什么集合,那集合的一个半形是even的对吧?OK,那我们用这种方式。那用这种方式的话,你在这个负循环里面写还合适吗?不合适了吧,所以说应该在外面来定一个什么。这一个集合对吧,哎,在这个地方。用一个A。而release里边泛型放的是对不对?加V。
19:03
创建一个。存放事件的。集合啊,存包事件的集合,然后每次把这个事件这个地方是什么循环封装。循环碰装,那你每一次封装完一个事件之后,你是不是应该把这个事件添加到这个集合里边了,events.a然后把这个event给它添加进来,好添加进来之后你是不是在外面去。Get kind process,点这个地方才能去批量的做一个提交啊,做一个批量的提交。接下来有个问题,就是说这个东西我们放在这儿来创建合不合适?一个集合你合不合适,你要想这个方法,就是想它会不会多个创建。
20:02
这个东西我们讲的是不是你读一次就可以调用一次方法呀,读一次调用次,读一次调用一次,OK,那是不是读一次的时候,假如说十条数据,你限定了十条数据读一次,十条数据读一次,十条数据另有一个集合,十条数据用一个集合。那不完了吗?对吧,所以说这个地方我们应该把它提出去吧。你给他提成。全局的全局的之后,那全局的又有个问题了,那是不是应该要做一个,有同学发现了这个地方,你只要写出去了之后,应该把这个events干什么,要清空吧,Clear对吧。如果你不可啊,你第二次是从是从11读到15的,但是你这个events这个集合里面是不是放了一到十的数据呢?你再往里一写,又把它写了一次,你前面做那个off没用,白做了,所以说你要是放在外面就清空,你要放在里边也行,你就是执行一下就完事了,你线上业务肯定不能放在里边吧,放在里边最后肯定是OM了,对吧,你要思考市场上思考这种问题啊,就是什么东西该放里面,什么东西该放方面,如果放里边跟方便有什么区别,然后你相信呢,要不要做这个清空的一个操作。
21:14
啊,提供了一个操作,OK,这是写入China。写入完China之后,是不是我们还要更新一个呀。对吧,啊,之前提到更新off,那看一下啊更新off,嗯,Circle点。Update set to DB对吧,他要传一个什么进去。扇子进去。传一个赛进去,就是我们之前提到的,他是不是应该把两个东西加起来的东西啊,当时提到就是上一次到十了,这次读了五条,他应该存第二次存的是15嘛,哎,所以说要传一个size进去,那这个大小应该怎么怎么传呢。写谁呢?是不是集合的长度啊,对吧,多少条数据不是集合的长度嘛,因为我们是自增的主键嘛,对吧,就是集合的一个长度,OK,集合的长度那呃,Events点。
22:09
袋子。啊,那不能拿这个even对吧,应该拿这个all Rose,因为我们可点says。对,科里亚就空了对吧,克里亚就空了,OK,这个是将那个写进去啊,写进去好。写进去之后,嗯,写进去应该就结束了吧,结束之后应该看一下它这个地方process返回值是什么。Status当前我们返回是那么不合适,因为它是为了不报错,那我们看一下这个status到底是个什么东西,点进来。一个枚举一个一个back off。一个做ready就好了,准备下一次了,那back off呢,要退是吧,要做重试,要做重试这个东西,那我们当前按照我们这个逻辑走的话,我们把这个奥赛德的更新了,更新之后应该干什么?
23:09
Ready对吧?哎,Ready之前就ready,你一旦调它识别到这个ready之后,它立马又会触发下一次这个操作,所以说在这一块我们应该干一个什么事,slide.sleep,那睡多长时间呢?我们是不是有一个值啊,属性值啊,So source help点。Get wrong delay,这个地方有异常,给他抛一下啊,抓一下。好。抓了之后这个地方如果它正常的话,好来看一下啊。嗯,我们把这个缩减一点啊,让它集中一点,要不然上面就不好说了,当前再回过头来看一下我们逻辑啊,我们先去查询了一下,买搜L表,根据它是否有数据。If在这是不是到这分成两部分啊,有数据就是去封装even写出去对吧。
24:02
那你想想看,这个睡眠放在这儿。合适吗?那如果没有数据怎么办啊?啊,没有数据怎么办啊。没有数据直接就执行啊,那这块你肯定也要返回一个它的那个状态值嘛,不可能返回闹嘛,对吧。所以说这个地方应该返回的是status点,再告让它做一个退,因为它异常了对吧,做个退。那。这块有没有问题呢?你把睡眠写在里边。如果他没有数据怎么办啊,他这个线程还会。睡吗?不可能睡了吧,所以说你是不是应该把这个东西干什么。睡眠应该放在外面吧,就无论他有没有数据,你让那个性能多睡一会儿吧。
25:05
能不能理解这个意思?假如说正常的没有数据的话,是不是if就不进来了呀,不进来刚才我们是不是把睡眠的操作写在if里面了。不进来这个睡眠的这这套代码还会执行吗。不会执行,那你是不是一直去循环的去就疯狂的去调用这个process方法呀,一直跟白进行交互法啊,这样是不对的,对吧,就算他没有数据,我们也让他睡一会儿啊,待会儿再去执行,是这个意思,所以说呢,我们把这个提出来了。提出来了,那在里边呢,如果说它正常的话,嗯。这个地方我们直接返回一个什么。正常的睡眠走了之后,应该返回一个status.ready就说准备好了,就说睡眠完了之后,准备好了,你下一次可以执行了吧,啊,下一次执行了这样的一个操作,这样的一个操作,要注意这个睡眠一定要放在外面,如果说你不放在外面,因为咱们测试的数据量也比较小,你就会发现你那个日志啊,疯狂的打印。
26:06
也没有问题,就是说它那个监控的数数据的一个延时太低了,能懂这意思吧,这个两侧查询之间,就是监控的一个延时,两次就是监控的一个延啊,中间你正常的你去打印一个数据。监控白色个表,你不可能说一直在那读吧,他也受不了啊,质量也受不了,所以说这个延时,你要知道这个延时干什么用的,就是控制两次读取之间的一个延时啊,读取两次之间的一个延时,这是我们核心的一个业务逻辑。啊,就是说sources它到底是怎么工作的,就是说把数据读进来,至至于数据源你自己决定吧,啊当前我们是MYSQL里边去读的数据,MY里边读的数据,然后呢,服装even写给China啊,一步一步的写出去的啊一为写出去了OK。
我来说两句