00:00
了解了map之后,接下来我们再来介绍另外一种,也是非常简单、非常常见的基本换算,就是所谓的过滤,顾名思义,它其实就是要把数据流里边做一个过滤操作,把符合某些条件的元素过滤筛选出来,比如说我们可以看到图示里边。每一个数据元素,它的颜色。本身就是当前的一个属性,我们可以根据当前颜色的深浅来做一个判断。当颜色比较浅的时候,直接就滤除掉,而剩下来的就全部都是深色的数据块了,所以在这个过程当中,我们的关键点其实是要设置一个。返回值为布尔类型的条件表达式,那。返回如果为true的话,表示当前的元素可以过滤出来正常输出,那如果当前的表达式为false的话,那说明对应的元素就要被过滤掉,不再输出,所以我们会发现当前的经过过滤操作之后。
01:11
数据的个数是有可能会减小的,在有些条件下,当前数据是不会输出的,而另外一些条件下呢,相当于数据没有做任何的变化,直接原样输出,这就是所谓的过滤。那它的特点当然就跟map会有所不同了,前面我们提到的map转换,本质上来讲它是一对一的一个映射关系。得到的结果。数据流里。每一个元素的类型可以跟之前是不同的,数据类型可以发生转换。我们也记得慢function需要传入两个类型参数,它有两个泛型,一个是表示当前输入的类型,另外一个是转换之后输出的类型。而对于当前的filter呢,很显然我们就只有一种类型,因为满足条件过滤出来的元素还是原样嘛,当然类型就不变。所以当前filter要实现的是一个filter function,它只有一个,就是当前要处理的数据上的。
02:16
元素的数据类型。所以接下来我们可以在代码里面对于filter function也做一个对应的测试。我们还是可以在当前的包下边去新建一个Java class,就叫做。同样还是现在是transform filter test。那么整体的测试流程都是差不多的,我们把测试的没方法写出来,抛出异常,那前面同样还是需要去创建流式的执行环境get出来。叫做V。全局的将当前的并行度设为一啊,那同样前面还是要读取数据,我们就直接可以copy之前map这一部分测试的代码。
03:05
把数据源先创建出来,得到一个data stream source啊,那其实我们知道对于data stream source而言,本质上它继承自single output operator,因为我们知道也是一个算子嘛,所谓的原算子,那这个类呢,本身又继承自啊,所以当前我们得到的可以说就是一个data,后边调用map转换之后。我们会发现得到的是一个single outperator啊,那同样也是一个。尽管我们当前转换之后类型可能会发生变化,但它还是由一个event类型的data stream转换成了一个stream类型的data stream,那这就是为什么当前我们调用的API叫做data stream API,它就是在data stream之间进行来回的转换。
04:01
从一个data stream转换成另外一个data stream。所以现在我们要做的filter也是类似的操作。基于当前的STEM。就可以调用一个叫做filter的方法。在这个接口里边,这个方法里边要传入的那就是一个filter function,所以这就是flink编程的一个特色,如果是a source的话,那里面要传入的就是一个source function,那如果是map的话,传入的是一个map function filter的话传入一个filter function。那所以这还是对应的有几种不同的方法。第一种方法还是可以直接。传入。一个。实现了。Function的。自定义类的对象。哎,那所以这里边我们自然就会想到可能需要去一个对应的对象,我们这里可以把它叫做自己定义出来的filter function,叫做my future。
05:05
对应应该在下边。做出单独的声明啊,那所以这里边我们。实现一个自定义的filter function。Public static啊,写在当前文件里边的话,我们需要定义一个静态,就是MY。Implement实现了一个filter function接口。我们看到filter function有只有一个参数,泛型参数,就是当前的数据类型,因为它不涉及到数据类型的转变,所以我们直接传入的是。那么这个接口里边呢,同样只有唯一的抽象方法,就叫做filter,它会返回一个booing类型,这就是我们所说的其实就可以直接返回一个布尔类型的表达式。如果为真,那么就说明当前元素保留下来,过滤出来,如果为假,那么就相当于当前元素被删除掉,筛选掉。
06:07
所以接下来我们还可以指定一个想要实现的需求,就当前我们输入的这些数据里边呢,有不同用户的点击访问事件,我们可以要求只选择比方说某一个用户就是Mary,他的所有的点击访问事件。那基于这样的一个考虑的话,显然我们就是实现这样一个filter方法。只要判断当前这个。变量叫做value value里边的user是否是Mary就可以了,如果是value,那我们就返回true,如果不是返回false,所以直接return,可以return value.user。判断下是否equals。就这。就可以实现我们当前的一个过滤的功能,那就如果我们输入这样的三条数据的话,很显然相关的数据会输出,而Bob和Alice的数据就不会被输出了。我们这里可以做一个简单的。
07:08
我们把这个叫做一。接下来可以做一个简单的测试,把它打印出来。下边env XQ执行起来。运行一下。我们可以看到跟预期是完全一致的,这里输出的就是只有一条Mary的数据,Bob和Alice都没有输出。这就是filter的作用。那当然了,同样是filter function,我们可以用另外的方法来实现,那就是。传入一个。匿名类实现filter。Function。接口。这个过程跟前面我们提到的map传入一个匿名类实现map function过程是完全一样,所以直接stream可以调filter方法里边直接去一个filter function。
08:16
直接去一个filter function,那我们看到这里面的类型都已经帮我们补全了,需要去实现的filter方法,抽象方法也都已经放在这里了,那当然了,当前我们要实现的具体逻辑还是一样。如果这里面我们把这个改成Bo Bob的话,很明显得到的RESULT2。那么这个时候打印输出的就应该只有Bob这个用户,对应的点击数,Mary和就全部被筛选掉。这是我们能够想到的一个结局啊,那另外当然还可以有第三种情况,就是传入拉姆达表达式。也就是说传入一个匿名函数,这种使用方式呢,其实会更加的简单一点,因为我们想到了在map或者map进行转换的时候。
09:11
可能还会涉及到前后类型的不同,如果转换之后输出的类型比较特殊的话,诶,那我们说Java里边有泛型擦除,为了保证flink能够解析出对应的类型,有可能后边还要加上对应类型的一个说明,使用returns方法去做一个指定。而对于filter呢,这里面根本不用考虑类型的转换,它就是基于之前原始的类型做了一个筛选过滤而已,所以它使用拉姆的表达式其实是肯定没有问题的。啊,那这里面我们就可以直接。传入一个拉姆德表达式,把当前的当然是掉它的,判断它的user是否等于,比方说现在我们要判断是否等于Alice,这也是完全可以的啊,那这里可以直接把它也做一个打印。
10:00
我们这里就不在对应的定义RESULT3了,在后边直接可以print,把它print出来,这是一个拉姆达表达式的输出。Alice。对应的数据。我们可以直接运行一下,看一看是否最后的这个Lu对应的这条流,我们筛选出来的流可以得到爱丽丝的数据,确实是这样,我们得到的就是爱丽丝相关的访问。就是对于filter function的一个基本的用法,那整体来讲它跟map function的使用非常的类似,而且会更加简单。我们可以通过这种方式逐渐的熟悉flink里边编程的风格。
我来说两句