00:00
呃,另外还给大家多介绍一个,就是KBY,这里边大家看到这里边我们给零给二的时候,这里边它的这个类型呢,就是T和Java temple对吧,那这个Java temple大家就会觉得这个就这个就有点儿麻烦啊,我们如果写泛型的时候还得去仔细的看,对吧,是Java里边的temple把它引入才可以啊,还不能写错了,那有没有更简单的方式呢?比方说我就知道,明明知道这就是一个string啊。他的那个KK之后的那个k stream泛型能直接定义成string吗?哎,如果我们这里边啊,大家看这个data stream点这个啊,我如果定义一个,呃,新的一个stream对吧,他如果要是一个kid stream。这是类型是s reading对吧,里边我如果直接给string的话可以吧。给string就报错了,大家看是不是这就有问题了,它不能直接把我们这里边的啊,啊,当然当然这里边是做了sum了,对吧,我先把这个sum删掉,还是报错对吧?这报错是什么呢?它不能把sensor reading tale这样的一个kid stream转换,直接转换成sensor reading stream,对吧?那所以这里边是tempumle就得是tempumble,你不能直接用stream。
01:25
那这个怎么办呢?大家想这个就很不方便啊,这里边我可以用别的方式来对大家做操作,比如说什么呢?大家能想到我是不是可以直接给定字段啊,比方说这里面给定字段,呃,那个叫什么?应该直接给那个ID是吧?对,直接给ID也是可以的,但是很悲剧的是大家一点就发现ID这里边返回的是不是也是t Java参Bo啊,诶,那有没有别的方式了呢?哎,这里边其实刚才已经有同学想到了啊,可以下划线,下划线点ID,这样传进来的时候,大家发现这个ID是不是已经明确它的数据类型就是string啊,对吧?这里边传进来的这个类型其实已经把它转换成明确就是了,所以这个时候你再给string就没问题了。
02:20
啊,大家可以看到这个时候的KY,它的返回是什么。就是TK对吧,就不是Java temp哎,所以这这个里边就是呃scalela啊,强类型语言,所以在这各种转换操作的时候,大家一定要对这个数据类型,呃小心,然后很容易这个这个还是很容易一开始写代码的时候去去遇到一些坑的对吧。Some,呃,当然了,我们之前写的是这个SUM2,那我这里边去写some一个temperature可不可不可以按照字段也是可以的对吧,但是这个返回的啊,当然这个sum的话,这个就没关系了,Sum最后我是不是一定返回的都是data stream啊啊这个就跟我们那个就是K就没关系了啊,所以大家可以看到就是在这个过程当中其实有不同的写法啊,比方说我这里k by ID,然后最后在啊temperature对吧?啊,那当然这里边我就不应该data stream print了,这个print是不是还是原始的呀?啊对吧,所以这里边我就直接。
03:24
写成这样就可以了。好,这是我们前面讲到的这个基本的聚合算子,然后接下来再给大家讲一个更加普遍的聚合算子,就是所谓的reduce reduce其实这个大家也很熟悉了,对吧?啊,这个就非常经典的一个聚合操作,它可以看成是一个更加一般的一个一个聚合方法了,对吧?啊,那它的这个操作是什么呢?其实就是大家知道,就是相当于要把当前的元素跟之前聚合起来的那个结果,是不是要再做一个叠加呀。
04:03
呃,然后产生一个新的值对不对,然后每来一个元素就更新一次,每来一个元素就更新一次啊,所以啊,这个其实最后返就是我们输出的那个结果里边,其实就是当前聚合出来的那个最终结果啊,这是呃,我们这样的一个状态啊,那要用这个的话,我把这个some助掉,大家会想到reduce应该基于什么去做操作呢?哎,讲到这里,这就给大家多说一句啊,大家看这里是data stream这个类对吧?在这里边可以替,那这里边我可以直接去some吗?很尴尬,不可以对吧?啊,直接去做some是不可以的,那可以直接去reduce吗?很尴尬,只有这个注释里边有reduce,也没有reduce的方法对吧?所以如果要想做这个聚合操作的话,怎么办呢?对,还是必须得先KBYKBY之后,大家看到在这个k stream里边,那就是要some有some,对吧?然后要reduce有reduce,对不对?哎,所以这个就是大家比较熟悉的这种操作了啊,所以就是在这个flink flink里边,它一个比较显著的特点就是它很多就是几乎是所有的聚合操作都是针对这个k stream来做的,呃,大家总是要把这个data stream先做一个key back啊,大家能想到这是为什么吗?
05:37
他为什么默认这个架构上定义的时候,This three不能直接按照一个字段直接去做聚合?大家就觉得诶这个我要是直接统计一个总个数,还还更麻烦了,对吧?呃,为为什么他不能直接就做做聚合呢。啊,其实这个想法就是他希望更大可能的要利用我们分布式架构的一个并行,并行处理的能力,对吧?我们K外之后,大家想我基于这个哈希去做分区,那是不是就可以有不同的这个子任务对不对,那就可以并发去执行了,你如果不做这个KPI,不做分区的话,那是不是相当于所有东西都得在一起去执行了啊,当然你如果的需求就是说所有数据来了之后,就在一起去做做统计,那就没办法了,对不对,那就只能是在一起去统计了,所以其实这个这个任务相当于就没有办法去并发执行啊,就没有把我们的这个并行度真正的利用起来啊,所以这个过程其实还是,呃,就是大家会想到它是有一些架构上的需求在这里的,那有些同学可能就想到,那我如果要就是在这个data stream上直接想some,那怎么办啊。
06:54
呃,其实也很简单,你基于他再做一个,再做一个KY吧,对吧,那那大家想,如果说我这个里边就是我来的东西,就是很简单的,就是一个数据,就是一个数,然后我就要统计这些数来了几个,那这个这个需求怎么去实现呢?
07:13
啊,那其实也是按我们之前word count的做法对不对,我就把每一个来的这个元素前边是不是在map的时候多加一个统一的一个相当于占位的一个字符啊,比方说我就是一个某一个字符,或者说我就统一就给一个零数字,然后我之后就按照这个去K,那是不是所有的数据都是一个都在一个分区里啊啊,所以最后然后我们再做一个sum,该该sum sum对吧,该这个count,你去去做这个计数,这就可以了啊,所以这个弗林可是这样去做操作的啊好,那接下来我们要做的是这个reduce操作,大家回忆一下这个呃,Reduce的写法吧,比方说这里边我们给一个需求啊,我们想做什么操作呢?呃。
08:02
呃,比方说reduce就比这个sum就就会就会灵活很多了,比方说我们reduce想要输出什么呢?输出,呃,就是。当前啊,就是这个,我我们肯定K之后是当前那个某一个传感器,对不对,我们就是当前传感器。最新的温度要加十,比方说或者加一也可以,对不对,然后呢,我还要他的那个time step也做一个变化,我是。而那个时间戳是上一次。数据。的时间加一,大家想想这个需求是不是可以用reduce来实现,尽管很奇怪,对吧?啊,就是我两个数据可能用的这个不一样,有可能就是说呃,聚合的时候,每一次聚合的时候都是上一次数据的那个,呃,时间戳加一之后给到下一次的那个聚合结果,然后下一次聚合结果里面的那个温度呢,是最新的这个新来的数据的那个温度要加十对不对?哎,这这样一个规则有点奇怪啊,那我们实现一下吧,咱会想到6S里边我们传的那个function程是不是得是一个一个二元的参数啊,XY对不对啊,然后这个XY大家知道前面这个应该就是我们。
09:40
之前聚合的那个结果对不对,Y就是新来的这个元素对吧,所以我们最后比方说我还是要把它弄成一个sensor reading吧,包装成一个sensor reading啊,大家这个一写就已经没有,这个前面就不报错了,那我们包装成什么呢?呃,ID是不是还是一样的ID啊,他们听外之后X跟Y的ID一样,然后这个时间戳是不是要用X的什么time是不是要加一呀?
10:13
然后最后的温度是不是Y的温温度,最新的这个温度对是不是要加十啊,哎,这是我们的一个需求对吧,然后最后再把它打印输出就可以了,好,我们来看看这个效果是怎么样的吧。这里大家看一下我们最后这个聚合的一个结果是什么样子啊,大家看一开始第一条数据进来的时候,它是不是啊,只有他一条,那reduce没什么用,对不对啊,就就按照这个直接直接输出就好了啊,就就他自己大家看这个应该什么都都没变,六七十这个都不用看了,因为它只有一条数据嘛,对吧,我们就看后面这条数据,第二个数据来的时候,我们给的是什么来着。
11:01
35.1对吧,然后是206,那么大家看它的输出是什么呢。输出是前一个前一次聚合结果的那个时间戳加一对不对?哎,所以大家看输出的不是206,而是200对,然后得到的这个温度值是什么呢?是后一个结果,就是当前最新的那个数据加十对不对?哎,所以这里边是不是35.1加十,是不是45.1啊。然后再来一个数据的时候,这个31,这个数据来的时候,它是不是又是前边聚合的那个结果,时间戳再加一是不是变成了2012,后面的温度值,当前的温度值加十是不是变成41啊,所以大家看这个完全符合我们的预期啊,啊这就是这个reduce更加复杂或者说更加一般化的这样的一个聚合方式,在这个过程当中,当然这个就我们可以就自己玩出花了,对吧,你想做什么样的聚合都可以了啊,这就是我们基本的一些API的一些调用啊,给大家做这个聚合操作。
我来说两句