00:00
好,那刚才我们提到了这个东西呢,里边做的事情很多对吧,这三件事,这三件事还挺多的,所以呢,咱们这个东西呢,不在这写。啊,不在这个位置写对吧?呃,那在哪写呢?我们在方程里边来一个这么个类啊。专门到方程里面写,那我就叫table。Process。双生。好,那这边呢,要继承broadcast。导一下这个依赖对吧,这个我们先放在旁边啊呃,刚才我们知道它有这个类型,IN1IN out对吧?啊,输入一输入二输。输出啊好,输入一呢是主流,主流咱们是杰森object的对吧,主流接object。呃,第二个呢,是我们的。
01:00
广播流,广播流呢,目前来说还是spring,只不过说我们状态定义的它,那么所以刚才我们看到第一件事要转化嘛,对吧,好,那这个地方呢,用它。好,那大家告诉我,你输出这个地方,我们应该用什么类型。大家觉得这个地方的输出我们该用什么类型呢?你觉得用什么你就说嘛,对吧,大家都跟着去敲一敲,这样也不容易犯困好吧。敲一下,敲一下用什么类型啊。
02:04
哎,齐总说了,用那个杰森object。诶大家呢,都是认为用这个。杰森object对吧?啊,其实没毛病,因为为什么用杰森object呢?因为是这样子的。咱们后续呢,还要把数据写出去。写到pen尼克斯对吧,那也就是说对这个数据呢,还要再加工,你不像写到这个。卡夫卡你写到卡夫卡,那你用string类型无所谓了,对吧,你写到Phoenix,诶它有字段区分,那我们用JS明显更好一点吧,对吧?所以呢,输出数据咱们还是保持这个杰森OB就好了,好,Al加回车实现两个方法啊,那我先写这个,呃,广播流我呢把它拿上来可以吧,啊,我先写这个广播流的数据啊。好,那广播流里边呢,三件事儿对吧,来把这个拿过来。
03:04
一件一件来写第一件对吧,叫获取并解析数据啊,就是获取并。将数据转化为。招聘对象。第二件事情就是校验。表是否。存在啊,就直接写表吧。这样简单一点,为什么可以直接写见表呢?因为我们可以在见表语句里边干什么事。写上create table if not exist,是不是相当于做了一个校验对吧?当它不存在的时候,我们就创建就好了,其实就直接见表啊,呃,那第三件事情。写入状态。广播出去,当然广播出去这个呢,并不需要我们去操作,而是他自己,你只要负责往状态里边去写就好了。
04:05
往状态里边写就好了。懂吧,啊这样的一个情况,好,这是这么三件事情啊,呃,那这个我就可以不要了,对吧,因为我们把注释都写好了在这啊。嗯,那第一个获取并将数据转化为招聘。啊,那我们肯定要对这个Y进行转换。关键的问题在于这个value这个东西啊,你到底是说把这个整体转成Java病,还是说里边某一个字段呢。对吧,所以这就要求我们怎么样得了解这个value它是什么格式,对吧?好,我问大家这个value哪来的。明白,就是你在处理这种数据的时候,你一定要。清楚。这个数据的格式,它长什么样子,懂吗?如果你不知道他长什么样子,你这块你没法写了,你怎么写啊?
05:02
写不了了,你发现没对吧,所以呢,你想一想,诶,这个数据哪来的。广播流数据哪来的?所以我们要找这个格式,我们应该上哪儿去找?诶,来自于这个来自于配置信息啊,配置信息没什么用的,配置信息。啊,应该对来自于flink CDC吧,来自于flink CDC啊,因为格式嘛,你配置信息也好,MYS也好,呃,当然不能说你错,但是呢,拿不到格式啊。你有格式吗?对吧?答,弗林CDC是不是更好一点,诶,因为弗林CDC呢,它是不是封装了这个数据的格式啊。对吧,啊,应该是flink CDC的一个数据格式对吧?好,那么接下来呢,我们去找一个flink CDC它的一个格式,对吧,那很简单啊,我也就不找了,我干什么事呢?我打开之前咱们这个。
06:15
代码。我呢去运行一个,我就找一个不就行了吗?对吧,好我看一下啊,这个是base。行,就用它。嗯。诶,这边。怎么又给我们报错了?连接拒绝了,哦,我应该是打开了这个对。他连那个8020拒绝了,因为这个是我们打包的集群运行做的测试,对吧,啊不要看啊。
07:06
因为刚看到那个什么8020连接失败对吧,就知道了,我们开启了这个checkpoint对吧?啊。当然打包到集群上,我们去做了那个C测试的啊好,那这边呢,就已经。有了对吧?啊,随便拿一个数据,这边有个点需要跟大家说明一下啊,需要跟大家说明一下,诶我这个库不是没开屏吗。大家记得吗?我已经改成了220212,而且这个库我直接改的对吧,也就这个库没开blog,那怎么还能读到这个数据呢,注意啊。呃,因为现在呢,我们用的是initial initial呢有两个阶段,第一个阶段呢是初始化。对吧,读全量叫全量阶段,第二个阶段呢是增量,其实我们这个数据库有没有开blog,它不会影响到它全量阶段,因为全量是走查询。你没开lo我还不能查了吗?对吧,所以他数据呢,也只能读到这儿,你再去新增修改没用了啊,这个跟大家说一下对吧,好,那这个数据格式呢,我再给大家拿过来啊。
08:12
这是。数据格式啊,你未来做工作的时候也一样,你得把这个数据放在这对吧?好呃,那我们想一下,我们要去做的一个,转化为招聘的一个。数据应该在哪儿啊?对吧,当然不应该是他啊,其实应该是那个table process那张表,对不对,应该是table process那张表,但是呢,格式无所谓嘛,对吧,那我们的数据是不是在after里边啊。对吧。咱们那一行数据,咱们要的不就是把那一行数据转化为招聘对象嘛,那其实就是谁呀,其实就是这个after对吧,所以我们要做的呢,就是把这个after给他。转化为招聘对象。有没有问题?这个啊,那就说你这个数据不对啊,我说了你正常来说呢,你应该用这个table process这张表。
09:06
对吧,啊,有五个字段啊,但是呢,我就我为了拿格式嘛,对吧,我只是为了拿格式,我又不做其他的事情,所以呢,我拿过来before after,那我知道了啊,你数据呢,新增一条数据封装在after里面,因为我们刚才说了新增数据before是呢,After一个数据更新数据两个都有。对吧,啊,那。Before跟after如果是新增,那它没有,它有,如果是更新,那怎么样两个都有,如果是删除呢,那就是它有,它没有,有前面的没有后面的,对吧?那我们肯定要新增数据,那我们的数据呢,就在after里边,这个你要知道对吧?好,那就是杰森。点pass object先呢,把我们的value转化为Jason对象,因为这样好处理,接下来再进行一个转换,把它转化为招聘,对吧,那就杰森。
10:01
点pass object,然后呢,从这个object里边去get stream叫呢,After。好,Get out之后呢,我们要转化成table process。点class转化成这个啊,好加倍得到一个。Table对象,那这一步呢,我们就搞定了。对吧,把这个阿的数据呢。转化成了table process对象,这就是我们要的一个效果,对吧?好,这是第一步,我们做完了啊。
我来说两句