00:00
啊,现在我们已经知道呃,动态表和持续查询整个的这个转换的过程了,然后接下来我们就具体来看一看,举一个事例给大家看一看,到底这个流和表的转换是怎么样去做的,哎,那首先啊,我们说为了处理这个带有CQ查询对吧?关系型查询的这种流,我们首先要把它转换成表,你才能去写CQ,才能去调table API嘛,那这个第一步转换是怎么转换的呢?其实从概念上来讲的话,流里边的每个数据记录第一步我们做的都是一步插入操作,音色操作对吧?大家看这张图,诶,这个数据是什么呢?这就相当于是非常经典的啊,就是某一个用户带着一个时间戳在几点几分几秒,诶,这里面一个URURL点击了哪个页面,对吧?收集到的日志,这是非常经典的一个点击信息,那接下来来的这些所有的这个日志信息数据一条一条的入进来。
01:00
开了,哎,输入进来之后,我们把它要转换成表,是一张什么表呢?诶大家看到三个字段对吧?User,然后当前的这个时间,哎,比方说click time啊c time,然后URL3个字段,然后就是每来一条数据插入一条数据,每来一条数据插入一条数据,就是直接往后面追加,这个非常简单对吧?然后接下来我们可能要经过一个查询转换,那接下来就是持续查询的过程了,持续查询的这个过程当中,比方说我们写了一句select,比方说我们统计什么呢?统计,呃,就是当前每一个URL被点击的次数,对吧?呃,就类似于我们要统计那个点击量访问量了,那这里边怎么统计选取的就是啊就是啊,这这里不是不是统计这个每一个URL被点击的次数啊,是统计大家看group by user对吧,统计每一个用户点击了多少次,有多少次点击行为啊,统计的是这个用户行为的。
02:00
次数,那所以接下来我们写了一条CQ,这个CQ非常的熟悉啊,Select user啊,User的这个name,然后count URL点击了多少次当前的这个URL as CT啊,大家知道这个count是本身CQL里边的内置的函数嘛,然后from这张表,这张表叫clicks啊,注册了之后叫做clicks啊,Group by user对吧,按照user分组,然后统计它到底点击了多少次,那我们这里边得到的这个结果应该是什么样的呢?诶大家又看到了,我们接下来就是持续查询,所以是每来一条数据之后,我们得到的这个结果表就应该做一次更新,做一次输出对吧?那这里边第一条数据,呃,第一个这个点击来了之后呢,得到了表只有一条一条数据嘛,一这这张表里边就一行数据,对吧?然后接下来来了一个Bob,来了一个点击数据,那接下来我们统计怎么样呢?哎,发现Bob这条数。
03:00
对,统计的时候在我们表里边没有对吧?所以当前就是单一的插入一条数据就好了,接下来这张表就是MARY1BOB1,所以大家看,其实在做这个具体查询,我们执行这个Co的过程当中,我需要去把这个就是之前Mary的那个数据再统计一遍吗?其实不需要,对吧,底层执行的过程当中,显然你是按照这个user分组的嘛,显然我现在改变的只有Bob这个分组,所以说我其实就是只把这个Bob相应的那一个分组的那个count再重新统计一遍,重新做一遍计算就完了,这个Mary显然是不去改的,所以从这个意义上讲,大家认为这是一个增量化的查询,增量化的改变也是对的,对吧?啊,它这里边是增量改变的,然后大家看到就是MARY1报一了,如果接下来Mary又来了一条数据怎么办呢?那那我们就知道了,接下来Mary这条数据在里边已经有了,那你不要直接在后面追加,对吧?嗯,难道是MARY1。
04:00
一再一条一嘛,这不是我们想要的结果,我们想要的结果是直接把的这个统计数量改成二,对吧?这张表里边改成二,那如果说我直接在后面追加一个MARY2吗?这个也不合适啊,你有一条MARY1,有一条MARY2,那到时候我到底用哪个呢?哎,所以说标准的关系型数据库,其实就是直接把Mary对应的那个字段改了就完了,对吧?所以说我们想要的表是什么呢?其实就是改成MARY2,直接update的这条记录,然后报一,这张表改成这个样子,后面如果再来了一个啊例子对吧,他点击了一条数据,那我们就变成了三条数据记录啊,这个表里边有三行了,MARRY2 bob1,例子一,这就是我们最后查询得到的这个动态表的结果,对吧?它是在动态改变的,而且改变的过程当中呢,有插入也有更新,所以接下来呃,我们最后一步考虑的就是怎么样把这一个得到的这张。
05:00
动态表再去转换成底层的流呢,哎,这里边就有三种模式了,哎这里边大家就就讲到这里边的三种模式就对应着我们前面的更新模式对吧?更新模式主要针对的是写入到外部系统的时候,我们说它支持哪种更新模式,而我们这里边指的是什么呢?是动态表转换成在底层啊,转换成data stream的时候到底有哪几种转换模式,所以大家看到其实是一样的吧,因为你写入到外部系统的时候,大家想是不是底层也得先把它转换成这个data,因为我们都是其实都是data stream API,底层都是data data stream API嘛,所以最后其实都是流的这个操作对吧?啊你你转换到这个转换成流之后,然后再写入外部系统,那这这个过程我们指定更新模式是外部更新,外部系统要去支持这个模式,我们才能够,呃,就是指定这个模式对吧?啊,那接下来。
06:00
还有我们是给大家说一说这个转换成流的三三种情形,其实就是我们之前说的对应的三种更新模式啊,有颈椎加流,Only流,那大家知道这里面就只有音色的操作对吧?只有插入,没有别的删除和和更改啊,那另外还有就是撤回流,撤回流我们说的就是包含两种信息的流对吧?添加ADD和撤回retra,如果说要更新的话,那就是用两条信息表示一次更新操作啊,那另外还有一个就是up流了,Up流就是只包含up消息和delete消息,如果是插入和更新操作,通通都只发一个up消息,那所以说这里边我们底层是可以有这样的三种流的转换模式的,哎,那家就看到,诶,那不对呀,我们前面讲到这个在这个,呃,直接影视转换去to,呃,这个转换成这个pad流,呃。
07:00
STEM对吧,To STEM就没有to UPS啊,没有这种转换成这种这种方式的这个呃,就是我们当前的这种流转换的模式啊,哎,那这里边需要注意啊,是我们直接去调用显示的,把它转换成data stream的时候,这个不行,为什么呢?因为我们转换成data stream之后,你接下接下来是不是要调data stream API啊,对吧,你接下来要定义它针对当前流的一些流的执行操作,或者说你直接要把它做print输出,对吧?做这个其他输出的一些行为,那对于流逝处理而言,后续的任务是没有,就是我们说的啊,没有办法直接更新的对不对,没有办法直接去做这个更新的,所以说你去直接调这个方法去to upsur streamam,这个是没有任何意义的,因为你后面不能做任何样,呃,任何的这个流失处理的转换操作了,对吧,转换操作并不认识这种模式啊,所以说。
08:00
呃,它是什么呢?它是我们底层可以转换成这样的流弹这个信息,那用在什么地方呢?哎,你后面如果要接着支持这种模式的输出,外部系统是不是就可以了,对吧?哎,这这我们说的是底层转换的过程啊,并不是说我们代码里边显示调用to什么样的data stream这个过程,所以大家要把这个要区分开啊,那最后我们再来给大家画图,重点说明一下retract stream和up up upsur streamam的区别啊,判的话就很简单了,就在后面直接呃,直接追加对吧,就是转换成一个流,一条一条数据输出不就完了吗?啊那这里面如果要是retra stream的话,大家看到啊,这里边我们得到的这张这张表,做了CQL查询之后的这张表,我们说它是一个有聚合操作的表,对吧,直接end输出是不行的啊,那怎么办呢?这里边我们就说了,来了第一条marry的时候,这是一个添加操作对吧?哎,大家。
09:00
那这里边用这个加号表示insert信息,减号表示delete信息,或者像我们直接to check string打印输出的时候,大家看到它用一个戳false,用一个布尔类型的字段来表示是添加还是删除,这都是一样的啊,那你看第一条信这个maryry数据来了之后,我们这里边得到的这个转换出来的流是什么呢?接收到的信息就是一个加号对吧?添加一个MARY1,统计出来不是MARY1吗?添加,然后如果要是得到Bob,第二条数据输入的时候呢,添加一个BOB1,对吧,大家看转换过来的流就是这样的一个一个效果,我们本来之前的这个表应该是本来是这样一个一个表,做这样的一个动态转换,对吧?你再把它转换成流,Re流,那就是这样的一个一个信息了,然后接下来第三条数据,Mary这个第二个点,第二次的点击来了之后呢,它就会带来两个信息,或者说re流里边的两条数据输出,对吧,首先是。
10:00
MARY1的一个减撤回,然后MARY2的一个插入加号啊,那同样后边呃,例子一的话,那这里边这个继续去追加,对吧?后面如果Bob又来了一条数据的话,那同样是BOB1要检撤回,BOB2增加,这就是转换成的这个retract流,那后续这时候我们说底层它可以转换成这样的流,对吧?转换成这样的流去做处理,那后续你如果要是接这个,不管是你呃转换成这个data stream也好,还是说呃去做这个外部系统的输出也好,对吧,连接到外部系统去做输也好,都要求接下来的操作,你得支持这个retra的操作,对吧?它能读的懂你当前转换之后的这个信息,这样的话连接才是正确的啊,就像我们直接to check stream,诶,因为我们data stream里边他能读得懂这个信息啊,对吧,你这不就是一个一个数据吗?我知道你这个就是表示这要撤回这个了,对吧,我就。
11:00
对照这个直接给你打印输出,你看的一目了然吧,这个是没有问题的啊,所以这是完全可以做到的一个过程,那up呢,对比来讲,就是每一个数据来了,引发的这个消息流里边的输出都只有一个了,对吧,大家看这个箭头只有一个了,就是都只是每次的这个更新和这个插入操作都是当前的一个的操作而已,所以大家看这就比较麻烦了,你看MARY1B1MARY2对吧,例子一例子二啊MARY3这样的一组数,我收过来之后,你转换成这样一个流,当然是没有问题,这就是我们说的对吧,你你你生成这样的一一组信息,这个流这是没问题的,但是这要看到底我后续的这个操作对吧,外部的系统能不能读的懂,你比方说我如果要直接你说我涂data stream的话,转换成data stream,你来这么一组date data stream的话,Data stream会怎么理解这个数据,他就认为这是文。
12:00
一样的数啊,对吧,这就相当于我在呃这个一个流里边先来了一个一来又来了一个爆一,又来了一个二,我就认为是后边一条数据一条数据来的呀,我并不会认为这个MARY2是要代替之前的MARY1的,对吧?大家想想是不是这样一个结果,哎,所以这个是跟我们要的那个结果是不一样的,我要的是要更新之前那条数据,而这里边我们直接你做不到,所以它就不支持这个操作啊,那如果说我们是ES对吧,或者说MYCQ这样的数据库的话,它本身支持这个操作,我能查得到吗?那这样的话你就可以直接更新那个数据了,这就是关于这个呃,我们这个动态表和呃持持续查询底层操作啊,它具体用一个例子给大家做一个说明。
我来说两句