温馨提示:文本由机器自动转译,部分词句存在误差,以视频为准
00:00
我们现在已经了解了水位线怎么样去生成啊,我们可以调用系统内置的水位线生成策略,也可以自定义水位线的生成。在前面我们介绍的这些方法当中呢,我们的数据里边是没有时间说信息的,所以在里边我们一定要去重写create time step sign这个方法,或者是调用一个with step,其实最终都是指定了当前提取时间戳的一个策略。从数据里边提取某个字段,然后把它指定成时间戳,我们现在这种方式其实会发现啊,就是本来这个数据里边是已经有时间戳的,那我能不能在这个数据读进来的时候就已经指定好了时间戳是什么,后边就不用再去进行单独的提取处理了呢?完全是可以做到的。当然如果想要做到这一点的话,那就要求我们之前数据源本身里边它就包含了对应的时间出信息。所以接下来呢,我们可以再介绍一下,在自定义的数据源里边,本身就是可以发送水平线的。
01:06
当然了,就是如果有一些数据源啊,比方说卡夫卡本身它就支持里边我们插入当前的时间桌信息的话,那我们同样也可以去做这样的定义啊,那之前呢,我们已经实现了一个自定义的数据源click source。在这里边我们发送数据的时候,他并没有带着时间戳的信息啊,那接下来呢,我们就来尝试一下,在自定义的数据源当中直接去生成带着时间戳信息的数据啊,其实这个过程也非常简单,我们在前面这里不是定义了当前的event吗?随机生成了一个event,然后接下来呢,我们可以在这里插一句。可以直接为要发送的数据分配时间戳,直接把时间戳给它指定啊,那所以当前这种指定的过程也是需要用到S算子的上下文ctx。
02:04
调它的一个方法,我们看到下面有一个叫做collect with time stemmp方法,好,那所以在这里呢,我们就看到里边要传两个参数,第一个是一个event,哎,那就是当前的这个数据嘛,所以我们就把这个event传进去,另外一个就是给他分配的时间戳到底是什么?哎,那我们这里非常简单,里边不是已经有了时间戳了吗?我直接把even.time stamp传到这就可以了。这就相当于已经实现了我们后边在这里做的这个时间戳的提取。那发出这个数据的时候,它就自带了时间桌信息,那接下来发出去之后,我们就可以在这里就不实现这个也是可以的了,直直接用就完全没有问题了。当然了,在自定义数据源里边,不仅仅只能做这个时间戳的分配啊。另外我们发现还可以直接。向下游直接发送水位线,那这个方法我们刚才也看到了,Ctx里边另外有一个方法就叫做一米water mark,里边要传的当然就是一个water mark了,那water。
03:10
本身我们创建的时候,里边加入一个长整型的时间戳就可以,比方说我们当前生成的策略就是直接基于event.time step,我们现在这一个按照系统时间去生成的话,那显然这是一个升序产生的嘛,所以我们插入的时间戳直接就是even.time step减一一毫秒就可以了,哎,所以呃,如果说我们这样去定义的话,那就甚至在后边连这个。STEM and waters这个方法都不用调用,我们连water已经生成好了,那当然就不要再去自己指定water这个生成策略啊,那所以如果是用在测试环境里面的话啊,有时候我们可以直接做这样的一个处理。当然了,在当前的这个状态里边,如果说我们已经指定了这个发送了水位线的话,那后边就不能再去调用这个STEM盘的watermarkx了,在实际的应用场景里边,一般哎我们都是在数据流处理的过程当中加上这一步方法去进行处理啊,所以这两者应该是二选一的一个关系,所以在这里我们把这一部分就注掉,这是关于在自定义的数据源当中为数据分配时间戳以及发送水位线的方法。
我来说两句