00:00
那么好的,那我们整个吉他诺的这个框架,其实基本上已经讲完了,那么后面呢,我们会结合两个应用案例来弄一下啊,跑一下我们这个啊,一个是卡卡进卡夫卡出的一个简单ETL,另外一个就是我们呃使用这个诺来完成一个卡卡夫卡到这个do瑞S的一个输出,因为Doris呢,它是一个也是新兴的一个o lap啊把它演示一下。啊,那么需要强调的是,就是如果你要跑我们接下来后面这两个应用案例的话,呃,一定要使用我们编译好的包,嗯,因为当时呢,我们做这个课的时候呢,基于这个V2.1.0的版本,呃,实际上呢,还是嗯太早的一个版本。嗯,当时做这个东西的时候是卡发卡进,卡发卡出,还有Doris输出,它三个插件各有一个bug,嗯,那么如果说你用V2.1.0的包呢,你是。
01:01
100%不可能跑通我们后面这两个案例的啊,所以说一定要用我们修改编辑好的包,可以在我们的评论区里面去获取。啊,那么我们先来看第一个需求。嗯,就是对这个test csv啊,就是我们自己在卡卡里面弄一个主题,然后对这个主题里面的数据呢,消费它完了之后对这个主题里数据进行一个ETL啊,只要这个年龄18岁以上的记录,那么可以跟我们之前的那个。呃,刚开始的那个需求呢,基本上是一样的,只不过呢,这次我们换一下数据源,还有这个输出,我们都是从卡夫卡输入,然后再把这个数据呢,清洗完之后再输出到卡夫卡里去。啊,首先呢,就很简单了,我们去创建两个主题,然后接着就是去编辑应用配置,完了这个再去编辑一个这个什么,把这些内容给弄进去啊,提交任务啊就完了,那么这样的话就是我们照着操作一遍,其中呢,我们还会讲解一下,就是啊卡夫卡这个配置里面需要注意的一些点。
02:05
那么在开始之前呢,就是我们第一步是创建卡卡的主题啊,在这一步之前呢,请注意自己的这个keepper和卡夫卡一定要启动。啊,那么我们这边直接就开始。呃,做这个练习,呃,首先呢,是创建一个主题,这个地方呢,其实我们已经创建完了,只不过我再把这个命令呢,呃,象征性的敲一下。然后。啊,指定这个主题名称,我们叫test think。这个分区呢,我们只弄一个,然后副本呢,也只弄一个。因为只是演示。啊,那么这是一个创建主题啊,后面呢,我们后面还会再创建一个什么主题呢?就是test。
03:02
呃,就是这个里面是放我们就是ETL清洗完的数据的,呃,同样的这个partitions和这个。这个副本数呢,都是一。啊这样啊,然后其实这个我们已经就是我个人的话,就是提前已经创建好了,所以说这一步呢,我就先不做了。然后我们去这个。他那底下。还是习惯性的去这个这个目录里面。哦,这个地方已经。建好了,我删了之后重新弄一下。啊,我们还是重新去编辑一下这个零三的这个。配置文件。然后呢,把这个文档里的这个配置。复制粘贴一下,我们讲解一下它是怎么用的。
04:05
好,那么现在粘贴。啊,我们还是写零三这个文件。好了,粘进来了。呃,那么我们现在呢,对这里面的参数进行一个解读啊,首先呢,这个even呢,这个我们就不说了啊,它是对这个环境的一个配置,和之前的基本是一模一样的,并行度一样一样,还是稍微一。那么关键是呢,我们这次用了一个新的source和一个新的S。然后在这个S里面呢,啊,我们要说什么呢?啊,首先是我们可以看到,你可以通过这两个参数去指定一个你要消费的。呃,一个是集群,然后另外一个就是呃,你的这个消费者的ID。然后你可以通过呢,这个topics去指定你要消费的主题,实际上呢,这里是支持啊,同时消费多个主题的,主题之间呢,你可以用逗号分隔。
05:01
然后另外一个呢,是我们这个source插件,它的这个结果的一个表明啊,这里我们写的是test。然后form这个type呢,啊,实际上可以是这个啊。CV啊Jason,呃,但这个地方呢,实际上我们配了不同的这个format type之后呢,你还需要去给他配一个这个啊数据的格式。也就是说你实际上这个方法和type实际上是就在给他啊设定一个用什么样的方式来配这个啊模式的一个方,呃,一个一个一个语法啊,假如说呢,你是CSV的话,那么我们这个模式里面就必须得是一个Jason的数组啊,你像我们这个方括号,也就说明是这个是一个Jason数组,然后Jason数组里面呢,其实放的是一个个Jason对象,那么Jason对象呢,我们主要是对它的这个啊CSV里面的一个字段名,还有一个字段的类型啊做一个归应。后面呢,我们用这个format点它的这个参数,指定了它的一个分隔符啊,之所以这个地方写分号呢,就是想让大家啊知道就它能写分号啊,其实我们有这么一个参数,因为默认的其实是逗号。
06:13
然后关键是后面这两个啊,你可能没有见过,一个是allow comment,也就是说啊,我的这个啊,配置文件里面啊,我的这个CSV数据里面是否允许注释的存在啊,因为CSV其实有一些这个关于CSV的规范,我们知道CSV其实也可以写注释,就是CSV文件的话,你可以用井号啊来表示这一行被注释掉了,那么format in这个地方意思是说啊,如果说我解析的这个数据,嗯,它不合规范,然后我是忽略还是不忽略,我们这里呢,是说忽略,也就是说一旦这个你消费到这个非法的数据,不符合CSV格式的数据啊,整个程序不会崩溃,终止运行。啊,那么circle这里呢,我们用到这个直接就是select from,然后where where什么的啊,就这个啊在这个,呃,Think这个中呢,就说明说到说到哪,实际上这个think呢还可以,呃去指定这个是精准一次还是至少一次啊,我们这里没有写。
07:09
那么这个地方呢,是一个啊,我们要消费去,呃,这个输出到哪个这个啊主题里去,完了这个是我们要消这个生产到了哪个集群啊,当然了这个地方呢,我们还是用V尔来进行对A进行一个过滤,待会真正运行的时候呢,我们给这个变量呢,传一个18啊,这样的话,我们对这个数据里面只要18岁以上的值。好,那么现在呢,我们保存退出。然后我们先起一个。我们先把那个卡巴卡起来吧,我们先起一个这个。生产者,然后再起一个,这个消费者先起生产者。啊,Producer,然后。制定我们这个要生产的一个目的地。
08:02
完了之后呢,我们的话题叫test csv下划线。啊,回撤。那么这边呢,我就已经连上卡布卡了,准备生产数据啊,那么这边呢,我就开始。去消费它用一个控制台的一个consumer。然后呢,这个话题呢,还是主题,还是我们的这个不是了哈,是test think。那么这边呢,就消费上了,我们这边呢,把这个任务提交一下。然后还是通过config这个参数来指定我们的配置文件。是零,三,然后呢,传一个变量A等于18岁。
09:05
啊,我们可以看到呢,这边我们的这个任务已经成功提交了,然后在学会部U上看一眼。啊,他已经跑起来了。那么我们这边的话呢,就去生产数据,首先呢,我们说是呃,分号风格,我们先打个张三,然后呢,他是19岁。我们可以看到这边的数据已经过来了。啊,不行,我还是给他拿回来吧,然后这边的数据已经过来了,完了之后李四呢,我给他一个15岁。我们看数据没有过来,它被过滤掉了,那么我们现在要说一个就是刚才我们在这个配置文件里面呢,啊,它其实有两个选项,非常有意思哈。一个是我们说的这个,如果说这个报错。如果说这个模式有错误,格式有错误,他报不报错,是不是跳过忽略,那么另外一个呢,就是是否允许这个。
10:05
呃,备注我们看一下这是备注什么样子。我刚才说这个默认的情况下呢,就是CSV里面的备注是井号分格,如果说我这打一个井号,然后我说王五,然后给他21岁。你看我的数据已经发出去了。但是呢?呃,消费者这边数据没有过来啊,数据没有过来,但是我把这个井号去掉,你看一下。哎,我们的数据过来了,这就是说我们可以让CSV,就是说通过这个啊模式的就是相当于这个我们设定这个STEM的时候,实际上相当于呃,给他设定了一个模式匹配的问题,实际上我们在做这个ETL的时候,我们说除了写这种if else分支,实际上还有一种非常好用的,就是通过设定这种Jason。或者说是CSV这种数据模式,我通过这种模式匹配之后呢,基本上来说的话,就是可以保证只有符合我这个模式的数据才能留下来啊,通过这种模式匹配,实际上我们可以对这种Jason,我们说Jason这种呃,半结构化的数据呢,做一个非常明确的限定啊,比如通过这种模式匹配的方式呢,啊,我们甚至可以说啊,你一个Jason字符串底下,然后你有几个,你有几个对象,你有几个Jason对象啊,你这个键底下有几个Jason对象,完了之后,你这个键底象啊,必须得是一个什么样的类型,或者说日期必须得符合一个什么样的格式,这个都是可以做一个非常明确的限定的啊,这个不一定是非要写FL。
11:31
啊,而且STEM这个东西设定之后呢,基本上就是说这个整个流里面啊,只会有符合你这个模式,就能匹配到你这个模式的这个数据啊,是这样一个东西。啊,那么我们刚才说这个东西的话,就是既然说啊,只有能匹配到我们这个模式的数据,那么我们是不是。呃,有一些这个两个分号的数据,打一个两个分号数据会怎么样呢?我们可以试一下,你比如说这个地方我打两个分号。你发现什么也没有发生,因为它不符合我们的模式,就说这个数据呢,就直接被过滤掉了,我们为了防止它啊,你觉得他可能是啊,是不是挂了呀,我们再打一个21。
12:09
那数据还是可以过来,因为只有这个网五二十一啊,它是符合我们刚才设定的模式的啊,这像像这种乱码的字符串呢,我们就是可以让通过这种模式的限定呢,啊,可以让我们这个呃程序变得非常的健壮。比如说我们随便打点。啊,现在大家都没有问题,因为它匹配不上。啊,这就是我们说这个模式先进的一个好处啊,那么刚才通过这一个操作呢,我们也演示了这个从这个卡夫卡啊,进到卡夫卡出的一个塔诺的操作。那么我们的这个案例一就算是讲解完了。
我来说两句