00:00
关于用户自定义数据源呢,其实我们会发现在这里我们直接去实现的是一个,而前面我们提到卡夫卡数据源的时候,其实。本身这里传入的flink卡夫卡consumer,它继自fli卡夫卡consumer base,而本身呢,这个抽象类又继承自rich parallel function啊,那这里面我们会发现它比一般的function多了两个修饰的关键字,一个是rich,另外一个是parallel的话,这个是涉及到了函数,我们会在稍后去进行介绍,那这里呢,Parallel又指的是什么呢?呃,它指的是并行,我们继续点下去的会会发现啊,这个抽象类它继承了抽象的复函数这样一个抽象类,另外它实现的接口其实叫做。Parallel function,也就是最终它实现接口是这个,当然了,本身这个并行的function接口也是继承自s function的,所以本质上我们传入的还是一个s function,那这里的区别就在于,如果说我们实现了parallel s function的话。
01:12
就可以让当前的数据源给他指定更高的并行度。然后进行更加。吞吐量更大的处理,整个系统处理的性能就会提升啊,那所以这里面我们就可以去做一下简单的测试,像前边我们没有定义parallel的时候。直接自己定义的对应的这个source custom test,假如这里我们把全局的并行度提高一点,这里设成二啊,或者说我们更直接一点,直接把当前的a source这个source任务。单独给他设一下定型度,这样可不可以呢?我们可以运行一下。看一看结果怎么样,我们看到代码会直接报错,这里面报出抛出的异常是。当前的并行度对于非并行的算子而言,只能是一。
02:06
所以对于我们当前这个问题而言。如果这里边实现的是一个s function,简单的s function的话,它的并行度只能是一,不能设置成其他。那当然了,整体来讲吞吐量就会非常小了啊。那前面我们提到的。直接从socket文本流里面读取数据,其实也有类似的问题,Socket文本流只能串行的读取数据,它其实是没有办法增大吞吐量。当前我们就遇到了类似的一个问题,那怎么样才能够给它设置更高的并行度呢?解决方案也很简单,那就是需要实现一个parallel方式,所以这里我们还可以进一步去做一个测试,那就是针对自定义一个并行的。方式。去并行的生成数据啊,那当然了,如果是这样的话,我们这里就不应该去之前的那个click了啊,我们为了。
03:06
对比更加的明显,可以把之前的这段代码注掉,然后去。这里我们去一个自己实现的。Parallel。Custom source。我们这里还是把它叫做custom strip。然后进行一个打印输出,那这里的关键当然就是需要实现出自定义的parallel custom source这样一个parallel function啊,那我们如果说不想像那样把它单独放在一个文件里面的话,放在当前文件里边啊,那很明显要定义的就是一个静态了。Public static class。我可以。啊,它做一个实现,现在要实现的就是一个。Parallel方式。
04:00
里面同样有一个泛型。呃,我们这个可以简单一点,不要去实现一个event类型的数据了,因为那个随机选取的话会比较麻烦,我们直接就随机生成一个整数就好了,那当然上面这个类型也应该要改成整数。Integer。接下来里边必须要实现的,同样还是wrong和CANCEL2个方法啊,那当前我们是要实现自定义的。并行方式。那这里,呃,类似的,我们还是用一个标志位来控制当前数据是否继继续去生成,所以类型的一个running初时等于true。那另外呢,我们还是随机生成,随机生成的话可以直接用一个random来。所以这里面我们可以直接在外边把它定义出来random,一个random。
05:05
在run方法里面就是一个外循环,不停的生成数据。以running作为。当前的判断标准,那下面呢,就是调用CX方。只要传入一个整数就可以,我们这里边直接随机生成random点,非常的简单,那下边如果要是cancel方法的话,就是直接把。属性改成false,对应的我们在run方法里面的循环就会终止了,这就是一个非常简单的并行数据源的一个测试,那我们要看看如果给它加一个并行度为二,现在能够正常去运行吗?可以运行一下。我们看到现在就完全没有问题了。当然如果说我们想要进一步去测试的话,也可以把全局的并行度给大一点啊,那对应的话,我们会看到S任务的并行度是二,而后面打印输出的话。
06:07
它的并行度可以到达,就是更高可以到达我们当前全局设置的四,默认的如果说没有单独指定并行度的话,当前print的并行度就是我们统一在环境里边指定的四,这样的话可以看到一共有四个并行子任务在不停的生成整数。这就是关于并行的用法。
我来说两句