首页
学习
活动
专区
工具
TVP
发布
精选内容/技术社群/优惠产品,尽在小程序
立即前往

NIFI1.21.0_Mysql到Mysql增量CDC同步_日期_以及null数据同步处理

NIFI从MySql中增量同步数据_通过Mysql的binlog功能_实时同步mysql数据_根据binlog实现update数据实时同步_实际操作05---大数据之Nifi工作笔记0044

具体的,之前已经写过,如何在NIFI中实现MySQL的增量数据同步,但是写的简单了,因为,比如在插入的时候,更新的时候,仅仅是写死的某个表,也就是针对某个表,指定好字段进行插入操作,或者更新操作,这样就有些局限了,比如我想同步一整个库,注意是增量同步,那么,难道我要一张一张表的去创建好了以后,然后我再去一个的同步嘛,比较麻烦,一点点解决.

 首先看这个日期同步的问题,这里:

编辑

首先先来看一下之前那个整体的,mysql到mysql增量同步的流程,可以看到这里我们主要修改insert的ReplaceText处理器就可以, 修改这里拼接sql语句的部分.

编辑

上面这个insert对应的ReplaceText处理器需要修改.

编辑

需要修改他的ReplacementValue这个属性

INSERT INTO userinfo_nifi(id,name,mobile,email,son_json,cdate) VALUES($,"$","$","$","$",STR_TO_DATE("$",'%a %b %d %H:%i:%S CST %Y'))

主要是这里的内容,可以看到有一列是日期类型的,那么,需要通过STR_TO_DATE

INSERT INTO userinfo_nifi(id,name,mobile,email,son_json,cdate) VALUES($,"$","$","$","$",STR_TO_DATE("$",'%a %b %d %H:%i:%S CST %Y'))去进行转换.

类似于:

INSERT INTO `userinfo_nifi` VALUES (9, 'sdsd', '4645', '665', 'sdfs', STR_TO_DATE('Fri Jun 16 04:40:30 CST 2023', '%a %b %d %H:%i:%S CST %Y'));

将日期格式转换可以插入到数据库中的就可以了.

当然还需要在EvaluateJsonPath处理器中添加上,cdate

编辑

然后再来看更新:update

这里也是修改一下EvaluateJsonPath处理器和ReplaceText处理器就可以了

update userinfo_nifi  set id = "$" , name = "$" , email = "$" , mobile = "$" , son_json = "$" cdate = STR_TO_DATE("$",'%a %b %d %H:%i:%S CST %Y')  where id ="$"

这个是ReplaceText的ReplacementValue值.

这里其实还有个问题,就是空值的时候呢,已经试过了,空值的时候上面的办法也会报错...所以

再继续解决:

先来看一下整体的解决后的流程:

编辑

编辑

然后我们看看修改的部分,注意这个流程,仅仅是用来同步一张表的,配置也麻烦,需要配置表名,表字段什么的,后面咱们看看,能不能改成自动识别字段这样.

先来看insert部分,他的这个EvaluateJsonPath处理器

编辑

先来提取字段内容,然后

再添加一个updateAttribute处理器,用来更新一下cdate字段,日期类型的字段需要处理一下,

就是判断它是否是空,如果是空就返回null,不是null就返回它自身,当然,其他的字段也可以用这种方法来处理空值.

编辑

updateAttribute处理器这里的,需要添加一个字段,这个字段就是处理后的cdate字段.

$','%a %b %d %H:%i:%S CST %Y')")}

可以看到其实就是处理cdate字段的空值的.

然后再去:

编辑 看这个ReplaceText处理器

这个是ReplacementValue的值:

INSERT INTO userinfo_nifi(id,name,mobile,email,son_json,cdate) VALUES($,"$","$","$","$",$)

这样就可以了,下面的部分都不用动,这样就可以适应null的情况,如果不是空值就可以正常插入数据了.

然后再来看update更新:

更新也是从EvaluateJsonPath处理器开始提取字段值

编辑

可以看到提取的值有点多,因为有新值和旧值

编辑

提取以后

编辑

让再来看下面的updateAttribute处理器,这里主要添加两个属性,用来修改cdate的值,判断是否为null处理

$','%a %b %d %H:%i:%S CST %Y')")}

$','%a %b %d %H:%i:%S CST %Y')")}

这两个值的内容分别为上面的内容

然后再来看:

编辑

ReplaceText处理器的值

update userinfo_nifi  set id = "$" , name = "$" , email = "$" , mobile = "$" , son_json = "$", cdate = $  where id ="$"

这样就可以了,然后就是去测试,这样,新增,删除,修改,包括 对日期类型数据的空值判断,插入都可以了.

  • 发表于:
  • 原文链接https://kuaibao.qq.com/s/20230616A0581C00?refer=cp_1026
  • 腾讯「腾讯云开发者社区」是腾讯内容开放平台帐号(企鹅号)传播渠道之一,根据《腾讯内容开放平台服务协议》转载发布内容。
  • 如有侵权,请联系 cloudcommunity@tencent.com 删除。

扫码

添加站长 进交流群

领取专属 10元无门槛券

私享最新 技术干货

扫码加入开发者社群
领券