00:00
我们现在已经了解了怎么样在flink程序当中去读取有界的和无界的数据源,我们会发现啊,其实对于这个有界的数据源来讲,一般情况都是用在测试场景里面的,那在实际的生产环境当中呢,当然用的最多的就是直接读取卡夫卡作为数据源了。但是我们知道凡是总有例外嘛,那假如说我们想要读取的数据源,它不是来自卡夫卡,而是其他的某个比较特殊的外部系统flink里边呢?呃,既没有给我们提供对应的预实现的方法,也没有给我们提供对应的连接器。哎,那这个时候怎么办呢?那当然我们就要使用最底层的一般化的通用方法,直接调用a source,然后里边传入一个自己实现的s function。所以这就是我们所说的。自定义一个圆算子,自定义一个S,这里边的关键其实就是要自己去实现一个S方式接口啊,给他写一个对应的实现类,那这里边关键是什么呢?就是要重写两个方法,一个叫若,一个叫cancel。
01:14
这个我们也可以到源码当中去简单的看一下啊,S function里边这个interface里边我们会看到啊,这里关键就是一个wrong,另外还有一个cancel,那其他的这个source context,这是当前的这个上下文啊,呃,我们这个就先不去考虑,我们关键就是要实现这两个方法,那这里的说。其实就是最关键的。读取数据的方法,哎,它其实当前这里边我们看到它的参数就跟当前的这个运行时的上下文有关系,Source context是它的参数,这里边在这个上下文里边,它就可以像当前的流处理的下游任务发送对应的数据,哎,所以其实在这个wrong方法里面,简单来讲就是什么呢?我们可以从外部去读取数据,然后得到的数据呢,再用当前的运行上下文把它发送出去。
02:07
那另外这个cancel是什么呢?Cancel简单来讲就是说我们当前这个wrong按道理来讲就是会不停的运行的,因为我们流式处理程序是连续不断的读取数据嘛,永不停止,无休无止,诶那什么时候我们应该把它终止呢?诶那就调用这个cancel方法,比方说我们在那个web UI上啊,右上角那个大大的看L按钮,其实最终就是调到了这里来,那这个看L方法应该怎么实现呢?诶简单来说就可以创建一个标志位。上面的wrong方法里边,我们可以判断这个标志位,比方说为true的时候,我们就不停的循环,不停的读取数据,发送数据,那如果为false的时候呢,那就终止当前的循环,那这cancel里边,我们只要去把那个对应的标志位做一个更改就可以了。这就是s function基本的一个处理思路,接下来呢,我们就做一个具体的自定义。
03:02
那么我们现在也没有特别的需要连接到的外部系统啊,所以呢,我们就干脆直接做一个随机的生成吧,就相当于随机的产生一个测试数据源啊,这当然也可以去做一个自定义了,我们先来把这个类做一个实现,哎,所以先去new一个scale的class,现在是真正的一个class,不是object了啊,我们就把它叫做当前的点击事件的数据源吧,我们随机生成点击事件啊,就叫做。Click source。啊,那么当前这一个类呢?显然我们需要去实现对应的s function接口,所以我们直接extend source function。啊,那后边我们知道当前的s function是有泛型的,对应的那个泛型就是最终我们当前数据源发出的数据类型。那我们当前既然是点击事件嘛,对应的类型当然就是前面我们定义好的event了。
04:00
所以接下来我们可以看一下。这个class里边,我们需要真正实现的抽象方法就是一个肉,一个cancel。那前面我们也说了啊,在这两个方法里边,主体逻辑那就是draw,我们就是不停的循环,然后发送数据啊这里边我们可能是随机生成一个数据,然后直接发出去,那cancel呢,要用一个标志位来进行当前这个发送数据中断的控制,诶所以这里边呢,我们首先就把这个标志位定义成当前类里边的一个属性不就可以了吗?哎,所以我们定义一个标志位。这里边我们就哎这个标志位显然是可以更改的啊,所以应该是V,不是啊v value啊,我们把它叫做running吧,是否在运行它当然是一个布尔类型了,我们直接让艾做一个自动类型的推断,不写类型了,那接下来呢,哎,我们就是在run方法里边要去定义具体的处理流程了。首先。我们是要随机生成数据啊,那所以这里边我们首先应该要有一个随机数生成器。
05:07
主要用它来随机的生成数据,那当然就是random了,SKY和Java里面都提供了,我们随便选择一个就可以,另外呢,我们还应该要定义。数据随机。选择的范围。哎,那这就是首先啊,我们定义一个users。所有的用户名称可以在什么范围内去选择呢?哎,我们定义一个数组吧。里边我们定义一些用户名,比方说叫Mary。哦,然后我们可以定义Alice。Bob。好,再来一个比方说C开头的carry吧,然后接下来我们还可以定义所有的URL可以选择的范围,URL也是一个数组,哎,那这里面我们定义。主页面点后可以有购物车。
06:03
点cart,那另外比方说还可以有喜欢的商品啊,类似于收藏夹一类的东西啊,Favorite我们可以简写,另外可能还有一些商品详情页的浏览啊,这个我们随机的定义啊,比方说等于一。可以多定义几个,哎,那后边还可以有这个等于三等于二。这就是我们随机生成的一个范围。然后接下来的关键流程,那就是使用一个循环不停的发送数据,那这里边呢,当然不能死循环啊,我们有一个判断的条件,判断条件就是当前的标志位。用。标志为。作为。循环判断条件。不停的发出数据。所以这里边我们应该是一个while循环,里边的判断条件就是running。
07:06
那里边呢,首先我们应该要生成一个数据啊,哎,这里边我们就先定义一下吧,我们当前的数据。Event一个c event啊,那就应该是一个。具体的一个对象实例里边呢,呃,首先我们应该从users里边去做一个筛选,那users里边我们选择谁呢?这里面我们就要随机的生成一个当前的索引号,所以我们当前就要调用random点。Next int是一个int类型的索引号吧,里边呢,还应该指定它的范围,当前的范围当然就是数组的长度了,users.less。随机的做一个生成啊,那这当前我们只是定义好的这个user是什么啊,然后接下来URL当然也是一样了,从URL这个数组里边随机的去选取一个索引位置。
08:03
那里边给的范围呢?当然还是URL这个数组的长度。最后呢,还应该有一个时间戳,那我们这里可以直接使用calendar,哎,直接get instance。然后去。调一个get time in millions方法获取一个毫秒数作为当前的时间戳,这样的话这个事件就生成好了。然后接下来呢,就应该。要。调用。上下文,哎,这里边就是它的参数叫ctx,当前这个SS任务的上下文调用ctx的方法。向下游发送数据。那这里调用它的什么方法呢?诶,我们看到它有一个叫做collect的方法收集,哎,所以当前我们其实就是使用这个类似于这个收集器啊collector啊,做一个收集,把当前要发送的数据收集起来,接下来呢啊,就相当于写入到了我们发送的缓冲区里边,接下来就可以直接发送给下游任务了。
09:12
所以当前我们是调用C tx.collect里边要传入的当然就是一个event了啊,那当前我们直接把前面包装好的event放在这里发送就可以。当然了,这里我们还可以多考虑一点,就是呃,如果说我们这里不做限制的话,那这个循环起来的话,这个发送会频率非常的快啊,可能会把我们整个这个机器的资源全部耗费掉,所以这里呢,为了方便观测,我们可以。每隔一秒。发送一条数据。方便我们做观测。这里我们就非常简单的直接使用th.sleep啊,直接让他休息一会儿啊,直接传一个1000,哎,那这样的话就是休息一秒钟。
10:04
这样的话,这个wrong方法的逻辑我们就已经完全实现了,最后还有一个cancel cancel这里就非常简单了,我们直接把running这个标志位。给定成false就可以了啊,那我们就会发现啊,当前正常情况下,这个任务起起来之后,就会调用这里的wrong方法,然后就会进入循环,不停的发送数据,不停的发送数据,那假如说我们想要让当前任务停止的时候,就会调到这里的cancel cancel呢,就会把当前的running标志位属性改成false,诶,那同时这个run方法还在运行着呢,这个时候他在判断well循环啊,发送了一条数据之后,下一次再做判断的时候就会发现running变成了false,那么就相当于退出,不再发送数据。所以我们看就是用这种方式实现了发送数据的中断啊,那这就是自定义了一个方式。
11:00
那这里如果说我们想要去做测试的话,那还应该在另外去创建一个scla的object啊,这里我们可以叫做source。Custom。Test。啊,当前用户自定义的一个测试main方法,然后里边还是啊,Stream execution environment.get先把它获取到当前的这个变量,我们还是叫做烟V上面,我们把它改成下划线,引入对应的影视转换啊,然后同样啊,还是为了方便测试,我们直接把这个全局的并行度设成一,接下来原算子S,那就是要直接去。读取自定义的数据源。那这里就是env直接调用点ADD source方法里边就是new,一个我们之前定义好的click source啊,那这里我们得到的这个stream。当然它的类型data stream。
12:02
泛形式event,这就是我们定义好的这个S啊啊,那得到的这一个可以直接做一个打印。看到结果是什么,最后不要忘记还要运行起来,这就是我们完整的测试的过程啊,我们可以直接运行一下,看看效果怎么样。现在已经启动了,诶我们可以看到。每隔一秒就会输出随机生成的一个用户点击访问的事件。所以这个过程的话,我们会发现啊,这就是跟我们在click里边定义的流程是完全一样的,哎,这里到底是哪个用户,然后点击了哪个页面啊,这个都是随机生成的,我们就可以用它来模拟当前在生产环境里边的一个正常的数据。这就是自定义数据源的测试过程。关于自定义数据源呢,我们还需要另外注意一点,就是这里的并行度,我们现在是全局的设置成了并行度为一,也就是说我们当前读取数据的时候,相当于是一个串行的读取,一个接一个读取,那当然后面的打印也是按照顺序一个接一个打印啊,这个是没有任何问题的,那实际运行的时候,当然并行度是要调大的,我们现在能不能全局的把并行度调大呢?哎,当然是可以的,但是这里如果调大并行度的话,其实只是最后的打印输出是并行打印前边的读取数据。
13:27
其实并没有变成并行,它还是串行,这是为什么呢?这主要就是因为click s。它本身这里边实现的是一个S方,S方本身并行度只能为一,它并不是并行的。那如果要做并行的话是什么样子的呢?哎,其实前面我们讲到卡夫卡作为数据源的时候也已经知道了啊,Flink卡夫卡consumer它本身其实实现的是一个rich parallel source方式啊,所以如果说我们想要让它并行读取数据的话,必须实现的是一个parallel source方式。
14:04
在这儿我们也可以代码里边去简单的做一个测试啊,关于这里啊,如果说。我们在a source后边直接对它单独的设置并行度set parallelism2,这样的话我们运行就会发现。我们看他直接就抛异常报错了。他会告诉我们说当前的并行度啊,对于一个非并行的算子而言,这个并行度必须是一不能设置成别的啊,所以说呃,相当于我们这个在代码里边指定它实现function的时候,就已经对它做出了对应的限制。那怎么解决这个问题呢?其实也很简单。就是我们让。这里的S直接实现一个parallel s方式就可以了,当然要引入对应的类,然后接下来我们再来重新测试一下。运行。我们可以看到现在就是每隔一秒直接会跳出两条数据了啊,这个效果跟之前就会有所不同啊,相当于是两个并行的SS算子啊,都在执行我们这里边的逻辑,那就是每隔一秒,然后就会随机的生成一条数据,那所以当然就相当于每隔一秒会有两条数据生成了。
15:21
啊,这就是这个简单的测试啊,那当然了,为了方便我们的处理啊,我们还是一一秒钟生成一条数据就可以,所以这里边我们还是把它改回来,改成S方式啊,做一个串行的测试就够了,但是我们需要知道,当前如果想要并行处理的话,要实现的是一个parallel source方式。这就是关于读取自定义数据源的具体的操作。
我来说两句