00:00
那除了皮处理之外,我们当然更关心的是理处理怎么写啊,所以接下来我们再写一个流处理count啊,同样还是新建一个object。这个我们就叫做。叫做strict account。把它创建出来,然后同样我们还是在main函数里边去写对应的内容。嗯。刘处理。Were part。第一步还是一样的,大家还记得第一步是干什么吗?还是创建执行环境对不对?呃,这里面我们要创建的就是。大家觉得这个执行环境是一样的吗?哎,这个会有所不同啊,大家要创建的是一个流树里面的执行环境。
01:04
所以这里边引入的也不一样了,它引入的是什么呢?大家看是stream execution environment。引入这。这里还是一样啊,叫get execution environment就可以了。那当然了,为了防止后面再报那个影视转换的错误,我们把这个是不是可以直接下划线就可以了。然后接下来哦,接下来是该读入数据了,那大家想我们现在还是直接从这个文件里边去把这个数据一下全读入吗?呃,当然其实流处里也可以从这个文件里面去读取,那它的读取就是从文件里边一个一个去读,对不对?呃,但是我们的想法,按道理来讲,数据流感觉不应该是这样的,数据流应该是什么样的?对,应该是从一个啊,就是开放的一个流式的输入里边来一个数据就处理处理一个对不对,我们可以手动的一个一个输入,然后他这边实时的就有结果输出了,这样的情景才对,所以这里边我们创建我们的这个数据从哪里输入呢?我们想要的是一个。
02:15
想要的是一个流式的数据对吧,所以这里边我们用一个so文本流来做我们的输入数据。接收一个。Pocket文本。好,所以这里面我们定义一个stream了啊,不是data set了,对不对?呃,这里边的零从哪里边去读取呢?还是env,大家看到它本身有一个socket test string,这是不是就是一个socket文本流啊。大家会看到这里面需要传参,传参是要有一个host name,还得要有一个port,对不对,这两个是必须的啊,就是。
03:09
后面的这些那个分隔符什么的可以不要,前面的可能必须要,呃,那我这里边其实就是其实就是一个主机名和端口号吧,呃,那我这里边当然就是local host的,可以先起一个另外端口号,我随便起一个7777吧,等一下我可能就要在本地是不是要起一个7777端口是一个呃,Soet服务对不对,可以发送这个so文本流。接下来如果已经有了数据之后,接下来就要对它做处理和转换了。每条数据。处理。啊,这个我们最后得到的就应该是一个word count。In the对吧?
04:03
它应该是基于data stream,第一步应该做什么呢?是不是还是大家想一想接下来的操作跟我们前面做批处理是不是一样啊,对,还是,那这里面还是先切开对不对,Split一下空格区分,把它split出来啊,当然这里面中间你如果想做一些别的操作,其实也是可以的,比方说我们可以是不是可以filter啊。按照条件去做过滤,那是完全是可以的啊,那这里面比方说我要求它必须得是飞空,这个很简单啊,是不是可以直接传一个下划线点呢?Empty啊,这就可以直接飞空对吧,把它筛出来。然后接下来是不是还是需要把它map呀,这个就就几乎是完全一样的啊,转换成一个二元组去做Co计数,所以这里边我们要的是本身的这个word,还有一个技术一对吧,这两个,然后接下来是不是可以读BY,然后再some啦,按照这个flink的编程习惯啊,但是大家看一下。
05:14
这里边没有group,那得怎么样呢?大家注意一下啊,在link的这个data stream API里边要做类似的这个分组操作,要用什么呢?用不是reduce by k啊,要做P8。对,这这个是之后大家会经常用到很常见的一个一个算子啊,一个一个这个API,所以大家要习惯啊,就是要做分组的时候K板直接那K看什么呢?大家看是不是里边还是可以直接给一个int,也可以给string,也可以给一个select,也可以给个函数,对不对啊,所以这里面还是一样,我是不是直接就K0,然后就可以S1把它算出来了,看这就是一个流处理的。
06:05
一个程序就已经实现了,也是非常简单。那我们这个做完了吗?如果要想看到它的结果,这还需要有一部对world count data stream要把它print出来,看到它的结果,然后另外大家注意啊。要注意需要干什么呢?哦,这里边不能让它停,对不对,这里大家要注意一下啊,其实前面我们定义的这个过程当中,它并没有真正执行。他其实只是定义好了我们程序的处理逻辑,处理流程对不对,呃,其实只是这样一个过程,那真正的就是把这个任务提起来要怎么样呢?对,后面要有一个真正启动的过程,里边我们要启动。啊,相当于是这样的一个过程,Env点大家看有一个execu这样一个更新。
07:06
当然这里面还可以传一个drop name啊,那比方说我们这个就叫stream word drop对吧,直接可以把这个启动起来。接下来我们就是传入一条数据,它就可以接收一条数据去做处理,就可以带入输出。接下来我们来测试一下,看看它的效果吧,呃,现在如果要直接起的话,可能还有一点问题,因为我们还有一个前置的工作没有完成,对我们是不是还得起这个三文本流还没有吗?所以这里我要。在本地要去起一个这样一个东西啊,那大家可能听用过这个NC命令对不对,NC命令呢,它是一个比较好用的这个网络网络工具啊,它其实是NENEK的一个简写,对,所以这里边我们可以直接NC-LK,杠L就是相当于listen啊,启动一个server,对吧,就是TCP或者udp的方式去监听一个指定的端口,后面要跟这个端口,比方说我们的端口是7777,那前面还加了一个box k,意思就是说要保持我们当前的这个连接状态,可以被多个连接打开,它的好处是什么呢?如果你不加K的话,这里边我们的这个提起来之后。
08:30
呃,那那这个是正常处理的,如果我们这里面把这个程序关掉,这里面的NC命令也就退出了。而如果我们这里边这个加了杠K的话,就会一直启启动着,这是一个持续保持的状态,不会因为这里面的任务结束而把我们的NC也关掉。这样一个状态啊,这边提起来就可以了,然后我们把代码也启动运行一下。好,代码已经启动起来了,诶,但大家会看到这里边好像它没有这个数据输出是吧?我们是要等他这个数据输出吗?啊,显然不是,这跟批处理不一样,因为批处理是从文件里边一批都读出来了,他可能在处理对吧,现在他在处理吗?对,因为连数据还没还没给嘛,所以我们当然是要先把数据是不是一个一个输进去啊,这里边我们可以。
09:30
Hello what?大家会看到这里边一输入hello word是不是这里边实时的就会输出WORD1HELLO1。然后这里边hello flink这里边是不是哈,就变成二了,就是我们正常的一个world count计数的一个过程,所以说其实这个过程还是非常简单的啊,而且这个跟批处理是不是就完全不一样。T处理的话,如果我们是这两条数据的话,最后它会这个WORD1哈一输,然后后边在HELLO2这个哈输两次吗。
10:08
大家想一想,如果是批处理输入这两条数据的话。是不是只输出一次,HELLO2啊,对,他只输出一个最终结果,而我们流处理的话,就是来一个数据是不是实时变化,来一个就处理一个,这就是流处理的一个特点。但是有同学可能发现了,这里面有一个很奇怪的东西,就是前面为什么有一个324这样的一个东西呢?这是什么玩意儿呢?这里边可以先跟大家提一句啊,这个代表的是在我们,呃,不是顺序啊,这个是代表的是我们在执行这个flink任务的时候。任务的并行度,并行的这个编号数,或者说具体来讲的话是任务执行在哪个slot上面啊这个slot这就又提出了一个新的概念,大家可以理解成它就是我们可以多线程去跑这个并行任务,那到底执行在哪个线程上呢?诶这里面要有一个编号对不对?所以大家看到当前我们其实就已经把它的这个。
11:17
多线程的这个状态已经在并行的执行了,他们是跑在不同的线程上。那有的同学说这里边为什么是324这样的一些东西呢,如果我要是再多输出几个东西的话,它这个会会变得更多嘛,这个线程会变得更多嘛,那我们多多看看啊,比方说hello skyla。大家看一也出来了,对吧,一和二,那如果要是这个how are you呢。大家看323,诶这个3Q我们再看一眼啊。大家看还是这个323,呃,这个我们输一个比较复杂一些的东西啊,比方说这个。
12:05
I'm fine是吧?或者,Are you OK,漂亮啊。诶,大家看这好像我们输入了这么多数据,它好像一直就是1234这么几个数对不对。那它会出现比四更大的数吗?那这个下去之后,大家可以自己去测试一下,这里面我如果要是持续去测试的话。远不会,为什么呢?某人啊,很多,这里大家注意啊,这里边我们没有并没有程序里边并没有指定它的并行度,对不对,这里边其实可以做一个什么事情啊,所以在后边去set Terrorism。可以设置并行度,如果这里边我要设一个并行度是二的话,那其实大家如果要是再重新跑一下,可以看一眼啊。也就是下面得到的结果就只有一和二。
13:03
那有同学可能会想到了,那为什么我这里边是四呢?哎,这里边已经运行起来,我们现在再看一下啊word。诶一和二对吧。Hellolink,还是一和二啊,大家还记得后面是哪里有那个三和四来着?大家看就变成二和一对吧,这个好像就确实是一直都是一跟二对不对,所以它就是这里面设置的并行度,其实就代表了我们执行的线程的数量,其实它就是确定是执行在哪个slot上面啊。大家可以理解成就是分区之后可能就有这样的一个效果,对不对啊,到底是分到哪一个线程上去执行,那前面我们为什么没有设置的时候,它是按四,好像是按步行路四来执行的,对不对,这里面有一个有一个默认的状态啊,如果在现在我们这个开发环境里边去运行的话,默认的并行度是当前电脑运行的,呃,就运行电脑的核心数量,CPU的空的数量,那大家就会知道了,当前这个电脑是一个四核的电脑,所以它这里面默认的并行度就是四啊,是这样的一个状态啊,所以这就是。
14:30
这一部分内容大家可以先用这个word count做一个入门,先看一看flink的代码到底是怎么去写的。
我来说两句