00:01
已经定义好了基本的数据类型,那接下来我们就可以调用datapi创建数据源,对数据进行转换处理了。我们现在。定义的数据类型就应该是当前的event,这样一个类型里边包含了用户每一次对于页面进行访问的一次点击事件,里边主要有三个字段,所以我们当前创建的数据源里边的每一条数据都应该是一个event类型的对象。接下来我们可以在当前的包下边新建一个Java类,去进行一下当前的测试。这部分测试我们主要做的是针对这一部分数据源读取的API的调用,所以我们把它命名为s test。那首先。把当前的方法先写出来。抛出异常,那里边的话,第一步当然应该是先创建执行环境。
01:08
所以这里我们还是流的执行环境最简单的方法,调用stream environment的get应的这个方法。得到。当前的这个环境的对象,我们可以把它叫做烟。啊,那当然了,后面方便我们做测试的话,在。控制台打印输出的时候,按照顺序打印输出,我们可以直接把当前的并行度全局的。设为一。接下来,我们就可以去不同的来源读取数据了。其实前面我们在介绍workout的时候,已经有过对应的读取数据的方法,我们在进行读取批量数据的时候,直接就可以从一个文件当中去进行读取,这里我们调用的方法是en的方法,所以这其实是最为常见也是最为简单的一种调用方式,从文件直接读取,当然这种方式读取出来的是批量数据,相当于是在。
02:11
做一个批处理,本质上我们就是直接读取收集到的日志文件,把所有的数据日志数据提炼出来,进行统一转换计算。所以首先第一种我们用的就是从文件中读取数据。用的方就是读,想能够转换成类型的数据的话,那显然现在就不应该还是从这这样一个文件里面去读取了。我们应该在创建一个。新的文件能够表示当前用户对于页面的访问操作这样一个点击行为,那我们可以直接把它叫做,我们可以把它叫做clicks。
03:04
用户的点击。点TXT,当然了,这里我们可以只要把它定义对应的文本文件就可以。可以是TXT后缀也可以是CV啊,那像CV的话就是了,当前文件是以分的这样的一个文本文件。我们这里可以做一个简单的数据的定义,第一条可以指定,因为第一个字段是用户名称user,所以我们指定一个名字叫做Mary,然后他访问了一个页面的URL,最简单的home页面。主页面啊,那接下来呢,还应该有一个时间戳,这个时间戳我们定义它应该是一个长整型的毫秒数,所以这里边我们可以直接给一个1000,这就表示是第一秒钟来了一个Mary的访问事件,他点击的页面是后页面。那同样我们还可以定义其他,比方说又来了一个爱丽丝。
04:03
他访问,比如说我们这是类似于一个电商的场景的话,那他可以访问一个类似于购物车,点击了一下自己的购物车。这是2000毫秒的时候做了一个这样的操作,那当然了,我们还可以定义Bob,第三个用户,他是访问了某一个商品方商品的页面,我们可以用一个表示当前,这是一个商品页面的访问后边。问号分割加上当前商品的ID,比如是100,这是三秒发生的事情。后面的话,我们还可以对于类似的这个操作做一些。进一步的。复制啊,比如说我们可以让Bob这个用户多做一些点击操作,然后后边还可以给他做一些。访问不同的页面后,这是接下来第四秒和第五秒。
05:02
发生的事情啊,那再往后的话,也可以有一些其他用户的访问事件插进来,Mary在第六秒钟的时候可以又访问了一次,后面。接下来我们就直接可以复制一下。Bob,这名用户在第七秒和第八秒的时候分别又访问了对应的页面,最后我们可以再访问一个商品页面,我们直接把它ID定义成19秒的时候访问了一下对应的商品。这就是简单的一些测试数据,先把它定义好,然后接下来我们读取当前数据的时候,我们读取当前文件的时候,当然应该给一个文件路径啊,就在当前的这个项目目录下边,我们就直接给input。接下来是CS点。TXT。跟我们之前直接读取点TXT的方法是完全一致的,当然了,这里边我们可以直接把它叫做STREAM1。
06:03
如果说我们想看到当前的内容的话,可以直接调用stream的print方法。将它做一个打印。最后,不要忘记env execute执行起来。这就是一个最为简单的读取数据的方法,当然我们现在读取的是批量数据,所以相当于是一个有界的数据集直接读进来,然后直接可以在控制台做一个流式的打印输出。当前我们处理的是有戒律。我可以直接运行一下,看一下测试结果。结果非常简单,我们这里读入一条数据对应的就可以在控制台做一次打印,因为我们当前直接读取的是文本文件里边的每一行,所以这里的每一行数据就作为当前的一条数据string类型的一条数据进行了转换处理,我们这里边当然没有特殊的转换了,直接就打印输出了,这就是最简单的从文件去读取数据。
07:08
除了从文件读取数据之外,另外还有一个非常常见的读取有界数据的方式,那就是我们可以直接在代码当中。代码当中直接把测试的数据不要写在文件里啊,直接在当前代码当中嵌入进来,那就是我们可以从把它包装成一个集合类型就可以了。那就是我们可以从。集合中读取数据。这种方式的话,要调用的是env的。的。From。可以看到from这个方法里边要的数就是一个集合类型最简单的情况,当然了,后面我们还可以定义一些所谓的type,或者指定当前的数据的类型啊,那当我们当然如果是简单类型的话。
08:06
Flink可以自动的进行解析,我们就不需要做单独的定义了,比如我们这里可以直接定义一个集合。A。当前的这个列表集合里当然是应该有泛型,我们可以随便定义一些基本的数据类型,比如说直接就是in体制,这是完全可以的,比方说当前我们就是一些整形的数字,Numbers,那当然了,这里边我们需要去一个list,接下来可以在后边里边去加入一些数字啊,比如说我们加入二。再加入。一个五啊,这些都是可以的,那接下来呢,我们可以直接from collection,把当前的numbers。读进来得到一个对应的stream,我们可以看到得到的是一个data stream source data stream source里边的泛型对应的就是我们当前传入的数据类型,而this呢,本身它继承自single output stream operator。
09:12
这是所谓的一个单一输出的硫算子,然后它本身又date,那所以本质上我们当前就创建出了一个data three,一个数据流,接下来就可以基于这个流做各种各样的转换计算操作。那当前呢,我们可以把这个直接叫做这是一个比较特殊,里边就只有一些数字,那如果说我们想要对应之前的定义好的这些event事件的话,也可以有其他的一些定义方式,比如说。同样我们还是可以去直接一个list,现在我们就直接包装成event数据类型,这也是完全可以的,对于这样的简单的符合flink定义的类型,Flink可以自动的把它解析出来。那接下来我们就可以。
10:02
把当前这个list直接可以把它叫做。那么events里边。可以去做一些添加,添加对应的事件对象,那这里我们就可以直接调用它的构造方法去进行传入参数,构造一个对象啊。里边前面两个。字段都是string类型,最后一个是一个长整型。那我们同样可以把对应的这样的一条数据传进去。后面的话我们同样可以改变一下,比方说是Bob点击了一下购物车。第二秒的时候。接下来同样可以调用env from方法将当前的event。这是我们能够看到的,那下面同样可以把这个stream和STREAM2做一个打。
11:04
我们可以加一个前面加一个参数,让不同流的输出结果看的更加的明显一些。这个可以直接输出一个二,为了区别的话,前面STREAM1这里可以输出一个一,接下来再来运行一下,看一下效果怎么样。非常的明显。对于一而言,我们是照着原先。这个文件里面的每一行数据原封不动的做了一个输出,而对于二呢,呃,这里其实是把每一个event事件做了一个to string之后,序列化之后,然后打印输出啊。那当然了,对于numbers这个流里面,就是把对应的数字直接打出来就可以了,所以在这里可以看的非常的明显,所有的类型,我们定义的数据,它的类型也就是这里要传入的集合的泛型,也是我们生成的data streamam这里的泛型。
12:08
在之后我们往往会提到就是当前数据流里的数据类型是什么,所指的也就是里边每一条数据对应的类型。所以可以看得出来,从集合当中读取数据这个定义会比较简单,而且会非常的灵活,想要做出什么样的测试数据,可以直接把它构造出来,添加进去就可以了,在代码中可以非常快速的进行测试和调整,所以从集合当中读取数据呢,往往就是用在代码的测试和。所以这种方式呢,它的本质上其实相当于是直接把对应的集合数据临时存储到了内存当中,那么形成了特殊的数据结构之后,作为当前的数据源进行使用啊,一般呢,就是只用在测试当中,在实际项目当中,当然不可能直接把数据写死在这里了,只用在测试环境。
13:08
当然了,对于这种情形,跟它类似的还有另外一种读取数据的来源,那就是可以直接从元素,所谓的从元素读取数据。这种调用方法呢,其实和from collection非常的相似。可以看到下面还有一个方法叫做from element,这个from element呢很简单,就是不需要在这里再去构建对应的集合类型了,而是直接把对应的所有的这个数据。原生的数据。直接一行一行添加在这里就可以了。所以像上面我们就只有这两条数据的话,和BOB2个用户的两次点击可以直接用from element这种方式直接跟在后边作为它的参数读取进来。
14:02
效果跟上面。这种调用从集合里面读取是一样的啊,那所以这种方式可能在测试当中会更加的简单,所以。测试里面往往会用这样的一个形式去代替前面我们使用集合的这种方式,那当然了,这里如果直接定义的话,STREAM3。得到的当然也是data source,它的泛型是下面,如果我们直接做一个打印输出的话,也会看到。相同的结果,我们可以运行一下。我们可以看到,上面三和二这两条流的输出完全都是一致的。这里可以看到当前的time打印出来的话,是因为我们当时在里边是做了一个用,呃,Q。下边的这个time类型进行了一个转换,把它转换成了年月日十分秒的这种形式,那这种方式呢,我们会知道是把当前的长整形时间戳作为从1970年1月1号零点开始的一个毫秒数来进行的转换,那当前我们只是一个一千两千,那很明显就是1970年1月1号的一个时间了嘛。
15:28
这里边为什么是八点,这主要是因为本身做标准时间值转换的话,那应该基于所谓的UTC时间,标准时间啊,就是英国伦敦那里的格林位置时间作为标准,而我们现在如果直接控制台打印输出的话,显然会以我们当前自己的时区进行一个输出,北京时间的话,东八区很显然比伦敦标准时间要早八个小时,那所以。标准UTC时间的1970年1月1号的零点就是我们这里的早上八点,所以可以看到转换过来之后就是8.01秒一条数据,点零二秒一条数据,那不管是直接从元素里边读取,还是从集合里边读取,得到的结果都是一样的。
16:18
这就是我们所说的读取有界流的最常用的方法。那实际应用的过程当中呢?很显然后面两种方法都是用在测试当中,最常见的读取有限流进行批处理的方式,就是第一种从文件里面读取的这种方式了。在flink里边,它不仅仅支持直接从当前的一个文本文件里面去读取数据,也可以直接传入一个分布式文件系统的目录。诶,那当前我们常最常见的分布式文件系统当然就是HDS了。如果我们想要从HDFS目录下去读取数据的话,那还需要引入相关的依赖。所以。
17:04
整体来讲,从集合当中或者从文件当中读取数据,都是用来处理有借流数据的。
我来说两句