00:00
好,那这个卡夫卡连接器的依赖我们导过了,然后呢,在配置文件这里呢,也指定完卡夫卡connect的一些基本参数啊。那接下来我们写个代码来测试一下。六啊,比如说点at硅谷,点link点。嗯,是最差。Plan。卡不卡?嗯,我叫stream叉卡夫卡。啊,这个插我小写吧啊。好,来一个没方法。以前我们要怎么写啊,我们是不是要利用一个stream execution environment。呃,或者说不是这么写啊,我们通常是怎么写呢。
01:00
诶,删掉啊,我们是不是stream environment To Get对吧。啊,Get错了啊。Cat执行环境啊,我们以前是不是这么写,然后EV去可以去设置一些参数,对吧?啊可以去set各种各样的东西,嗯,那STEM它的API他帮我们做了一个封装。你看我们怎么写就行,它提供了一个那叫卡普卡S,我们直接去get data string就OK了。那我们看Java代码,Java代码呢,你看。也是一个什么卡卡s get data大菌。那你再给他看前面这两个东西。还跟之前的一样了,是不一样了,这个类是是这他提供的啊。
02:01
你可以看到导包,你看这几个类啊,全部都是STEM差帮我们封装好的啊,底层就做了很多事啊。那比如说我先把这两个拿过来。我把它注掉啊,这个是以前啊,这是flink API写法啊。下面这个是什么呢啊。STEM叉API写法,那这个就是可以把我们初始化环境了,来导导包导包。那么看它首先创建了一个流的环境配置。啊,环境配置。然后呢,会创建一个牛的上下文啊,然后就是把这个配置传进来,那这个配置。要传两个参数,第一个呢,就二这个是main方法的二。第二一个呢,就是什么呢,相当于说我们在env里面去。
03:00
设置一些东西啊。额外去指定一些参数,那么可以在这里去再单独去指定。啊,那CTRL加P我们看一下啊,点进来。它封装了一个类型叫什么呢。Stream configuration。A config function。这个类呢是一个接口,我们去实现它就可以去再去set这个EV的一些参数,你可以看到对不对。它参数是不是the streamecu environment对吧,那一般呢,不建议你在代码再去更改一些环境配置,如果你有需要的建议还是在哪里application在这里去指定就好了啊,这是比较建议的方式啊,也是官方推荐的啊,那如果这边不需要再单独指定一些参数啊,那你就一个。那这个就好了,Contest,那在接下来我们看看怎么写。另有一个卡夫卡source,这是它封装好的啊,泛型呢,传了一个数据的类型。
04:04
再往下呢,把contest传进来就OK了,然后get一下data string。我们来写一下吧。另有一个卡夫卡S。那这个。选择哪一个呢?不要选错了啊,要在导包的时候不要导错了,导谁呀。刀是dream叉的,而且这是dream叉,你看这个是A版本,这个呢是Java版本啊。那同样我们前面有个包导错了,我发现了,你看我导了一个SC的版本了啊,把这个去掉,我们重新导。好,先用着吧。呃呢,比如说我这是类型啊。里面呢,把这个contest传进来。接下来干嘛呢?Get string?
05:00
这样呢,这个流应用就进来了,还可以点很多东西,你要指定UID啊,还可以继续调用其他扇子啊都可以了。那比如说我执行一个map,因为卡不卡,我们知道卡不卡的消息是不是有个T,有个value,那一般我们的数据是不是放在value里面,所以这里呢,我是不是要把value的数据取出来就可以了,做一个打印给大家看一下。嗯,那这边就简单了啊,Key value。或者说你用这种写法。点value就可以了。啊,应该,那我们来一个record吧,我写一个number表达式,我直接record.value就可以了,然后呢,做一个print。好,我就简写了啊。那接下来我们来试一下啊。我首先呢,在卡不卡这里呢,是不是要启动一个叫S1的topic对吧,这边我们指定的是S1啊。
06:13
那其实这里也可以点topic去指定啊。这个可以去指定我要消费的topic,但现在我们配置是只有一个,就不用指定,还我执行。啊,还没创建,让它自动创建一下好了。好报错,看什么错啊,这个是要提醒大家的,如果我们用STEM,它API必须指定一个空,也就是这个文件的路径。如果不指定,呃,他找不到啊,找不到。那怎么用呢?很简单,来右键拷贝。Copy pass。
07:00
然后呢,让里面呃,点击编辑配置。那你看我们刚启动这个应用啊,首先观察一下这个地方是不是勾上了,对吧,第二一个。这个控这个配置像在哪传呢,通过may方法传进来啊,通过may方法传进来,那哪个地方是写may方法呀。这个地方。对不对啊,是在这个地方,那就在这里选择杠杠空,然后把这个路径粘上去。那就OK了,点击应用,OK,好,执行。你要现在不报错了吗?对吧。但是我们可以看到程序直接停了,而且没有任何输出,为什么?因为以前我们flink写的时候是不是还要cute一下,对吧,我们以前是这么写的吗?Cut,那这边呢,我们得用这个contest点,哎,Start一下。就换成这样就OK了啊,来启动再观察一下。
08:05
诶,你看现在就是一个流式程序了,你看它不会停止一直在运行,那我们在卡不卡里面往里面发送一点数据试试S1.11逗号啊逗号。啊,回车。看一下这边,诶,你看消费到了吧,而且做出了一个打印啊。S2。你看又有了,那这个呢,就是通过STEM的API来消费卡。那这个地方我们说了是不还可以去指定topic是吧,那其实啊,再给大家介绍一个用法,在我们生产环境当中,比如说我这边。呃,有一个作业他要消费的是topic s1,那比如说我这里在创建的一个作业,他要消费的是,比如说topic叫S2,哎,每个作业消费的不一样,那我这个配置文件怎么写?
09:03
那首先呢,你把所有的topic都列上去,比如说S1S2对吧,好,那在卡不卡这个DEMO里面。我们看一下。拧完这个source之后,是可以指定一个什么topic。那这个时候呢,你可以指定什么呢?啊,这边消费的是A啊。啊,比如说他消费了S1,那在另一个作业里边,你是不是就可以这么写了。消费的是S2这个topic皮卡。这是允许的啊。那像现在我们就相当于说每个流子消费配置文件中部分topic是吧?啊,那比如说做个区分,我这边print叫S1啊,这边叫叫叫叫S2啊。这边看一下没什么问题啊,那这边再启动一个。
10:04
啊,他不卡死。So producer杠杠broke list。S2啊。一个S1,一个S2,好吧,好,那我重新启动。呃,那这个时候,比如说S1再来一条数据啊,叫S333回车。好,我们看到S1的拿到了吧。那我们再看另一条流的能不能拿到啊,比如说S555回车。你看SR的打印的是不是也拿到了,这个就是它的一个用法啊,你可以在配置文件指定多个topic,但是你在消费的时候可以指定我要消费哪个。这当然也可以指定多个了。
11:00
对吧,比如说这条流量同时消费两个topic也可以啊也可以。好,我把它停掉,还有一种用法给大家介绍一下,就是说如果你企业里边有多个卡夫卡集群。多个卡不卡集群,那怎么办呢?那它也支持这么一种配置啊,我要往下走啊,我把它打开。比如说我我我有几台,它组成了一个卡夫卡集群,有其他几台节点组成另一个卡夫卡集群。那这个时候怎么来区分啊,配置文件这么写啊。在上面呢啊,比如说我叫一个卡不卡一是集群一,呃,另一个我叫卡普卡二,这是集群二,这个名字是无所谓的啊,我们前面是默认的叫一个什么呢?直接叫他一个卡普卡S是吧。啊,不是啊,前面我们没指定啊。前面我们不用去指定,那下面这种就是在配置之前加一个。
12:05
集群名称那里面的配置跟之前一模一样啊,那你这边就可以去指定啊,比如说第一个集群是在这儿啊,第二个集群在这。把每个集群的地址把它写上去啊。那这个呢,你看假设这是第二个集群,你的卡夫卡集群,那这样的时候。就有了,那我怎代码怎么写来,我拷贝一个,嗯,代码二啊。把没用的去掉。那比如说我这条流要读取集群一,这条要读取集群二,那怎么办呢?它这里还有一个参数点。Alice Alice是不是别名的意思啊啊,就是把我们卡夫卡指定的集群名称传进来,比如说前面叫K1对吧,那这个叫什么呢?Unless这个叫K2啊,那这样就可以实现呃,消费指定集群了啊,多集群的用法啊。
13:08
啊,不是K1K2这个叫卡不卡一啊。这个叫卡不卡一,这个叫卡不卡二,对吧?啊,每一个对应配置哪些topic可以消费啊,同样可以指定多个啊,比如说这里也可以指定S3嘛,对吧。这个就是卡不卡connect的一个简单用法。
我来说两句