00:02
OK啊,来吧,那我们这个唇呢,就写好了,写好以后呢,接下来我们要去。写这个读啊呃,这个读的话呢,就比较简单了,因为我都知道怎么去把它存进去了,那我就把它读出来不就完了吗。对吧,其实读的话呢,我们也要考虑这么几个问题,什么问题呢?首先第一个,呃,我现在要读什么东西,我很我我已经很明确了哈。对吧,因为你都已经把它存到中了,那我肯定是要去读你的,关键是我读出来以后我要干什么事儿。对吧,那你看看我们读出来以后,是不是要把它传给我的卡夫卡,然后呢,告诉卡夫卡来,你按照我传给你的奥进行消费,对不对。这个能理解吧,我你按照我传给你的这个offet进行消费,好,那么对于卡夫卡来讲,那我就得知道他从卡卡去拿数据,谁从卡夫卡拿数据呢?是我的这个这个Spark streaming,那么Spark streaming从你的卡布卡去消费数据的时候,我怎么给人家去传那个offset,这个事情你得知道吧。
01:08
对吧,所以说这里面问题啊,就是你读出来以后呢。你要去考虑啊,就是如何这个传递给这个Spark streaming,呃,如何指定Spark这个streaming。STEM啊,然后呢,通过指定的啊,如何指定Spark swimming命通过,呃,这个这个如何让吧啊,如何让Spark通过指定的opposite进行这个消费。对吧,你现在想把这个事想明白。好,那我们看一下这个怎么做啊,这个其实很简单,我们这这个时候就又要回到你的这个卡不卡的这个工具类里面了啊,因为在这里面我们之前在写的时候,你看了哈,这个是基于Spark stream消费,然后呢,我们是使用的默认set,当时其实我就提到过,我们在做这个创建你的这个direct stream,就直连的这个stream的时候,它里面其实这个位置我们是可以。
02:09
再传一个参数了,就是opposite。能不能看明白。就现在我们只传了两个。对吧,那我其实可以再传一个这个就是你所指定的,它就会按照你的这个opposite进行这个消费。对吧,所以这个事就很好解决了啊,就你把这个OB给他传过来就完事了,好,关键是你分析到了这个问题以后呢,那你就要分析下一个问题,什么问题呢?这个Spark streaming。需要这个需要。这个Spark streaming这个要求的,Opposite的。格式是什么?对吧,就你要给人家传了,那你得满足人家的格式吧。对不对啊,格式是什么?来,那你看一下它的格式,不就是这个格式吗?刚刚你已经看到了啊。这个图我先放到放到放到这吧,来,它的格式你看了哈,是不是这个格式啊。
03:05
是不是你的他需要一个叫做这个map。对吧?啊,我们就不用这个map了,这个比较麻烦,它需要用到一个map,这个map里面的话放的什么,放的是一个topic partition,放的是一个浪类型的,这就是他需要的格式。能听懂来,那我就把它拿过来啊,嗯,哎呦这个。放在哪的?先先藏起来吧,先放到这藏起来好吧,哼,来,那我们把它这个拿过来你看了啊,他要的格式就是一个map格式,Map中呢,放的是一个topic partition,你从字面上也能理解,这不就是一个什么呀,叫做主题分区对不对?Topic partition,然后后面是一个long long是什么呀?主题分区后面这个一定是你的oppositeset。对不对,同学们一定是你的opposite好,OK,行,那你把这个想明白以后,那你不就知道这个方法我要干什么事了吗?是不是就是从你的ready中把opposite读出来,但是读出来以后呢,我们存储的格式跟这个格式肯定不一样,那你要怎么再去把它处理成这个格式?
04:07
能记得吧,同学们来,那我就写方法了啊,DEF,我们叫做read oppositeet,那你怎么去读啊?啊,怎么去读啊,你读的时候你要用到什么东西呀,你就想想你读的时候你要用到什么东西呀,你纯的时候你的key用了什么呀,你的key是不是用了你的group加topic呀。对吧,那你说我读的时候,我用不用这个group加topic呢。同学们。你不用的话,你哪知道你的key是什么呀,你得用吧,所以说你要把这个topic给我传过来,给我把ID给我传过来。能听懂,最后我给你返回的是这个东西。打个包。理解吧,哎,这样就可以了,好,那接下来就可以读了呀。怎么读啊,同学们啊,是不是一上来以后就呃获取你的red链接来my ready有条件什么get je好,这是我的je,你先不管三七二十一啊,先把这个记得关掉理解吧,然后下面就是拼接你的key来V叫red的key啊,这个key是不是还是一样的呀,就是我们叫opposite,这个一定要跟你存的时候保持一样啊,我就不写了,好吧,一定要跟你存的时候保持是一样的,要不你就读不出来了。
05:28
对吧,下面就往出读啊,怎么读呢,就是摘点叫h get all,对吧,然后把这个red key给它传进去,传进来以后。我们叫做这个opposite吧。能看到。对吧,这个叫opposite好行,那这里面放的是什么?放的就是你的分区和什么呀,和你的这是你的分区,呃,这是你的什么opposite,那最终我们要把它处理成这个结构对不对,所以说我们还是什么定义出来这么一个结构啊,那怎么定义啊。呃,那我就什么来一个什么MU。
06:04
的一个什么map。对吧,这里面放什么呀?它放的是一个topic partition topic part,然后呢,后面是一个浪类型的,好把它定义出来,然后这是我们的叫做结果吧,好吧,这是这是我的结果,OK,那有了以后下面不就是去迭代它了吗?对吧,但是你迭代这个玩意儿的话比较麻烦啊,为什么比较麻烦了,你看啊,它是一个什么东西啊,它是一个。Java的一个map,你Java的map你叠加起来就很麻烦,现在。能不能听明白啊,就很麻烦啊,所以这地话我可以这么去做啊,可以这么去做,我们什么呀?就是将Java的map转换成skyla的map进行迭代,对吧?这个怎么转换呢?我们需要用到一个,这个需要导一个东西啊,SKY的collection里面有一个叫做Java convert,好把它倒过来,倒过来以后这里面就会有一个方法,你看啊,就是opposite.as sc,就先把它转成一个SC,然后再去做迭代。
07:18
这就能迭代了,明白吧,那这个迭代出来以后呢,就是一个,呃,它迭代出来应该就是一个partition。Offset。能看到吧,好,那接下来这里面不就是相当于什么呀,把你的partition和opposite封装成你的什么呀,Part什么来,那我们封装一下啊,就上一个topic partition,好,这里面需要你传什么东西啊,传一个topic,传一个part来,Topic呢就是我们传过来的topic part就是你刚刚的这个part。对吧,但是它要求的是一个呃,Int是吧,它实际上是一个字符串行,那我就怎么to int一下。对吧,这是我们的TP那姐,呃,哎,这不能叫TP啊,这是那个塔迪声。
08:05
对吧,那我把这个封装好以后呢,那你看一下接下来我不就可以这么去做了吗?就是直接你的results叫put put啥东西啊,是把你的TP放进去,然后呢,后面是放什么,放你的value value的话就不就是我们的这个东西吗?是不是你真正的这个oppositeet呀,对吧?Oppositeset oppositeet讲什么呀,图浪,因为我要的是一个浪类型的嘛。对吧,那你看这不就把它封装好了吗。能不能看明白。可以吧,行,那我把这个封装好以后,接下来我就什么给他直接返回呗。对不对,来把这个red关掉,关掉以后呢,最后我就什么把这个result给他什么返回回去这就可以了,呃,我看看啊呃。Map啊MU啊,这现在是mutual map,那我就什么to to to to map一下对吧,他要的是一个不可变的,但是我现在是一个可变的啊,因为我要往里面加东西啊,那我再去么转,转换成一个什么,转换成一个这个这个不可变的,然后给他返回回去,这不就搞定了吗。
09:10
能看到吧,啊,这就是我们这个最终写的这个代码啊,行呃,现在我们就把这个opposite的一个管理方案,就算是把它写好了,那写好以后,接下来我们就可以用到我们这个代码中了。明白了吧,来听一下啊。
我来说两句