00:00
我们已经了解了怎么样将flink处理的结果数据写入到文件系统当中,那接下来呢,我们还要去展开去讲解一下怎么样写入到其他的大数据组件中。首先最为经典的当然就是flink的好伙伴,那就是我们说刘氏处理里边的双子星之一的卡夫卡。我们知道它作为一个分布式的消息队列,本身卡夫卡处理的也是流失数据,所以说跟flink的连接可以说是自然而然,非常默契的。前面我们提到从卡夫卡去读取数据源的时候,哎,那就是可以直接引入弗林克官方给我们提供的卡夫卡连接器啊,那在这里呢,是有一个link卡夫卡consumer,它帮我们实现了。Reach parallel source function啊,那么放在这里我们就可以直接连接到卡夫卡去读取数据源了,接下来呢,Link进行转换处理,那最后得到的结果能不能再写入到卡夫卡当中呢?当然是没有问题的。之前我们也看到了,对于。
01:05
目前弗link支持的外部系统卡夫卡S和think两端都是完全支持的,而且卡夫卡跟弗link之间的连接还不仅仅是支持了读写操作啊,它还支持了所谓的精确一次的状态一致性保证,那就是exactly one,这是最高级别的状态一致性保证啊。这一部分呢,我们会放在第十章讲到容错机制状态一致性的时候再展开去讲解啊,反正我们现在有一个认识,就是弗林克跟卡夫卡他们特别的搭,特别的配,他们就适合连接在一起去构建一套流失处理系统啊。所以接下来呢,我们自然是要说一说怎么样输出数据到卡夫卡里面去。那如果说想要写入数据到卡夫卡的话,我们自然知道了,肯定还是需要引入卡夫卡相关的依赖的啊,那卡夫卡的依赖呢,就是flink卡夫卡的连接器那个connector,那之前我们在读取数据源的时候已经引入过了,现在就不需要再重复引入了,那然后接下来呢,我们就可以去写一段代码引入一个,这里注意之前我们是。
02:15
那现在要引入的当然就是一个producer,因为我们是要写入到卡夫卡当中,就应该是创建一个生产者把数据写入到卡夫卡当中去啊,那所以接下来的这个操作可以说跟之前的SS那一部分是完全对应的,基本上也是完全一致的啊,所以接下来我们可以在代码里边去做一个测试。那就是新创建一个object,现在是think to卡夫卡test。同样的,我们这里还是没方法先写出来,首先引入流式的执行环境,Stream execution environment,我们把它引进来,Get。啊,当前这个我们还是叫做烟V,上面记得把这个替换成下划线啊,同样还是全局的并行度,我们先设置成一,然后接下来呢,我们干脆就读取文件数据吧,之前我们不是有一个clicks.TXT这个文本文件吗?我们直接读取这个文本文件里面的数据,然后直接把它写入到卡夫卡里面去啊,那这样的话,文本文件读出来都是string,这个转换也比较简单啊,这个就省去了很多中间步骤,我们可以。
03:25
读取文件数据。这里就是env,我们知道read file。直接传入当前的路径,那是input。CS.txt。得到的这个我们可以直接叫做stream,然后接下来stream,那就是要将数据写入到。啊,那这里我们stream直接调用一个,哎,那是通用的ADD think方法里边,那就要new一个对应的think function,那这里的s function就是连接器给我们提供的flink卡夫卡,注意现在是producer。
04:08
Producer stream当前所要写入的数据是字符串类型,然后我们得看一下这个producer里边啊,需要哪些对应的参数,我们可以看一下。首先我们可以看到当前的flink卡夫卡producer,它本身实现的是一个to face commit s function,好,那我们看到它首先是一个s function啊,这是实现了s function对应这个接口的。另外呢,它还是一个。两阶段提交的think function。诶,这就是我们所说的这个to PC啊,两阶段提交,它是对状态一致性达到精确一次保证的一个有效的方法,诶,所以啊,我们可以看到啊,弗尼克跟卡夫卡之间的连接,它通过这种方式就实现了真正意义上啊,最高等级最高级别的状态一致性保证啊,这个我们到后边还会去展开去讲解,那这里的弗Li卡卡producer呢,我们得找一下它的构造方法,里边到底传什么参数,我们可以看到啊呃,这里做了很多种情况的重载,最为简单的最上面我们看到就是传一个。
05:15
Broke list啊,当前一个STEM类型,当前的这个集群的broke list,然后一个string类型的topic ID,我们要写入的主题,另外还有就是cization STEM定义了我们做序列化时候啊数据的基本结构,所以接下来我们在代码当中,如果说想要去传递的话,诶,那可以前面啊什么样的配置都不做,直接在这里。写啊,那如果还是哈杜102的话啊,那我们依然是哈杜1029092,然后第二个参数是当前的topic啊,那我们写入的话,这个我们就叫做clicks吧。那最后还有一个对应的STEM码的配置啊,那我们直接因为是字符串类型嘛,这个直接写就可以了啊,没有对应的这个数据类型解析啊,所以我们直接使用simple STEM STEM放在这里就可以了。
06:10
那最后这个已经做完写入了,最后不要忘记env execute执行起来,这就是我们完整的一个流程,所以可以说是非常的简单啊。啊,那接下来我们可以去测试一下,这个测试呢,当然是要求我们在哈杜102那边需要把卡夫卡要起起来啊,之前我们这里是已经起着了,而且我们这里边还有一个已经创建好的producer,当然了,现在这个没什么用啊,我们又不从卡夫卡去读取数据,比方说我们现在不想停这个producer的话啊,我们可以另外去新建一个terminal的页面。放大一点,然后接下来我们可以去在这里创建一个consumer,因为接下来我们的测试操作是在flink那边去产生数据,然后利用生产者把它写入到卡夫卡里边来,那我们要想看到数据的话,还应该有一个消费者啊,所以这里边我们主要是为了看到测试数据啊,啊,那这里边我们就是直接调用并下边的。
07:08
卡夫卡。Console consumer,然后杠杠。Bootstrap server。哈杜OP102,当然这里写local也可以9092,然后接下来杠杠topic。现在我们定义的主题是clicks,好,直接把它创建出来啊,当然了,其实我们不先创建这个消费者啊,直接运行这里程序也是可以的,因为那边卡夫卡都已经起了嘛,我们这里直接去写数据。看一下这边的运行情况。我们看到这边已经运行完毕哦,那这边所有的数据就都已经写进来了,所以这就完成了写入到卡夫卡的一个测试。当然了,如果说我们还想进一步测试的话,我们还可以啊,做一个比较好玩的事情,就是flink去读取卡夫卡里边的数据源,然后呢,经过flink的处理转换之后,得到的结果再写入到卡夫卡里边来。
08:08
诶,这有什么意义呢?诶,那其实我们想到啊,这里我们可能不做任何的转换,但事实上我们完全可以在弗link里边经过处理转换,经过数据清洗提取,得到自己想要数据的时候,然后再把它写入到卡卡里边的另外一个topic里边主题里边来,所以这其实就起到了一个。EL的效果啊,就是我们所说的啊,对数据进行提取清洗啊,这样的一系列操作,转换操作啊,这里我们也可以简单的去测试一下,那这个测试的话,我们的代码就需要去进行一个更改了啊,这里我们可以直接去抄之前我们这里的这个,这里我们用了proper,直接把上面这一部分全部copy过来。上面的这个stream我们就可以住掉了。当然了,这里我们可以发现,读取卡夫卡数据的时候,我们读取的主题是clicks,而后边写入的主题呢,也是同一个clicks,这个显然是不对的啊,所以我们当前应该把这两个主题要区分开啊,那这里我们还可以做更多的操作,比如说啊,当前我们从卡夫卡这里读进来的clicks,可能是之前文本文件里边类似于这样的一些数据逗号分割的用户点击的日志数据,而现在呢,我们可能希望对它进行一个包装转换。
09:29
比如说哎,那我们现在就可以直接给它做一个map转换,每来一条数据之后,我们可以将它包装成一个event事件啊,那当然了,如果想要做这个操作的话,那还应该先要把当前的数据做一个拆分,我们知道所有的数据都是以逗号进行分割的,所以接下来呢,我们就可以按照逗号去进行分词,把得到的每一个字段提取出来,包装成一个event,然后返回就可以了啊,所以接下来我们应该先要把当前的字段都切出来,我们可以把它叫做FS。
10:07
Data调用SP方法去做一个切分,分割符是逗号,然后接下来呢,呃,最后返回的就是包装好的一个event对象,这里边首先第一个user字段,那应该选择fields里的第一个字段,那当然就是FIELD0啊,那有空格的话,我们应该做一个trip,它本身就是一个string,那所以我们不需要做任何的转换啊,直接放在这里就可以了。然后同样第二个字段URL,那是F1。我们做一个tri,最后一个是时间戳F2,这是一个长整型的字段,所以我们tri之后还需要对它做一个too long的转换啊,那这样的话,得到的数据类型就变成了event类型啊,那当然了,如果说我们这样去定义的话,那得到的stream就不再是一个data stream stream了,而是一个data stream event啊,那假如说我们不想让后面这个解析过于麻烦,我们还是想把它转回stream做一个打印的话,那就后边再去map。
11:10
调一个to string方法就可以了啊,甚至我们还可以直接不要那么麻烦啊,直接在这里调一个string方法,返回一个字符串也是一样的啊,那所以这就是我们整个处理的过程,现在的话,我们得到的这个就不应该叫做clicks了。得到这一个结果呢,那就是一个一个包装好的event,所以我们可以把它叫做events,对应的主题我们也叫做events啊,所以接下来我们做测试的时候,哎,我们当前创建一个主题叫做clicks,那么创建一个producer,在这里产生数据。弗link从卡夫卡clicks这个主题里边读取数据,然后进行map转换处理,最终把得到的结果数据写入到ES这个。主题里边啊,那所以这个过程我们看到这就是标准的一个ETL的过程。
12:02
所以整个这个构建一个数据管道啊,进行这样的一个处理还是非常有意义的,接下来我们可以来做一个测试,卡夫卡那边已经启动了,所以接下来呢,我们可以直接把当前的flink代码先运行起来。运行起来之后,这里不会有任何的输出,因为我们当前是一个是一个无限流嘛,我们当前还没有数据进来,所以应该是不停的等待啊,等来了数据之后来一个转换一个输出一个,然后接下来呢,我们自然就可以到。好,102,这里来把当前的producer和consumer都提起来啊,那我们首先当前的producer跟之前还是一样啊,Topic就是clicks,我们可以直接把它先启动,然后当前的consumer呢?那我们需要更改一下当前的topic events。把它创建出来。然后接下来我们就可以在这里边输入一些数据了,比如说我们现在要输入的是某个用户marry的一个点击数据啊,Marry。
13:01
点击了后。这是在第一秒的时候点击的。然后我们可以过来看一下,这里的输出就不是原样输出了,而是已经包装好的event啊,所以我们看到这就是一个数据ETL的过程啊,那同样这里如果我们输入一个Bob的一个点击事件,他点击了cart页面,然后这是第二秒的输入,那同样我们可以看到这里提取出来一个event,这是Bob的一次点击事件。这就是读取卡夫卡数据又写入到卡夫卡当中的完整的测试过程。这是一个数据管道。
我来说两句