00:00
其实我们现在这个新的逻辑,那无非就这块儿,这一点点。剩下咱们上面这块怎么样去获取什么偏移量,对吧?然后呢,怎么样去给你偏移量来获取我的数据,怎么来记录一下咱偏移量,然后最后怎么去提交,其实和咱们前天讲的内容是不是其实一样的呀,对吧,就是以后呢,大家在写的时候,对吧,就是你们可能工作的时候也这样啊,就整个咱们对我们的经理的消费处理这块代码呢,其实都是一样的,对吧,其实都是一样的对吧,然后咱们不一样就是在哪呢?就是我现在这边的业务,对吧,这个业务咱们现在是怎么做的呢。首先呢,获取我们的类型对吧?因为你现在可以把我们这个Json这个数据拿到,拿到之后呢,那么在我当前呢,Can采集到的数据中啊,它会记录一些我们这个内容,我们主要获取什么呢?获取三个东西,一个呢是当前这个数据,那么另外一个是哪张表的数据发生变化了,然后还有一个呢,是我们这个类型对吧?其实这个类型呢,那因为我们现在是模拟生成的数据,其实正常情况下你的订单肯定不会有delete。
01:12
你想一想,正常情况下,咱们现在假如说你在在那个网站上有人下订单了,基本上这个删除操作应该是没有的吧,对吧,是不是全是我们的音色的呀,所以说呢,咱们结合着我们的业务啊,我在这呢,也做了一个过滤是吧?然后呢,我把咱们这个呃判断一下,判断是不是音色的操作,如果是音色操作说明啊,他帮咱们生成了一个新的订单,对吧?以及新的一些其他的内容,我把这些数据的给拿到对吧?给拿到那么拿到之后呢,那么咱们对吧,来把这个表给取到,把这个date的到,那么这个date啊,大家注意了,咱们现在呢,那有可能我当前一个新增的它有多条数据,就是执行一个批量的iner操作,对吧?那么如果这样的话呢,那么它给咱返回的是个什么呢?是个数组,所以说呢,你在获取date的时候呀,你要通过get get a来获取啊,然后接下来那么既然是数组,那我肯定对数组呢来进行遍历,对吧,每次变历得到的是一个这个date杰。
02:12
对吧,那么咱们现在呢,要想把这个保存的话,我保存的是我们这字串,所以说把它给转换一下,转成一个这些字符串,然后做一个保存,对吧?那接下来咱们现在呢,那往卡卡里边发送数据了,对吧?那么如果要发送数据的话,这块呢,咱们我再想封装一工具类对吧,其实你也可以什么呀,你也可以直接放咱买卡不卡U求里面去,对吧?我现在是想把这个消费啊和生产给分开啊,分开,那么如果分开的话,那这个时候呢,我相当于写一个生产者,那我在这呢,我再来一个工具类啊,在这里再来一个工具类,那么这个工具类呢,叫MY卡卡think对吧,麦卡think,那么当前这个工具类它主要完成的功能对吧,是向我们这个卡夫卡主题中来发送我们的数据啊,来发送数据,那么如果你要发送数据,大概的一个操作,那其实就是刚才咱们这一套,你有一个我们这个卡发卡。
03:12
Product,然后接下来,那么你要把你发送数据的封装成一个对这个producer record对象,然后给他发过去对吧?那么无非呢,到咱们这个发送的时候,你呢去封装一些参数,所以说这块呢,这个代码我直接呢给大家拿过来对吧?来咱们现在来看一看我当前这个工具类里面写法,然后CTRLC把这个东西拿过来,然后CTRLV是吧?那目前在我这里首先第一件事对吧,那我现在呢,先去读取我们的配置文件对吧?这个配置文件干什么呢?拿到卡卡的地址,然后接下来对吧?那么这里呢,我要去创建一个卡不卡它的一个producer这个对象,对吧?那么在串建对象的时候呢,那这块呢,那我需要去new一个color producer,那需要把我们这个参数呢给传过来啊,那么这个参数就是咱们现在呢,这里封装的一些内容,对吧,而且我们在生产数据的时候,为了保证我们生产数据,它卡夫卡接收的时候的一个这个。
04:12
精准一次性,咱们是不是要把我们这个属性给打开呀,是不是开启密等性操作呀,对吧,把这个呢给打开,像这块呢一样啊,就其实我们这会其实也可以通过常量来设置啊,可以通过常量来设置,其实我这种写法这是不好的,对吧,这是不好的啊,我是给大家啊说一下,就是比如说我们现在在这里,那应该有一个什么呢?应该有一个。Producer producer,然后呢,咱们conflict,然后接下来你这里去找什么booto server conflict呀,这些如果要是消费者的话,这个东西叫什么,是不是叫consumer con费对吧?诶这会儿呢,其实你可以通过咱们现在产量的方式啊,靠产量的方式,然后接下来,那么咱们现在呢,在这这个方法它是干什么的,帮咱们创建我生产者的,那么有了生产者之后呢,那我可以去调它的一个散的方法,对吧?那这个散的方法呢,那么这里其实呢,我提供了一个方法的重载,对吧?那我们在发送消息的时候呀,你可以指定我们的K,如果指定K的话,它会以K的哈希对吧?来作为我们分区的依据啊,来把这个东西拿过来,然后取的哈希值来决定把咱们这条数据呢,开始发送到哪一个我们这个啊这个分区中去,对吧,那么另外一个呢,直接啊,你主题还有咱们这个message就可以了,对吧?哎,那我现在呢,我在调的时候,我直接调的这个方法,那么具体怎么做呢?对。
05:37
啊,当我现在呢,去调我们这个散的时候,当我去调查散的时候,对吧,那么首先呢,我现在去找我现在这个方法,那么这个方法呢,它首先看一看有没有我们的卡夫卡producer,那么如果要是没有的话,咱帮你把这个东西呢给创建出来,那么接下来我调用我们的producer,它里边send方法,然后封装一个producer record啊这样的一个对象,那么这个对象里面需要传两个参数,一个是你要发送到哪,那么另外一个是我们现在呢,它的一个发送的内容是什么。
06:11
对吧,其实我们现在这里大家注意啊,那么对于我们的卡卡的操作啊,一个呢是producer,那么另外一个呢,Consumer,那producer在发送的时候,它是把咱们这个数据封装成一个叫producer record对象消费的时候呢,他从卡卡里读数据,是不是也要把这个这个数据封装这个对象叫consumer record吧,对吧,这个呢,也要弄明白,把你们之间关系,对吧,然后接下来那么咱们现在呢,把这个数据啊发送过去了,那再往下咱们呢,要做测试了。这个代码其实本身同学们这个不难,对吧,但是我现在呢,让大家帮我把这个东西测一下对吧,就是我看一看,我们现在已经完成了我们这分流了,对吧?那到底能不能实现咱们分流操作呢?对,到底能能实现它分流操作呢?对吧?那我想测试一下咱们写这个程序啊,好使不好使啊,我现在想测试一下咱们写这个程序好使不好使,那么如果你要想实现对吧,咱们这个把这个程序测试一下的话,那么这里呢,那来看一下咱们这个测试,那大家想想你都需要起些什么东西呢。
07:15
卡法肯定能起ready ready是肯定能取,因为咱们现在精准的消费嘛,对吧,来存放便移量还有别的吗?N地都需要起吗?这个时候没ND啥事了,是不是对吧?那咱们现在呢,在这里,那你是不是得把咱们这天都起起来啊,对吧?所以说呢,那咱们现在在这里,我需要把ready提起来,Can提起来,然后把卡不卡也组keepboard提起来,对吧?然后记起来,那么咱们去运行我们现在啊这个程序啊来运行我这个程序对吧,来呗,那我在这呢,把这个程序啊先去对吧,拿到是吧,拿了之后呢,那么咱们现在呢,把它给运行起来啊,把这个程序呢,咱们现在给它提起来。
08:03
嗯,啊SC是吧,这边再提交是吧,为执行,那咱现在呢,得把这个去执行一下啊。把这些停掉。C点我们这个start开始采集对吧,还不能让它停对吧,来这个时候咱们运行。那么正常情况下,大家想一想,我现在如果看输出的话,你想他现在应该输出什么?啊,就是按照我这个写的代码,你觉得现在输出的是什么。应该是保存分区吧。因为你想一想,我现在从里面能拿到咱们现在这个P量数据吗?拿不到是不是,那如果拿不到的话,咱们从哪读,是不是从我现在的最新位置开始读啊,那读完之后,咱们现在最终在这里是不是要把它保存一下呀,所以说咱们现在呢,在执行的时候啊,看的错了是不?这不是我想要的啊,应该看到咱们这行的一个分区对吧?然后接下来呢,这里呢看一看,所以我现在啊,在咱们这里没有连接到那时候们ready没起呗。
09:12
对吧,我说的这个软件提起来来呢,来到这里啊,我把该起的起起来,对吧,那来到哪呢,我开一个啊。来到我们的OT model,咱们这个red这个下边,然后red-server,然后接下来red com把这ready呢给提起来啊,Ready提来,那么ready提完之后呢,那你可以通过它ready对吧,去看一下咱们当前我这里边有的内容,对吧?那这里目前是不是只有一个呀,因为我们那个存放我们这个状态呢,是否我们那个登录过的这个是不是已经过期了呀,对吧,已经过期了,现在只有一个对吧,关于我日活的它的一个偏移量对吧?那接下来这个ready起完之后呢,那么咱们现在再来去运行我的程序啊,再运行程序。
10:02
它启动呢,它运行的,那么同时呢,我看一看我这里啊,看一看我当年启动的内容卡法提起来了,Canon是不是没启对吧?那么can起的话,那应该是点点对吧,然后点点,然后B下面,然后呢,有一个我们这个start是吧,一个start我就写到这单机的就可以啊,写到单机就可以。那咱们现在kon呢,提起来了对吧,开起来了,然后看一看咱当前运营情况,大家看是不是保存分区啊对,就是我现在从我们这个分区读数去,然后呢,接下来咱们的对吧,这个保存到我们这个啊red里面去对吧,然后接下来这个时候呢,你可以来到我们现在呢,它的一个red里边来,大家可以看到现在咱们这里是不是多了一个对吧,这个呢,就是我们当前对吧这个主题,然后呢,这个我们这个消费者组对吧,它所对应的一个偏移量的一个对吧,这个存储情况对吧,然后接下来,那咱们现在呢,要干什么呢?要去往我们这里对吧,这个生成数据,那么整个流程什么样的注意啊同学们,那么流程应该这样的,咱们现在在这里,当我去执行咱们这个炸包的时候,是不是要把这个数这这这个这个这个数据,然后给它生成呀,那么生成之后它往哪发,是不是往咱们的卡夫卡发对吧?那么接下来来到我们的卡卡里边,注意啊,在这里我再去克隆一个session出来啊,再去克隆一个session。
11:27
那么来到我们现在o BT model,我们这卡不卡对吧?那么来到卡卡之后呢,我来看一看当前我们这卡不卡呀,它里边的内容短就都有哪一些主题对吧?那杠杠勾出啊,然后进来杠我们这个server,然后看do,然后呢,202杠个号9092多少杠杠list对吧?那么这个呢,我是查看一下当前都有哪些主题,那么我现在当你运行程序的时候,同学们这里它首先要把这数据,这个这个数据是不是放到哪,是不是得放到咱们现在这个我们这个主题中去啊,但是呢,我在查看的时候注意看同学们,我现在呢,在查看的时候,我要看一看我这个主题里面有没有有没有数据对吧?那我现在呢,我不看它了啊B卡cons consumer,然后接下来,那么这里呢,我在指定我们的topic的时候啊,我在整topic的时候,我指定谁呢?我指定的是OS_order in inform。
12:27
你能理解什么意思吗?然后这没有这个主题啊,啊,但是咱们不是做分流的嘛,对不对,因为你想你现在啊,你把它把这个数据发送到咱们当前这主题里边,这里是不是只是一个其中这个中间的一个过渡阶段呀,对吧?那么咱们现在拿这主题之后,那我现在程序运行的目的干什么,你当前这个程序你在运行的时候,他不就是要从咱们这个主题里边把数据给拿到吗?拿到之后咱们干什么,是不是马上就写不到,卡不卡的呀,对吧?那么如果说不出意般情况的话,我应该能看到当前我们的order e它的一个我们的新增数据的记录,对吧?那么同时在我当前这个例子下边啊,那我在列出来的时候,应该多了好多ods开始的,对吧,是不是应该多了好多odx的主题啊,来那来试一试,同学们在这呢,我去监控的我们当前这个对吧,这个主题现在没有给给警告,然后接下来那我现在呢,去执行一下模拟生成数据的这个程序来运行看效果。
13:32
那么这个数据呢,模拟生成了对吧?那么这个数据生成之后呢,那么接下来咱们来看看这里同学们对吧,这个东西咱们数据是不是过来了,那么数据过来,那咱们现在呢,拿过来,这里边还有没有什么,咱们那个这里边还有没有什么咱们table啊,什么date呀,什么type这些东西了,这里边是放的只剩什么了,是不是只剩date的东西了,对吧,就相当于咱们现在这里啊,原来这些东西没了,只剩下什么,是不是只剩下咱们当前date里的东西,然后它以一个字串的形式,是不是给你拿回来了呀,对吧?哎,如果说能跟这个思路话,那咱们这会呢就过了啊,这回就过了,注意啊,同学们,那么现在我们这个,呃,通过我们的Kindle版本啊,它的一个ODS处理,对吧,那这就完事了啊,这块呢,其实是对吧,这K的版本,它的一个ODS处理对吧,这个ODS什么呢?其实就主要是从我们这个主题里边夸主题里读数据,读完数据之后呢,做一个分流啊做一个分流。
14:33
好了,我把这个稍微停一停啊。
我来说两句