00:00
呃,那接下来呢?我们要将每一行数据转化为这个切算对象。那正常来说,我们应该大家想一想,应该用哪个函数。我们应该调用哪个函数?Me?对,调用map传map方程吧,好,那我们来做一下这个事情啊,那就卡法点S点调用一个map,这里边呢是一行数据对吧?呃,那正常的呢,我们就很简单,就是杰森。点pass object,把这个line呢进去。好传进去好,那这边呢,还有一种简写的方式对不对啊,那当然了,我们直接返回啊,可看返回它有一个简写的方式,Out加回车,那么表达式的方式,直接呢,方法调用的调用接算里边的pass object,因为它只有一个承载的方法,对吧,它呢,有有一个啊,就只传一个参数的,只有一个自己好呃,那这样的话大家想一下,如果说我过来一个脏数据会怎么样呢?
01:29
如果说我这个杰森数据,哎,最后少了一个大括号。大家觉得这个任务会怎么样?解析失败了。报错对吧,而且任务会怎么样?任会。停掉吧。
02:02
是不是任务会停掉,重启之后它还是接着刚才消费是不是会停掉,任务就停掉了,对吧?好,那我们想一下啊,如果说来了一个脏数据。他脏数据这种东西正不正常。你就想这个东西是不是正常的。如果来了一个脏数据,是不是正常的?那我伸展环境当中是很有可能会有这个什么。脏数据的对不对。是不是有可能会有这个脏数据啊,但是如果说由于你这个脏数据,整个任务就挂掉,是不是不应该啊。
03:29
OK,那这个时候我们想一下,有什么办法能做这个事儿呢?我们有没有办法来做这个事儿呢?那遇到脏数据我们应该怎么办呢?有的人说非要的过滤掉,哎,可以非的给它过滤掉。
04:01
对吧,啊try catch这样方式做一个过滤是吗?呃,其实我们可以直接这样啊,在如果说在flink当中呢,往往来说我们这样那脏数据我们也得给他保留下来,也得给它保留下来,对吧?保留下来干什么事呢?看一下这个脏数据到底有多少,最终跟我们数据做一个比较,看看它有多少,也就是说这块我们就不要用map了。不要用map了,对,有同学提到了用process,为什么要用process呢?因为我想把数据保留下来,我要写到另外的框架里边,所以在这边我也把脏数据写到测输出流里边。对吧,而我们一旦要用到测输出流的话。要用到测殊流的话。我们就只能用process是不是?
05:03
OK,那所以呢,我们用啊用process就好了,那这边呢,有一个process方式object,这个地方返回值是J3。Object对吧?好,那这个里边我们去处理一下,比方说我们要做这个Y6的一个解析啊,那直接这样。杰森点pass。Object,然后呢,Value。加宾得到一个杰森object,好,那这边呢,Return,哎不是return了,应该是直接写出叫out.connect j3object,但是对于这个呢,我们要做。异常处理对吧,要做异常处理。号加T。对吧,我们要做一个异常处理,Try catch。啊,Try catch好,那如果抓到异常是一个解析异常。
06:05
Pass exception。对吧,如果说我们抓到的是一个解析异常。好,那这边呢,我们看一下这个我来改一下啊呃,我看一下exception。他说我们这个地方,那我们这样吧,直接还是抛一个这个exception出去,对吧,如果抓到。抓到了一个异常,我们就正常的呢,这个我就就不打印了,我就不打印了,我干什么事呢,写入侧输入流。发山。异常对吧,往往都是我们的这个解析异常,发现异常,将数据写入侧输出流,写入侧输出流。那这边我们就用Ctx.outut然后这个地方呢,我们应该要一个out the对吧。
07:10
好,那这边呢,我们就直接是分类形就好了,然后这个地方。要给一个ID,给个ID对吧,那张数据叫得体。我们有一个脏数据的一个标记就好了,看看嘉宾得到一个out of time啊,然后接下来呢,把这个扔进,然后接下来还有一个我们的数据本身value。对吧,那只有这样的话,我们遇到脏数据之后,程序也不会挂掉,因为我们今天刚才讨论的这个问题,脏数据呢,它属于正常的现象。啊,它属于正常的现象。好,那这边呢,我们叫嗯,一个J3OBJDS。
08:05
啊,杰森OJ的一个DS对吧?啊,OK,那这块呢,我们就可以截一下啊,我们终于把这个数据呢,转化为了一个杰森对象。
我来说两句