00:00
后我们再来介绍一下自定义think输出,前面我们已经介绍了各种各样的输出到外部系统的方式,那总结起来不外乎就是flink或者把here已经帮我们实现了对应的连接器,连接器里边提供了think function,我们直接把对应的think function对象实例拗出来,填到I think方法的参数里面就可以了。那我们就会想到了,假如说我们当前想要连接的存储系统flink并没有提供相应的连接器,But here里也没有提供,那这个时候应该怎么办呢?那我们就知道了,那就只有自己去实现一个think function,或者我们实现一个对应的函数类think function,那实现之后接下来把它传入到think方法里边作为参数就可以了啊,那这里边我们就会发现啊,比如说常用的一些大数据组件,目前flink还是不支持直接向他们输出数据的,比如说H,我们常用的这样一个。
01:07
列存储的数据库,哎,那这个时候假如说我们最终的结果想要写入到h base该怎么办呢?那就只能自己定义think,所以这里边就是一个自定义s function的应用实例。前面我们也已经提到了,在实现think function的时候,最关键的地方其实就是要重写一个invoke方法,这个方法里边我们就可以实现怎么样把数据包装好,然后发送出去。那对应的如果说我们想要建立到外部系统的连接的话,这个连接不需要每来一个数据就去连接一次,那可以把它设定在open生命周期里边做一次连接,在close生命周期里边做一个关闭,而VO方法是每来一条数据都要调用的。这就是完整的一个定义的过程,那对于自定义function而言,这里需要注意的是,我们一般情况简单实现的话,可能只有连接到外部系统把数据写入的这个过程。
02:12
我们没有办法考虑到发生故障的时候,我们到底要怎么样从原先的状态里边去做恢复,怎么样做状态一致性的保证,这一部分flink官方为我们提供的连接器是全部考虑到的,但是如果我们自己实现的话,这一部分可能会非常非常的复杂。所以在一般情况下,我们不要自己去定义think,因为最终可能会无法保证结果的正确性,发生故障的时候没有办法正常恢复啊。那当然了,如果说当前的需求就一定要让。我们写入到一个没有官方连接器的一个外部系统去,那也就只好退而求其次,在牺牲状态一致性的前提下,我们实现具体的功能,把数据写进去就可以。
03:02
所以写入H的这个例子,我们就不在代码当中再去做详细的实现了,我们只是简单的看一看这里边的处理流程是什么样就可以了啊,那首先还是需要先去导入依赖,这里既然没有连接器,那我们怎么还要导入依赖呢?啊,那很自然,因为我们要连接到外部系统,当然对应的客户端啊,或者说读写操作的一个连接器还是需要有的,比方h base client,把客户端引入。那接下来要做的操作呢?很显然就是直接去new一个function,因为我们可能需要生命周期,在open生命周期里边连接到h base,所以看到这里边我们就用到了一个open生命周期,在里边创建了一个到h base的连接的配置con configuration,然后接下来呢,在里边设置了各种各样的连接配置参数。
04:00
这样的话可以连接到h base。后边的invoke方法里边每来一个数据都会调用这个方法,而最后我们想要把得到的数据进行写入的时候,那就要调用。本身给我们提供的那些读写操作的接口了,比如这里读写h base数据的时候,我们首先先用get table方法把当前的表先拿到,然后接下来呢,去指定rocket,指定列名啊,接下来把对应的数据去put进去执行写入操作,最后再调用一个close方法把点关闭啊,那对应的close生命周期里呢,那就是直接把当前的连接关闭就可以了。所以整个的处理流程还是非常简单的,那在实际应用场景下,我们能用官方提供的连接器,那还是要使用官方的连接器,那这样的话对于状态一致性、容错性的保证会更好一些。
我来说两句