00:00
我们从外部系统已经把这个数据读进来,然后在内部已经注册了表,创建了表之后,那接下来就可以调用table API或者是直接写CQ去做表的查询了啊这里边首先给大家讲一讲这个table API的这个查询,那它主要其实就是这是我们集成在语言里边,Java和skyla都有,对吧,集成在语言里边内嵌的查询API。那这里边,呃,所以就是说它它的主体啊,调用这一套API的主体就是所谓的table类,那么这个类呢,就代表了我们前面所定义的那个表,这里大家要注意一下,就是说这个table对象啊,我们处理过程当中必须是得拿到这个table对象,然后才能调,哎,点select.filter对吧,用这个链式调用的结构去写,那如果说我们之前并没有这样的一个table。只有前边,诶,我们做connect之后,大家看到这个创建是直接在这里边点create temporary table,这是只在环境里边注册了这个table,并没有获取到一个就是这个table对象对吧?啊,那这个怎么办呢?呃,就调用这个方法from。
01:13
我们定义的那张表,注册的那张表,然后把它拿出来就可以了,然后就可以用这个table了啊,所以呃,这一点是大家需要需要搞清楚啊,先把这个先呃,就是确定它在具体使用的过程当中得怎么用啊,然后接下来啊,我们这个select filter这个就不用不用多说了啊,我们之前那个示例代码里边不就是这么写的吗?这个select表示提取的字段对吧,然后filter表示我们筛选的那个where条件啊,那所以这一部分其实对大家来讲是没什么难度的啊,然后这里边我们可以。就是把把这一部分还是在这个代码里边给大家做一个转换,做一个这个前面我们这个第二部不是已经把数据读出来了吗?接下来第三步啊。
02:02
呃,这个table我就删掉了啊。第三步,接下来就是表的查询转换。好啊,那首先我们这里边比方说啊,这个定义一个sensor table对吧,就像我们这个代码里边这么写的一样,这当前的这个S数据啊,它从哪里去读呢?我们已经定义了这个env啊,所以这里边从这个表环境里边调它的from方法,之前不是已经在这个表环境里边注册过吗?啊,现在我们就用这个名称把它拿出来就可以。这是当前的这个table先获取到对吧?啊,然后接下来比方说我这里边可以哦,其实这里面我们测试的时候已经转换过了啊,我把这个就先就先注掉吧。呃,我们把这一行就注掉啊,然后下边那个测试输出的时候,我们可能就再用别的这个转换之后的这个table再去输出就完事了啊,或者我直接把这个可以删掉了啊,既然有这个转换啥查询,接下来就可以删掉了。这里边呃,首先我们可以想到就是刚才的这个做法,其实就是一个简单的提取字段嘛,做一个筛选嘛,对吧?呃,这个result table这个是不用给大家做太多的这个详细解释啊呃,Sensor基于sensor table去做一个select,这里边给大家说的是我们之前用的是里边直接传这个,呃,就是对对应的这个一个字符串对吧来表示,呃,这里边逗号分割,然后来表示我们要提取的字段,而后边我们在table API里边更常见的写法是什么呢?给大家写一下啊,是这种写法。
03:49
就是直接给一个单引号,然后后边加上当前的这个字段名来表示当前表里边的一个字段。啊,这种大家要稍微的呃,熟悉一下,适应一下啊,啊就是我可以传多个参数,每一个参数都是这样定义的一个字段啊,那这到底是什么呢?这个大家一定要注意,要引入那个上面那个影视转换啊,如果不引入这影视转换下面这个这个是报错的啊,这本身是在skyla里边给我们提供的这个一个一个类叫做symbol,就是一个一个符号啊,符号表示它主要就是前面一个单引号,然后呢,后边加上一个字段名就可以,只是我们一张表里边对应的那个字段。
04:34
啊,那table API就支持了这个symbol这个类型啊,所以大家要把那个影视转换引入就可以这么用了啊,那后边这个filter啊,如果说这里边我又想写filter,这里面怎么写呢?啊,这个大家看就可以ID对吧?啊然后要等于341啊那有同学说我这里边直接等于可以吗?直接等于后边给一个341,这当然不行,因为大家知道我现在是Java代码啊,你这个等于,那相当于是什么,相当于是负值啊,这肯定不行对吧?那所以有同学可能想到了,我可以双等对吧。
05:08
但是双等,注意双等号,这说的是Java里边的呃,就是呃等于的那个那个表达式对吧?逻辑判断表达式,所以这里边也并不是我们要做的这个字段相等的这个表达式,那对于这个symbol而言,字段相等指的是什么呢?三等号啊,这个给大家稍微说一下啊,你这个点进去的话,就会看到这是在这个就是我们影视的这个expression啊,这里边它不是要传一个这个表达式嘛,所谓的对吧,这里边重载定义好的一个方法,就三等号对吧,某一个字段它如果要是等于什么值的话,用的是这种方式。啊,这个大家就是大概了解它这个用法啊给大家。每一个都用一遍之后,大家看到那个别人写的代码,呃,不要不知所以就可以了,对吧?这个其实就是很简单的一种用法,这样的话后面我们就不需要所有的都包在那个一个字符串里边,对吧?你还得涉及到这个单引号双引号,那个还挺麻烦,这里就是我们都按照Java的这个表达式去写就可以了。
06:14
好,那那当然后面我就可以直接把这个result。Table做一个,呃,大英输出对吧?呃,那那这个我们就不去运行了,大家知道这个肯定不会有什么问题,好然后后边呃,大家会想到还可以有这个CQ查询的一个一个写入嘛,啊那那之前这个我们也都写过,这个就不用再详细给大家做了啊,大家可以参考在这里边我们写的那个CQ,写CQ的时候大家需要注意就是你只有table是不够的啊,你还得有一个注册在环境里边的那张表,对吧?啊,尽管我们知道这他这个直接这么一注册,这不就是一回事吗?但是你必须有这么一步转换操作。那有了注册之后的表,就可以直接在里边写一个CQL语句,包在一个字符串里边,执行这个table env.cql query就够了。
07:06
这就是这个这部分内容啊呃,然后这里边给大家说一个就是。别的一个一个实现,大家可能发现,呃,就是我们这里边啊sensor table,这是做了一个转换,我们这里边这个3.1啊,这是一个简单,呃,这个查询转换对吧。哎,那这里边我们的转换操作,呃,可不仅仅是这样简单的一个select where什么对吧?哎,那更多的情况下我们是要做统计,要做聚合的呀,哎,所以如果说在这个table API里边要做聚合的话,我们这里边怎么去体现呢?诶接下来我们给大家说说这个啊,那这里边写一个这个聚合操作啊。聚合,聚合转换。呃,所以这里面我可以定义一个AJ,对吧,A j result table。
08:05
同样它是一个table类型,然后。接下来我们还是基于前面那个sensor table啊,然后做什么呢?既然要做聚合,那大家想是不是,我得大家想想那个CQ应该是怎么写的啊,那得select对吧,比方说我要统计,统计个什么呢?统计这个,呃,每一个sensor,就是它的那个ID出现的次数对吧,就是它它的那个温度值统计到底,呃,出现了多少次,我们的那个数据来了多少个,很简单的一个count,那家想这个在CQL里面非常简单啊,因为有count函数嘛,就是select select一个ID,然后再count ID不就完了吗?啊,但是大家注意,后边我们得有一个。Group by对吧,得有一个分组这样的一一个语义的定义,我们得group by ID这样才可以,所以在这里边如果直接调table API的话,先做group by,那就是先做分组。
09:02
这里边传一个ID对吧,传一个这样的一个字段,那这里边它为什么报错呢?大家看,哎,这就是因为group by之后得到的是一个group的table,哎,这个类型又又转换了对吧?那么对于这个group table,这这又是一个这个Java interface啊,它里边可以调什么方法呢?可以调一个select的方法,直接得到一个table对吧?啊,当然这个select方法里边的这个表达式肯定要包含着一个聚合的一个,呃,一个一个字段对吧?啊,类似于这样的一个聚合方法啊,一个聚合函数,所以在这里边我们就来直接写去做一个select select啊。Select啊,里边我们可以给表达式,也可以给那个字符串,这里边我们直接给表达式啊,ID,然后把ID count ID,那怎么写呢?这里边我们是Java的这个,呃,调用方式嘛,所以大家看它可以直接点点后面就有各种各种方法,诶这个直接我们这个symbol啊,它有count方法对吧,直接点count,然后可以给它做重重命名,比方说我们as count,对吧?啊,这样输出的话,这就是一个ID一个count,一个ID一个count输出。
10:14
啊,这就是这个过程。啊,那那这里我们可以在这里也做一个测试输出,对吧,比方说AJ。呃,Result table,哎,这里大家想到,那我这里边怎么样输出呢?那还是to转换成流嘛,哎,我们先试一下啊,假如说这里边我要去to a stream的话,可不可以呢?诶,先先做一个尝试啊,大家可以。其实我我我可以直接告诉大家,这里边你做尝试的话,最终肯定是会报错的,我们先把这个写出来,这里边print的话分一个,呃,区分一下啊,这是a result,这里边是我们一般的那个result对吧?好,我们先运行一下试试。到啊,不出意外果然报错了,我们关键要看看他的这个报错信息到底说什么。
11:06
呃,大家看一下它的报错信息是什么呢。诶,这里边我们得到的这个是。哦,这个类型不匹配是吧?这里边这个类型不匹配,那我们得看一眼,这个最后得,哦,这里边我们这个to a stream的时候啊,这里边这个转换的时候,其实已经不是之前我们那个原封不动输出了,现在变成了两个字段对吧?哎,所以前面有一个ID是string,后边呢,只剩下了一个double,哎,那这样的话,这应该是类型匹配的,对吧,我们重新再运行一下。大家看现在更改之后还是在报错,那我们继续看它到底报什么错呢?哎,大家注意这个错啊,Table is not ANA pen only table。让你用什么呢?Useto retract stream in order to handle and ADD,呃,Handle ADD andtract messages,这是什么含义呢?
12:03
他意思就是说你现在这个转换不对,对吧,不能直接做这么转换,为什么呢?因为当前的这个表,它不是仅仅追加的这种表,啥意思呢?我们这里边转换是做了一个追加流的转换,对吧,它是说现在这张表。不是一个仅仅追加的表,所以你就不能转换成一个仅仅追加的流,对不对啊,它是这样的一个意思,哎,那为什么这里边我们这张表不是一个仅仅追加的表呢?什么叫紧紧追加的表呢?呃,之前我们其实能想到啊,直接输入那个数据的时候,大家知道我流式处理吗?那当然就是来一个数据处理一个,来一个数据处理一个,那我后边如果是一张表的话,那不就是在这张表里边来一个数据写一行,来一个数据写一行,对吧?就是这样一行一行往往里写就完了嘛。所以对于这样的数据而言,我们追加到这张表里边,这就叫颈追加,对吧,就是来一个添加上去,来一个添加上去,这就是append only。
13:08
那现在我们前面做的这个转换,大家看这个你是筛选这个3EN1,然后取这个ID temperature啊,那大家想我一开始得到的那个数据啊,就是一行一行的数据对吧?啊,然后现在做了这个转换啊,做了这个select where ID等于341的时候,那得到的这个新的表啊,Result table应该是什么样的呢?那其实还是就是如果有满足条件的一个341这样的一个数据,诶我就追加一条对吧,后面如果是三六不追加,不写347不追加,如果又来一个341在后边再追加,那其实是这样的,所以我现在得到的这张表是不是还是一个仅仅在后边追加的一张表啊。对吧,所有数据都在后面追加,哎,这就叫end on the table,那我们这里边在做转换的时候,当然你可以可以把它转换成一个流啊,对吧,因为它一条一条追加,这跟那个流的数据那个一条一条来有什么区别吗?没区别。
14:07
而现在大家看我做了一个聚合,这个聚合操作是ID和它的count数量,所以我们这里边大家想啊。我还是看之前那个Sen table啊,一条数据来了341对吧,我这里边写一下啊3EN1。然后3637后边又来了一个三色一,那按这种方式的话,它后边我们做这个聚合的时候会怎么样呢。哎,那当然是这张表聚合的结果表啊啊,这个一个ID一个count嘛,那当然是SENOR1来了一个一对吧,这是第一条数据,然后是什么呢?346来了一个一,这是追加对吧,在后面追加37又来了3471。然后341又来了之后,注意我是仅仅仅是在后面追加3412吗。
15:02
不是,我们这张表其实是一个更改的操作,对吧,其实是341这条数据要改成二。哎,所以你如果要是说这个表我们只有插入操作的话,它就是它就是颈椎加,你要是还要改之前的数据的话。那就没办法,颈椎加了对吧?啊,所以这里边就是说,呃,那那怎么办呢?这里边已经把这个解决方案给我们提出来了,你要用to retract string啊,这就是retract,就是撤回对吧,回收就是撤回流,用这个转换成撤回流的方式来做这个,呃,你经过更改的这张表,那所以这里边我们要改一下这个方式啊,Totract string。里边类型还不还是一样的,我们来看一看现在的这个输出结果是不是可以正常运行。好,大家看现在已经运行成功了啊,已经跑出结果了,我们看到这里边是什么样的结果呢?啊,首先这两条这两条流啊,这个呃,输出的结果这个没准对吧,所以我们就单独看就可以了啊result这里边一啊第一条数据来了之后,我们输出了一个一个341对吧,然后这里边AJ这里边输出的。
16:17
大家看前面它还有一个处。诶,这个处又是什么意思呢。啊,这个处其实我我们往后看啊,往后看啊,就看这个AJ啊,第一条数据3ENER11给了一个处,对吧,因为我们第一条341来了,然后第二条呃,S61这COUNT1对吧,也来了一个处,然后S71又来了一个处,31又来了一个处,注意接下来我们后来不是又来了两个341的数据吗。接下来来了第二个三一的数据的时候,它输出的是什么呢?它输出了两条,大家看啊,输出了两条信息,第一条信息是3411FALSE,然后后边是342处。
17:02
哎,所以这表示什么呢。这就为什么叫撤回流,哎,为什么叫这个回,或者叫回收流,对吧,它的这个意思就是用前面第一个这个字段,这是一个布尔类型的字段,表示当前这个数据是否是新增还是要撤回,要回收。哎,所以说就是我这里边如果来了一个这个FALSE3411,就是说之前我给过你的那个3411,现在要更新了,对吧,你不要以那个为准了,我们现在要把它回收,现在不是3411了,改成什么呢?改成3412。所以同样道理,后边如果要再来一条341数据的时候,这里就是3412又变成了false,然后变成3413,对吧,这就是这个retract这种模式,这个输出的一个结果。啊,这就是这个,呃,做这个table API的调用啊,这样的一个查询,然后这里面简单的再给大家说一下,除了这个。
18:02
就是呃,简单的这个用这个table API做这个转换,当然这里边我们也可以直接写CQ对吧?啊大家知道这里我如果要直接写CQ的话,那就我定义一个a j result CQ table啊,那同样它也是一个table,这个怎么写呢?Table env CQ query对吧?哎,里边我直接写的这个就是select ID对吧,Count ID,这个大家就很熟了啊。呃,当然这里边我可以再as一下,注意as你就不能还是叫count了,因为count是CQ里边的这个关键字嘛,定义好的方法嘛,所以这里边我比方说叫CT啊,From,这里注意啊,我们之前的那张表,哎,我们那张表是叫input table,对吧,你不能用这个sensor table,我们之前注册的那个表名叫input table,所以用input,然后group by ID。这样写的话,你后面直接去打印,也要把它to retras这个变成这个回撤流对吧,回收流,然后把它打印出来就可以了啊,这个过程都是一样的啊。
19:10
好,这部分就给大家先讲到这里。
我来说两句