00:00
接下来我们来扣你啊,第一步呢,我就。嗯,不写了,我直接把这个拿过来啊。因为获取执行环境对吧,啊把这个拿过来。好,倒一下这个包啊。这一块是干什么事儿呢?呃,首先病因度我四个测的啊,但是呢,我其实主题都是一个病因度,所以呢,我还是设置为一啊,我还是设置为一对吧,这个大家都清楚诶,获取,然后呢并成为一,这一块呢,是你生产环境当中一定要去添加的,而且呢,我们其实在弗林CPC那个代码当中也去打包运行,而且重启了,对吧?在。重启了,那我们平时做测试呢,并不需要,因为它还需要多开一个什么HDFS比较麻烦,所以呢,我们只是说我每一个类启动类里边,我都会把这个给你添加上,但是我不用。
01:11
不用就告诉你,生产环境当中你一定要写,不要漏了。好吧,生产环境当中,因为这些东西呢,我们课堂上之前都已经给他测过了,所以没必要一每一次都去把这个打开,你你测什么东西,你还要开一个HDFS,没必要对吧?啊,但是呢,代码我给你放这儿了,注射掉不用OK吧,懂这个意思啊,其实我们所以我这个重新再粘一下,我要写的话,写这个也很快,对吧,但这个没必要了,或者执行环境到现在还写,有什么写的必要吗?对吧?好,那再来第二个读取这个。卡巴卡这个主题的一个数据,对吧,卡卡主题数据好,那么接下来呢,我们。正常而言呢,因为点a source对吧?啊,然后这边呢,你有一个叫link卡夫卡的consumer。对吧,但是呢,很明显我们后面有很多的。
02:05
这个需求都要读卡法数据,因为咱们整个ods还有这个DWD都在卡夫卡,那你DWS来消费数据,你肯定要从DWD消费啊消费过来,那那都是卡夫卡,所以呢,我们写一个工具类吧,写个工具类啊好,那在这边诶迎来了我们第一个工具类叫MY卡夫卡YouTube啊。啊,那第一个方法呢,Public那肯定是静态方法了,工具类嘛,对吧,返回类型呢,刚才我们看到叫Li,卡夫卡叫consumer,诶那我们消费数据呢,我们都用啊,比如说我们就直接叫get。你叫要get KA consumer getli KA这个都可以啊,参数要什么等会再说啊。好,这边呢,我们就先直接return new一个。Li开发了一个consumer,看他要什么参数对吧,看他要什么参数啊好,那CTRLP。
03:05
第一个位置要一个什么topic?他是不是要一个这个。Topic呀,对吧,好那么简单,那topic这个东西呢,毋庸置疑肯定从外面传进来。因为不同的需求,消费的主题肯定是不一样的,对吧,所以呢,我们要。Topic。对吧,好,那。这个有了。诶往这一放好吧,往这放,呃,第二个位置用的一个叫什么。第一,Civilization的一个stemal,大家之前用的叫什么叫simple STEM对吧?好,那我们先用它啊,其实它里面有问题等会我们再说啊,我们再改啊,先写在这,我们把这个先创建完啊,呃,该改的地方再改啊,第三个要一个proper配置信息,对吧,比如说消费者组这样的东西,因为你是个消费者嘛,你要给消费者组啊,对吧?嗯,我们给的最多的其实就是消费者组是不是好,那这边呢,你有一个party。
04:15
然后呢,在这里边添加参数啊呃叫consumer config.group ID,哎,那group ID呢,那消费者组最好也是从外面传进来吧,对吧。叫group ID。好,那么。这个地方。就有了对吧,你要想有其他的参数,你可以在这里边去找嘛,对吧,这这太多了,这参数这么多。对吧,有很多你要从头消费啊,从尾消费啊,就是ear还是latest对吧,你都可以自己去设置嘛,我们就不设置那么多了,就告诉大家可以这样去玩对吧?好但是呢,我刚才提到了这里边呢,它有一个问题。他有问题,他有什么问题呢?点开啊。点开之后呢,那这里边无非就是序列化反序列化是不是对吧?好,那我们看一下这个你有一个string。
05:06
啊,创建STEM的时候啊,嗯,诶阿,不如他啊。序列化反序列化A,嗯,在这儿。大家看一下吧。继续调用对吧,还没结束啊,调用好,呃,那么这个里边呢,它有一个点啊,这。这是最核心的那个参数,对吧。就是它啊,它要求这个东西呢,怎么样不等于呢。要求不等于呢,也就是说,如果说我们使用他这个消费者用这种解析方式啊,这是反修的话器,对吧,反消发器啊好,那如果用它消费了一个为nu的数据。那么它直接控制帧就挂掉了。这个很不爽吧,对吧,那种生长环境当中,那有可能有那的数据啊。
06:02
对吧,那有同学说A那的数据你不是可以把它过滤掉吗?注意这个是消费者,他是什么最前面。你没办法把这个。在这之前把那的数据过滤掉,那就是上游了,那不就卡帕里面没有none的数据了,对吧,我们现在说的里主题里边有那的数据,你把它消费掉了,对吧,所以这种方式呢,它不行。如果你用浪值数据它是不行的,对吧?好,那除了这个之外还有什么呢?刚才我们用的是disalization的一个STEM,用的simple string,对吧,简单的啊,确实过于简单了啊,什么都不处理对吧,那这边呢。还有一个他对吧,叫卡夫卡d civilization的一个STEM,但是呢,它是一个接口啊,那没关系,我们自己实现里边内容就好了。实现方法对吧,啊,1233个方法啊,那其实比较简单啊,首先第一个叫is end of stream。哎,问你是不是最后一个流里边最后一个数据,那必然不是啊对吧,False就好了。
07:02
对吧,有没有下一条数据,那我们是卡从卡布读数据,它是一个什么?无介流,无限流必然是false对吧?它不是最后一条,永远都不是最后一条对吧?好,呃,中间一个这是反斜的话啊,等会儿呢,我们再写这个看这是最后一个方法啊,叫type information。类型呗,对吧,那我们是不是string类型啊,你simple string是不是也是string类型,那我想必你这个simple string里边也怎么样,也要写这个方法对不对,你看它is end of stream false对吧,肯定给false,因为留啊好,那还有一个是类型,这个类型类型定义呢,我直接抄他就行。直接抄。因为很简单啊,因为你呢是SW,我也是S,我当然可以抄了。对吧,啊,它string的表达方式这样写的叫face type info,然后string type info看见没,就这样写的,就这样呗,对吧,直接照抄就好了,对吧,那中间这个东西。
08:00
就是它有可能为none是不是。有可能没闹。对吧,所以呢,而且我们解析反修的话,其实就是要一个value吧,一个value,所以呢,你不要着急,这边做一个判断。If这个recorder他等等呢。或者record.value等等呢?对吧,诶如果你没弄了,你不能解析了呀。对吧,啊,那你怎么样返回一个。空可以吧,返回一个空。能明白吗?我们可以返回一个空啊好,那空的话你肯定有问题吧,或者说你返回个浪值行不行。这个地方再返回一个,那值可不可以,这个是可以的,反而可以的对吧,因为刚才那个东西,那它是什么,它是在反斜的话,也就是在这个类里边报的错,对吧,那我呢,直接返回出去了,它是不是被它处理到那到下游呢,做过滤,把这个钠过滤掉了,对吧,你在这返回一个钠,返回一个空串都可以对吧?好,那接下来继续啊,如果。
09:05
他不是呢。诶,这边少写了一个什么else。呃,如果他不是呢,那不是就正常解析呗,对不对,那就怎么写record。Return啊,返回那就用一个string啊,那record.value吧,我们只需要这个value就够了,其他的什么key啊,我们就不要了,对吧?啊,就把这个value拿到。是不是?对吧,咱们可以这样去写一下啊,那这块就不会有这个控制帧异常了,比如说你这个数据为空,或者说你这个Y6为空,它都不会控制帧了,要不然你在这边解析的时候再控制帧了,这是simple string里边的一个问题。啊,就这个问题我们得处理一下OK吧,好,那到这个为止呢,咱们这个工具类就写好了,那我们这边可以去用了吧,好那两个参数呗。一个叫topic。Topic对吧,还有一个stream叫group。
10:06
好,呃,主题名呢,叫topic DB啊,这个消费者组名称我喜欢用这个啊。我喜欢用当前的类名。啊,再加一个班级号啊,因为我每个班都用班级名的话,那都一样了,对吧?啊都不是每个班都用这个,呃类名的话都一样了,所以我们加一个班级号呃,做一个区分啊a source,然后这边。就是麦卡YouTube店get卡康。盖着卡普卡的一个consumer对吧?呃,Topic传进去,然后group ID传进去,搞定啊阿加得到一个卡普卡DS。啊,得到了卡夫卡的一个数据流,对吧?啊,这个很简单,然后接下来这一步我们一并做了。
11:00
呃,我们要转化为监测对量,要做过滤对吧,那肯定是先过滤再写监测对量,但实际上呢,这两个操作我们可以一步搞定。对吧,没必要这样写啊,没必要先写再写。Map对吧,一步啊一步用谁呢。Fla map就好了啊,Fla map它既可以转换也可以做过滤啊都可以对吧?好,那么另一个。啊,那这边呢,是杰森object。对吧?呃,当然了,如果你想把这个脏数据保存下来,你可以写到测出之流,那这个时候你就只能用process了。啊,只能用process了。懂吧,如果你想把它保存下来对吧?好,那这边我们写一下吧。带大家写一个啊,或者写两个头两个呢,我们这样写,后面呢,可能不这样写了,另一个process function啊,那这边呢是杰森。
12:02
好,下个。打错包了。Fast杰森。好吧,好,那接下来呢,我们可以直接这样处理,怎么处理呢?来看啊。jason.pass object value啊,那也就说你这里面有long值,你不是会解易出错吗?就是让它出错怎么办呢?对吧?好,那正常的呢,out.connect阶层object,但这里边很明显,如果你是脏数据,非阶层格式,或者输入long值,对吧?那么它就会报错,那很简单。Control out加t control out加T做一个包裹对吧?做个什么呢?Try catch做一个包裹,诶这个一条呢,我就不要了,我干什么事呢?我将这个数据写了。测输出流。啊。Out put好,那测数据流的话,这边肯定有一个out,对。这边只能写string了。
13:02
取名叫dirty。那这个。标签拿过来,然后接下来呢,VALUE6本身不动对吧,诶给它写到测输出流里边,这样的话,你从测出这个数据拿到你可以做。保存对吧?诶你看一下有多少条,在公司当中看一下有多少条脏数据啊等等这些东西都可以看到,对吧?啊,那你这边呢,我就不保存出去,我直接做个打印可以吧?呃,怎么打印呢?二加V啊得到一个T。Ogs杰森对象的一个数据流对吧,那就是杰森点。Get,然后呢叫dirty t啊做一个打印点。Print。这边来一个这个dirty做一个打印啊,比如说有脏数据就可以这样去做啊,对吧?啊这个意思好,那这块呢,咱们就。
14:03
搞定主流就搞好了,对吧,主流就搞好了。
我来说两句