00:00
我们已经了解了在flink当中怎么样去从外部系统读取数据,得到一个,这是由source算子去处理的。得到streamam之后,我们就可以调用AMAPI进行各种各样的转换计算,那这一部分就是所谓的转换算的操作了。经过转换计算之后,最终可以得到一个计算处理的结果,那终于就来到了最后一步,我们应该把计算的结果向外部系统去做一个输出写入。所以接下来这一节,也是最后一节,我们就来介绍一下flink当中的输出算子,也叫做sink算子。如果想要把数据输出写入到外部系统的话,那很显然我们关键点就是要跟外部系统做一个连接。你会发现,在fli当中,如果我们想要连接外部系统写入数据的话也并不难,因为所有的算子其实都可以通过自定义一个函数类来实现对应的处理逻辑。所以只要我们当前的外部系统有一个可以连接的客户端,那么我们就把这个客户端引入。在做完最后一步转换操作的时候,我们把这个客户端添加进来,然后调用它里边对应的写入数据的方法把数据写入就可以。
01:29
当然了,这里面可能涉及到一个创建连接的过程,我们不需要每来一条数据就在转换完成之后去建立连接,然后写入外部系统,那如果说我们希望一开始只建立一次连接,后面就持续用这个连接写入外部系统的话,诶,那也可以考虑使用之前我们介绍过的。函数类,诶,那就是在前面我们提到的udf里边最后一部分函数类,有一个非常常见的做法,就是我们可以在open生命周期里边做一些初始化工作,比如说直接建立一个到MYSQL的连接。
02:06
然后接下来每一个数据经过处理的时候,我们这里边自定义了一个flat map function,那么每一条数据都会调用这里的flat map方法,我们得到的最终转换的结果就可以直接调用数据库的连接写入到MYSQL里面去。啊,然后接下来如果所有的数据都处理完了,最后当前的作业要关闭了,那么我们就可以调用close方法,在这里就可以关闭到MYSQL数据库的连接,这个过程其实完全没有问题。而且非常的方便,我们在任何一个自定义的方式里边都能实现这样的一些操作。但是呢,在flink里边。想要做的是实时的分布式流处理,如果要做到实时,那会有另外一个问题,就是假如现在发生故障挂了,那怎么办呢?
03:01
如果说我们任何恢复机制都没有,也不保存当前的任何处理的状态的话,那很显然,如果我们处理连续不断到来的流失数据中间挂了的话。那就只能从头开始重新处理了,如果从头开始处理,那这个延迟肯定就会非常非常大了,所以这并不是我们希望看到,那就应该能够提供一个随时存盘的机制,随时去保存,然后一旦出现故障呢,就把当前已经保存好的,上一个正常的那个状态。直接恢复出来,回滚到之前正确的一个状态。哎,那如果要是我们希望做到这一步的话,那很明显我们应该对当前每一个任务处理的过程。处理到什么程度了?里边的任何一个状态都应该有所记录,有所保存。如果我们在每一个瑞士方式里边去做这种自定义的读写外部系统的话,那发生故障就很难能够直接回退到之前的状态。
04:11
那在flink内部呢?它其实给我们提供了一个叫做一致性检查点,也就是checkpoint这样一种机制,来保证内部的状态一致性,发生故障之后可以回退到之前正确的状态。但是对于像外部系统的写入怎么样去保证呢?所以flink专门。Data最美API当中。定义了这么一套写入,向外部系统写入数据的方法。为了将数据向外部系统的写入也纳入到内部状态一致性的管理当中,Flink专门把单独向外部系统写入这个操作抽象提炼成了最后的这一步think任务啊,所以他的data stream API专门提供了一个向外部系统写入数据的方法,就叫做addd think。所以整体来讲的话,这就跟前面的S任务可以说是完全对应,而且是对称的。前边我们可以基于env去ADD source,那最后呢,我们就可以再基于当前的dataream去ADD。
05:22
啊,那同样类似的,之前的source里边要传入的是一个S方式,而这里边我们要传入的就是一个think方式。Think这个含义呢,它本身在英文里边有下沉的意思,所以有时候我们可以想到啊,就是当前的这个数据流,如果把它想象成一个水流的话,那到这儿的话,相当于就要下沉,就要落入到最终的存储空间里面去了,那所以有一些资料里边可能会相对于S,如果翻译成数据源的话,源头的话,就把think翻译成数据汇,最后汇合汇入的一个地方,我们这里的话,不管怎么样吧。
06:03
它其实代表的都是我们最终把数据结果收集起来输出到外部系统这样一个含义,所以我们这里就统一直接把这个think算子叫做输出了,那接下来我们就是要看看到底怎么样在flink当中输出到不同的外部系统。需要说明的是,之前我们在代码里边一直在调用的print方法,最后一步打印到控制台,它本质上其实也是一个输出,因为我们之前看到了完整的一个。Flink代码的定义应该包含这四部分,当前的执行环境,数据源转换,然后输出。诶,那我们当前可能想到中间转换有可能可以缺省,但是这个数据源和输出一定是得有的呀。那之前的输出到底在哪里呢?就在print,如果我们点到源码里边就会发现它本质上就是创建了一个print think方式。
07:02
然后做了一个ADDS这样的一个操作,把当前的这一个think function添加进来了,作为参数调用了ADD think方法。那那在这个过程当中,这个print function到底又是个什么东西呢?它本质上。其实扩展自己就是继承字rich think方式。顾名思义,Rich think function就是think function对应的函数类啊,那它本质上跟rich s function是类似的,它是继承自abstract rich function,它是一种函数类,然后另外呢,又实现了think function接口,所以本质上它还是一个think function,我们现在要想实现的这个think function里边关键的抽象方法就是一个。Invoke,当然我们最终要实现的就是这样一个invoke方法,它里边传入的参数呢,一个是value,当前think任务接收到的数据,最终我们想要写入到外部系统的数据,另外还应该有一个当前的上下文,所以他能做的事情还是会更多一点的。
08:11
那当然了,对于think function而言,很显然函数类嘛,所以它又可以调用对应的生命周期方法,Open close是可以有的,另外还可以获取当前的运行时上下文,运行时环境,可以去定义状态,做各种各样更多的操作。那正常情况下,我们在代码当中如果想去手动的添加一个S任务的话,那就是要调用,这里我们可以看到。调用data stream对应的at s方法里边传入一个自己实现的think就可以了。那如果要是自己手动实现,要去向外部系统去进行连接,然后还得考虑如果发生故障的话,怎么样回滚,怎么样保存当前的一些状态,这个很明显就会比较复杂。
09:02
为了广大的程序员考虑,Flink当然就要官方给我们提供很多常用的外部系统的连接器,就像S那边一样,SK这一端也要提供对应的连接工具。所以我们可以看一下当前flink对于哪些外部系统提供了连接。这是当前版本的官网上支持的第三方系统连接器啊,那这里边目前支持的第三方系统连接器主要就有这些,这里边整体来看的话,可能有两大类啊,那一大类就是像卡夫卡这样的流式系统。消息队列,那这里边后面括弧SS就是表示。当前卡夫卡的连接器提供了s function,可以用来读取卡夫卡的数据,也提供了think function,可以用来向卡夫卡去写入数据啊。那另外一大类呢,可能主要的就是think了,就是很多系统,比如说像elastic search,我们非常著名的ES,那或者说像文件系统file system,那主要就是一个写入或者GDPC啊,要往这个myq Oracle之类的数据库里边做写入是可以的,如果要读取数据的话,那我们知道这里边明显存放的不是流式数据,所以是没有办法作为数据源去读取的,而只能作为最终的结果的写入目的地。
10:29
由think任务去向外部入,那当然了,有一些比较特殊的,像这个推的stream API,它明显就是一个的接口,那只能作为数据源来调用,这个比较少见。除了这些官方给出连接器的这些外部系统之外,还有另外一个项目,阿帕奇的巴here项目,它主要就是用来为Spark或者flink提供一些扩展的支持的。那么在这个项目当中呢,也已经提供了一些fli跟第三方系统进行连接的连接器工具,那比如说这边有flu red、阿卡na等等等等。
11:08
所以如果说我们在flink官方已经提供了相关的连接器,那我们这里边ADD。或者ADD think的时候,就可以直接把对应连接器给我们提供的s function think function传入就可以了,那如果说官方没有提供,把黑尔提供的话也是一样的。如果说都没有提供的话,那就没有什么好说的,只好自己手动去自定义实现一个think连接器了啊,里边关键点就是实现对应的think方式。这就是关于flink连接到外部系统的一些基本知识。
我来说两句