00:00
啊,现在我们前面的这个S任务和后面的转换任务就都已经介绍完了,在flink代码里边,我们现在就剩下了最后一步,那剩下就是think,我们是不是要对于这个数据最终一定要输出到外部系统里边去啊,这才是一个完整的处理流程对吧?流处理里边就是读取数据做转换计算,最终要输出到外部系统啊,那所以接下来首先我们来看一下就是这个think任务啊,到底要怎么样才能完整的做这样的一个thinkink操作啊,那自然大家就想到了之前我们那个source那边的时候,可以直接读取文件read text file,也可以直接创建一个socket文本流啊,这个socket,呃,Text,呃。就是直直接我们可以有一个这个socket的这样一个数据源,对吧,文本数据源,呃,那另外呢,我们还可以有更一般化的定义一个source任务的方法,就是ADDS,那对应的在S这边怎么做呢。
01:03
啊,简单来讲的话,Think也有各种各样的方式,那最通用的,最基本的,那其实就是一个ADD think对吧,你既然那边有ADDADDS嘛,那这里就有ADD think,那里边对应的要传的就是一个,之前那个S叫s function,现在这个就叫think function,对其实整体来讲是一样的啊,啊所以接下来我们还是在代码里边先把这个做一个实现啊。还是在这个Java下边。我们现在要带上这个包名,com.at at,硅谷点API test,现在是think。然后首先哎,我们第一个要给大家测试的啊,S test1,哎,那就是跟卡夫卡的一个连接了啊,我先把这个先创建出来。然后同样还是先把这个主方法先写出来啊,这里边throws。
02:04
Exception,然后里边还是该创建环境,先创建环境,我还是把它叫做env,好,接下来我们这个不是正确性,还是把这个并行路线设成一对吧,啊然后接下来。是不是还是对从我们之前的那个数据里边要去读取数据啊,啊对吧,所以这里边我可以把那个之前的这个直接先copy过来啊。我可以直接读镜来做一个转换。好,这是我们当前能做的一些操作,最后不要忘记有那个env execute对吧?哎,这是整体的程序架构,先写在这儿,然后接下来大家就想到了,那那这个data stream,如果要是想做这个think的话,大家看最标准的是不是就是一个at think方法呀,对吧?那另外其实它还有一些其他的方法,当然这个大部分要被弃用了,对吧?你像之前我们不是有read text file吗?那这里就有re write as csv或者write as text,对吧,就可以把它直接输出保存成一个文本文件或者CSV文件对吧?输出到文件啊,这个比较简单,但是就是一般不是特别常常见,对吧?我们这个得到这个统计结果啊,计算结果要直接写到一个这个CSV文件里面,这个确实比较少用,所以大家看这个已经被弃用了,然后另外之前我们不是有那个read,就是之前我们那个socket是有这个。
03:39
啊,前面那个env对吧?Env是有这个socket text stream吗?有这个直接从socket文本流里面读取数据吗?那现在与之对应的就有一个对right to。Right to socket对吧,就写到对应的一个这个socket文本流里边啊,这也是有这样的操作的,这个一般都是做测试啊,所以我们这里边就不给大家详细讲这些了,那现在要讲的就是。
04:10
真正输出到一个啊,外部的,比方说像卡夫卡这样的消息队列,比方说像一些啊,像一些red啊,ES啊这样的一些数据库,对吧?这这可能是我们在实际生产环境里边最常见的一些需求啊,那所以这里边就要有一个通用的ADD think方法来实现这样一个东西了,它里边要传的是一个think function。啊,所以呃,那那这个性方程大家就想到,那那怎么样去实现这么一个东西呢。啊,所以接下来如果我们要跟外部的一些数据存储工具去做连接的话,那就要引入一些,哎,一些依赖第三方的支持,哎,或者说这个官方给我们提供的一些连接器,它可以给我们实现这样的一个think方式,那官方给我们实现了哪些呢?
05:00
好,大家可以看一下啊,这是呃,当前一点十版本里边给我们提供的,官方给我们提供的一些连接器connector,那它提供的连接器有大家看,首先就是卡卡,呃,因为卡夫卡大家知道本身它是消息队列嘛,消息队列里边对于数据的这个处理和传输,是不是就相当于跟我们的流失处理是一样的呀,来一个处理一个对不对,那所以卡夫卡跟flink它的这种连接就会特别的顺畅,特就是天生一对嘛,本来就是你那边也是来一个处理一个,我这边也是一个一个消费,一个一个处理,那这不就正好能对接上吗?所以大家看他的这个连接器给我们提供的。有S的连接,也有think的连接,也就是说给我们实现了有S方式也有think方式,对不对?可以从卡普卡去读取数据,也可以写入到卡夫卡里面去。那另外还提供了其他的一些啊,那大家看还有这个亚马逊的这个knay streams,这也是流逝的,所以是不是也有source think啊,对吧?啊,另外还有这个rabbit MQ啊,这个也是有S和S,还有这个呃,Ni Fi啊,这个也是有这个S和S,那其他的大部分呢,都是只有什么。
06:15
都是只有think,对,就是你比方说像这个cassandra,像ES对吧,HDFS这些大家都比较熟悉啊,他们的特点,共同的一个特点是什么?对,大家想它是一个存储对不对,你说它是一个存储的话,我能直接从里边去读取流式的数据吗?很少对吧?呃,一般是不能的,你如果要从里边去读取流失数据的话,是不是相当于你还得有单独的一个程序,相当于从里边,呃,就相当于轮询的去访问数据,然后把它灌到卡夫卡里边啊,对吧,这样的话这才是一个流失率,那那你相当于我们弗林可连的是不是还是卡夫卡,哎,所以说这个就是一般情况跟其他的一些这种数据存储是没有SS数据源的这种连接的,而很多都有这个写入的连接,所以接下来我们这个think操作就可以写入其他的了,对吧?那这里面大家其实会发现,就是我们常用的啊,这卡三顿可能用的少,稍微少1.ehdff,呃,HDFS对吧?呃,这个卡夫卡这几个是有了,那比方说还有一些你像我们比较常见的,你像对吧,MYSQL这些,呃,我们想往里边写入的话,那没有怎么办呢。
07:27
啊,除了这个,这是官方连接器啊,就是flink本身官方给我们提供的connector,除了这些之外,另外还有一个啊第三方的框架啊,呃,这个叫做呃巴克尔对吧?呃这个巴克尔这一个,呃这个项目它其实也是阿帕奇下属的,它主要就是给这个flink Spark提供一些连接工具的支持。所以说他呢,给我们其他的一些工具提供了对应的连接器支持,比方说啊,这里边看有这个阿帕奇的active MQ,另外还有这个flu,对吧,啊,这个red,阿卡natti,这些他们的连接是由这个bar here给我们提供的。
08:09
啊,那有同学就想到,那假如说这里边也没有提供的那些连接器怎么办呢?比方说大家看这个,呃,这个MYSQL对吧,HP啊,至少这个一点十版本里面,我们现在好像都没有看到,那怎么办呢。但是在后面那个01:11的版本里面,其实有对应的连接器的啊啊,那所以如果当前这个没有对应连接器的话,我们也可以自己搞定。对,因为大家想那个接口不是都已经通用放在那了吗?Think function嘛,自己去实现一个s function不就搞定了吗?啊,所以接下来我们就分别按照不同的类型给大家讲一讲写入到不同的外部系统到底是怎么样的一个操作,那首先我们还是做这个卡夫卡的这个写入啊,卡夫卡的话整体来讲是比较简单的,首先我们是不是应该引入这个卡夫卡连接器啊啊,现在我们不用引入了,因为之前呃,在做那个读取数据的时候啊,是不是已经引过一次了,那现在不用了啊,那接下来我们就直接往卡夫卡写入就可以了。
09:11
好,那现在的问题就是要写入的话,是不是要new一个这个think function啊,卡普卡那边应该已经给我们连接器已经搞定了,对吧?那我们看一下啊,跟卡夫卡相关的,然后大家看现在是一个,诶,大家还记得之前我们读取卡夫卡数据的时候用的是什么吗?对,Flink卡夫卡consumer,而现在我们用的就是一个producer对吧?哎,那这个自然知道,既然你要往里边写嘛,那现在是不是应该是一个生产者啊,要发出数据,写入数据对吧?哎,所以这是自然能想到的啊,然后里边要传参数啊,那我们看一下它的这个构造方法有哪些啊,这里边我们就找一个最简单的吧,啊,比方说大家看三个参数字比较简单啊,我找一个这个。诶,大家看第一个string,那是不是同样还是应该有一个topic啊,当前你要写入哪一个主题对吧?然后另外诶后边接下来是不是还可以有这个对应的一个这个civilization STEM码对吧?呃,就是还是做一个这个序列化的这样一个呃,STEM的定义啊,那最后还应该有对应的一些property con配置啊,所以这个其实整体来讲来讲是非常简单的啊,那或者说你看第一个这个更简单,第一个这个写写起来就是。
10:29
啊,第一个这是连配置都没有了,他直接大家想我这个,呃,Producer最关键的配置是不是就是那个broker list呀?啊,所以我直接把这broke list直接写在这儿完事,对吧,直接就一个broke list,然后topic,另外还有一个theirization scheme啊,所以这就是最简单的一种实现了。那所以接下来我这就写一个比方说,我当前这个叫local host9092对吧,然后这是当前的broker list,另外当前的主题,主题我叫S吧。
11:05
输出对吧,然后后边啊,后边我再去,呃,就是要拗一个创建一个当前的那个序列化的那个STEM,那这个稍微有点麻烦,因为我要单独去实现的话,我还我还得去把它做一个转换对吧?呃,做定义那个对应的那个stemma到底怎么去写,那干脆我简单一点,我当时是不是定义过它to string方法呀,哎,所以大家想我是不是直接把它来一个to string,那接下来是不是得到的就是一个对string类型的一个data stream,那现现在我是不是直接就可以对相当于我在这儿是不是就已经做过这个序列化了,哎,所以接下来我是不是直接还是来一个对simple string scheme放这儿是不是就完事了,对吧?呃,这样的话就可以实现这样的一个功能,那这里报错主要是因为我们这里面的类型不对,对吧,应该是string类型啊,这样的话就完成了。就这么简单,这就是跟这个卡夫卡的连接对吧?好,所以我们接下来给大家做一下测试,看看这个效果怎么样啊。
12:10
好看一眼哦,当前这个卡夫卡是启动着的,所以这里面我们可以直接运行一下看看效果。执行。呃,然后如果说我们这里面想要看到对应的这个结果的话,那那其实大家想到我接下来是不是应该得有一个对,应该得有一个消费者对吧?呃,我先到这个对应的目录下边去,呃。兔S卡不卡,然后接下来我要先去有一个这个消费者啊呃,Console consumer,然后杠杠boostrap serve,对吧,Local host 9092,杠杠topic,现在的那个主题是s test,好,我们来消费一下当前的这个数据。
13:09
大家看这边其实都已经写完了,对吧,都已经对应的那个都已经有了啊。我们看一下。现在好像消费不到数据是吧。我们看这里边I think是执行了对吧。啊,我们这边要不就再来再来生产一下这个数据对吧。好,再运行一下。我们这个应该没有写错吧,Think test,大家注意一下这个主题啊,Think test对吧。是这样的。诶,大家看现在就把这个数据都已经写进来了,对吧?啊,刚才可能这个代码直接就就就这个,呃,写完了之后,这边没有没有真正的写到这个主题里边来啊,这边消费不到,那所以大家看到现在是正常的消费到了所有的数据。
14:05
啊,但是大家会觉得这个这个有点奇怪是吧,看着这个结果就是一下子H一下子全过来了,这个明显不像我们那个流逝处理流逝输出的那种结果,对吧?所以接下来我们可以给大家测一下,就是前面我们不是讲过卡夫卡也可以消费卡夫卡里边的数据吗?那自然我们就想到是不是可以卡夫卡进卡夫卡出,做一个这样的类似于数据管道啊啊对吧?诶,所以接下来我们就把之前的那个卡夫卡S的那一部分啊,直接都copy过来给大家看一眼。啊,这边我就直接直接这个copy这个数据了啊。然后property,我也直接就把这个都copy过来,那这就不是从文件里边读取数据了,而是从卡夫卡里边去读取数据,把当前的这个数据得到之后,呃,大家看到这个读出来之后,本身就是一个string类型的这样的一个一个数据,对吧?我把这个定义叫做input strip,然后我再做一个这样的转换,大家看本来得到的应该是一个。
15:11
逗号分割的啊,这样一个呃,一个一个字段对吧,一行一行的那个文本数据,然后接下来尽管我输出的时候又是一个字符串,但大家想这是不是已经包装成3READING,然后做了序列化之后的那个状态啊,所以它俩应该是不一样的对吧?啊,所以接下来我们直接把这个代码运行一下啊。好,我把这个运行一下。好,我把这个放在这边,呃,然后这里我可以把这个停掉啊,给大家看的这个清楚一点,这里边是一个消费者,然后另外我们现在的数据是不是应该得有一个生产者啊,哎,直接我们直接用之前的这个生产者就可以,Topic是sensor对吧?啊,我直接用这个啊。好,现在这个已经执行起来,我们把这个数据一条一条做一个输入。
16:03
我们可以把这个做一个做一个分屏显示啊。好。当前是这个341第一条数据。诶,大家看这里边是不是就输出了一个sensor reading啊。对吧,341,然后35.8这条数据,我们这里边每一每输入一条数据这边对应,诶对对应着就会有一个一个输出,对不对,三四十六啊,大家看到这个一条一条数据的输出了吧。所以这个过程,这就是我们所说的,类似于做了一个数据管道,卡夫卡进,卡夫卡出,而且大家会发现这跟我们直接用卡夫卡去消费一个topic不一样,这两个是不是不同的topic啊?哎,所以这就相当于是我们用这个flink程序做了一个什么事情。类似于是不是做了一个ETL啊,大家看是不是这样啊,对吧,我就是直接把这个数据啊先呃,就是原始的那个数据灌进来,先读取出来,然后我做一些基本的处理转换,然后是不是呃,你可以做一些拆分,可以做一些呃转换,做一些提取,对吧?最后再灌到另外一个主题里边去,这是不是就是一个ETL的过程啊,所以你看这个,你用卡夫卡做一些基本的数据处理清洗啊,也是可以的。
17:24
啊,只不过一般情况我们可能这个flink代码里边,这只是作为一个简单的初始的第一步,对吧,不会只做这个操作,你如果要想用它做这个ETL也是没有问题的啊,这就是跟卡夫卡的一个连接的状态。
我来说两句