00:00
啊,现在我们已经分别用批处理和流处理实现了word count的程序,那已经跑完了之后,我们现在还只是在这个开发环境里边运行啊,那我们自然就会想到在实际生产环境当中,应该是要起一个flink集群啊,然后呃,就是把这个flink部署起来啊,然后我们是不是要在这个自己自己的集群生产集群上去提交当前的这个作业啊,哎,这个才是我们真正的运行的这个状态啊,所以接下来我们想要做这件事儿,在这个之前呢?啊,这里边我要做一些调整,对这个代码做一些调整啊,比方说我现在想要重点运行的这个程序入口就是这个流逝的work count,那大家看一下我当前的这个代码有没有什么问题啊?首先有一个问题,就是我现在真正如果要在生产环境里边读取流式数据的话,我能从文件里边读吗?对,大家想到这个文件里边的数据的话,这相当于是一个有界流对吧?数据相当于已经都已经收好了,所以我在这里边做这个呃,测试的时候啊,在这个开发环境里边,大家会看到它是相当于按照这个文件里面的每一行一行一行读,然后读完了是不是就直接退出了呀。
01:16
但是真实的生产环境里边应该是什么样子,应该这个数据是不是连续不断的来,永远不会停止啊,对吧,我这里面应该是一直跑着,一直在等数据来,这才这才对,哎,那大家想一想,如果要是这样的话,那我们是不是不能从文件里边读取数据,我应该把它换成一个什么样子呢?大家想一下,从哪里可以有这样的一个真正意义上的流式数据呢?哎,其实我们接触过的大数据组件里边最好的其实就是卡夫卡,对,其实就是消息队列对吧?诶,那卡夫卡这边消息队列里边的数据,我们这里边如果有一个消费者的话,你订阅啊,我们这里面的某一个topic,那其实消费数据的时候就是那边有一个,我这里面就消费一个,对吧?诶,所以这个过程其实就是完全是流逝的啊,那当然了,卡夫卡这又涉及到跟外部系统的一个连接,我们现在不做那么复杂,那可以用一个非常简单易用的小工具来实现一个这个流式输出的这样一个功能,这个小工具叫做。
02:22
NC啊,大家可能也听说过啊,呃,Net cat对吧?Net cat的缩写啊,这个在Linux环境里边都是一个自带的一个小工具啊,所以如果我要用这个工具的话,比如我现在啊,起一个本地这里的啊,这是我Windows自带的一个Linux子系统啊,我在我在这里边直接敲NC,然后杠LK,这个表示杠L是指的。Listen就是监听某一个端口,然后后面这个K的话表示的就是keep要要保持住当前的这个连接啊,啊就是不要说我那边如果要是连连连接,如果那边的程序如果退出的话,我这儿这个server也断开啊,不要有这样的情况啊,所以简单来讲这个NC这个命令啊,就相当于我在这儿启动了一个。
03:11
啊,就是可以发送socket文本流的一个服务器,然后我的端口是7777,那在别的地方是不是就可以去啊,就是相当于监听我当前的这个服务器的777端口,就可以收到对应的那个文本数据啊,啊所以这个其实比较简单啊,就这么一行命令,直接在这里就可以发送这个socket文本流了,所以我们接下来可以从socket文本流读取数据。呃,那这种读取方式的话啊,啊,那我就直接把这个写出来,Data stream。Stream先把它定义好,Input data stream同样还是要基于env,当然是env啊,那这里边直接大家看可以调一个socket text stream这样的一个方法,这就是相当于我要读取一个文本流啊,Socket文本流啊,那这里面需要有参数了,参数当然传的非常简单,就是当前的host name和port啊,那我当前当前我既然是本机起的嘛,所以我当前是local host,另外端口号7777,大家看就是这样对吧?哎,我把这个先定义好。
04:28
好,有了这个之后,现在我们运行一下啊,把这个提起来做一个测试,看看这个效果怎么样。我把这个做一个分屏显示。大家看这边起起来之后是不是就等在这儿了,没有任何的输出对吧?哎,这很正常啊,因为现在没有输据啊,对吧,我们只是起了一个这个,呃,相当于起了一个数据源,要读取777端口数据的这样一个任务而已,那至于说这这个任务里面有没有数据,那是我们这里边要去传输的啊,那所以接下来我们可以随便说一些啊,Hello。
05:05
Word大家看到我这一输输入一行,这里是不是就输出了两行啊,因为我们是flat map嘛,可以一对多,所以这里边HELLO1WORD1。对吧,啊,然后接下来后边呢,就是继续啊,比方说哈flink大家看到是不是哈,就变成了2FLINK变一啊好,你也可以输,输的这个更多一点,我可以how are you对吧,大家看这是不是就直接输出了三条啊,How are you,各是一各是这样的一个统计。这就是我们真正意义上的流式输入,这里边做这个流失的处理,是不是就是有一条数据处理一个,用一条数据处理一个,这里的这个处理操作非常的实时,对吧?就是根据我们这里边的输入情况,来一个马上实时的输出一个计算结果,这就是真正意义上的这个流失输出。
06:02
呃,那当然了,就是对于我们这个代码,如果最终是想要做这个部署提交的话,可能还会有另外一个问题。这里边的这个问题在于大家看我这里边是不是把这个主机名和端口号直接写死了,这个的话,我们在实际生产环境里边当然是非常忌讳的啊,就包括前面你假如说真的是从这个文件里面读取的话,直接写本地的这个绝对路径写死这个也是非常。不应该这么去做的,对吧,我们应该把这些是不是应该放在配置文件或者说启动代码的时候作为参数传入啊,啊所以接下来我要把这一个,呃,当前的这个配置项啊字段做一个提取,那提取这个的话用什么方式来提呢?在这里边我们就是用啊,这这里边是flink给我们其实有一个自带的一个工具啊,叫parameter two,所以我们用这个para。
07:03
Turn two。工具提呃,从程序启动参数中提取。配置项哦,大家看一下这个东西是怎么样去定义啊,那就是大家看flink API Java u下边提供了一个类叫做parameter two这么个东西啊,啊,那我现在调的就是它的from a。大家看from as的意思,其实就是说从启动参数里边提取东西,对吧?那我们的启动参数在哪呢?在代码里边是是用什么呢?String,哎,这个当然是一个string对吧?哎,注意啊,这里边其实是一个string类型的数组,因为就是是不就是我main方法本身是有参数的呀,大家在执行调用这个就是把当前这个代码啊作为这个程序,呃,主程序入口的时候,那是不是调用这个main方法的时候是可以传参的呀,那这个参数是不是就是我们启动的时候带的那个参数,哎,所以那个参数就在这个AX里面,那我现在可以直接from这里的这个AX对吧?啊,我先把这个先拿到啊。
08:23
就就把它叫做parameter two,然后接下来呢,我就可以提取啊当前的这个host name啊,我就叫做host吧,呃,就应该从这个parameter two里边去,大家看它这个parameter two是不是就像一个map一样啊,一个K一个value对吧?诶所以你看这里边我可以直接get,然后传一个K进来,就拿到了对应的那个字段,比方说我当前这个K就叫host吧,然后同样另外大家看后面这一个端口号是不是应该是一个整形啊,所以这里边我直接给一个int port,同样还是parameter to,诶大家看它直接有一个方法就叫get int,诶这就不用我们做那个数据类型转换了啊,非常简单啊,直接把这个拿到,比方说我这个叫port。
09:10
后边这里也就不要写死了,直接把这个对,直接就是host和port啊,所以这就是我们真正啊能够提交到生产集群环境里边这样的一段代码。啊,那这里面如果我们要去在这个开发环境里面做测试的时候啊,你现在直接运行是不是有问题啊,因为当前这个as没东西嘛,你这里边直接解析解解析不到东西,那我们要运行应该怎么运行,是不是要去edit configuration对吧?然后在这里边大家看这个program argument这个参数里边,呃,这个Java程序啊,提交的时候是不是用那个杠杠来来表示参数的,对吧?所以我这里边杠杠host,然后传值local host,这是我当前的这个主机名对吧?杠杠HOST7777。
10:07
这是我刚才的端口号,把它应用一下,那这里的host和这个port啊,杠杠host和杠杠port是不是要对应着我这里边get的这个K啊,对吧,你这里边你可以改啊,你把这个改了的话,我们传参的时候那边也要改一下,好这就是我们这段代码,大家运行一下,看看这个效果怎么样。正常情况下应该是跟刚才的结果应该一致。运营起来了,至少没有报错,那现在我hello word大家看没有问题对吧?HELLO1WORD1,如果再输hello flink这个HELLO2FLINK1没有任何问题,这就是我们执行的这个过程啊,就可以把这一个呃,所有的这个参数提取出来,方便我们后边做生产,生产集群环境里边的一个部署提交。
我来说两句