00:00
接下来我们就给大家再来讲flink处理data swim API里边的另外一大类啊,前面我们讲过了,已经有了,知道了这个source怎么去读取数据源,然后呢,Transform怎么样去做数据的转换处理计算,那接下来还有一个就是怎么样写入到外部系统对吧?啊,做输出,那这就是所谓的thinkk了,这里大家需要注意一下,就是因为flink的底层它并不是像Spark那样,我们是RDD对吧,就是本身就是一个一个的这个数据集,所以说呢,它没有像Spark里边我们fosh方法,然后直接去迭代去做这个输出啊,那这里边flink要去做对外输出的时候呢,我们必须要去调用它的一个sink方法,Think任务,你构建这样的一个S任务,然后才能实现向外部的写助啊,那有同学这里边可能会觉得有点奇怪是吧,那你前面不是讲我们在这里边可以自己去建立这个,呃,跟数据库。
01:00
的连接操作嘛,对吧,我在这里边连,连接上数据库,然后我就直接我我可以读外部数据库的数,然后去做一些处理计算,那我也也可以直接就往外部数据库去写数啊,啊当然这个是完全可以的,这里边主要是说什么呢?你flink在做操作的过程当中,大家会想到你往外部数据库去写数,简单写的话,你怎么写都行,对吧,你想在哪写在哪写,但是flink其实这个分布式系统,它是要考虑我们结果的正确性。哎,大家想想想是不是这样对吧,如果说我们在某一步做操作的时候,你万一要是挂了的话怎么办呢?你如果要是挂了之后,在恢复的时候,你之前已经写过的那个数据是到底要再写一遍还是不写了呢?那这个其实你需要考虑的问题很多,对吧?你直接简单粗暴的直接建立这个数据库连接,把它写写进去,这种方式其实是不是很合理的一种方式,而且flink也不会把你的这个就是写入里的,这个过程当中你涉及到的一些状态也不会帮我们保存存盘成checkpoint啊,所以为了方便这个flink的统一的状态管理和这个容错机制的保障,保证我们结果的正确性,那比较建议大家一旦要涉及到对外输出的时候,还都是要用这个标准的think方法来做啊,那所以它这个整体的调用方式是什么呢?整体也非常简单,那就直接就是一个ADD think之前我们不是那个。
02:30
自定义输入源的时候可以ADD source嘛,啊,或者卡夫卡源的时候也是ADD source嘛,这里边一样与之对应的ADD think,然后里边呢,哎,当然就是你要去实现一个think方式了,当时我们ADD source里边是一个S方式,现在就是一个think方式,所以整体的这个方式啊,这个代码风格跟之前是非常非常类似。呃,那这个就很麻烦,大家可能想到这又得去自定义实现了吗?啊,但是没有那么复杂啊,因为我们处理完成之后数据的输出,这显然是很多场景下,我们是要跟外部的这个存储系统进行连接的嘛,啊,所以弗link本身是给我们提供了很多外部系统的连接器的,比如说啊,大家看这个官网上给我们列出来的啊,已经官官方给我们提供的啊,就是flink本身给我们已经提供的一些框架的支持的think有哪些呢?啊卡夫卡哎,当时给大家说说这个卡夫卡跟弗link他们几乎这个做流式处理就是匹,就是天生一对嘛,对吧,就明显就都是按照流氏这种思路去做的操作,所以说它的这个支持是最好的啊,大家看后面就是s think都可以对吧,就是你可以从卡夫卡去读数,然后再写入到卡夫卡,没问题,然后呢,还有这个阿法奇的卡Sandra卡Sandra,呃,这也是一个分布式的存储,它是只有think没有S,没有用它。
03:55
来做这个数据源的啊,就没有数据源的连接器,那另外还有这个亚马逊的这个knais streams,这是流对吧,大家看这个支持也就很好,也有source和think,那另外大家可能比较关心的是我们常用的那些数据库对吧?像什么这个MYSQL啊,ES啊,对吧,Red啊,呃,什么highway space这些有没有呢?呃,这里面大家看官方支持的是有这个ES,还有h efs对吧,这些是有的,另外还有这个Twitter的streaming这个呀,还有这个阿帕奇的呃,奈法呀,对吧,这些是是支持了的啊,那大家可能就想到了,那如果说那不支持的怎么办呢?哎,不支持的也不要着急,那还有一些呢,有一个flink这个下面的一个项目叫做巴赫对吧?啊,但是巴赫这个项目,它主要它自己不是做特别的一些工作的,它主要就是为像Spark呀,Flink呀,这些大数据框架去提供一些连接工具,提供一些支持的包。
04:55
那所以像有一些东西,比方说flu阿卡na这些的支持,大家看这些flu的think对吧?朝这里边写入的这个支持,在把her里边都是做了支持的,所以我们可以引入第三方的包给我们实现这样一个连接器啊那有的同学就想到,那如果这里边还不支持的东西,那怎么办呢?还不支支持的,那就没办法了,那就只能是自己去手写了,对吧?哎,自己去new一个think方式,然后你去建立数据库的连接,然后去写入啊,那有同学说,那你这样写入它就能保证我们我们说的那个容缩性了吗?啊那后面我们会讲到,如果你自己实现这个think function式function,还想实现它的这个容错性保证的话,状态一致性保证的话,哎,那可能你需要考虑的问题就更多啊,这个现在我们先暂时不考虑这些,所以后边呢,会给大家举一个例子,就是用这个写入到MYSQL,大家看到没有支持对吧?啊,那我们就给大家写一个到MYSQL里面写入。
05:55
Com白CQL里边自定义一个think function就这件事情,好,那首先我们先来给大家写一个这个最简单的写入吧,大家可能会想到前面列出来的那些工具啊,外部数据库工具其实都是一个,呃,就是一个数据库对吧?或者说像我们这个消息队列啊什么的这样的一些东西,那有没有最简单的啊?当然我们看到有这个h efs对吧?啊,那有没有更简单的,我直接往一个普通的文件里面写的这种方式呢?哎,当然是有的,对吧?这里边我们先来做一个呃,简单的操作啊,我们就在API test下边,因为thinkink这一部分可能相对来讲内容会多一点,所以我们新建一个呃,Sink test这样的一个包。首先我们先来做一个file s。
06:44
Main函数啊,呃,然后在里边,其实前面这个都一样,我就不详细写了吧,啊,我们直接把这个transform上面这一块直接抄过来,对吧,我还是呃,这个先去获取环境,然后并行度设成一,这无所谓啊呃,这个就是看的更明确一点,我们看的更明白一些,然后后边从文件里边读取数据,呃,然后我们先把它转换成样例类类型到这儿为止,先把它copy过来。
07:13
好,然后这里边该引入的这个依赖统统引入,注意不要忘记这里边我们要引入对应的影视转换对吧?所以这个下划线改过来,呃,这里边这个就就不用说步骤了啊,大家都知道这是干什么了,好,前面已经完成,然后接下来我们就直接think吧,对吧?现在该做的已经做完了啊,然后我们就直接think,呃,然后这里边这个可以做一个,呃,大大大家会看到,就是这里边可以我定义一个,其实这里边就不需要定再去定义这个变量了,对吧?大家想你要做think的话,是不是就相当于我们之前这个data stream,你直接一个print,我们说这是不是就是一个think呀,对吧,这本来就是一个think,那现在呢?呃,大家不要忘记,最后还有这个execute,对吧,我们把这个先写出来啊,这是fair think test。
08:04
那现在呢,其实就是基于这个data stream不要做直接think了,然后去做一个ADD think对吧?呃,一般画的是这个直接ADD think,当然如果要是你往这个一个简单的文本文件里边写的话,大家会发现有一个简单的方式可以怎么样write as txt对吧?啊,或者write as csv,大家知道这个CSV,那就是相当于就是直接保存成逗号分割的那样的文本文件嘛,对吧,这个其实都是可以去做的啊,然后里边直接就传一个传一个这个目录的名称就完事了啊,那比方说我直接copy一下之前我这个就当前S下边的这个这个目录对吧,然后我改一个名,我这个比方说就叫做out对吧。这个其实很简单的,然后接下来。大家看啊,我直接把这个输出一下,你看我现在相当于什么呢?前面做了这个转换操作之后,在这里是,哎,有同样有两个输出,那相当于是什么?那就是我前面做这个各种各样转换操作,对吧?Map数据源那边我不写了,然后他出来之后呢,哎,就相当于又分叉了,又分了两个流一样,这种感觉对不对?当然这个不是我们说的那个分流,就把数据分流是什么呢?就是每一份数据都会广播出来给到我们后续的同样的这个操作,这里边一个是做了这个print控制台打印,另外还有一个就是直接这里边输入到这个CSV文件了,对吧,直接write出去啊,所以这个也是完全可以去这么去定义的啊,大家看一下这个运行的结果啊。
09:39
大家看这边我们已经运行完了啊,大家看这里是直接在控制台做了一个输出,那我们想要那个文件有输出吗?啊,现在还看不到,那我们得重新刷新一下对吧?然后大家就看到这里是不是多了一个alt.TXT啊对吧?然后你如果打开的话,大家看到这里边它的数据是什么呢?就直接是我们原封不动的这里边的这个数据啊,为什么呢?因为这里你看我这里边用的是。
10:10
哦,大家看到这里边我是直接把这个data stream啊,就上面就是已呃,就是已经把它这个读进来的数据,直接把它写入到当前的这个CSV里边来了,对吧?啊,所以在这个写入的过程当中,它其实是给我们已经做了解析,把里边的每一个字段,我们不是说要逗号分割吗?对吧?CSV嘛,所以说它会把当前的这个sensor reading这个样例类数据里边的这个数据啊,直接解析出来,以逗号分割每个字段打印在这里,所以大家看这个就有点像我们一开始输入的那个数据了,对吧,跟输入的那个数据几乎就一样了啊,所以这是这个直接输出的这样的一个状态啊,当然有同学可能发现了这个,你你直接这么去写入的话,这里边好像他这个就是说好像他已经被弃用了,对吧,现在他已经不用这种方式了,哎,那现在推荐的方式是用什么呢。
11:10
还是直接ADD think,然后要实现一个什么呢?大家看要实现一个file system包下边的一个streaming file s啊,我这里边这就是更加一般化一些了,但是调用可能就会复杂一点,所以大家看到这个flink的发展方向,它其实是什么呢?我们觉得这个很好用对吧?但但是越好用的东西越不符合它这个分布式处理的过程当中,它的这个底层架构,哎,所以说它其实会把这些东西都都改的比较看起来好像难用一点啊,大家看看这个现在推荐的方式怎么用啊啊,那我们就是直接ADD think,然后里面要去实现的,诶这里面大家可能会想到,那我去new一个这个所谓的streaming,刚才大家看到了叫fair think嘛,对吧,我去拗一个这玩意儿不就完了吗?但是大家注意啊,在这个streaming fair think里边,大家看它就是一个rich think function式,对吧?啊,就实现了这样的一个think function,然后我们看一眼里边它的构造器啊,构造方法,大家看它是一个protected。
12:09
这样的一个类型,所以其实你在外边不能直接掉,对吧。哎,那这里面这里边你不能直接调,那我怎么样能实现这个东西呢?啊,这里边大家就会看到它有这样的两个方法,一个叫做for roll format,一个叫做for BK format,这两个方法它是直接可以返回一个streaming file system,呃呃,Streaming file s里边你看还有一个什么default role,呃,Format builder对吧,返回它的一个builder,那这个builder又又返回什么东西呢?大家看它真正返回它其实本身就是一个roll form ma builder对吧,实现了这么一个,继承了这么一个类,那么这么一个类,它其实本身这是bucket builder对吧,这这一层一层稍微有点多啊,Buck builder。然后本身它其实这里边就会帮我们实现,大家看这里边就会creates buckets对吧,就会把当前我们想要做这个文件输入输就是做这个输出时候,想要用到这个桶,这个bucket给我们创建出来,其实就是这样的一个整体,就是这样的一个过程啊啊那所以这个其实呃,本身的这个调用方式,大家也不用那么的在意它的一些细节,那主要就是说你再去实现的这个过程当中呢,里边要去怎么写呢?哎,那说了这么多,主要就是说我要写的这个方式啊。
13:40
大家看就是这里边我还得去引入对吧?啊,这个后后面我再再引这个这个类啊,这里边稍微有点问题,我们把它这个think大家来看这个啊,然后直接调它的for row format这个方法啊,那当然就是还可以B对吧,B大家知道,就是文件块,文件块了嘛,它可以支持这个呃,8K对吧,像have支持的这种这种列存储的格式都是可以的,那我们这里边就是普通的这个文件的话,那就是roll format对吧,一行一行的去做这样的一个格式化,好,然后里边要传的呢,一个pass,一个encoder对吧,那pass这里边大家知道,你既然它是一个pass类型嘛,那我我得把它引入,就是flink call FS的这个pass里边要传的当然就是上面的这个路径了。
14:27
我把这个传进来好,然后后边诶不是啊,在外面对吧,Pass后边还有一个,我把这个放到下边来吧,后边还有第二个参数。我们看一眼啊哦,这逗逗号是已经有了对吧?然后第二个参数,这里边还有一个encoder,哎,那这个我干脆把这个new pass直接也也也这个回车回下来对吧,大家看这样看的两个参数看的清楚一点啊,然后另外还有一个这个encoder,这个incoder非常简单,那其实就是个大家知道编码器其实就是做一个序列化嘛,你要写入,写入到文件里,当然要有这样一个序列化工具了啊,那比方说这里边我直接就要一个sensor reading的一个序列化工具,直接调它这个类的to string方法就完了,对吧?啊然后这里边大家看还需要传一个什么呢?那当然就是要传一个,嗯,你看到这里边这个这个里边啊是不是。
15:27
平常如果我们要去指定的时候,你最好是要给一个当前它的那个呃,插set的当前那个字符集的那个类型啊,对吧?啊给一个UTF8,或者说你这里边不不给也可以默认它就是用utf utf8对吧,那这里边呢,你还不能直接大家看还还在报错,为什么报错呢?之前我们看到它一层一层追到底下的时候,这还是一个builder对吧,一直还是一个builder,这个这个builder你你它并不是一个我们想要的那个东西啊,并不是我呃这里边你看它extends,这个8BUILDER,这个8BUILDER你到最后它已经就是直接可以序列化了,它并不会继承我们当时的那个要的那个类string,呃,就是streaming fell s那个s function啊,那这个时候你怎么样去做这样的一个,呃,返回呢,那其实是在这个RO ma builder下边,大家看到啊,它其实是有一个方法,我直接给大家看有一个方法。
16:27
叫做build。这个build就是这个builder.build方法真正的能返回一个,大家看这是一个public方法对吧,这是真正的create实际的那个think的一个方法,好,所以这里边返回了一个streaming fair think,对吧?所以我们最后定义了这么多,你要再来一个点build,这样就可以真的把它写进去了。好,那前面这个路径稍微有点问题,因为我复制过来,它默认要把这个,呃,这个杠要去做一个转移,对吧?所以这里边我们再稍微的调整一下,把这个不必要的东西删掉。
17:03
呃,然后这里我们给大家定义一个不同的文件名叫AL1,给大家看一眼啊,大家看看这个它输出的会变成一个什么样子。好,我们看一下运行的结果啊,这边当然还是正常print输出对吧?我们关键是看一下这里边到底输出了什么,大家看这个输出的情况就有点诡异对吧?啊,这里边我们本来想的是哎,他应该输出一个这个alt,一点TXT一个文件就完了,但事实上不是,它是把当前的这个当成了一个文件夹的名,而且后边呢,还给我们加了一个日期对吧,加了一个日期作为这个后缀,然后里边你看,诶,它就是分了不同的part,大家看这种方式,这里边是我们具体的这个数据对吧?而且这里边的数据大家看没有像right s csv那样直接逗号分割,把它每个字段提出来,而是什么?哎,直接我们这里边调用的是它的那个,就是simple string in codeder,我们直接把它调它那个呃,To string方法嘛啊,所以当前这个样样例类直接to string的话,原样打印对吧,就像我们print出来的那个样子一样,直接把它放在这儿了啊,所以大家看这样的一个组织形式part。
18:18
什么零零对吧,啊,这个其实这不就是我们分布式架构里边的这种方式嘛,对吧?哎,所以你看这个你要写这个HTFS啊,或者是写别的一些文件系统的时候,你会想到它的底层跟这个是一模一样的,这就统一起来了,这也是为什么呃,Flink弃用了这么看起来这么简单,这么好用的方式,而是统一成了这种看起来还有点儿绕的方式。希望大家能熟悉这种用法啊,就是后面大家会看到很多think方式,你在实现的时候都是这种风格啊,就是关于我们往文件写入的一个实现。
我来说两句