前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >客快物流大数据项目(五十八):解析kafka消费出来的数据格式

客快物流大数据项目(五十八):解析kafka消费出来的数据格式

作者头像
Lansonli
发布2022-03-07 10:48:48
6900
发布2022-03-07 10:48:48
举报
文章被收录于专栏:Lansonli技术博客

目录

解析kafka消费出来的数据格式

一、解析kafka消费的OGG数据格式

二、解析kafka消费的Canal数据格式

解析kafka消费出来的数据格式

一、解析kafka消费的OGG数据格式

  • 输出到kafka的数据结构
代码语言:javascript
复制
table            表名
op_type         变更类型, I:添加, U:修改, D:删除
op_ts            操作时间
current_ts       同步时间
pos            偏移量
before         操作前字段集合
after           操作后字段集合
  • 插入数据
代码语言:javascript
复制
{
	"table": "IT.tbl_express_bill",
	"op_type": "I",
	"op_ts": "2020-08-31 07:31:28.000305",
	"current_ts": "2020-08-31T07:31:33.371000",
	"pos": "00000000130000093541",
	"after": {
		"id": 24,
		"express_NUMBER": "648777075",
		"cid": 55,
		"eid": 80,
		"order_channel_id": 3,
		"order_dt": "2015-12-25 11:46:21",
		"order_terminal_type": 5,
		"order_terminal_os_type": 5,
		"reserve_dt": "2015-12-25 11:46:21",
		"is_collect_package_timeout": 1,
		"timeout_dt": "2015-12-25 11:46:21",
		"type": 1,
		"cdt": "2015-12-25 11:46:21",
		"udt": "2015-12-25 11:46:21",
		"remark": null
	}
}
  • 修改数据
代码语言:javascript
复制
{
	"table": "IT.tbl_areas",
	"op_type": "U",
	"op_ts": "2020-08-31 12:46:13.000162",
	"current_ts": "2020-08-31T12:46:21.142000",
	"pos": "00000000130048058345",
	"before": {
		"id": 110101,
		"name": "东城区",
		"pid": 110100,
		"sname": "东城",
		"level": "3",
		"citycode": "10",
		"yzcode": "100010",
		"mername": "中国-北京-北京市-东城",
		"lng": 116.4100,
		"lat": 39.9316,
		"pinyin": "Dongcheng"
	},
	"after": {
		"id": 110101,
		"name": "东城区",
		"pid": 110100,
		"sname": "东城",
		"level": "3",
		"citycode": "10",
		"yzcode": "100010",
		"mername": "中国-北京-北京市-东城区",
		"lng": 116.4100,
		"lat": 39.9316,
		"pinyin": "Dongcheng"
	}
}
  • 删除数据
代码语言:javascript
复制
{
	"table": "IT.tbl_express_bill",
	"op_type": "D",
	"op_ts": "2020-08-31 08:31:13.000145",
	"current_ts": "2020-08-31T08:31:18.308000",
	"pos": "00000000130006459536",
	"before": {
		"id": 1,
		"express_NUMBER": "946585109",
		"cid": 80,
		"eid": 55,
		"order_channel_id": 1,
		"order_dt": "2013-06-02 21:24:14",
		"order_terminal_type": 4,
		"order_terminal_os_type": 1,
		"reserve_dt": "2013-06-02 21:24:14",
		"is_collect_package_timeout": 0,
		"timeout_dt": "2013-06-02 21:24:14",
		"type": 1,
		"cdt": "2013-06-02 21:24:14",
		"udt": "2013-06-02 21:24:14",
		"remark": "测试"
	}
}

二、解析kafka消费的Canal数据格式

  • 输出到kafka的数据结构
代码语言:javascript
复制
data        更新,插入时的新数据;删除时被删除的数据
database   数据库名称
es          毫秒值
id           
isDdl       是否是ddl变更操作
mysqlType   数据库表结构,mysql类型
old         更新数据时旧的数据
sql         具体的ddl sql
sqlType     数据库表结构, java.sql.Type类型
table       表名
ts          毫秒值
type        变更类型, ALTER,INSERT,DELETE,UPDATE
  • 插入数据
代码语言:javascript
复制
{
	"data": [{
		"id": "426",
		"name": "杨玉玲",
		"tel": null,
		"mobile": "13*******47",
		"detail_addr": "虹口区曲阳路500号202室",
		"area_id": "310109",
		"gis_addr": null,
		"cdt": "2020-02-02 18:51:39",
		"udt": "2020-02-02 18:51:39",
		"remark": null
	}],
	"database": "crm",
	"es": 1598859797000,
	"id": 2,
	"isDdl": false,
	"mysqlType": {
		"id": "bigint(20)",
		"name": "varchar(50)",
		"tel": "varchar(20)",
		"mobile": "varchar(20)",
		"detail_addr": "varchar(100)",
		"area_id": "bigint(20)",
		"gis_addr": "varchar(20)",
		"cdt": "DATEtime",
		"udt": "DATEtime",
		"remark": "varchar(100)"
	},
	"old": null,
	"sql": "",
	"sqlType": {
		"id": -5,
		"name": 12,
		"tel": 12,
		"mobile": 12,
		"detail_addr": 12,
		"area_id": -5,
		"gis_addr": 12,
		"cdt": 93,
		"udt": 93,
		"remark": 12
	},
	"table": "crm_address",
	"ts": 1598860641719,
	"type": "INSERT"
}
  • 修改数据
代码语言:javascript
复制
{
	"data": [{
		"id": "439",
		"name": "罗尚英",
		"tel": null,
		"mobile": "18*******07",
		"detail_addr": "环翠街道红树东方6栋8-3",
		"area_id": "520423",
		"gis_addr": null,
		"cdt": "2020-02-02 18:51:39",
		"udt": "2020-02-02 18:51:39",
		"remark": "测试"
	}],
	"database": "crm",
	"es": 1598865334000,
	"id": 61,
	"isDdl": false,
	"mysqlType": {
		"id": "bigint(20)",
		"name": "varchar(50)",
		"tel": "varchar(20)",
		"mobile": "varchar(20)",
		"detail_addr": "varchar(100)",
		"area_id": "bigint(20)",
		"gis_addr": "varchar(20)",
		"cdt": "DATEtime",
		"udt": "DATEtime",
		"remark": "varchar(100)"
	},
	"old": [{
		"remark": null
	}],
	"sql": "",
	"sqlType": {
		"id": -5,
		"name": 12,
		"tel": 12,
		"mobile": 12,
		"detail_addr": 12,
		"area_id": -5,
		"gis_addr": 12,
		"cdt": 93,
		"udt": 93,
		"remark": 12
	},
	"table": "crm_address",
	"ts": 1598865336004,
	"type": "UPDATE"
}
  • 删除数据
代码语言:javascript
复制
{
	"data": [{
		"id": "438",
		"name": "张静利",
		"tel": null,
		"mobile": "16*******32",
		"detail_addr": "渤海路信得塑业",
		"area_id": "410611",
		"gis_addr": null,
		"cdt": "2020-02-02 18:51:39",
		"udt": "2020-02-02 18:51:39",
		"remark": null
	}],
	"database": "crm",
	"es": 1598865296000,
	"id": 60,
	"isDdl": false,
	"mysqlType": {
		"id": "bigint(20)",
		"name": "varchar(50)",
		"tel": "varchar(20)",
		"mobile": "varchar(20)",
		"detail_addr": "varchar(100)",
		"area_id": "bigint(20)",
		"gis_addr": "varchar(20)",
		"cdt": "DATEtime",
		"udt": "DATEtime",
		"remark": "varchar(100)"
	},
	"old": null,
	"sql": "",
	"sqlType": {
		"id": -5,
		"name": 12,
		"tel": 12,
		"mobile": 12,
		"detail_addr": 12,
		"area_id": -5,
		"gis_addr": 12,
		"cdt": 93,
		"udt": 93,
		"remark": 12
	},
	"table": "crm_address",
	"ts": 1598865297996,
	"type": "DELETE"
}
本文参与 腾讯云自媒体同步曝光计划,分享自作者个人站点/博客。
原始发表:2022/03/07 ,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 作者个人站点/博客 前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 解析kafka消费出来的数据格式
    • 一、解析kafka消费的OGG数据格式
      • 二、解析kafka消费的Canal数据格式
      领券
      问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档