00:00
创建好了执行环境之后,那接下来我们就可以在代码里边去定义数据转换的流程了,那第一步当然是先要从外界去把数据读进来。在link当中呢,呃,对于数据的输入来源,我们就叫做数据源data source,那么读取数据源的算子我们就把它叫做原算子,就是source operator,在我们做flink部署提交的时候也会发现啊,在创建出来的数据流图作业图里边,第一步操作其实就是我们当前读取数据源的这个S这一步操作,哎,所以有时候我们就把它叫做原算子,SS算子,或者叫SS任务,这一步就是我们整个处理程序的输入。在flink代码当中,我们添加一个SS算子,这样一个原算子的方式其实非常简单啊,就是直接调用当前EV当前执行环境的一个方法就可以啊,那像之前呢,我们在流处理work这个程序里边调用的是socket text stream,这样创建出来的就是一个socket文本啊。当然了,得到的。
01:10
最后得到的就是一个string类型的data stream,那如果说诶我们这里是直接去读取一个文本文件的话,那调用的方法就是当前EV执行环境的read text file方法啊,那么我们看到它返回的也是一个string类型的data,所以这就是我们第一步的输入读进数据来之后,接下来就可以基于这个data stream去做一步一步的转换了。那这里我们看到的啊,不管是socket文本流也好,还是直接读取文本文件也好,都是直接调用一个非常特殊的方法,诶,那我们就会发现,那假如说我们的数据来源是其他地方呢?没有特别定义的一些存储空间,或者是其他的一些组件,如果是这样的话,我们到底应该怎么读取数据呢?呃,其实对于弗link而言啊,更加通用一般化的接口是。
02:03
基于当前stream execution environment。下边有一个方法叫做。At the source,就是直接添加一个圆,哎,所以如果说我们是一般化的调用的话,应该是调用这个方法,然后得到一个data。那当然这个方法里边是要传参的,因为我们是非常通用的方式嘛,那你到底怎么读取,显然就要有一个专门的定义,作为参数传进来,这里的参数就是一个s function啊,当然了,这个source function我们直接点进去会看到这是一个Java接口,所以这个接口只要我们实现对应的方法里边的一些抽象方法,那么就可以实现从外部某个数据源去读取数据的这个过程。那我们自然就会想到,如果总是自己写代码的时候,还要实现这样一个接口,那显然是很麻烦的嘛,如果说我们只是想简单的做一些测试,有没有更加简单的方法呢?当然是有的,所以接下来呢,我们就来讲解一下读取有借数据的简单的测试方法啊。所以接下来呢,我们在当前的代码里边去新建一个包。
03:13
我们现在是第五章,所以把这个叫做艾特硅谷点CHAPTER05。先把它创建出来,然后接下来我们就可以创建一个对应的object来做一个测试了,首先我们要测试的是有界的数据源,所以我们干脆就把它叫做。S。Bonded。Test。我们先把它创建出来啊,那当然了,Main方法里边,首先我们先要创建一个执行环境stream execution。Environment。好把它引入,哎,那当然了,为了方便后边我们做这个转换啊,这里还是把影视转换,相应的影视转换要引入,所以我们改成下划线。这里调用的就是get execution environment方法。
04:03
直接把它叫做en nv。然后接下来。我们就可以尝试自己去指定一些有界的测试员。比如最简单的方式,我们可以直接从。元素中读取数据什么叫从元素中读取数据呢?简单来讲就是直接指定当前我们有哪些数据啊,那对应的调用的方法叫做in.from element。啊,这里面我们就会发现啊,它直接就从指定的元素里边去获取对应的数据,创建出相应的data stream,然后就可以做转换了啊比方说我们这里非常简单,我就给一堆数字吧,呃,12345,诶,那我们知道这样得到的对应的这样的一个stream。那其实就是整数类型的一个data stream,所以这里我们要完整的补充它对应的数据类型的话,那其实就应该是data stream。
05:06
In,哎,所以我们会发现啊,这样的一种方式就非常的简单,只要我们想要定义一些有界的测试数据,都可以使用from elements,直接把它放在后边就可以了啊,当然了,这里边我们所要指定的数据类型应该是一致的,如果说我这里边既有整数也有字符串的话,哎,那我们知道,那最后这里的data的类型那就变成了any了啊,就变成了它们的公共负类,所以一般我们还是要把里边的元素指定成相同类型的。啊,这是最简单的一种方式,那我们会发现啊,这个如果只是1234的话,这个看起来没有什么实际意义啊,为了看的更加的明显,我们还是举一个实际的应用场景吧,啊这里边比如说我们当前的场景的设定呢,就是一个网站的访问操作。也就是说我们当前可能开发了这样一个网站应用,接下来呢,我们需要收集用户的所有的点击操作。
06:05
我们现在所有收集到的这个日志数据,那就应该是只有三个重要的字段,一个就是哪一个用户做了一次点击,那就是有一个user字段,然后呢,呃,用户点击了哪个页面,应该有当前页面的URL,另外还有一个访问的时间戳,我们把它叫做time,所以整个抽象出来就是这样的一个三元组。我们可以统一,哎,把它叫做event,哎,这就是一个单独的数据类型了啊,包装出来的一个类对于scla而言呢,我们其实有一个非常简单的写法,那就是可以直接使用样例类case class做一个定义啊,那这里边对应的类型呢,User和URL都是string,都是字符串类型,Time stamp呢,当然是一个长整型了啊,所以接下来我们可以直接把这个样例类定义出来,在slink当中,它是直接支持skyla的样例类这样一个一种数据类型的,那如果要是在Java代码里边的话,那么它也同样支持类似于po的这样一个数据类型的定义,但是Java里边对于po类的定义要求就会更加的复杂一些,在skyva当中呢,我们直接指定成样例类,那就完全没有问题啊,所以这个其实非常简单的啊,我们在代码里边可以多加一句定义。
07:21
His class。Event。啊,那后面我们把对应的字段都定义出来,User。Stra。然后URL。另外还有一个time stamp。长、整型、老,这就是我们对于当前的数据类型的基本定义啊,那么有了它之后呢?那接下来我们从元素中读取数据就可以用另外一种方式了。From elements哎,这里就直接可以去创建当前event的对象,一个一个传进去就可以了啊,比如说这里我们可以创建一个event里边,比方说某一个用户有一个Mary。
08:01
它点击了一个页面,呃,比方说就叫做home吧,主页面最后还有一个时间戳长整形啊,那我们给一个1000L,那表示这是我们当前的第一秒钟做了一个点击啊,当然这个时间戳应该是从1970年的1月1号零点,而且是标准时间啊,格林位制时间,1970年1月1号零点开始,然后到目前为止的一个秒数或者毫秒数啊,就看我们取几位,如果我们把这个当前的数字的单位认为是毫秒的话,那这个1000L相当于就是第一秒钟啊,我们这里只是做测试啊,没有实际的意义啊,那当然了,除了这个第一个event之外,另外我们还可以继续给对应的event。比方说我们再来一个,另外一个用户来了一个Bob,然后他也做了一个点击,比方说我们如果是电商的应用场景的话,可能有这个收藏夹啊,那来一个cut啊。
09:01
同样后面我们来一个它对应的时间戳是2000L,那也就是说在第二秒的时候做了一个点击,这样我们得到的一个data stream,那应该是什么样子的呢?比方说我们把这个叫做data stream1。那么它的数据类型就应该是data stream里边的泛型,那当然就是event了啊,所以整体来讲的话,我们就是里边的元素类型是什么,那么所有的元素类型都是event得到的data stream这样的数据流,它的类型当然也就是。所以这个其实整体来讲还是非常简单的,这就是第一种简单的方式啊,从元素里边去读取数据,那当然了,另外还有一种对应的。是从集合中读取数据,这种方式在实际应用的时候可能会更多一点啊,就是我们可以把所有的数据先集中的攒好了,放在一个集合里边,然后全部指定好了之后呢,接下来我们直接调用EV的方法把它整体读进去,这样的话就是我们对于测试数据的指定就会在单独的一部分这个代码里面。
10:11
做测试的时候想要更改测试数据,那其实就不用改后边的处理流程的代码了啊,只要改前面的这一个单独的数据就可以了啊,那对于这样的一个应用的方法呢,也非常简单,就是from collection啊,其实我们看到直接打一个from啊,后面就自动补全了,提示出了各种各样不同的方法调用,这里面要传入的当然就是一个,哎,我们看到就是一个seek或者是一个啊,就是。Scla里边的序列类型,或者是一个迭代类型啊,那我们这里边传入什么东西呢?最简单的方式当然就是定义一个啊,一个list就可以了嘛,比方说我们这个就叫做click。用户点击的一组事件啊,那我们把它包装成一个scla的list里边当然就是event了,这个其实非常简单,我们就把前面的event直接都copy过来就可以了。
11:07
这是我们当前包装好的一个列表,然后有了这个列表呢,就可以作为from collection的参数传入,直接把click传进去就可以了,啊,那得到的data stream它的类型呢,我们把它叫做STREAM2,它的类型诶,我们知道当然也就是。啊,所以跟上面这种从元素里边读取数据的方式得到的是完全一样。如果我们想看一下这个得到的结果是什么样子的啊,最后也可以做一个打印输出。简单的做一个打印输出。那我们可以直接把stream。前面得到的stream和STREAM1 stream2全部打印。直接调用print方法就可以了。如果要运行起来的话,最后不要忘记env execute执行起来,当然了,为了区分这三条不同的流,我们会发现当前这个代码里面啊,其实是有三条不同的流处理啊,那所以在这个过程当中,如果我们打印全混在一起的话,可能就看不清楚了,所以我们可以print里边加一个参数啊,这个参数会在我们输出的时候前面有一个标注,比方说我们这个stream就直接把这个叫做number。
12:24
然后下边这个叫做一,这个叫做二,我们可以直接运行一下,看一看得到结果是什么样。现在程序已经跑起来了,我们可以看一看,现在已经得到了结果输出,我们可以看到number这边啊,那当然就是12345直接打印输出了,这里边诶之所以不是按照顺序,那是因为我们之前说过,当前是一个并行的流处理程序,我们在集其开发环境里边,本地环境啊,模拟的时候默认的并行度如果不给的话,默认就是我当前的机器的CPU核心数,哎,我当前机器是16核,那当然一共就可能会有16个不同的slot去执行我当前的并行任务啊,那所以这里边为了看得更清楚一点,我们也可以因为直接做一个全局的并行设置,比方说啊,我直接设成。
13:16
Set paraism1,那就表示我当前只有一个并行度,只有一个SP去执行,相当于就是串行了嘛,啊,这样的话,我们输入是什么样的顺序,得到最后的输出也就应该是什么样的顺序,这个会看的更加的明显一点,所以我们就后边做测试啊可以。直接把全局的并行度设成一,我们可以看到现在就是12345按照顺序输出了,后边我们看到一这条流里边,这里边我们给的这一个字符串啊,它就会放在。当前我们这一个输出结果前面加一个当前的标注啊,那所以当前一这是我们的第一条流,使用从元素读取数据得到的Larry和Bob的点击事件啊,那后面的二,这就是我们使用从集合里边读取数据得到的点击事件。
14:07
所以这种方式整体来讲还是比较简单的啊,在实际应用的时候呢,呃,我们知道真实的代码,我们不可能把这个数据直接写死在当前的程序里边啊,所以一般情况这种方式就是用来做这个测试的。那实际生产环境里边的代码如果写好了之后呢,一般我们的数据从哪里去读呢?如果是有借数据的话,诶,那一个非常简单的数据来源,当然就是可以直接从当前的文本文件里面去做读取了啊,那当然了,我们现在是应该调用data STEM API啊,所以还是基于stream execution environment去调一个read file方法,从一个文本文件里边也可以读出相应的数据啊,那当然了,如果用这种方式的话,我们应该要在input下边再去创建一个文本文件,比方说我这个就叫做clicks.TXT。然后在里边我们可以把对应的这些数据都写进去啊,比方说这里面有Mary,我们还是以逗号分割,然后后边后。
15:07
1000啊,然后比方说有一个Bob。后面是CART2000。呃,比方说还可以多一点数据啊爱丽丝。呃,他可以比方说去访问了某一个商品的详情页,我们就叫prod,那当然了,后边应该哪个商品应该还应该有一个ID啊,比方说等于一。3000第三秒钟啊,对应的我们还可以去复制更多的数据。多给几条,比方说这里边给一个四,给一个五,我们这里边可以让他访问不同的数据。把URL可以做一个更改。ID等于二,ID等于三啊,这样的话我们就有了更多的测试数据,然后在当前的代码当中,我们可以直接使用从文件中读取数据的这种方式。
16:00
从文件中读取数据,调用的当然还是env read text five,这里边我们直接指定。当前的路径是input下边的click。点TXT。这里要注意当前得到的STREAM3,它的类型就不再是event类型的data stream了,我们这里直接去让idea给我们做一个自动类型的推断就可以得到啊,他得到的呢是string类型的。这就是我们所说的啊,读取文本文件之后,它是一行一行读取,每一行数据,在这里就是一个字符串,所以后面如果我们真正要做处理转换的话,肯定还是要把它做一个解析,做一个拆分的啊,啊这就是我们当前从文件读取数据的方式,我们也可以在这里边做一个打印。直接做一个print,然后加一个三。当然了,得到的这个数据跟前面我们看到的就会不太一样了,前面我们直接打印都是event,现在呢,当然就是字符串原封不动的打印输出了,这就是我们对于有界数据源的读取,我们介绍了从元素、从集合和从文件三种不同的读取方式,都非常的简单。
我来说两句