前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >客快物流大数据项目(五十九):定义解析kafka数据的Bean对象类

客快物流大数据项目(五十九):定义解析kafka数据的Bean对象类

作者头像
Lansonli
发布2022-03-09 09:01:47
4090
发布2022-03-09 09:01:47
举报
文章被收录于专栏:Lansonli技术博客

定义解析kafka数据的Bean对象类

一、定义消费kafka字符串的Bean对象基类

根据数据来源不同可以分为OGG数据Canal数据,两者之间有相同的属性:table,因此将该属性作为公共属性进行提取,抽象成基类

实现步骤:

  • 公共模块java目录下的 parser 包下创建 MessageBean 抽象类
  • 编写代码
    • 继承自 Serializable 接口
    • 创建 serialVersionUID 属性
    • 定义 table 属性,实现 setter/getter 方法

参考代码:

代码语言:javascript
复制
package cn.it.logistics.common.beans.parser;

import java.io.Serializable;

/**
 * 根据数据源定义抽象类,数据源:
 * 1)ogg
 * 2)canal
 * 两者有共同的table属性
 */
public abstract class MessageBean implements Serializable {
 
    private static final long serialVersionUID = -8216415778785426469L;

    private String table;

    public String getTable() {
        return table;
    }

    public void setTable(String table) {
        this.table = table;
    }

    @Override
    public String toString() {
        return table;
    }
}

为什么创建serialVersionUID:

serialVersionUID适用于Java的序列化机制。简单来说,Java的序列化机制是通过判断类的serialVersionUID来验证版本一致性的。在进行反序列化时,JVM会把传来的字节流中的serialVersionUID与本地相应实体类的serialVersionUID进行比较,如果相同就认为是一致的,可以进行反序列化,否则就会出现序列化版本不一致的异常,即是InvalidCastException。

使用idea生成serialVersionUID:

操作步骤

说明

1

设置自动生成 serialVersionUID

2

选中对应的类名,然后按 alt+enter 快捷键

3

结果显示

二、​​​​​​​定义消费OGG字符串的Bean对象

实现步骤:

  • 公共模块parser 包下创建 OggMessageBean
  • 继承自 MessageBean 抽象类

参考代码:

代码语言:javascript
复制
package cn.it.logistics.common.beans.parser;

import javax.print.DocFlavor;
import java.util.Map;

/**
 * 定义消费出来的ogg的数据的javaBean对象
 * {
 *     "table": "IT.tbl_route",            //表名:库名.表名
 *     "op_type": "U",                         //操作类型:U表示修改
 *     "op_ts": "2020-10-08 09:10:54.000774",
 *     "current_ts": "2020-10-08T09:11:01.925000",
 *     "pos": "00000000200006645758",
 *     "before": {                            //操作前的字段集合
 *        "id": 104,
 *        "start_station": "东莞中心",
 *        "start_station_area_id": 441900,
 *        "start_warehouse_id": 1,
 *        "end_station": "蚌埠中转部",
 *        "end_station_area_id": 340300,
 *        "end_warehouse_id": 107,
 *        "mileage_m": 1369046,
 *        "time_consumer_minute": 56172,
 *        "state": 1,
 *        "cdt": "2020-02-02 18:51:39",
 *        "udt": "2020-02-02 18:51:39",
 *        "remark": null
 *        },
 *     "after": {                         //操作后的字段集合
 *        "id": 104,
 *        "start_station": "东莞中心",
 *        "start_station_area_id": 441900,
 *        "start_warehouse_id": 1,
 *        "end_station": "TBD",
 *        "end_station_area_id": 340300,
 *        "end_warehouse_id": 107,
 *        "mileage_m": 1369046,
 *        "time_consumer_minute": 56172,
 *        "state": 1,
 *        "cdt": "2020-02-02 18:51:39",
 *        "udt": "2020-02-02 18:51:39",
 *        "remark": null
 *    }
 * }
 */
public class OggMessageBean extends MessageBean {
    //定义操作类型
    private String op_type;

    @Override
    public void setTable(String table) {
        //如果表名不为空
        if (table != null && !table.equals("")) {
            table = table.replaceAll("[A-Z]+\\.", "");
        }
        super.setTable(table);
    }

    public String getOp_type() {
        return op_type;
    }

    public void setOp_type(String op_type) {
        this.op_type = op_type;
    }

    public String getOp_ts() {
        return op_ts;
    }

    public void setOp_ts(String op_ts) {
        this.op_ts = op_ts;
    }

    public String getCurrent_ts() {
        return current_ts;
    }

    public void setCurrent_ts(String current_ts) {
        this.current_ts = current_ts;
    }

    public String getPos() {
        return pos;
    }

    public void setPos(String pos) {
        this.pos = pos;
    }

    public Map<String, Object> getBefore() {
        return before;
    }

    public void setBefore(Map<String, Object> before) {
        this.before = before;
    }

    public Map<String, Object> getAfter() {
        return after;
    }

    public void setAfter(Map<String, Object> after) {
        this.after = after;
    }

    //操作时间
    private String op_ts;

    @Override
    public String toString() {
        return "OggMessageBean{" +
                "table='" + super.getTable() + '\'' +
                ", op_type='" + op_type + '\'' +
                ", op_ts='" + op_ts + '\'' +
                ", current_ts='" + current_ts + '\'' +
                ", pos='" + pos + '\'' +
                ", before=" + before +
                ", after=" + after +
                '}';
    }

    /**
     * 返回需要处理的列的集合
     * @return
     */
    public Map<String, Object> getValue() {
        //如果执行的是删除操作,则返回before节点的列的集合,如果执行的是插入和更新操作,则返回after节点的列的集合
        if (after == null) {
            return before;
        } else {
            return after;
        }
    }

    //同步时间
    private String current_ts;
    //偏移量
    private String pos;
    //操作之前的数据
    private Map<String, Object> before;
    //操作之后的数据
    private Map<String, Object> after;
}

三、​​​​​​​定义消费Canal字符串的Bean对象

实现步骤:

  • 公共模块parser 包下创建 CanalMessageBean
  • 继承自 MessageBean 抽象类

参考代码:

代码语言:javascript
复制
package cn.it.logistics.common.beans.parser;

import java.util.List;
import java.util.Map;

/**
 * 定义消费出来的canal的数据对应的javaBean对象
 * {
 *     "data": [{
 *        "id": "1",
 *        "name": "北京",
 *        "tel": "222",
 *        "mobile": "1111",
 *        "detail_addr": "北京",
 *        "area_id": "1",
 *        "gis_addr": "1",
 *        "cdt": "2020-10-08 17:20:12",
 *        "udt": "2020-11-05 17:20:16",
 *        "remark": null
 *        }],
 *     "database": "crm",
 *     "es": 1602148867000,
 *     "id": 15,
 *     "isDdl": false,
 *     "mysqlType": {
 *        "id": "bigint(20)",
 *        "name": "varchar(50)",
 *        "tel": "varchar(20)",
 *        "mobile": "varchar(20)",
 *        "detail_addr": "varchar(100)",
 *        "area_id": "bigint(20)",
 *        "gis_addr": "varchar(20)",
 *        "cdt": "datetime",
 *        "udt": "datetime",
 *        "remark": "varchar(100)"
 *    },
 *     "old": [{
 *        "tel": "111"
 *    }],
 *     "sql": "",
 *     "sqlType": {
 *        "id": -5,
 *        "name": 12,
 *        "tel": 12,
 *        "mobile": 12,
 *        "detail_addr": 12,
 *        "area_id": -5,
 *        "gis_addr": 12,
 *        "cdt": 93,
 *        "udt": 93,
 *        "remark": 12
 *    },
 *     "table": "crm_address",
 *     "ts": 1602148867311,
 *     "type": "UPDATE"               //修改数据
 * }
 */
public class CanalMessageBean extends MessageBean {
    //操作的数据集合
    private List<Map<String, Object>> data;

    public List<Map<String, Object>> getData() {
        return data;
    }

    public void setData(List<Map<String, Object>> data) {
        this.data = data;
    }

    public String getDatabase() {
        return database;
    }

    public void setDatabase(String database) {
        this.database = database;
    }

    public Long getEs() {
        return es;
    }

    public void setEs(Long es) {
        this.es = es;
    }

    public Long getId() {
        return id;
    }

    public void setId(Long id) {
        this.id = id;
    }

    public boolean isDdl() {
        return isDdl;
    }

    public void setDdl(boolean ddl) {
        isDdl = ddl;
    }

    public Map<String, Object> getMysqlType() {
        return mysqlType;
    }

    public void setMysqlType(Map<String, Object> mysqlType) {
        this.mysqlType = mysqlType;
    }

    public String getOld() {
        return old;
    }

    public void setOld(String old) {
        this.old = old;
    }

    public String getSql() {
        return sql;
    }

    public void setSql(String sql) {
        this.sql = sql;
    }

    public Map<String, Object> getSqlType() {
        return sqlType;
    }

    public void setSqlType(Map<String, Object> sqlType) {
        this.sqlType = sqlType;
    }


    public Long getTs() {
        return ts;
    }

    public void setTs(Long ts) {
        this.ts = ts;
    }

    public String getType() {
        return type;
    }

    public void setType(String type) {
        this.type = type;
    }

    //数据库名称
    private String database;
    private Long es;
    private Long id;
    private boolean isDdl;
    private Map<String, Object> mysqlType;
    private String old;
    private String sql;
    private Map<String, Object> sqlType;
    private Long ts;
    private String type;

    /**
     * 重写父类的settable方法,将表名修改成统一的前缀
     * @param table
     */
    @Override
    public void setTable(String table) {
        if(table!=null && !table.equals("")){
            if(table.startsWith("crm_")) {
                table = table.replace("crm_", "tbl_");
            }
        }
        super.setTable(table);
    }
}
本文参与 腾讯云自媒体同步曝光计划,分享自作者个人站点/博客。
原始发表:2022/03/08 ,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 作者个人站点/博客 前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 定义解析kafka数据的Bean对象类
    • 一、定义消费kafka字符串的Bean对象基类
      • 二、​​​​​​​定义消费OGG字符串的Bean对象
        • 三、​​​​​​​定义消费Canal字符串的Bean对象
        相关产品与服务
        文件存储
        文件存储(Cloud File Storage,CFS)为您提供安全可靠、可扩展的共享文件存储服务。文件存储可与腾讯云服务器、容器服务、批量计算等服务搭配使用,为多个计算节点提供容量和性能可弹性扩展的高性能共享存储。腾讯云文件存储的管理界面简单、易使用,可实现对现有应用的无缝集成;按实际用量付费,为您节约成本,简化 IT 运维工作。
        领券
        问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档