00:00
要把数据写入到外部系统,那最简单的写入当然就是直接输出到文件了。之前我们在讲到function的时候,其实已经知道怎么样去从文件里读取数据了。在当前的执行环境stream execution environment里边有一对应的方法,就是直接read file或者是read text就可以直接读取文本文件或者是一般化的文件。啊,那对于这个写入呢,其实也是完全类似的,在flink data stream API当中,我们可以看到基于data stream可以去调用。Right as csv或者as text,也就是直接把当前的数据流保存成CSV文件或者TXT文件。这种方法可以把数据直接写入到文本文件当中,但是这种方法我们看到已经要被弃用了,主要就是因为它是不支持同时写一份文件的,那所以相当于在最后一步think任务并行度就已经失效了。我们只能把最后一个think操作并且都设成一,这样才能保证所有数据串行写入到同一份文件当中。
01:20
这就相当于大大的把我们整个系统运行的效率拖慢了。另外,对于前面我们提到的如果发生故障之后要做回滚,要保存状态恢复出来,保证状态的一致性这一部分,Right as csv和as text都是无法做到,所以这些简单的方法调用都已经要被弃用了。那目前版本当中要想写入数据到文件系统应该调用什么方法呢?那就只能调用最一般化的。ADDS方法里边传入一个think function,当然了,这样的一个写入文件的s function flink底层已经帮我们实现了,相当于flink帮我们提供了一个连接到文件的连接器,这个连接器叫做。
02:09
Stream fair think。那这样一个stream file think,字面上含义就是这是一个流式的文件输出的类,那么本质上来讲它是一个rich think function,这就是我们说的它是一个think function,另外它还实现了checkpoint function checkpoint listener2个接口,这就集成了flink当中的检查点。checkpoint机制能够。保证发生故障之后可以恢复到之前的状态,而且保证状态的一致性。那stream file thinkk这个类其实为批处理和流处理都提供了统一的一个SK的接口,它是可以将分区的文件写入到flink支持的外部系统当中啊,所以它的这种实现就会比简单的right as csv right s txt这样的方式要复杂的多。它的底层原理其实是将数据要首先写入不同的桶bucket。
03:16
每个桶里的数据就可以被分割成一个个大小有限的分区文件,那这样一来的话,就实现了真正意义上的分布式文件存储。所以我们知道,对于HHT这样的经典的分布式文件系统而言,显然用这种方式就可以进行对应的输出写入了。那默认的分筒方式呢?本身是基于时间去做分筒的,比如说我们每小时就写入一个新的桶,那就是当前,每个桶里边保存所有的文件都是过去一小时之内的输出数据。在这个类里边,如果我们仔细看的话会发现啊,它本身是一个think方,那如果我们调用ADD think方法的时候,是不是直接new一个传进去就可以了呢?诶,那要看它的构造方法了,结果我们会发现当前的构造方法是protective。
04:12
哎,那当然了,不能直接调用,怎么样去创建一个当前的实例呢?哎,那我们可以看到它里边提供了不同的build。所以很明显你使用的是一个builder设计模式,也就是所谓的建造者或者说构建器这样一个模式啊,那这里边主要的builder有两类,一类叫做bunk。Format builder,另外一类叫做roll format builder,我们对应呢,应该先创建出相关的builder,然后调用里边的build方法,就可以得到对应的一个streaming s的实例了。那这个过程我们知道主要是在builder里边能够去配置各种各样的参数,去指定当前对象里边的每一部分的分别构建的过程啊,所以使用起来会更加的灵活,更加的方便。那当前我们这两种不同的builder又有什么区别呢?这主要指的是文件写入时候的编码方式有所不同。
05:20
那首先我们可以看到。Roll form builder,它就是行编码,诶,那行编码的话,就是我们最简单最常见的这种编码方式,所有数据按照一行一行的形式直接写入到文件当中,那另外还有一种呢,叫做b form builder啊,其实我们知道对于杜op生态里边的很多组件而言,所使用的。数据编码格式是party啊,就是这样的一种列存储编码格式,它其实就是属于这里的批量编码格式。我们在希望写入分布式文件系统的时候,主要就是要根据需求创建一个对应的build,然后给出相关的一些配置参数,再调用一个的方法就可以实现对应的功能了。
06:10
接下来我们就在代码里面去做一个具体的测试。首先我们有一个job class,现在我们主要是要测试写入到文件的过程,所以是to test。那整个的测试框架还是类似里边我们首先要创建。执行环境。V。还是先把全局的并行度设成一,那下面的话我们需要读取数据,接下来我们还是直接借用之前定义好的测试数据from elements,把所有用户点击的事件event读取进来。接下来我们就可以写入到文件里去了,那这里面涉及到一个问题。我们想要写入文件的话,那应该要调用ADD think方法,那这里边我们需要去实现一个think function,这个function呢,那需要构建。
07:10
Streaming fair s,它的一个对象就可以填入到这里,作为think方式输入啊,那这里面关键就是怎么样去构建一个streaming fair s。首先可以调用它的for RO format或者for bank format方法得到一个default builder啊,那接下来当然调用这个build build方法就可以创建它的实例了啊,我们现在的话,简单的数据当然是使用行编码就可以了,就for format,而且我们注意这个方法呢,前面还有类型参数,所以这里边。指定的就是当前think任务要处理的数据类型,而里边要传的参数呢,主要有两个,一个就是写入的。文件的目录名称,另外还有一个code编码器啊,那编码器的话,当然就是说我们要写入文件,肯定要做一个序列化嘛,那当前的数据类型如果比较特殊的话,至少我们得定义对应的序列化方法,把它进行编码转换,呃那呃,当前的最简单的定义方式当然就是。
08:19
当成string直接写入到文件,当然最最容易了,所以这里边我们可以把当前处理的数据类型直接定义成string类型参数,然后里边的话先给一个目录,我们现在就直接在项目根目录下边加一个output文件夹。当然了,这里需要的是一个类型。Pass类型,而不能直接传一个string类型的目录,所以我们需要new pass。把对应的字符串作为参数传入。我们这里需要引入的是flink Co FS下的pass。然后第二个参数,那就是需要去new一个对应的code了,现在既然是string类型字符串,这个就比较简单了,Simple string code直接把它写出来,里边呢可以传对应的一个字符集啊,那最常见的当然就是UTF8。
09:15
这样就把当前的。Stream think,它相关的RO builder定义好了,那最后如果说我们想要得到。真正的。Stream的对象的话,还要调用view的方法。这个得到的是一个。在中间可能可以调用各种各样的方法去指定一些配置的参数,我们可以看到有各种各样的with方法,最常见的就是with rolling policy。指定一个滚动策略,因为我们知道写入文件嘛,而且是流逝的写入文件啊,这样一个概念,其实在呃日志里边,日志文件里边是经常可以见到的啊,因为对于日志而言,可以看作就是一个流逝数据的不停的写入,如果要不停写入的话,文件的大小是会不停增大的。
10:13
那如果无休止的增大下去显然是不行的。所以我们应该定义一个策略,达到满足什么条件的时候,当前的文文件就可以结束了,就可以直接归档保存起来了,不再写入新的数据了,而开启一个新的文件,接收接下来到来的新的数据。啊,那这里面主要定义的策略呢,包含的内容就是主要就是。时间,一个就是多长时间范围内我们开一个文件,接下来超过这个时间范围内之后,我们就直接打开一个新的文件,那另外还有一个主要的策略,就是考察当前的文件大小,超过多少字节我们就开启一个新的文件。啊,那所以这里面可以举一个具体的例子,就是在当前这个策略里边要传入的呢,是一个rolling policy啊,那在flink里边给我们已经提供了相关的default。
11:09
默认的rolling policy,那么它里面同样也是使用builder build方法来去进行一个构建的,啊,也是这样一个builder设计模式。那么在它里边同样也是基于with这样的一些方法调用去配置相关参数的,那这里面最关键的with呢,我们可以看到可以指定当前的max最大的大小,按照大小来定义啊,比如说我们当前如果说。文件达到一致就进行归档保存,然后开启下一个文件的话,诶,那这里需要传什么呢?要传的是一个长整型的size,那这长整型数量的单位就是字节,所以如果是以G字节的话,传入的就是1024乘以1024乘以1024。这个其实非常简单。那另外我们看到还可以定义一些时间相关的参数,比如说with,说over in,就是隔多长时间,时间间隔就要滚动一次,那当然就是要开启一个新的文件了啊,这就相当于是我们滚动的整个周期,那这里要传的也是一个长整形的数量,这个数量是时间的毫秒数,时间间隔的毫秒数。
12:26
那所以这个一般我们的时间间隔肯定都是要达到分钟级别啊,比方说我们隔15分钟滚动一次。直接把它转换成毫秒的话,可能会有点儿麻烦,代码的可读性会降低,所以我们可以借用。Time unit。这个类里边的一些方法来进行处理,比如说我们定义minutes。我们可以two millions,把它转换成一个长整形的毫秒数,里边传入对应的数据,比方说15分钟。除了这样一个滚动的周期之外,另外我们看到还有一个位子。
13:05
可以定义一个当前不活跃的间隔时间,也就是说隔了多长时间没有数据到来,我们一直没有做任何的写入,这个时候我就认为当前文件已经结束了,可以直接归档保存,然后接下来的数据写入到下一个文件。啊,那当然了,这个时间间隔呢,一般比上边的整个滚动周期要稍微小一点,比如说我们可以隔五分钟做这样的一个判断,假如五分钟没有数据来的话,那么就直接开启一个新的文件做写入。那下边我们就可以直接把对应的streaming写进来了,但是现在类型是不匹配的,因为本身stream里边的数据类型是类型。而我们现在这个s function呢,它要接收处理的数据是string类型,所以在这中间我们还应该有一步转换操作,哎,那这个我们可以直接做一个map转换了,就把当前的每一个data。
14:08
调用to string方法做一个序列化,然后调用点ADD think传入我们指定的stream file,这样的话就可以了。最后不要忘记加上。En nv.XQ执行起来就是我们整个定义的一个过程。当然了,现在整个并行度是一,可能看不到分区写入,并行写入的这样一个效果,那所以我们可以把它调大一点,并行度调为四,可以运行一下,看看得到的结果是什么样。好,这里控制台没有任何的输出,因为我们这里没有做print,而是把它直接写到文件里了。我们看到当前目录下边多了一个output文件夹,里边呢,又有一个以当前时间作为名称的目录啊,那我们说默认的分筒策略就是按时间来分的嘛,哎,那当前我们看到下边就有四个不同的分区文件里边点开的话就会发现是我们当前的。
15:13
所有数据平均分配到了四个分区文件里面做了写入,这就是flink写入文件系统的过程。
我来说两句