00:00
再往下呢,就到了大家比较喜欢的环节了,是吧,写代码啊啊,那么这块的代码我要完成什么功能呢?完成什么功能是吧,那来往下走。那么这块代码呀,我们说我要对我们这个数据呢,来进行一个我们这个分流啊,来进行分流什么意思啊,什么意思说这个K呢,它对会把整个数据库的变更啊都给记录一下啊,那也就说咱们现在在这里我监控的呢,是我们这个my circleql这个数据库,那么这个数据库的所有表变化呢,它都会被我们的开头记录,然后呢,发送到我这卡卡里边来,那么发送卡卡哪个主题呢?是不有一个咱们专门创建一个主题,什么下划线DB-C啊,这里把所有表的这个变化都记入这个主题了,对吧,那不行对吧,那我现在想干什么,我现在啊,我想做一个分流对吧?这个分流是怎么分的,它会根据我们当前对吧,这个记录的这个数据的,它属于哪张表的变化。
01:03
然后呢,根据咱们这个表的名字来把咱们当前这个记录呢,分配到不同的主题中去,什么意思对吧?就是我现在呢,这里边我比如运行咱们这个这个模拟生成数据的程序,对吧?那么他呢,可能对我们订单表啊,用户表啊,用户券表啊,收藏表啊都做一个修改,然后呢,咱们现在把这变更都记录到哪了呢?都记录到这个主题里面去了,对吧?但是我操作不方便,那怎么办呢?我要从咱们这个主题中把咱数据给读到,然后读到之后呢,哎,来取出来,当前你要记住啊,属于哪张表呢?然后根据咱表名来把咱们这个数据呢,发送到不同的一个主题中去,咱们管它呢叫做分流啊,也就说我现在啊在咱们这里。首先干什么呢?我要写一个SPA stream程序对吧?写一个SPA stream程序对吧?那么这个SPA stream程序它干什么呢?它啊要读取我们这个啊,卡不卡,读取我们现在的这个G帽0523,然后下划线DB的杠C的这个主题数据啊,读取我现在这个主题,然后接下来,那么咱们现在读取完之后呢,来进行分流啊,然后接下来根据我们这个表名啊,根据我们这个表名,然后将我们这个数据来发到不同的主题上去。
02:29
发到不同主题,那意味着什么呢?那意味着咱们现在在这里我要读了,那么读完之后呢,到时候我还要要把这东西写到卡布卡吧,对吧?写到卡布卡对吧?那么如果写的卡的话,那么这个时候咱们这个东西是不是还得再回来呀,对吧,你不光从卡布卡读对吧?那么这会儿呢,你还要把这东西再写到卡夫卡,所以说呢,那卡夫卡在写的时候,那可能什么呀,可能就是我相当于我这个主题对吧,或者说我现在相当于什么,相当于我这主题不同了呗,对吧,那我这块呢,我稍微标记一下来往下拿啊,在这里对吧,那在这呢,我单独去拿一个容器啊,在这我单独拿一个容器,比如说在咱们这里。
03:13
拿这个吧。啊,拿它吧。这是什么呢?整个这块啊,这就是咱们的卡不卡啊,整个这块,那这就是我的卡布卡啊,我现在用颜色标记一下什么样,比如它对吧,然后接下来那么咱们当前的卡夫卡里面呢,那不同的主题对吧?那第一个咱们这个主题呢,就是我们现在这个基贸U23下线DB对吧,然后杠C啊,这是一个,那么除了这个之外呢,那咱现在再来一个啊,那么再来一个的话,然后上来啊把这个方格了,那它会根据什么呢?它会根据我当前的表名对吧?然后比如说我现在有order in inform这个表,我在前面可以加了一个什么呀,我可以加了一个前缀,比如说这个呢,是我ODS层对吧,我在这里我要对他做一个分层了,这会大家想想拿到的是不是相当于原始数据呀,对吧,然后接下来那么你处理完之后,那你这个原始数据你变了吗?其实没变吗?同学们。
04:09
对吧,咱们只不过干什么,是不是把这个词句放到不同的主题里边来存下呀,对吧,比如说我现在有什么order info,对吧,然后呢,我这里有什么ods,然后呢有什么user,什么info对吧等等,那就我现在呢,可以根据啊,我这个不同的一个表明啊,来把它数据呢放到不同的一个主题中去,对吧,那么这个方向呢,咱们给他改一改对吧,这个。这个往上塔对吧,这个是从我这主题读,然后记下来,那么咱现在呢,这个对吧,他呢要往我这主题写啊,就当年咱们现在呢,要写到我们现在这个不同的主题中去,对吧?那接下来我要做的事是什么呢?那我要做的事就是编写咱们这个Spark training这个程序啊,那么具体的Spark程序在写的时候,对,那么应该注意什么对吧?那这块呢,其实咱们前面应该做过。
05:00
啊,应该做过对吧,所以说这会就开始到什么呢?到咱们这个重复工作的时候了,对吧,到重复工作的时候了,对吧?来那我现在呢,来啊,我现在把这个东西拿过来啊,拿过来。还是在我们那个real time里边啊,还是在real time里边,然后接下来,那我现在呢,我去新创建一个我们那个SKY对吧,这个呢,就是通过我们的can啊来采集数据,然后对我开采集数据呢,做一个分流,把它放到哪呢?放到我们的ods下边,注意啊同学们,相当于我从咱们原始数据读到数据之后,然后仅仅做一个分流处理,这做一个分层,那目前咱分什么层呢?对吧ods对吧,目前咱们这样一个ods层对吧?那么我现在他所玩的功能是从我们这个卡普卡周恩来读取数据,然后呢,根据我们这个表名啊来进行我们这个分流处理,那么如果从卡卡里边来读取数据的话,那么具体这个应该怎么来做呢?一起来跟着我这个思路对吧来走对吧,那首先呢,咱们是不是得先把这个这个基本的准备工作跟创建的创建出来啊,对吧,首先第一个对吧,那么咱们呢,一。
06:14
呃,Spark啊,然后com Spark com,然后这块呢,咱们去设置一下两个东西,一个是我们的p name短到另外一个呢,是我们这个对这个master啊们这两个东西设置一下a name呢,随便写对吧,然后接下来这个master呢,咱们在这里local对吧,这块大家注意啊,其实你们习惯新对吧,但其实如果说我现在有12分的话,对吧,但是我现在其实我这个这个卡瓦分区是不是只有四个分区啊,也浪费了,对也浪费了,所以说呢,咱们现在这里让它一样就行,对吧,然后接下来在咱们这里我呢这一个变量靠,然后接下来是不是开始创建我们这中contact量。叫stream com这些代码呢,就不需要我前两个我不希望大家去复制的啊,就是除非什么,除非你已经非常非常熟了,你可以去复制对吧,要不然你给我手写对吧?然后在这里呢,应该指定两个内容,一个是我们这个com,那另外一个是不是在采集周期啊,对吧,另外一个呢,是采集周期对吧,另外一个采周期对吧?那么咱们比如说五秒钟一个采集周期,然后接下来VR拿到了我们这个SSC,然后接下来我定义两个变量对吧,一个是我们这topic,你现在是不是要从我们的卡卡里读数据呀,那么从哪个主题读数据呢?咱们应该是积贸0523,然后下方线DB对吧,这里是不是有个杠C,这个是我现在主题吧,像这个东西,如果说你要这个不靠谱的话,咱们呢,可以自己去什么呀,把它拷贝过来对吧,就到咱们这个卡卡里边,把这个主题呢,给我拷贝过来对吧,把这这主题给我拷贝过来对吧?哎,那么看你自己对吧,然后接下来因为你们我在写的时候,你们这么忙人帮我一起看着的。
07:55
对吧,然后接下来呢,咱们再往下还有什么呢?再来一个group ID对吧?那么当前呢,我要使用哪个消费者组对吧?其实这块呢,那也可以随意点是吧,Base然DB对吧,比如说我现在接kel,然后grow是吧,那我现在这种消费者组对吧?那你要从这里消费了,那同学们接下来该干啥了呀?
08:19
啊,你要从我这个卡卡主题里边来消费了,现在干什么啊干什么,注意这个代码和原来是一样的,我要从卡卡消费,我是不是得知道从哪消费啊,你怎么知道从哪消费,你是不是拿咱们这卡卡的这个偏移量。对吧,所以说我现在我昨天给这子堂前天在讲我们这个精准营的消费的时候,我说这个程序呢,咱们在这讲,但是你们可以啊,对以后用在你们任何业务里面去,对吧,就只要你去消费卡卡数据,只要你通过SPA程序去多取卡卡数据的话,那么这个时候你是不是都在考虑咱们精准一次消费的问题啊,那么如果精准一次消费的话,那么这个首先第一件事干什么,是不是得这个从我ready中,呃,从咱们这个RA中来获取我们这个偏移量,来来获取我们这偏移量等,那怎么获取呢?我们好像封装了一个工具类吧,叫outside manager u点有一个叫get outside方法,然后传讲参数过去,一个是我们的topic,那么另外一个是我格ID吧,对吧,另外一个格ID,然后接下来点V2,那么他拿到什么呢?他拿的是我们的map对吧?那么这里放的是不是应该是某一个主题的某一个分区,它的偏移量到哪了。
09:33
对吧,感觉大家还这么陌生的。啊,这不是你们,这不是早晨我看你们的代码是吧,对吧,感觉你们都写了,但是我说注意二,今天我在写,以后我在写,这会的时候我就让你们来帮我写了,对吧?注意啊,这块不要光跟着我的进度,对吧,你要把它掌握了才行,那么接下来,那么拿到偏量之后呢,咱们是得做个判断呀,因为获不获取到偏移量,我的读取方式是不是不一样的呀,所以说呢,在咱们这里我需要做一个判断,判断什么呢?如果说我们的oppositeet manager它不等于空,那么并且咱们这个oppositeet m点,我这个size如果大于零的话,那说明什么?有偏移量了呗,就是咱们是不是曾经从我这里面读数据啊,对吧?那么如果说有偏量的话,咱们把偏量保存下来,我是不是应该从指定的位置开始消费啊,对吧,应该从指定的偏量位置啊,从咱们这个指定的偏移量位置开始我们这个消费,那么如果说我现在呢,要没有找到偏移量的话,那么是不是应该给你的配置从哪就从最新的位置开始消费啊。
10:37
对吧,从最新的这个位置开始什么呢?小贝说老师咱们在哪配的呀?到在哪配的,你想一想,咱们在这里来找一下麦卡副卡,我们现在u to,那么在麦卡卡U里边,咱们是不是配了一些属性,其中有一个叫什么?呃,在这对吧,咱们在消费的时候,其实是我从我们的最新位置开始开始录取啊,其实这东西你不用配啊,默认的话对吧,它也是啊,默认的话也是对吧?好了,那我现在呢,那咱们就开始消费一下,怎么消费呢?那我应该去调用我们这个麦卡夫卡YouTube,它里边有一个方法叫盖的卡夫卡stream,然后需要传些参数,第一个参数topic,对吧,这传过去第二个参数S得传第三个参数,咱们现在就是从指定的偏量位置开始消费呀,那应该是off outside啊,Map,你得把偏量告诉我呀,我才能从这开始消费呀,然后第四个参数是我们这个跟ROID对吧,然后接下来DRDR对吧,那么这块呢,拿到的是我们一条记录对吧,Record。
11:37
啊,Stream对吧,瑞stream,那么如果说我现在呢,在这里对吧,我呀没有拿到偏量,没有拿到偏量这个东西咱们在调方法的时候,是不是在偏量就不传了呀,对吧?它就从咱们最新的位置上消费,但是大家想一想,那这两个东西ifl是不是只会执行一个分值啊对吧?所以说呢,咱们在这里把它是不是给提取出来来CTRLC对吧,把它提取出来,那么提了之后呢,这个因为我对做修改,所以说呢,把它改成V2,然后这块呢,给它原来一个空值对吧,那这对吧,我呢直接把这个拿过来到咱们这啊,就不用成心再生命变量了,是不是给它赋值就可以了,这块呢也给它赋值就可以了啊,这块也复值就可以了,好了,那这样的话,咱们现在呢,可以从指定的位置是吧,来都开始进行消费,这是第一个啊,这是第一个,现在有点印象了吗?有那么一点点了是不是啊,然后接下来同学们,那么咱们下一步该干啥了呀。
12:39
对吧,我已经从指定位置开始消费了,对吧?消完费之后咱们是不是得到一个这样的一个这个离散化硫了,这个离散化硫内部封装的什么RD的呢?说封装卡法D啊对吧?卡法RD里边是不是有一个我们当前的什么分区的一些什么偏移量的一个信息啊,对吧,咱们可以获取到,那么怎么获取呢?对,要注意,那接下来咱们呢,要获取当前批次的,当前批次的它读取的我们这个卡卡啊,这个主题中啊,卡卡主题中它的一个偏移量信息啊的一个偏移量这个信息,对吧?那么怎么来获取呢?怎么来获取,咱们是不是得根据RDD来获取啊,而且根据卡法RD来获取,对吧?那么如果要获取这KRD获取的话,那你肯定要先把这个东西给我转成RD等,那怎么转换呢?两个算子可以把我们这个离散化流转换成我们这个RD,一个是我们这个for RD,那么另外一个呢,Transform。
13:39
咱们现在还没到提交的时候,所以说呢,我用我们的transform都要用我transform,那么如果用transform的话,那么这个时候呢,他拿到的是一个RDD,对吧?但是呢,这个东西我们应该把它做个转换,对吧?那么怎么转换呢?通过它里面一个方法叫as instead of,然后把它转换成has outside range对吧,这样的一个类型,接下来去调它里面的outside range方法,那么这个方法的返回值应该是一个数组对吧?应该是一个我们的一个数组对吧?这个呢是我们的outside range,然后接下来咱们现在只是把吧当前的这个主题中,对吧,它的一些我们的分区信息获取到,你看主题分区从什么地方开始的,从到哪结束,是不是只是拿偏移量信息啊,但是我们现在RDD里的数据,我最做改变嘛,是不做改变,所以说RDD是不是还要给它返回过去,对吧,所以说我接下来啊,得到一个这样内容什么呢?这个是我们的off side stream啊,这拿到我的偏量。
14:39
之后那么这块你拿了之后,你是不是要提交的时候要用啊,所以说呢,我把它的一个定义啊,也提取出来啊,我把它的定义也提取出来啊,那么这块呢,等于一个空的数组,那应该是every点我们的empty,然后接下来什么类型的呢?我们的outside re,这个类型对类型,然后接下来给我们当前的这个数组变量来做一个赋值操作,对吧,那么它呢,改成VR。
15:05
啊,他改为。这个呢,是我们第二步操作对吧,第二步操作,然后接下来第三步操作对吧?那么第三步操作呀,同学们那就来到这了,对吧?那我现在拿的数据长什么样了?是不是长成这样,咱们是不是已经做过格式化校验,是个杰森对吧?是个杰森对吧?但是呢,咱们拿到的这些东西应该是个杰S字符串吧,对吧?那我在操作的时候,你想一想,你操作字符串是不是就不如操作咱们的对象更方便一点,你操对象你是不是可以通对象点的一些方式来获取到对象内容对吧?比如说我想获取表名对吧,你可以通其对象直接对它的属性对吧,该概念的什么w string可以把表名获取到,对,但你说字符串的话,这个东西不太好处理,所以说呢,那我希望干什么呢?我要对我们这个接收到的数据啊,接收到的数据来进行一个结构的转换,原来是什么,原来是接森格式的字串啊,原来是接层格式字符串,我现在想转换什么,我想转换成。
16:08
D格式的一个对象啊,介绍对象,但甚至原来这个搜二都谈不上啊,这个其实什么,原来最早的时候其实是这个东西。对吧,其实这个东西对吧,然后只不过这里边放的呢,这是一个key string,然后这个string是谁,这个是不是咱们现在它这个介词格是字串等,这个我这么写能能理解吗?同学们我这么写能理解吗?等就我现在这里拿到数据,它会给咱封装一个consider record对象吧,这是我现在从卡卡里读到的一条记录,他们给咱们封装一个consumer record对象,但是我们这里只关心谁这个key是不是用来做分区的,咱们只关心的是不是这value部分呀,这个value是什么格式呢?这个value是个字符串,而且是不是一个机子格式字符串啊,所以说我要把它给转换一下,那么怎么转换呢?那么你现在这里是不是可以去调用我们的outside stream点对吧,Map这个方法呗,对,然后接下来咱们现在呢,拿到的数据,这里应该是一个record,这是从卡卡里读到的一条记录吧,然后接下来,那么咱们现在是不是只取Y的部分就可以了,所以说呢,A card的点我们先。
17:17
带value,那么他拿到的是什么?他拿的是一个JA森str对吧?咱们现在获取到的是啊Jason格式它的一个字符串,然后接下来那你要干什么呢?我要将我们这个Jason格式这个字符串来转换为Jason对象,那么怎么转换,咱们是不是可以用到我们这个的这个阿里巴巴fast对吧?那么这里它应该有一个方法啊,叫什么plus object吧,然后呢,你传一个字符串过去,它已经返回一个对象,咱们把这个东西给它传进去,它返回的应该是一个Jason object,然后最后咱把JA森object给返回一下,那么这样的话,我当前这个离散化流里边放的数据是不是应该就是一个一条一条的接触对象了呀,对吧,来接来第2VR,对吧,拿到什么呢?拿到的是我们的Jason object stream啊,拿的是我们现在啊这样的一个。
18:17
然后接下来咱们再往下走来看一看到目前为止同学们能不能跟上。那么再往下走的话,那接下来该干什么了呢?是不是开始准备进行分流了呀,对吧,那么再咱们这里啊,那么在分流的时候同学们对吧?那么我这呢,就我看我这个讲义这个是不是已经做了判断了啊,在这里以前有个讲义没更新啊,以前有讲没更新,那么咱们在分流的时候,其实主要取那习东西呢,主要取这么几个东西对吧,一个是我们的表名,一个呢是我们现在这个对吧,这个数据还有一个呢,是我类型对吧?然后接下来那么咱们呢,对它的要做于处理,然后把我现在数据呢发送到我的卡卡里面去,对吧,那么其实主要呢做这样操作,我这会可能和这个讲义不太一样啊,咱不看它了,对吧?呃,什么呢,我现在啊,在这里要进行一个分流处理啊,要进一个分流处理,根据什么,根据我们这不同的表名,然后将我们这个数据发送到不同的卡夫卡主题中去啊,发送到不同卡法主题中去,对吧,大家想一想,那如果这样做的话,大家想想这个时候。
19:25
你是不是相当于这个你已经开始提交提交了呀,对吧,我现在拿一条数据,然后我就往卡卡发,我最终我现在要做的事,我是不是要发到咱们的卡卡不同的一个主题上去啊,对吧,也是我现在已经开始提交提交job了,那么如果提交的话,那这个时候咱们是不是可以用我们的行动算子量,那就是for r DD呗,对吧,那么如果for r DD的话,那么这个时候你拿到的是什么呢?你拿到的是我们这个RDD,对这个算这RDD是算数,对吧,RDD,然后接下来那么咱们现在呢,在这里。我们要要对我的RDD里的一个数据对吧,来进行一个处理对吧,那么如果进行处理的话,那这里咱们也可以怎么办呢?RDD点。
20:06
外PAR for就不行来,咱本身在咱们这里,其实我是不是已经是for了呀,对吧,然后接下来在咱们这里,我可以把咱们r de拿到对吧,RD什么RDD本身不也是一个集合吗?我说对集合理数据来做一个处理呀,对吧?哎,那么咱们现在这里说老师我现在不用这个,那我用什么呢?我用咱们现在这个map什么partition对吧,这样行不行对吧,其实这块呢,没必要了,你直接干什么,因为咱们不需要转换了嘛,老是不是直接对做一个提交处理就行了呀,对吧,所以说呢,在这里吧,这个这块无所谓啊,就这块提法可能不一样,对吧,那么咱们现在呢,在这里我现在对他做一个我们的便利,那每次便利,那么你拿到的是什么呢?你拿到的应该是一个阶层对象了嘛,对吧,你应该拿到是一个接对象,然后接下来咱们对我的接对象呢,来做一个处理啊,你对我的杰对象做个处理,那首先第一件事来看一看咱们这个杰森对吧,长什么样啊,来看一下杰森对吧,这个是不是咱们。
21:07
现在接它的一个大概的一个格式啊,那么我需要从这里获取哪些东西呢?想一想这个东date咱们是不是肯定得要,然后就除了date之外呢,咱们这个这个东西table出的要,然后咱们type出在要啊对吧?所以说这三个啊,我是想获取到的,那首先第一个我们现在呢,根据这对象对吧?首先第一个获取什么呢?获取我们这个表名啊,获取表名,然后呢,会表名呢,怎么获取呢?那应该是JA森OBG.get直接get string就行吧,然后呢,咱们当前这个属性叫table ctrl c给拿过来,这是第一个啊VAR,然后呢,这个是我们table name表名,这是第一个,那么除了表明之外,是不是还有一个获取我们的操作类型啊,获取我们的操作类型怎么获取呢?Jason object。杰森点,然后接下来在这呢,我去获取啊,咱们也是string类型的,那么这里呢,它的一个属性名叫type啊叫typeb,那大家注意啊,咱们现在呢点VR,那么如果你叫typeb的话,这里会有问题。
22:16
是不是拿现在关键字了呀,对吧,来来呗,啊飘一下是吧,那一般你们要要要什么的话,你们在那只适当的飘对吧,然后呢,或者说我一般我肯定不这么改对吧,我就换名对吧,我就换一个名字对吧,那咱现在呢,这个是操作类型,然后接下来再来还有一个什么呢,来获取我们现在这个数据对吧?来获取我们现在数据,注意看同学们这个东西,他俩是不是可以通过get string直接获取啊这个数据呢。看什么类型的是什么。JS数组对不对,那如果是JS数组的话,那这块呢,咱们还得要注意一下呢,获取呢,我们现在短操作的一个数据对吧?那么怎么获取呢?咱们可以通过JA森OBG.get对吧?咱们现在可以获取搜串对吧?然后可以获取直对象,然后呢,现在有直些数组J数怎么获取,注意看Jason是不是有一个叫Jason a瑞啊对吧?那么他可以拿到什么,它可以拿到一个咱们这个这个J子数组,那点VR一下,这个呢,就是我们这个date AR,这就我们这个电子数组对吧?那我可以啊从这里拿的这些东西对吧?但是咱们拿到这个东西之后呢,注意同学们,咱们现在呢得判断一下对吧?就目前我们在这里啊,要是想完成我们这个首单的话,同学们对吧?那其实我比较关心的是什么,是不是关心的是订单它的一个新增操作呀。
23:47
但是我在设置数据的时候,那你想一想他是怎么做的,他肯定先把某些数据给他干掉,比如说你看我现在这里边有订单,同学们,比如说你看我在这里有这么多订单,他干什么的,他把订单干掉,然后重新的呢,再做一个生成,然后呢,他模拟我们现在的一些支付过程的话,他肯定怎么做的,他要改变咱们订单的状态,因为你想一想订单的状态是不是有什么未支付啊,什么支付啊,或怎么样的,他把这状态给我改一改,所以说其实他在去我们这模拟数据的时候,就我在执行啊,我这个模拟数据的架包的时候,那么其实呢,它底层不光是有我们这个新增操作,还有一些什么删除啊,对吧,或者是咱们updated的操作,但是我关心的是什么,我目前是不是只关心新增啊,所以说呢,那我这里其实我可以加一个判断啊,同学们,那我可以加一个判断,加一个什么判断呢?在这里我先把咱们这个操作类型啊给拿过来,拿过来之后呢,那我判断一下,如果说当前我们现在这个insert啊。
24:47
因此。对吧,那么点我们这DQ啊,如果说啊,我们当前这这这这个吧,SCR是不是是吧,T是吧啊third是吧,那么咱们现在呢,如果说啊,在这里有一个in third是吧,那注意大小写啊,大写咱们应该是我们这个大写啊,那么等于谁呢?等于我们当前操作类型啊,如果说我现在是iner的操作的话,那么这个时候呢,我再把这个数据拿过来就型。
25:20
啊,我再把水拿过来就行啊,其实这块呢,你不做也行啊,但是我们现在呢,其实你可以在先过滤一下啊,其实这块其实主要不是过滤,其实还是分流对吧,还是分流对吧?啊那这块说老师那这里是不是过滤了,对数据过做过滤了,其实这块呢,就是相当于做判断,你不判断其也行,等于我的业务同学们对吧,我是不是只需要拿咱们新增的就可以了呀,对吧?所以说呢,那咱们在这里判断一下是不是我们的新增,后边我这会还还还要改啊同学们,接果者程序后边到时候我还会改对吧?如果说是新增操作的话,那这个时候我再做这些操作就来得及,同学们。就咱们不一定说上来每条数据我都把这个什么表啊,什么咱们这数据啊,什么类型啊都获取一下,其实我先获取下类型,你看是新增对吧,那么如果是新增的话,我才继续,对吧,往下把这个表名,然后呢,把咱们这个什么数据给拿到,那么拿到数据之后你要干什么,咱们拿到出据之后,你是不是得啊,然后呢,根据我们这个表明啊,根据我们这个表名,是不是将我们这个数据发送到不同的主题中去啊,对吧?发到不同的主题中去对吧?那么如果发到不同主题的话,那么这个时候咱们应该怎么来做呢?那你是不是得想办法去创建一个卡夫卡,它的一个生产者呀。
26:43
因为你要往卡卡里发消息,同学们,这个时候你的角色数是变了,上边你是从卡卡来读数据,你要保证精准一次消费,现在你干什么,你现在是不是充当卡卡的生产者,你是我们卡卡里边来发消息啊,那卡不卡的生产者,这会应该什么?卡普卡producer是吧?然后接下来这个呢,是key value,对,然后接下来这里边是不是有一个叫什么send的方法是吧?然后呢,在这里对吧,然后咱们要封装一个我们这个什么叫producer吧,对吧,Producer record对吧,要封装一个这样的内容对吧?然后这块呢,是你这个什么K和V对吧?那么在这里传的时候呢,那需要传什么,是不是topic呀,注意这块你这么写topic行不行?
27:30
是这个topic吗?你要这么写就出大问题了,你你在这里对这个这个弄了一圈回去之后,你又把这数据又写回去了,对不对,所以说咱们现在这里不应该这么干吧,对吧,怎么办呢?在这里对吧,来我拼接对吧,发送的或者目标topic名称啊吧,拼接目标topic它的一个名称,那么这个名称等于什么呢?来那就是咱们在这里VR,然后呢,比如说散的topic的散topic,那么它就等于我们的前面啊加一层,比如说ods啊ods这个呢,是我们新的一层ods层,对,然后接下来在咱们这里对吧,把我们的people name给加上去。
28:13
是不是根据我的表名来发送不同的主题啊,对吧,那这块咱们其实到哪呢?就是把这个数据读到,然后呢,哎,根据表名加数据发送不主题去在咱们个表面前面加了一个ods对吧?哎,那这块呢,是关于我们现在啊这样的操作对吧?关于这个操作,然后接下来那么在发送的时候呢,大家想一想,那这里topic有了,那第二个参数的第二个参数,咱们现在这里,你在发送的时候,你是我指定咱们别的内容啊,那这就是我给大家讲的,说我现在啊在发送消息的时候,你可以的直接指定我们分区。你可以直接指行分区,你可以指定咱们这K,那么它会给你K的,它的哈希值是不是决定发到哪个分区去呀,你还可以什么呀,是不是直接把这值给我呀,这个值是谁的,这个值是不是就咱们这个date呀,同学们对吧?但是我发现这个date呀,它里边对吧,这是个数组,那如果数组的话,这个东西你这么直接给我发送是不是还不太行啊,怎么办呢?咱们是不是得对我们这个数组来进行遍历啊,对吧?那在这里我们现在啊在这里要干什么呢?要对我们这个对的数组来进行我们这个变历啊对对的数组来进行变历,对吧?那么如果对对数形便利的话,其实我如果是一次操作的话,同学们对吧,其实这个东西呢,它就一条,你直接写零也行,对吧,你直接写零也行,但是有可能什么呀,有可能人家到时候真正执行的是我批量insert,对,比如说我现在我可以在一个语句里面批量insert吧,同学们insert,然后into,然后接下来咱们这里。
29:49
是不是有一个什么values,你这里是不是可以写多个,都写多个,是不是相当于批量插入多条了,对吧,那这块呢,那你的数字肯定是多个条数据了,对吧?那咱们做便例的话,那应该是data every,点这个for,这样放还放不了怎么办呢。
30:10
这个东西这放放不了怎么办?转换一下嘛,对吧,你想想这个本身是不是接子数组啊,对吧,那转换成SC的,你不就操作起来就熟练熟练了嘛,对吧?那如果你要把它转成SKY是怎么转换呢?Import对吧?然后接下来咱们这个skyla下边有谁呢?是不是有我们这个collection啊,In collection,然后接来这个下边呢,去找我们这个JA comvers,把这个内容呢给我,对吧,转换一下,然后接下来点as skyla,把当前数字转换成skyla,它一个数组,然后接下来再做for,那么每次遍历这个时候得到的是什么?得到的是一条数据吧,对,原来咱们呢,其实是一个J森啊,那现在呢,其实得到这一条对的J森,对吧,然后接下来,那么咱们是不是可以把咱们这个操作往上拿呀,对往上拿,而且我现在呢,发送我们这个消息,我这块呢,我想单独去啊写一个方法对吧,那么怎么写呢,这块呢,我一会单。
31:11
都封装一工具类对吧?你现在呢,要去往这发消息了对吧?那发消息的话呢,那我希望啊,咱不有一卡夫卡U吗?这卡卡u to,它主要是不是从我们的卡卡读数据啊,那么再写一个,比如像卡卡think是吧?那这里呢,主要是发数据,那么如果发数据的话,两个内容,一个是往哪发主题,你得告诉我,另外一个发谁,咱们是往did里发过去啊,但是注意这个东西呢,它变成Jason,咱们把它转化成Z串发过去,对吧?那目前还没有这个内容,咱们到时候给创建一下就可以了,那么最后对吧,别忘了同学们在这里啊,在这里我处理完之后了,那么咱们现在在这该干什么了?对吧?是不是开始去提交或者保存我们的偏移量呢?对吧,提交我们这个偏移量对吧,那么这东总提交呢,那应该是我们的outside me.C这个时候传的时候数据它topic量对吧,然后接下来这个呢,是我们这个个肉ID。
32:11
急是吧,咱们现在呢,这个outside range啊,把这个东西对吧,给他看一看,那咱们再往下写,是不是就开始写这块东西了,稍微休息一会啊,稍微休息一下。
我来说两句
666
6666
666
感谢分享
666
可以
棒啊
感谢分享
666
厉害了,学习学习。