00:00
好,下这块我们是把Spark生产者跟这个卡卡集群已经集成完毕了,那接下来下一个案例呢,我们是集成Spark消费者来消费卡卡的某一个主题,哎,这样一个过程啊,那下面呢,我们来看一下消费者这块,那稍微注意一下跟我们生产者啊,哎,我们的时候呢,是添加一个依赖,就是SPA随名,嗯,卡不卡,那下面呢,我们额外还需要加一个Spark号,以及SPA随名对应的一个依赖哈。那把这俩一站呢拿过来。添加到我们对应的文件当中。这两个依赖倒完之后,那下面呢,我们就开始来写一下对应的这个消费者啊。好,那下面呢,把它拿过来这个名称。你有一个。那这里面new的是scholar object啊,他。搞定,搞定之后没方法。那这里面写代码呢,是有套路的哈,第一个呢,是要初始化对应的Spark contest。
01:03
初始化上下文环境。哎,这是第一步,然后呢,最后一步啊,谢谢,最后一步,最后一步呢,你可以认为是嗯三吧。最后一步呢,是执行代码。然后呢,主色。并处射吧。那这个完毕,那中间这块干嘛,哎,中间这块呢,负责消费数据。行,那首先我们先准备一下Spark对应的一个环境,那这个呢,如果大家不是特别了解的话,可以去看一下咱们之前讲Spark的呃,一些内容啊,那Spark呢,你可以关注上五五交易公众号啊,回复对应的大数据里面就有Spark的教程哈。行,那下面呢,我们这里面怎么写啊,哎,第一问呢,是你一个。第一步呢,我们是另一个。Spark。诶,你发现,哎嗯,这个倒包了啊,如果没有倒包的话呢,你可以提前倒一下。
02:02
SPA,然后接下来点set这个。Master。Master,那master里面我们怎么填呢?我们这里面填的是这个local星。Local。比如说本地模式运行。然后再来一个site,这个APP name,那这个名称啊,就是我们当前应用程序的一个名称,那我们就给它起一个叫呃,Spark。嗯,杠。卡不卡吧?点这样呢,就得到了一个配置信息,Com,那下面呢,我们又一个,哎,Streaming contest又一个。S。Swimming。那这里面没有导包是吧,哎,你刷新一下啊,没文刷新一下。如果还没有的话,把这个拿过来在里面,你提前把这个包倒过来。
03:01
它这个自动导包有点小问题。拿过来之后,然后你再写一下啊,那这呢,就是这个又一个streaming。诶,你看现在就有了吧,哎,又一个stream contest,然后里面需要什么呢?需要一个配置信息,Com,对吧,那这个com呢,我们说刚才已经给了就是它。那后面还有一个对应的叫Du reason,哎,其实呢,这是Spark swimming名对应的一个啊窗口大小多长时间一个窗口,那这里面我们来一个三秒啊三秒那就是SEC。啊,再看。呃,三秒钟一个窗口,OK点。那这里面就得到了一个上下文环境啊SC啊,简写OK,那接下来我们写最后一个啊,先把这个呃,最基础代码写完啊SSC,如果你代码写完之后。那就是第二。START1执行,然后呢,你要想让这个程序一直执行的话,还得调用它的一个主设的对应的一个代码啊叫。
04:05
这样就行了,那接下来呢,我们开始写它对应的核心代码,那核心代码这块呢,就是主要的就是消费者呗,对吧,哎,卡法消费者,那这个卡帕卡消费者怎么得到呢?那这里面有对应的叫卡普卡优。第二哎,可以创建一个direct流,那这个流里面就是关于卡不卡相关的一些处理哈,好拿过来,那拿过来之后呢,第一个信息就是对应的这个上下文环境看么SSSC给他一个。那下一个呢,是它对应的一个数据的存储位置。那这个存储位置呢,它需要一个你看啊啊location存储策略,那存储策略有几种。进去看一下。这它对应的源码,它给出了你对应的四种存储数据的方式方法,那第一种啊,就说大家这个英文水平啊,啊非常非常好是吧?啊叫prefer breakers啥意思呢啊,Use this only if,这个equator啊,On the same啊,Same nose as your,卡卡相当于啊你的这个卡卡消费者和卡巴基群哎,放到同一个节点上啊进行一个消费,那这个呢,一般是本地模式的时候啊会选择它啊。
05:17
那下面这个呢。下面这个呢,是use in most case,也就说在大多数情况下,我们是用这个的啊,It will啊,Contains啊,分布式的一个分区啊,然后执行,其实呢,它是优先啊位这个执行它底层呢啊,建议大家选择这种方式啊,它是一个优化的方法啊。嗯,你要是那啥的话,你可以把它复制过来嘛。哎,往里面一放。你看在大多数情况下,它会在所有的执行者之间一致的分配分区啊,是一个最优解啊,最优解它会根据这个呃,你数据的一个存储位置啊进行一个判断啊,那下面呢,这些呢,这两个啊,这两个呢,是你在有数据倾斜的场景下啊,会进行一个使用。
06:04
啊说如果啊负载不均匀,可以使用此项,在特定的主机上放置特定的这个topic position啊在地图中未指定的任何的这个top牌参都将啊使用一致的位置,也说这块呢,呃,咱们目前还没有发生这个舒心鞋啊不建议呢去使用这种存储测量。那我们就使用它就行了,优先位置也是他推荐的。那再往下同加P,后面呢,还有一个对应的叫啊consumer啊stay。那我们就给他一个consumer stay对吧?嗯,那这里面就是CS consumer点,哎,开始呢,就是订阅对应的主题,于是你到底要消费哪一个主题的数据呢?是这样吗?好。那么这个是萨啊对吧,那他这个泛型啊,你要给一下啊。它的外形呢,其实就是对应的,它这个呃,KV都是这个对应的叫这类型。
07:00
好,那往后看。看里面对应的参数,那第一个参数呢,就是你要订阅哪些主题,但是你看它这个主题,哎,它是迭代器,比如说TOPIC4,你可以消费多个主题。那来呗,来一个集合吧。假如说我们现在就消费一个first OK吧,好。那么下一个对应的参数。好,LP下一个对应的参数呢,就是这个卡普卡的对应的一个参数,类似于我们之前写的那个properties,只不过呢,现在呢,不是proper了,它什么?哎,它是一个集合,是一个map集合,那么map集合里面传入的K呢,是10G,哎,这个值呢是object,比如说在这个map集合里面,你得把它对应的bootru server k的反序的话,以及Y6的反序的话,还有消费者主的ID是不是都得在这个集合里面进行一个配置,就是这样的啊。那这样我们给他一个啊。好不好?好不好啊,这个P。
08:00
行吧,就这么一个参数,那把这个参数拿过来。定义这么一个集合。玩。它等于MB集合。那这个map集合它对应的KV类型,你要给他指定了,K呢是这个实际类型,这个value呢,是这个object类型啊。那给过来第一个参数,我们用这个consumer config哈,英文。肯兴can点第一个参数BOO server,好给它赋值,那我们复制什么呢?哎,对应的叫hioop。102对应的。9092。HIOP1039092OK。那接下来下一个参数对应的是consumer config。点,哎,那这个呢,是key的一个反序列话,那反序列话呢,我们用这个class of。ST string反序的话,OK,放进去。
09:02
那下一个呢,是对应的这个Y流的反系统化。开门,开点value。Class。好,那下面呢,是这个。哎,反序的话。OK,那还需要一个这个呢,就是消费者主这个必须要给定啊。开猫开这第二消费者主的话呢,就是group ID OK。那格住牌ID,我给个名字吧,就叫这个test。哎,这样呢,这就OK了,嗯,行,那这个OK之后呢,你通过这个kava us创建了一个流,那你得返回一个流啊,哎,点。这样呢就得到了一个硫啊,那这个呢,就叫这个卡不卡。Swim。拿到这个流之后,那下面呢,我们对这个流哎进行一个处理,我希望得到这个消费的数据之后打印到控制台,那怎么打印到控台呢。
10:07
第二这里面啊,哎,我用了一个map,为什么用map呢?还记得你卡布卡传过来的数据,它是有这个key和VALUE6的吧,对吧,那K呢,这里面是对应的A这种,然后Y6呢,你可以一般放的,比如说像爱硅谷啊相关信息,那其实我要打印的时候,我是不是只打印这个Y流就行了,哎,所以这里面我用一个map,只取出它对应的Y6的值。Record。record.value流,哎,这是只打印Y流值的一个做法啊,那这个呢,就是Y6STREAM啊。那Y6D swim完事之后,那下面呢,我们就来对它进行一个打印print啊就OK了,行,那这个代码呢,我们就写完了啊,稍微回忆一下。那下面呢,哎,我们来整体的回顾一下这个代码的编写哈,首先呢,看一下上面这块初始化上下文环境,哎之后呢,我们写的是这个,哎也说启动程序进行一并进行一个主射,那中间的这块呢,就是核心的消费数据的代码,好,那交费代码首先用这个卡把U点创建一个direct stream流,那这个流当中我们需要传送参数,第一个就是上下文环境啊,这有了,那第二呢是它优先的一个存储位置啊,这是一个优化的手段,之后呢,这块呢会订阅对应的你消费的主题,比如说订阅或者主题,然后后面呢,传上对应的卡法参数,那卡把必备的参数,比如说作为消费者必备的数有哪些呢?这一个呢,就是你要连接上卡玛集群。
11:38
之后呢,哎,这里面你要有对应它的key和Y流的反序列化,以及呢,对应的这个消费者主ID啊,那拿到这个数据之后,哎,你需要对它进行一个打印,那打印的话呢,哎,我只打印它对应的Y流值,哎,那我就取出Y流,哎然后呢,进行一个打印就完事啊这个代码呢,非常简单啊,那下面我们来进行一个测试。直行。
12:06
好,那这边有数据了,那下面我怎么办?哎,我往里面去打印一些相关的内容啊,那我创建一个生产者。结束掉。对吧,哎,福尔生产者。那下面呢,我往里面打一个哈。爱德硅谷。观察一下,看这里面有没有啊,我再打一个爱的硅谷。你看就出现了吧,哎,Hello,艾特硅谷,然后艾特硅谷,哎,这样呢,就实现了Spark消费卡不卡的一个功能啊。
我来说两句