00:00
前面我们已经做过了批处理的word count啊,大家会发现在这个处理的过程当中呢,其实非常简单啊,而且我们这里边基本上就是创建好了执行环境之后,后边就是按部就班读取数据,然后一步一步做操作,最后打印输出,直接就全输出了,那这个看起来非常非常的直观,非常的直白,呃,那我们会想到这是批处理,那流处理是不是跟他一样呢?接下来我们来做一个流处理的word count。同样还是在当前的包下边新建一个,这个我们加上一个名字叫做stream word count流处理。我还是呃在这儿啊,写一下注释,这个是当前是流处理word count,好,呃,我把这个main方法还是先定义出来,然后接下来我们就想到了,那之前那个批处理的时候,第一步不是创建那个批处理执行环境吗?那现在流处理怎么办呢?啊,其实大家想到了一样啊,创建流处理的执行环境对吧?哎,这这个就是猜也能猜到啊,那之前我们批处理的时候直接是用了execution environment.get execution environment,那现在刘处理怎么做呢?
01:20
这个跟那个稍微有一点不同,我们现在这个这个环境叫做stream execution environment,然后同样也是去get exq environment就可以了啊,那跟刚才的那个区别就在于大家看现在是这个包是在streaming里边对吧?Flink streaming API scalela下边同样也是skyla,大家注意这个导包不要导错好然后之前我们记得上面不是还引入了为了后面做那个影视转换,还做了一个下划线引入吗?啊大家看这个一开始我们当时引入的时候,那个本这一句本来是灰的对吧?啊,那后面我们做了这些转换操作,写完之后这里就变成量的了,那说明真正用到这个影视转换了,所以这里边我们也要引入这个影视转换,那要引入的什么呢?跟那个类似,也是同样包下边的flink streaming API GALA点下划线。
02:14
啊,那其实有同学会会发现,那你何必这么麻烦呢?因为大家之前学加拉就知道啊,导包的时候,这个下划线表明的就像Java里边那个芯儿一样嘛,相当于是把当前这个包下面所有东西导入,那你既然这个execution environment也是在这个包下边的内容,那是不是我其实这个可以省略掉啊,啊对吧?所以这里啊,我直接可以把这个stream execution environment直接换成一个下划线就可以了啊把我们后面要用到的影视转换也就都引引入了,好那接下来已经有了这个流数理的执行环境啊,那接下来我们想到是要读取数据啊,然后这个创建我们要要的这个之前我们要的是data set,那现在我们说批处理是data set API,流处理呢是data stream API,所以接下来我们当然是读取数据,创建data stream了,数据流嘛,但现在我们的这个数据在哪呢?
03:12
还是从这个这个哈,这个文本文件里面去读吗。啊,当然也可以,就是这个流处理的执行环境也是允许我们去读文件的。但我们想到如果要是真正的流处理的话,数据应该是源源不断的来,对吧?啊,那应该是有一个,就我正常来讲的话,应该是比方说开放一个接口,监听一个接口,呃,监监监听一个端口,然后我们这边应该有不停的数据发送过来,这样的话才是一个真正的流处理的环境嘛,啊所以接下来我们这个要做的是。接收一个啊,我们接收一个socket。Socket。文本流,哎,那么这个socket的文本流接收的时候,怎么样去做这个处理接收呢?我们先把这个定义出来,叫做data stream啊,Input data stream,那么同样它还是基于environment env,然后有一个方法,这里边大家可以看到,呃,就是下面它有有各种各样不同的调用的方法,对吧,之前我们是呃,有一个方法叫做read text file。
04:26
然后现在我们还有一个方法叫做啊,其实这里边看的很明显,叫做socket text stream对吧?啊,它就是用我们的这个socket文本流,从相当于我们是去监听一个端口,然后呢,呃,就是监听一个socket开放的这个socket端口啊,然后从这个端口去读取数据,转化成我们这里边将要处理的data。然后这个socket文本流,这个创建的时候你监听端口,那要传什么呢?大家看这里边的参数也非常简单,一个是一个string类型的hostname,另外还有一个是一个int类型的port端口号啊,那就是主机名和端口号嘛,所以这里边我就直接写死吧,比方说当前我就是local host,我就在本机起一个端口号,比方说我起一个叫7777,把它先定定义好放在这里,然后接下来那当然就是。
05:22
进行转换操作了,对吧?进行转换处理统计,我们定义一个result,同样这个也是date stream,基于前面的input date stream,然后接下来做什么操作呢?那还是Fla map吗?啊,一样对吧?跟之前那个完全一样,呃,然后我们下划线split,然后用空格把它分开啊这个大家知道,下划线是那个拉姆达表达式的一个一个简写形式,这这个大家应该是清楚的,对吧?呃,你也可以写这个拉姆达表达式,比方说我这里边的每一个,呃,每一个元素,拿到这个每一个元素去调用一个Switch的方法,然后把它这个做这样的一个处理,对吧?呃,然后我们再再做一个Fla,把它把它打散啊,这个是一样的这个操作啊。呃,这里边我们一般简写就是都用下划线来写了,然后后边哎,同样还是map成一个二元组啊,或者说如果有同学说诶。
06:22
能不能调用其他的方法呢?有没有其他方法呢?诶,当然有,比方说这里边大家看有filter对吧,我想去做一个过滤,同样也是可以的,但这里边这个过滤好像没什么意思,我们就要求它这个能empty吧,要求它这个不为空啊,然后后面我们map成二元组,同样还是当前的元素,Word是什么就是什么,哎,作为K放在第一个位置,第二个位置来一个数字一。那同样后边是不是就是按照第一个位置分组,呃,第一个元素分组,然后把第二个元素散起来就完了,我们就想到哎,那是不是还有group by呢?哎,没有group by。
07:01
哎,那这里面流处理里面没有group by,那现在该怎么样去做分组呢?它尽管没有group by,但是给我们提供了另外一个大家看到叫做KBY啊,也就是说这个分组指的更明确啊,就是这叫做什么分组呢?就是你去指定一个K。对吧,KBY嘛,所以这里边指定的就是当前你要分组的那个K是什么,同样这里面大家看到有一种方式是直接传一个int进来啊,当然还有别的那个传递参数的方式,这个后面我们用到再说,现在大家可以先直观的就先理解一下我当前那就按照这个int的话,传二元组里边元素的位置下标。当然就是KBY0了,对应着前面的group by0啊,那现在然后再做sum,有这个sum对吧,同样还是sum,一去把第二个位置的元素叠加起来就完事了。啊,所以大家看这个处理流程跟前面几乎是完全一样的,好啊,那接下来我们知道要打印输出的话,那我把这个result data stream再去调用一个print方法把它打印输出,对吧?哎,这这样就搞定了啊,那大家可能会想到这样就就结束了吗?这样就看起来的话,除了我们这个源头不太一样,没有从这个文件里边去读数据,这看起来完全一样啊,这跟这个批处理这里面几乎。
08:26
都不用说,几乎就是完全一样是吧?啊,当然这里面的这个数据类型是不同的,那比方说你像前面我们做这个socket text stream的时候,大家知道这个得到的就应该是个什么东西啊。大家把这个呃呃specify type一下啊,得到一下它的这个type,大家会看到这里边得到的不是data set了,因为我们调用了stream嘛,对吧,Stream execution environment流处理执行环境得到的是一个data stream,然后接下来的转换就全部都是data stream在做转换啊,所以我们说管这些接下来的这一些API,我们就叫做data stream API对吧?与之对应的这个前边这里边word count这个批处理这里接下来调的这些我们就都是。
09:12
Data set API对吧?批处理的API这里边我们用到的是data stream,所以我可以把这个写出来,这是data set data stream stream,那同样下边我们这里边应该是什么呢?哎,大家想到了,同样它应该是一个二元组类型的data stream,对吧?整个转换过程当中都是data stream在做转换。啊,所以这个其实整体过程是非常的简单的,但是这个其实没有完啊,这个关于为什么给大家运行一下啊,执行一下大家可能就会比较好理解了,好大家看现在我们这个代码已经运行起来,诶,然后接下来大家看直接就运行结束退出了。对吧,这直接就运行结束了,这是为什么呢?难道我们这里面不应该是他要监听端口等待我们输入数据吗?
10:03
怎么这里边什么都没什,什么都没有,直接就就退出了呢,整个都已经跑完了呢。啊,这个大家要注意啊,这里边上边我们现在是流处理对吧,流处理的过程当中,你如果按照每一步的这个操作啊,代码里边的每一步操作,把这个定义出来,这是真正的在执行操作吗。大家仔细一想就会发现,这并不是在对数据做操作,因为现在还没数据呢,你数据还得还得等,等他读进来才行呢。你现在连数据都没有,那后面这定义的是什么呢?其实相当于只定义了我们要做的操作,要完成的那些任务,对吧?要数据来了之后,要执行的那些任务,我是全部都定义出来,分配好了,但现在没有真正的去执行任务。所以后边你假如说直接print,然后就结束的话,相当于我只是定义好任务,然后就结束了,然后就不等了,这显然是不对的,对不对?哎,所以接下来呢,我们应该得相当于启动一个进程,然后让他要在这里边一直等待数据输入,然后去执行我们定义好的一些操作。
11:15
啊,所以大家看这个特点就体现出来了,之前我们不是说流处理是就是flink啊,流处理它是事件驱动吗。所以说不是说我们按照这个代码的逻辑啊,按照顺序执行完到这儿就结束了,而是在这儿你其实是应该要启动一个进程啊,就像是监听端口一样,对吧,启动放在这里,然后等着数据来,然后用之前定义好的每一步操作去做计算。啊,是这样的一个含义,所以接下来我们必须得有一步,这一步叫做启动,呃,任务执行,具体来讲的话就是要调用环境。Stream execution environment啊,流执行,流处理执行环境的execute这个方法啊,这个就表示我们当前要把它执行起来,要把它运行起来,而不是只把这个任务定义好,哎,跑完了就就结束了。
12:13
所以说这里边要加这么一句。然后大家看到这里边其实我还可以给参数啊,这里边给的参数是就是你你也可以可以没有,可以不给,也可以给一个参数,这里边是一个字符串,这个字符串就是当前执行的这个作业job的名字,它的一个name啊,比方说我们这里边管这个叫做stream count,流逝的world。好,然后接下来我们执行的时候,就是会提交这么一个drop上来啊,现在我们再来执行一下,看看效果怎么样。大家看这里边我们启动报错,为什么报错呢?诶你一看就知道这个connect refuse,因为你要去监听当前local host的7777端口,你现在没东西啊,对吧,我这里边又没有,呃,启动这样的一个服务,然后这个777端口去去启动你这里面监听,当然什么东西都没有嘛,所以我们为了让它正常运行,那我在当前的这个。
13:11
呃,本身的主机里边啊,去用NC命令,呃,Linux环境里面大家知道有一个非常好用的NC命令,就是net cat,对吧?啊,这个命令是可以给我们创建一个,呃,以TCP或者udp方式去监听一个指定的端口,就相当于启动了一个,呃,一个服务器,一个socket的一个一个server一样啊那这里边我们加一个参数叫杠LK。呃,L表示的是启动一个server,然后监听端口杠K的话表示是当前的这个socket要要保持住,对吧?Keep它可以为多个连接打开,因为你如果默认情况下,那就是我只有一个连接去去连上监听当前这个端口的话,那这里边这个如果我连接的这一个程序断掉的话,那这里边的这个服务器也就断开了,这里面我们希望它一直保持住啊,Keep keep住,然后这里边后面跟上端口号7777,这个启动其实非常简单,直接起起来,然后接下来我们再运行一下。
14:14
重新运行一下。啊,大家会看到接下来我们这里边运行起来之后,诶,好像就停在这里了。没有任何反应了,对吧?哎,直接就停在这儿了,这是为什么呢?哎,当然还是我们之前说的流处理,现在是要事件驱动嘛,我现在本身这个流处里的所有任务都分配好了,都放在这儿了,然后也执行提起来了,对吧?哎,那那但是怎么样呢?它具体执行计算是要有数据来我才计算呀,我是来一个算一个对吧?哎,所以接下来你当前得有数据输入,那数据从哪输入呢?你这不是有这个NC,有这个net cat这里边起了这个服务器吗?对吧?在这里边输入数据就完事了,我那边监听着呢,所以接下来我把这个放在这边,然后我们。
15:03
做一个这个分屏显示啊,大家看的会更清楚一些,在这里我输入开始输入hello word。大家看到,瞬间左边我们的这个控制台就输出了HELLO1WORD1。这是不是就表示当前到目前为止,我做统计的时候,Hello出现了一次,Word出现了一次,对吧?啊那那有同学说,诶那你这个不稀奇啊,这就相当于你输入什么,我把它map完了之后,你不是转换成了一吗?对吧?那那那你再输入再输一个那个哈,那不是还是HELLO1吗。诶,大家继续看啊,我再输入一个hellolink。大家看flink当然还是一这个没什么毛病对吧,但是大家注意哈,变成几了。接下来输出的时候,哈直接在之前一的基础上变成了二哈二。你如果这个时候看他的最新的统计的话哈,这个出现的次数是不是就应该变成二了,所以大家看这就是我们所说的流处理的特点,来一个处理,一个低延迟实时更新结果,对吧?啊,就不是说你所有的数据我要等到它都到齐了,我最终返回一个结果,大家还记得之前那个批处理的结果吗?那最后只有所有的数据都完成之后,我就输出一个哈,然后三对吧,只输出一个,那现在就不是了,现在是每来一个数都会把当前有计算的这个结果输出一次,就在当前的基础上做统计,做计算。
16:37
这就是这个流处理的特点。啊,那这里面如果比较细心的同学可能会发现一个一个细小的问题啊,这里面有一个细小的问题,大家会发现,诶,你你输出这个二元组这个word count输出就输出嘛,怎么前面还有个小数字啊。这里面有一个小数字,然后后边还有一个间括号,这个是表示什么含义呢?
17:04
诶大家可以多做一做测试,因为大家会发现发现这里边这个没准对吧,2342,诶那那这个它有没有可能出现别的数字呢?诶我多测一测,比方说我这个hello skyla,而大家看到还有一对吧,呃1234这些都有,然后后面我我们再再来一些别的啊,比方说我我来这个how are you how are you,诶大家看到这里边还是二跟三之类的这些这些呃数字出现啊fine thankyou and you。大家看这样输入的时候,它其实就是12344个数在来回出现,那这表示的是什么含义呢?啊,这其实就是这个数字啊,代表的是当前我们执行的时候。啊,大家可以呃,想到就是我们当前flink是一个本身是一个分布式架构的流处理,大数据处理框架嘛,所以这里边本身我们默认执行的时候是可以去做并行执行的,而现在的这一个小数字就代表当前执行,当前统计啊,这一个哈,这个这个字啊,或者word这个字,它对应多少这个count数量的时候,这个任务到底执行在哪个并行的子任务上。
18:30
所以说这里边大家可以认为这是一个并行子任务的那个序号,对吧?啊,那那个编号,或者说大家可以认为就是我们当前行的任务的那个号码。那这里边为什么只有一到四四个四个数字呢,大家下去下去之后可以自己测啊,我这里边如果要给大家再多输入几条数的话啊,你你比方说在这个,呃,Hello Spark对吧?啊,随便去去打一些招呼,你会发现它永远不会超过四,它的原因就在于。
19:03
哎,那其实大家就想到了,那是不是说明我当前其实并并行的这个线程就是四个在四个线程在同时处理啊,啊,这是因为我当前没有指定并行的程度,那现在多线程处理的这个环境是多线程到底是几呢?或者说我们的这个并行度是几呢?就是四,这个四是哪里来的,是根据我当前机器的默认核心数量来的。因为我当前是一个四核的机器,所以大家会看到就是这里边默认都是四对吧,这里边输出的都是四,大家如果要是核心数量不一样的话,下去之后可以做一些别的测试啊,看看是不是这样。那当然这里边,呃,大家会发现,如果说我停在这里一直不输入的话,我们这个代码是一直是运行的状态,对吧?来看你可以一直等在这里,他永远不会退出。那那除非怎么样,那除非我们手动把它停止,把它关掉,对吧?啊,这样才是真正的一个退出的过程,所以这也是流处理的一个特点,就是有头没尾,连续不断,源源不断的会产生,对吧,这是关于这个流处理。
20:12
整体的一个了解。
我来说两句