00:00
呃,上节课给大家把这个基本的热门商品统计的逻辑都已经实现完了,我们也测试看到了最后的输出结果,那在实际项目应用的过程当中,大家知道肯定不可能直接去读取现成的文件,对吧?诶,那这里边,而且我们直接写死的这个路径,这一般肯定不会这么去干的啊,那具体来讲。是不是就应该要有一个流逝的真正流逝的数据源输入啊,哎,所以接下来呢,我们就给大家讲怎么样连接卡夫卡,从卡夫卡里面读取数据去做一个测试,真实场景下应该都是读取卡夫卡数据源的啊啊,那所以这里面我们的data data stream就应该是还是stream对吧,这个还是string啊,得到的是一个。Input STEM现在掉的方法就应该是直接ADD source里边是不是要new一个对blink卡夫卡consumer对吧?啊,然后大家稍微注意一下,之前我们那个弗Li卡夫卡consumer连接器里边提供的那个生产者消费者都带着011对吧?诶,那现在为什么没有了呢?大家看一下那个po文件就知道,我们当前这个卡夫卡引入连接器的时候,后边是不是就没有版本啊,哎,所以大家注意这是一个当前卡夫卡的通用连接器,就是说它会自动,呃,就是追着我们当前卡夫卡最新的版本啊,去给我们做对应的那个升级,所以这里边我们项目里边可能一般用这个通用连接器会更多一点。
01:30
然后里边具体的配置项大家还记得,最简单的这个实现是不是可以给一个topic名称,然后后边给对应的那个反区域列化的那个STEM,然后给property就可以了啊,所以这里边我们当前的这个topic当前热门商品啊,我就叫hot items。呃,然后当前的那个序列化的工反序列化的工具啊,那就是simple,我都是stream嘛,直接给一个simple string STEM就可以了,对吧,然后最后再来一个properties。
02:04
Proper啊,那么我们前面要把这个proper再做一个具体的定义,对吧。前面这里面你有一个property。Proper把它创建出来,然后接下来就是proper需要去set各种各样的这个我们定义的这个配置项了,对吧?啊boot最主要的啊,Boottrap service。然后后边给一个对应的啊,Local host 9092,好,这里写错了,少了一个R对吧?BOO STEM service local host9092,呃,然后比较重要的,既然是消费者嘛,还有其他的一些参数,比方说配这个消费者组对吧?Group ID啊,这里随便给啊,比方说我就叫consumer对吧。
03:01
哎,那下边可能还有其他的一些选项,这个我就不详细写了,大家可以参考文档里边的这些实现哦,大家可以看到这里边我们可以再去单独的定义当前的这个izer对吧?Key和valued,其实是下边我们这里边已经定义过这个simple stringchema了,就不需要这里再定义了,对吧?啊,你要想定义也可以啊,另外还有就是有这个呃,自动呃,重新重置这个偏移量这种方式啊,Latest这个也是可以去做设置的,其实也没有特别重要的意义,如果不设也是可以的啊,我这里边就直接把这个都copy过来了。直接把这个全放到同一行里面去,好,那么对应的配置都已经写好了,那接下来我们这里还报错啊,看一下这里少了一个分号,好,这个代码就没有问题了,接下来我们就只要启动卡夫卡去做一个啊,做一个测试就可以了,对吧?现在卡夫卡是启着的啊,所以我直接可以把这个代码提起来。
04:03
呃,然后呢,接下来我们在卡夫卡这边应该是要生产数据,对不对啊,所以还是得到对应的那个目录下边去啊。卡夫卡里边啊,接下来我们要创建一个卡夫卡console producer对吧,那这边去生产数据,然后那边去消费,看一下当前的这个结果啊,那这里边是broke list local host 9092,然后杠杠topic,就是刚才我们定义的那个叫刚才叫什么来着,Hot it啊。后台it好创建这样的一个producer。然后我这里还是给大家做一个这个分屏显示吧,这边已经提起来了,呃,然后接下来呢,我找一些这个测试数据,给大家做一个试试验啊,这里面我有一个test data,这边有这个user behavior的测试数据。
05:04
好,那接下来我们把这个放在上边。做一个分屏显示,看一眼啊。呃,首先第一条数据输入进来啊,这个没有任何的反应对吧,这肯定是啊,那大家会想到这个一定是要等到我们现在应该是五分钟一个窗口,对不对?诶,肯定是要有这样的一个过程的啊,所以你看接下来我是060又一条数据,它是不是应该是一分钟之后啊,对吧?啊,前面这个这个数到底是什么时候,这个我不太确定啊,所以说我就直接一分钟一分钟往后走。零六零幺二零两分钟之后。1803分钟之后,2404分钟之后好像一直没有输出是吧,然后300,大家注意这是五分钟之后对吧?诶大家看现在还没有输出,这是为什么呢?对,大家记得我们当时尽管现在是升序,升序数据我们没有奥rock延迟对吧?但是后边是不是我们等它的时候相当于加了一毫秒啊,那是不是至少你要等到诶这个300这个秒300之后再过一毫秒我马到这个程度才可以啊,那我们这里因为没有毫秒的数据,那是不是至少要给一个,至少给一个301啊,应该对吧,我们看一下给一个301。
06:27
大家看现在是不是真的输出了,输出的这个结果是09:05的这个数据,然后统计出来是当前1715热门度是二,大家看一下,那所以大家看到前面是不是1715有两条数据啊,确实是对吧,当前这个五分钟之内啊,然后2244这个数也有两条,2244是不是也是两条啊,然后另外3611281只有一,为什么只有一呢?其实大家想到当前关的这个09:05:00,应该是哪个时间出啊。
07:03
前闭后开对吧?它是哪个时间戳啊,这个时间点是不是就是300这个时间啊,你看我刚刚过了这个时间吗?不是加一毫秒要出发吗?啊,那我现在是刚过这个时间,然后它就关了,当然这个是不是就是300毫秒的这个窗口就是09:05啊,哎,所以是不是不包含这个数据啊,当然也不包含后面301的数据,对吧?当然就只有一个啊,所以大家看这个还是没有任何问题的啊,就是这样的一个,呃,结果的一个输出。当然了,我们还可以做进一步的测试,就是如果后边我们还有这个330,又过了半分钟之后,诶,这个没有复制上。把这个当前这个数据啊。再输入一下330,这当然没有任何反应对吧?哎,这个540也不会有任何的反应,这是九分钟之后,那600会有反应吗。600。
08:00
600也没反应对吧?呃,那什么样的数据就会有反应了,这这个大家很很容易能想到,是不是601的话,这里就真的会有关系了,对吧?这就是下一个五分钟的窗口,就应该到时间了,大家看是不是又输出了一个呀,当前是九分,09:10的这个窗口是不是关掉了,当前统计最高的3611281是三,哎,那当然了,现在我们是不是300301的这两个数据就包含进去了,当然是三对吧?然后1715后面因为又来了一条,是不是也是三啊,然后下边2244这个还是二,然后1256它是一,而这个2244最后的这两条是不是6006601没包含进去啊啊,这就很容易能够想到这样的一个结果啊呃,当然了,最后一条数据大家看到,如果说我一下子做一个跳变,跳到很远之后的话,你看这个数据,这是。661600之前,我这个是658600对吧,那这个661600,这已经是多少秒之后了。
09:03
这已经是跟这个比的话,是不是刚好是3000秒之后啊,3000秒之后要跟最初的这个58000比的话,刚好就是3600秒,是不是一一个小时之后啊,所以大家会想到,假如我一下子时间跳到一个小时之后,会出现一个什么效果。是不是中间会有很多窗口关闭啊,而且前面呃,大家想中间的那些要关闭的窗口里面也有数据吗?呃,当然是有的,对不对?因为你想我们前面要关的这些窗口,所有这些数据是不是它同时要属于12个窗口啊,所以后边这里的窗口当然也是都生效的啊,所以大家看一下接下来是不是就是一个一个都会关啊,09:20 09:33十三十五,四十四十五对吧,一直到哪个时间段呢?是不是到09:55啊,然后大家想第一个数据本来是9.05,那为什么最后一个窗口它不是它不是10.05呢?首先为什么不是10.05。
10:04
大家想第一个窗口关的这个时间其实是是不是这里边的8300啊,啊,并不是我们这里的这个5800对吧,58000,所以说从5800开始算的话,那其实这个应该是从九点钟开始算的,对吧?大家想这个58000,按照这个倒推五分钟的话,应该是九点整对吧?诶,那大家就想到,那这里面既然是一个小时之后,这里面的61661600难道不是十点整吗?为什么?哎,对,大家想这里边这个延迟一毫秒是不是相当于。十点整的数据就不应该输出啊,对吧?哎,那必须要是不是等到我这里边再再做一个复制啊。大家知道什么呀,是不是601的时候现在。对应的是不是就会有一个十连整的数据输出了,对吧?啊,所以当然了,统计出来的数据大家看还都是一样的,因为我数据就那么些嘛啊,所以他们分别都属于12个窗口啊,每一个输出都可以得到这个对应的统计,这就是把它切换成了一个卡夫卡数据源。
我来说两句