00:00
了解了flink跟外部系统的连接方式,那接下来呢,我们就可以举一些具体的例子,看一看怎么样把flink的处理结果写入到外部系统当中,最简单的当然就是写入到文件当中去了啊,其实我们知道啊,之前我们说flink可以直接读取文件里的数据作为数据源啊,就像之前我们在做workout的时候曾经举过的例子啊,直接可以读取文件,那就是read text file,读取一个文本文件里的数据,每一行读进来作为数据挨个处理。哎,那同样对应着我们也就可以写入数据到文本文件,那关于入到文本文件呢?我们可以直接点到data streamam这个类的源码里边来,去搜索一下当前写入的方式,比方说我们搜right,就可以看到有right as text,还有right as csv啊,那就是写入到文本文件,但是我们会发现啊,这种方式简单粗暴,就是直接传一个路径进去,呃,就像我们读取这个数据文件的时候一样啊,呃,直接就可以把对应的内容写入到文本文件里面去了。但是这种方式。
01:13
已经要被弃用。主要就是因为。我们当前是大数据处理,往往我们要做的呢是并行计算,那最后的写入也应该是一个并行写入,最好的方式是应该要写入到分布式的文件系统里面去,那所以我们知道这样一个文本文件啊,只给一个路径,你这样去写的话,它是不支持分布式的写入的,诶那如果说我们想要把最后的文件在汇总到同一个文件里边去的话,这个可能就会很麻烦,所以我们在用right as text或者是right as csv的时候,往往会直接把它的并行度设置成一,也就是说哎,规定了我当前写入输出到文件的时候就写到同一个文件里面去啊,但这样的话就会拖慢我们的运行效率嘛,而且对于这种方式呢,它也没有更多的故障恢复之后一致性的保证,所以之后诶这种方式就要被弃用了,那接下来我们到底用什么呢?啊,那就是用。
02:17
通用的方法,直接data swim.add sink里边传入一个think方式,那这个sin方式难道我们要去自己写吗?当然不用,Flink当中给我们提供了这样的一个具体的实现,叫做stream file s,也就是说流逝的文件系统输出的这样一个连接器。它本身就继承自抽象类rich think方式,那就实现了我们要的这个think方式的功能啊,而且它集成了flink的检查点机制,可以保证精确一次的状态一致性语义,那关于这个检查点和精确一次性呢,我们会放在后边,放到第十章再去做讲解,那这里可以简单提一句就是。
03:00
它主要保证的就是故障恢复之后整个系统的正确性,或者说状态的一致性。对于这个stream file think呢,呃,它是为我们当前的批处理和流处理提供了一个统一的接口,那简单来说,它其实就是要把要写入的数据先写到一个一个的桶里边,Buckets里边,然后这个分桶的方式呢,默认是基于时间的,每一个小时写入到所有的数据啊,就写入到一个新的桶里面去,而每个桶里的数据呢,都可以分割成一个一个大小有限的分区文件,所以我们看这就真正意义上的实现了分布式的文件存储,所以这种方式天生就适合我们当前的并行处理,就适合写入到HTFS这样的分布式存储系统上去啊,所以在实际应用当中啊,基本上我们应该是用这种方式去写入到文件。所以接下来我们可以在代码里边来看一看啊,到底是怎么用的,那我们还是在当前的包下边去新建一个SKY的object,现在我们要测试的是think think to five。
04:13
Test。那方法先写出来啊,那同样前面的这一部分内容呢,我们还是可以照搬一下之前我们的这个设置啊,先创建执行环境,然后读取数据啊,现在我们要分布式的写入嘛,所以这个数据多一点啊,我们用之前这个reduce这一部分的数据,先把它拿过来。同样,引入之后,我们需要改成下划线,方便后边做影视转换。好,接下来呢,我们可以直接以文本形式。分布式的。写入到文件中,哎,当然了,我们现在如果要是不是写入到HDFS的话,其实就应该是分布式的很多个文件啊,分区文件,所以接下来我们可以直接来做一个尝试STEM啊,那当然了,最简单的方式是文本形式嘛,我们先做一个map转换,把它转换成string,这个也很简单,因为本身我们这个样例类它就有to string方法嘛,所以我们这里面只要调一下啊。
05:17
只要把它to string转换成string,然后去做一个at think写入就可以了,那这里边呢,我们需要去创建的就是一个streaming。Fair SK,好,那这里的泛型是stream,当前的数据类型是stream嘛,所以这是完全没有问题的,这看起来非常的简单诶,但是如果说我们点进去啊,看一看这个stream fair think的话,就会发现,首先我们看到啊,Stream fair think,它记什么字,Rich think function,所以这里边还可以有生命周期方法,可以获取运行上下文,另外呢,我们看implement,它还实现了checkpointed function接口。Checkpoint listener接口啊,那所以呢,它跟当前的checkpoint这个检查点,这就是跟容错机制有关,它有这方面的保证,然后接下来我们看它的构造方法。
06:08
我们看到stream fell s,它的构造方法是一个protected方法。所以我们在这里如果要去接创建啊,在外部要想去直接创建它的对象new的话,其实是不能成立的。所以我们应该怎么样去创建呢?啊,其实里边我们就看到啊,这里面本身它要传的是对应的builder,好,那自然就想到了,其实我们当前使用的这一个设计模式啊,应该是一个builder模式,建造者模式啊,那下面我们看看它的think builders有哪些,主要有两种。一种哎,叫做row format builder,另外一种叫做B。Format build啊,所以这对应的其实是文件底层的两种编码方式啊,那一种就是行编码roll format啊,另外一种就是批量编码啊,比如说park这样的格式啊,啊,那所以对于这两种编码呢,我们直接调用当前的这个静态方法,For row format或者for bank format就可以了。
07:17
就可以得到这样的一个format build,然后接下来有了这个建造者之后,那怎么样最后得到我们真正的string think对应的这个think function的对象呢?哎,那当然了,最后我们就是可以调用它的。Build的方法,这里有一个build,最后就会创建出真正的给我们创建出相应的对象实例,所以整体来讲,我们在这里使用的这个过程啊,就不是直接去new,而是要调用streaming。Fair s下边的建造者的方法,哎,那比方说现在我们就是行编码吧啊,For row format。然后里边这里需要传参,我们看这里需要的参数就是两个,一个叫base pass啊,那这个很简单,就是文件路径吧,我们总得指定当前的这个文件写到哪里去,注意这个并不是一个字符串啊,这是一个pass类型啊,我们要你有一个pass,然后另外呢,还有一个encoder,一个编码器,这就是我们当前数据的编码逻辑了啊,那现在我们既然已经转换成to string了嘛,转换成文本文字了啊,那显然我们直接使用一个最简单的啊,指定这个字符编码集就可以了,所以接下来我们在这里可以直接传入,这里需要去new一个pass。
08:37
那当然了,Pass里边直接传一个对应的路径,一个string就可以,比方说我们这里啊,就创建一个目录叫做output吧,直接把所有输出的文件写到这个output目录下边,然后另外还有第二个参数,那就是需要去创建一个encoder啊,这个非常简单,我们就直接用一个simple stringcoder就可以了。
09:00
里边需要有泛型,刚才就是string。里边给一个当前的字符编码及UTF8,这样的话我们就创建好了。如果我们觉得这个一长串看起来太不方便的话,我们也可以把它放在外边,这样看的会更加清楚一点。比方说我们定义一个当前的这个就叫做fair think。诶,那么我们把这个定义在这儿,注意接下来这还没完,当前我们只是得到了一个builder,接下来真正的fair think,那还需要再去调一个build的方法才能够得到,然后这里ADD think的时候,只要把fair think添加进去就可以了。好,那最后我们不要忘记env执行起来,当然了,我们不需要再打印了,因为已经有输出了。最后flink的程序的最后就是一步think操作,Print也可以,我们直接写入到文件,当然也是可以的。所以接下来我们就可以直接执行一下。
10:02
我们看一看效果怎么样。当前已经执行完毕了,控制台并没有任何的输出,因为我们没有print操作,那当前的输出在哪里呢?诶,我们可以去找上边多了一个output文件夹,那接下来我们可以看到哦,当前就多了一个。这样一个以part开头的文件,那我们可以看到当前所有的数据都以文本的形式写入到了这个文件里面去啊,而且这个目录是以当前的这个时间来作为名称的啊,那所以当前我们只有一个文件,那是为什么呢?那是因为我们的全局并行度是一啊。所以我们当前就是按照顺序一个一个读出来,然后一个一个写进去,只写入到一个并行分区里面啊,那当然就只有一个文件了,所以我们可以调大这个并行度,我们先把当前的这个output先删掉。然后调大并行度,重新做一个测试,做一个分区写入,看看效果会有什么不同。
11:04
运行好,现在已经结束了,我们看到。这里的output下边就变成了四个分区文件,这就做了一个四分区的并行写入,如果点开的话,其实我们会看到啊,当前一个Bo报B的数据,第二秒的数据和第六秒的数据,那其实我们会发现啊,这就是我之前我们说的这个轮询吧,啊,那假如说从这个开始的话,那就是123412啊啊,那按照这个顺序的话,前面这个1000它应该是在第四个分区,我们看看是不是。果然,第一秒的数据是在第四个分区一和四,它是在这里,所以这就是我们使用streaming file s去进行分布式文件写入的操作,在这里呢,我们还可以多做一点扩展,就是关于这里的stream file SK啊,我们直接调用for RO ma或者for bank format可以创建它的builder,那另外呢,在这个builder里边还可以调用一些方法。
12:03
比如说我们这里可以调用一个with,我们可以看到可以with rolling policy,这个rolling policy就是一个滚动策略,什么叫做滚动策略呢?其实这个概念在日志文件写入的时候经常会遇到啊,因为我们知道这个文件它是连续不断有内容要写进去嘛,所以诶,那这个文件如果要是一直写的话,内容会越来越多,那写到什么程度,我们就应该到此为止去开一个新的文件去写入呢?诶,那这个应该是有一个标准的,所以这就是所谓的文件写入的滚动策略啊,那一般这个里边可以设什么呢?我们可以看一下。文档里边对应代码的这种配置,比方说我们可以设一个with roll over t,哎,也就是说我们可以设置多长时间就滚动一次啊,就是我们当前收集数据啊,后面给了一个时间,我们看这是15分钟,所以说就是隔15分钟我们就开启一个新的文件去写入。
13:04
然后另外还可以配什么呢?With in activity in inter,不活跃的时间间隔是五分钟,也就是说如果有五分钟之内啊,没有新的数据写入了,这个文件一直没动,那即使当前没有到15分钟的间隔时间,我也可以直接开启一个新的文件去写入。啊,因为一段时间没有数据来的话,那可能我们当前的数据收集应该告一段落了啊,所以就可以重新打开,另外还可以设置最大的文件大小,比如说这里我们三个1024相乘,那也就是说达到文件大小达到一个G的时候,诶,那我们当前文件就结束,可以开启一个新的文件去接收数据了啊这是一些常见的滚动策略的配置,我们这里就不再详细说了,大家可以在实际的工作应用当中去进行自定义的设置。这就是关于分布式写入文件的测试。
我来说两句