00:00
之前我们所实现的写入数据到外部系统的这个操作,其实都是基于弗link给我们提供的官方连接器的啊,那即使是像瑞啊官方没有给我们提供连接器,那把here这样的项目也给我们提供了对应的连接工具,所以说如果有现成的连接器的话,我们直接把它引入,就帮我们实现了对应的C方式,那假如说我们现在这个存储设备比较特殊,官方并没有给我们提供连接器,那这个时候怎么办呢?哎,当然我们知道啊,既然它有通用的写入到外部存储系统的接口啊,S function,那么我们只要实现这个接口,其实就可以实现一个自定义的输出了,跟我们之前讲到的这个source读取数据源一样啊,S那边我们可以直接at s,然后自定义实现一个S方式,那同样这里我们也是可以直接ADD think,然后去实现一个自定义的think方式就可以了。啊,当然了,如果说我们想要扩展功能的话,也可以实现think function接口所对应的函数类,那就是reach think function,它的特点就是还增加了open close生命周期方法,另外还有可以获取运行时上下文里面的很多信息。啊,那其实在实际应用的过程当中,可能这个rich s function会使用的更多,因为我们知道在这里可以直接在open生命周期里边去创建一个到外部存储设备的连接,然后接下来呢?啊,那就是我们必须要实现一个关键的invoke抽象方法,这个抽象方法其实就是来一个数据就会调用一次,那我们在这里边呢,就实现对应的逻辑了,就把这个数据包装包装,然后写入外部系统就可以了。
01:39
所以这个过程呢,其实跟我们前面所说的负函数类的典型应用其实就是完全符合的啊,那接下来呢,我们就以h base作为例子,因为当前的版本flink并没有提供h base的官方连接器,哎,那所以如果我们想要把数据写入到h base当中呢,就只能自己去实现一个方式接口了啊那呃,其实我们知道啊,对于弗link而言,它还是在快速的发展过程当中,很多常用的工具,其实慢慢的flink都会扩展出对应的官方连接器,那如果有官方的,当然我们直接用就好了,不要自己去重复造轮子啊,而且自己去实现的话,有很多东西可能我们考虑不到,呃,就像前面我们所提到的,怎么样去保证它这个发生故障之后能够正常的恢复呢?怎样保证它恢复之后的状态一致性呢?呃,这些官方都会给我们考虑到,所以我们最好还是去使用官方给我们实现的东西啊啊,所以这里我们就不去完整的写这段代码了。我们就。
02:40
来大概的看一下这个流程是什么样就可以了,其实整体来讲也非常简单啊,那首先我们如果要写入h base的话,肯定要引入相关的HP客户端的依赖啊,H base client把这个要引入,然后接下来呢,我们在代码当中的逻辑,诶,那首先还是先去创建执行环境,然后读取数据,读取之后直接点ADD think里边传入的就是自定义实现的一个reach think方式。
03:08
然后我们这里的最佳实践当然就是在open生命周期里边要去创建一个到h base数据库的连接,我们看啊,前面定义这个对应的属性啊,我们这里边有HP con,另外还有一个connection,然后在这个open生命周期里边,主要就是创建了这样的一个connection。然后呢啊,Inro方法是最重要的处理逻辑,我们看到这里边它有两个参数,一个value string类型的value,这其实就是我们当前的每一条数据嘛,啊,因为我们是from elements,直接读了两个字符串进来,然后接下来另外还有一个就是contact,是当前的上下文,所以接下来我们要做的操作其实也很简单,首先获取到当前表的信息,然后接下来呢,指定rookie,然后指定当前的列名,然后把所有的数据都放进去,At column,然后接下来执行一个put操作,把它写入,那完成之后呢,我们可以把这个table close掉,最后在close生命周期里边,当然就可以关闭整个到数据库的连接了啊,所以这个过程其实还是非常简单的,感兴趣的同学可以自己去做一个测试,我们这里边只是了解这个过程就可以了,相信在之后的版本当中啊,像HP这样的大数据组件,Flink肯定会直接给我们提供相应的。
04:28
官方连接器的,我们其实没有必要自己去实现这一部分,所以这就是关于flink输出数据写入到外部系统的过程。
我来说两句