00:00
除了把数据写入到文件之外,Flink当然还可以把数据写入到各种各样的外部存储系统,最常见最经典的当然还是卡夫卡。前面我们已经提到了,卡夫卡本身其实就是一个用来存储和处理流逝数据的消息队列,它是基于发布、订阅这样的一个流逝的消息系统,所以它跟flink天生就是一对,所以我们会发现在大数据的流逝处理过程当中,经常会把fli和卡夫卡作为整个架构当中不可缺少的一部分组件。那flink官方为我们提供了卡夫卡的连接器,这个连接器不光可以提供S端的支持,也就是不光可以从卡夫卡消费数据,还提供了S端的支持,也就是可以将我们flink处理出来的结果数据写入到卡夫卡对应的topic当中。这里我们会发现就是不光是仅仅支持了读写操作。
01:05
而且flink跟卡夫卡的连接器还提供了真正意义上端到端的精确一次状态一致性予以保证,所谓的AG ones状态一致性在这里得到了保证,在实际项目应用当中,这应该就是最高级别的状态一致性保证。关于这一部分内容呢,我们会在后边的章节,在第十章做更详细的讲解,那现在呢,我们主要就是要做一个输出到卡夫卡的完整测试。而且我们会发现,现在整个数据处理的闭环已经形成了,比如我们可以测试一个从卡夫卡去读取数据,然后再经过转换处理flink进行转换处理之后得到的结果再写入到卡夫卡的另外一个topic当中。啊,那。这样的一个操作相当于构建的是一个数据处理的管道,从卡夫卡进来,经过flink进行处理。
02:06
得到的结果又写入到了卡夫卡当中。那整体来看的话,好像数据在卡夫卡当中没有变化,但事实上内部已经经过转换操作,所以有时候也可以直接拿flink用来做一个简单的ETL操作,也是完全可以的。接下来我们就在代码当中做一个具体的测试,同样还是在当前包下边去新建一个Java类。当前是写入到卡夫卡think卡夫卡。前面的处理流程整体还是一样的,我们首先需要exception,然后将流式的执行环境引入。因为。不是正确性,我们还是把全局的并行度先设成一,接下来要读取数据源,那当前的数据源呢,前面我们已经实现过了,就是直接从卡夫卡里读取数据就可以了啊,这里我们直接可以做一个copy。
03:12
还是需要把对应的property定义出来。当然了,这里我们其实没有必要定义下面的这些东西,只要有前边的BOO STEM service,其实就可以把对应的数据从clicks这样一个topic里边读取出来了,所以这里是第一步。从卡夫卡中读取数据。接下来的第二步。那就是用flink来进行处理转换,因为我们发现当前从读取数据读出来之后是一个string啊,那我们其实是就是相当于把clicks这个文件里边的数据直接传进来了,那我们接下来呢,可以把它在进行提取包装转换,转换成一个我们想要处理的event类型啊,那所以在这个过程当中,这就是要用到flink的map转换操作了,所以我们这里要做一个。
04:09
用flink。进行。转换。处理。我们前面得到的是卡夫卡。接下来当然主要就是要做一个map了,我们可以直接一个map。为什么这里直接你有一个慢方式呢?因为考虑到我们需要对当前数据string数据要做一个切分,切分完了之后还要再包装成一个event,那这个至少要处理两步操作,所以我们还是写成一个卖方式看的更明显一点。那这里。里边关键点在于需要去实现一个map方法,我们先定义好,最后转换之后得到的数据类型是什么啊,我们想要的当然就应该是一个event的数据类型啊。
05:02
所以里面的map方法也是传入一个value,最后要得到一个event。里边的话,我们首先应该要针对当前的数据逗号分割得到每一个字段,那我们就用一个string类型的数组做一个保存吧,我们把它叫做当前的字段。基于value做一个split,按照逗号做一个拆分,那得到的结果呢,就可以直接new一个event了啊,那我们现在提取出来FIELDS0就是第一个字段user,当然了。为了去除前后的空格、空白字符,我们可以加一个方法。然后接下来第二个字段F1,这是URL,同样也是string类型,我们直接一下就可以了。最后一个字段第三个字段F2。本身得到了之后tri之后,我们知道它应该是一个字符串类型,但是我们想要的呢,是一个长整型的时间戳啊,所以我们还需要做一个转换。
06:08
long.value。把对应的F2TRI之后的结果传给浪点values做参数转换成长整型类型。这样的话,我们就得到了想要的结果。当然了,如果我们之后想要把得到的结果直接think到卡夫卡当中的话。结果数据。写入卡大。那写入卡夫卡最简单的方式很显然还是string类型啊,所以最好我们还是先把它再做一个to string的转换,然后再输出到输到卡夫卡里面去,所以接下来我们还是可以跟上一个to string,或者我们可以直接在这个map里边得到的就变成一个string也是可以的。啊,那这里最终得到的就应该是。
07:01
调用to string方法得到一个。String类型啊,那最终。我们可以把这个叫做。Result。最终得到的结果我们需要写入到卡夫卡当中,那就直接result.adds,这里面传入一个think function。很显然,卡夫卡的连接器已经帮我们实现了,而夫卡的连接器我们在读取从卡夫卡读取数据的时候。对应的依赖已经引入过了,所以这里面依赖也不用引入,直接用就可以了。之前我们a source的时候,传入的连接器里边提供的s function是flink卡夫卡consumer,一个消费者从卡夫卡消费数据,那现在既然是要写入卡夫卡,很显然就应该用一个。Flink卡夫卡producer啊,那对应的类型当然是string了,我们要把string类型的数据写入到卡夫卡里面去,那里边需要入什么样的参数呢?我们会发现Li producer它可以有各种各样的构造方法,我们可以传入不同的参数,最简单的形式,这里第一个我们可以传入三个参数。
08:13
两个string,第一个string表示当前的broke list,因为我们知道这就是最关键的一个参数嘛,有了这个之后我们连properties都不用去配置了,那另外呢,第二个参数就是topic ID啊,当前写入的主题名称,最后还有一个是civilization STEM,也就是类似于之前我们的编码incoder啊,相当于就是当前的数据类型,要怎么样进行编码处理之后才能进行序列化写入到卡夫卡当中,我们当前既然是string,那就又简单了啊,那接接下来我们要传入的其实非常的简单,如果还是卡杜102。的话,那么就是韩杜1029092,这是list。
09:00
然后接下来当前的。Topic,我们经过转换之后,其实得到的已经是event了,那原先是CS日志数据,那现在呢,是另外一条流,我们可以把它叫做events。最后还有一个当前的sIgMa啊,那其实就是你一个。Simple string scheme传进来就可以了,因为我们当前只是string类型字符串嘛,直接传入就可以。最后不要忘记还有一个Env.execute要执行起来,这就是我们进行从卡夫卡读取数据,经过处理转换之后再写入到卡夫卡的一个完整的数据管道构建的方过程。接下来我们可以做一个测试,那我们还是。首先看一下当前的,当前的卡夫卡是已经起着了啊,那所以我们可以直接运行这个代码,先运行起来。
10:00
然后接下来我们就可以在这里去把卡夫卡对应的producer和consumer都创建出来啊,那首先我们去创建一个。卡夫卡conso producer。那这里接下来需要传入的是杠杠list。哦,当前就是local host9092。接下来杠杠topic。Lips。这是当前我们定义的用户点击的日志事件对应的主题,然后从这里要取消费数据。然后除此之外呢,我们还应该有另外的一个,我可以用一个window。然后另外的一个窗口里边,我们打开一个同样的控制台可以。创建一个consumer。主要是经过flink处理之后,写入了另外一个topic,叫做events,然后我们想要消费看到events里边的数据的话,那应该另外创建一个消费者来考察它里面的数据,那所以接下来cons consumer。
11:17
接下来要传入的是杠杠bootstrap。Server。同样是localhost 9092杠杠topic现在叫做。Events。我们先把生产者和消费者都已经创建出来了,然后接下来我们可以输入数据看一看当前的效果了。那当前的数据的话,我们还是直接从这个CS文件里边去做一个输入吧。我们可以看到当前输入了一条Mary对于home页面的点击第一秒的这样一个日志,点击日志事件,那这里就直接输出了对应的event。
12:05
经过to string转换之后的信息啊,那同样接下来如果我们再输入一条Alice点击的。事件,那么接下来同样可以得到它转换成的,然后to string之后的内容。呃,所以所有的数据我们通过中间flink的转换处理,那么相当于就做了一个ETL。如果说我们还想做其他更加复杂的处理计算的话,当然也完全是可以的。所以在这个过程当中,Flink其实只是中间的一个处理器,我们从卡夫卡里边读取数据,经过flink处理之后,又写入到了卡卡里边,那整个我们的输入和输出全是卡夫卡,这就是弗林克跟卡夫卡连接起来的完整的测试过程。
我来说两句