关键数据变更监控

#故事的开始

某个深夜,小朱(产品经理)悄悄发来微信

对于关键信息的变更,我们能持久化变更日志么?
.......

省略N多场景描述,总结就是:

想知道,某一天,某,把某个数据,从某改成了某?

技术架构

拿到需求之后,自然难以入睡.分析了一下我们当前的应用结构.

1.采用SpringCloud框架,以微服务的形式架构应用,每个服务都有自己独立的数据库,涉及到跨数据库取数时,非主数据均采用远程服务调用.
2.底层持久化框架采用mybatis.

#解决方案分析

数据库触发器

第一方案就想到在数据库写触发器,但是第一个否认的也是该方案.

灵活性差,针对不同表,对于每一个字段都需要处理,毕竟我们不是想监控每一个字段.不能灵活的配置监控表,监控字段.另直接嵌入数据库,不利于控制

mybatis拦截器

在经过了对mybatis的一番检索之后,没有发现对该需求的解决方式.在认知范围内,想到了使用mabatis拦截器解决该问题。

1.简单构建一个持久化实体,表结构省略.

public class SqlLog {
    @Id
    @GeneratedValue()
    protected String id;
    private Integer result;
    private Date whencreated;
    private String sql;
    private String parameter;
}

2.添加接口类

public interface SqlLogMapper extends BaseMapper<SqlLog> {
    int insertSqlLog(SqlLog log);
}

3.添加对应mapper.xml

<?xml version="1.0" encoding="UTF-8" ?>
<!DOCTYPE mapper PUBLIC "-//mybatis.org//DTD Mapper 3.0//EN" "http://mybatis.org/dtd/mybatis-3-mapper.dtd" >
<mapper namespace="com.hand.hap.cloud.mybatis.mapper.SqlLogMapper">
    <insert id="insertSqlLog" parameterType="com.hand.hap.cloud.mybatis.domain.SqlLog">
        insert into sql_log(result, whencreated,sql,parameter) values(#{result}, #{whencreated},#{sql},#{parameter})
    </insert>
</mapper>

4.编写拦截器

@Intercepts({@Signature(type = Executor.class, method = "update", args = {MappedStatement.class, Object.class})})
public class UpdateInterceptor implements Interceptor {
    @Override
    public Object intercept(Invocation invocation) throws Throwable {
        Object result = null;
        Object object = invocation.getTarget();
        Object[] args = invocation.getArgs();
        SqlLog log = new SqlLog();
        if (object instanceof Executor) {
            MappedStatement mappedStatement = (MappedStatement) args[0];
            if (args.length > 1) {
                Object domain = args[1];
                Configuration configuration = mappedStatement.getConfiguration();
                Object target = invocation.getTarget();
                StatementHandler handler = configuration.newStatementHandler((Executor) target, mappedStatement,
                        domain, RowBounds.DEFAULT, null, null);
                BoundSql boundSql = handler.getBoundSql();
                log.setParameter(boundSql.getParameterObject().toString());
                log.setSql(boundSql.getSql());
                //记录时间
                log.setWhencreated(new Date());
            }
            //执行真正的操作
            result = invocation.proceed();
            mappedStatement = mappedStatement.getConfiguration().getMappedStatement("insertSqlLog");

            log.setResult(Integer.valueOf(Integer.parseInt(result.toString())));
            args[0] = mappedStatement;
            //insertSqlLog 方法的参数为 log
            args[1] = log;
            //执行insertSqlLog方法
            invocation.proceed();
        }
        return result;
    }
    @Override
    public Object plugin(Object target) {
        if (target instanceof Executor) {
            return Plugin.wrap(target, this);
        }
        return target;
    }
    @Override
    public void setProperties(Properties properties) {
    }
}

该解决方案的思路就是在执行完更新操作之后,保存本次操作的操作记录.对比上一次操作记录,就可以形成一个修改闭环.

该方案也被否定.对于每一个数据库都要有一张日志记录表,或者是对改表有操作权限.对于mybatis而言,我们想要做的是一个通用的持久化方案,不应该嵌入业务需求.当然这仅是在本人的认知范围的一些拙见.

Spring AOP

最终采用的方案是在应用层监控mybatis的底层更新方法.达到了如下目标:

1.通过注解,简单可配置
2.异步解析与应用结偶

但是目前也存在如下不足:

一定程度上于我们项目的编程风格绑定,切合了我们的代码分层结构,如果其他小伙伴要用,一定程度上需要改写部分逻辑

AOP 插件编写

新建插件项目

image.png

编写注解,适用于表审计,列审计

@Target(ElementType.TYPE)
@Retention(RetentionPolicy.RUNTIME)
public @interface TableAudit {
    //默认针对字段单个字段审计 配合ColumnAudit使用
    boolean all() default false;
}
@Target(ElementType.FIELD)
@Retention(RetentionPolicy.RUNTIME)
public @interface ColumnAudit {
}

编写切面

@Aspect
@Component
public class TableUpdateAop {
    private static Logger LOGGER = LoggerFactory.getLogger(TableUpdateAop.class);
    @Autowired
    private RabbitMqSender rabbitMqSender;

    private static final String OPERATOR_ID ="operatorId";
    private static final String TABLE_NAME ="tableName";
    private static final String LAST_NAME ="lastName";
    private static final String ID ="id";
    private static final String MAPPER = "Mapper";

    /*我们使用的是mybatis源码,然后下载下来,自己做过一定的加工处理
    *如果小伙伴在采用改方式处理table日志变更记录时只需要把切面对接到mybatis的核心修改接口就可以
    * UpdateByPrimaryKeySelectiveMapper
    * UpdateByPrimaryKeyMapper
    * */
    @Pointcut("execution(public * com.hand.hap.cloud.mybatis.common.base.update.*.update*(..))")
    public void audit(){}
 
    @Around("audit()")
    public Object doBefore(ProceedingJoinPoint pjp) throws  Throwable{

        Map<String,Object> originValues = null;
        Map<String,Object> newValues = null;
        Map<String,Object> baseValues = null;

        boolean sendAudit = false;

        try {
            Object[] args = pjp.getArgs();
            Object parameter = args[0];
            Class clazz =parameter.getClass();

            TableAudit tableAudit = (TableAudit)clazz.getAnnotation(TableAudit.class);
            if( tableAudit!=null){

                StringBuilder clazzName = new StringBuilder();

                String mapperName = clazz.getName().replace("domain","mapper");
                clazzName.append(mapperName)
                        .append(MAPPER);
                //此处和我们的项目代码结构有关,如果采用这种方式需要作出一些调整
                BaseMapper baseMapper = (BaseMapper) SpringContextUtil.getBean(Class.forName(clazzName.toString()));
                Object dbObj =baseMapper.selectByPrimaryKey(parameter);
                //收集当前信息
                originValues = collectValue(dbObj,tableAudit.all());
                baseValues = new HashMap<>();

                //收集表名
                String tableName = StringUtil.camel2Underline(clazz.getSimpleName());
                baseValues.put(TABLE_NAME,tableName);
                //收集用户信息 需要根根自己项目做一些调整
                baseValues.put(OPERATOR_ID,RequestContext.getUserId());
                baseValues.put(LAST_NAME, RequestContext.getLastName());
                //数据主键
                baseValues.putAll(getFieldValueByName(clazz.getDeclaredField(ID),dbObj));

                newValues = collectValue(parameter,tableAudit.all());
                sendAudit = true;
            }
        }catch (Exception e){
            LOGGER.warn("Table update data collect failed, error info {}",e.getMessage());
        }

        Object result = pjp.proceed();

        try {
            if(sendAudit && Integer.valueOf(Integer.parseInt(result.toString()))>0){
                sender(baseValues,originValues,newValues);
            }
        }catch (Exception e){
            LOGGER.warn("Table update data send failed, error info {}",e.getMessage());
        }

        return result;
    }


    public  Map<String,Object> collectValue(Object obj,boolean scope) throws Exception{

        Map<String,Object> values = new HashMap();

        List<Field> list = Arrays.asList(obj.getClass().getDeclaredFields());
        for(int i=0;i<list.size();i++){
            Field field = list.get(i);
            if(scope){
                getFieldValueByName(field,obj);
            }else{
                if(field.isAnnotationPresent(ColumnAudit.class)){
                    values.putAll(getFieldValueByName(field,obj));
                }
            }
        }
        return values;
    }

    public  void sender(Map<String,Object> baseValues ,Map<String,Object> originValues , Map<String,Object> newValues){

        List<TableUpdateLog> logs = collectValue(baseValues,originValues,newValues);
        //此处我们采用的是rabbitMq 理论上此处只要通过异步方式处理即可
        rabbitMqSender.handleTableAuditData(logs);
    }
    public static List<TableUpdateLog> collectValue( Map<String,Object> baseValues ,Map<String,Object> originValues , Map<String,Object> newValues ){

        List<TableUpdateLog> logs = new ArrayList<>();

        if(!CollectionUtils.isEmpty(baseValues) && !CollectionUtils.isEmpty(originValues) && !CollectionUtils.isEmpty(newValues) ){
            for(Map.Entry entry : originValues.entrySet()){
                TableUpdateLog log = new TableUpdateLog();
                String columnName = entry.getKey().toString();
                log.setColumnName(columnName);
                if(entry.getValue()!=null){
                    log.setOriginValue(entry.getValue().toString());
                }
                if(newValues.get(columnName)!=null){
                    log.setNewValue(newValues.get(columnName).toString());
                }
                if(baseValues.get(OPERATOR_ID)!=null){
                    log.setOperatorId(baseValues.get(OPERATOR_ID).toString());
                }

                log.setCreationDate(DateUtil.now().getTime());
                if(baseValues.get(LAST_NAME)!=null){
                    log.setLastName(baseValues.get(LAST_NAME).toString());
                }
                if(baseValues.get(ID)!=null){
                    log.setPrimaryKey(baseValues.get(ID).toString());
                }
                if(baseValues.get(TABLE_NAME)!=null){
                    log.setTableName(baseValues.get(TABLE_NAME).toString());
                }
                logs.add(log);
            }
        }
        return logs;
    }


    public  Map<String,Object> getFieldValueByName(Field field,Object obj) throws Exception{

        Map<String,Object>  kv = new HashMap<>();

        String name = field.getName();
        // 将属性的首字符大写,方便构造get,set方法
        name = name.substring(0, 1).toUpperCase() + name.substring(1);
        // 获取属性的类型
        //String type = field.getGenericType().toString();
        // 如果type是类类型,则前面包含"class ",后面跟类名
        Method m = obj.getClass().getMethod("get" + name);
        // 调用getter方法获取属性值
        Object value =  m.invoke(obj);
        String underlineFieldName = StringUtil.camel2Underline(name);
        kv.put(underlineFieldName,value);
        return kv;
    }
}

工具方法

@Service
public class SpringContextUtil  implements ApplicationListener<ContextRefreshedEvent> {
    private static ApplicationContext applicationContext = null;
    @Override
    public void onApplicationEvent(ContextRefreshedEvent event) {
        if(applicationContext == null){
            applicationContext = event.getApplicationContext();
        }
    }
    /*ApplicationContext context= ContextLoader.getCurrentWebApplicationContext();//尝试下这个方法*/
    public static ApplicationContext getApplicationContext() {
        return applicationContext;
    }

    public static Object getBean(Class clazz) {
        return applicationContext.getBean(clazz);
    }
}

MQ 处理工具

@Configuration
public class RabbitMqConfig {
public static final String QUEUE\_TABLE\_AUDIT = "queue.table.audit";
@Bean
public Queue queueTableAudit() {
return new Queue(QUEUE_TABLE_AUDIT);
}
}
@Component
public class RabbitMqSender {
@Autowired
private AmqpTemplate rabbitTemplate;
public void handleTableAuditData(List<TableUpdateLog> logs) {
    String data = new JSONWriter().write(logs);
    this.rabbitTemplate.convertAndSend(RabbitMqConfig.QUEUE_TABLE_AUDIT, data.getBytes());
}
}

日志审计bean

@Data
@NoArgsConstructor
@AllArgsConstructor
public class TableUpdateLog  {
@Id
@GeneratedValue()
private Integer id;
private String tableName;
private String primaryKey;
private String batchNumber;
private String columnName;
private String originValue;
private String newValue;
private String operatorId;
private String lastName;
private Long   creationDate;
}

基础配置

spring:
  rabbitmq:
    host: 192.168.11.210
    port: 5672

整个插件我们已经完成,接下来我们使用刚刚搭建的插件

插件使用

导入插件

 <dependency>
            <groupId>com.hscf.cloud</groupId>
            <artifactId>hscf-table-monitor-starter</artifactId>
            <version>${hcloud.version}</version>
</dependency>

对应domain上配置注解

@ModifyAudit
@VersionAudit
@Data
@NoArgsConstructor
@AllArgsConstructor
@TableAudit
public class SysResourceGroup extends AuditDomain {
    @Id
    @GeneratedValue
    @ColumnAudit
    private Long id;
    @ColumnAudit
    private String groupCode;
    @ColumnAudit
    @NotNull
    private String groupName;
    @ColumnAudit
    private String description;
}

使用就这么简单,当对这个domain进行修改操作时,就会监控其变更数据.

接下来我们对收集到的信息进行处理.

信息收集

新建服务

image.png

处理MQ队列消息

@Component
public class RabbitMqReceivers {

    private Logger logger = LoggerFactory.getLogger(RabbitMqReceivers.class);
    @Autowired
    private TableUpdateLogDao tableUpdateLogDao;

    @RabbitListener(queues = "queue.table.audit")
    @RabbitHandler
    public void handleApiMonitor(byte[] data, Channel channel, @Header(AmqpHeaders.DELIVERY_TAG) long deliveryTag) throws IOException {
        JSONArray arrays = null;
        try {
            arrays = JSON.parseArray(new String(data));

            if(arrays!=null && !arrays.isEmpty()){

                String batchNumber = UUID.randomUUID().toString();

                for(Object logJson :arrays ){
                    TableUpdateLog log = (TableUpdateLog)BeanParser.parse((JSONObject)logJson, TableUpdateLog.class);
                    log.setBatchNumber(batchNumber);
                    tableUpdateLogDao.save(log);
                }
            }
        } catch (Exception e) {
            logger.error("Handle table admin data failed, data:{}", arrays);
        } finally {
            channel.basicAck(deliveryTag, false);
        }
    }
}

效果截图

image.png

总结

以上方案还有很多不足,编写也比较匆忙,小伙伴们多多原谅.

总的说来,这是目前想到的一个比较实用的解决方式.

如果小伙伴们有好的解决方式,QAQ,评论区走起留言.

原创声明,本文系作者授权云+社区发表,未经许可,不得转载。

如有侵权,请联系 yunjia_community@tencent.com 删除。

编辑于

我来说两句

0 条评论
登录 后参与评论

相关文章

来自专栏Web项目聚集地

自己手写一个Spring MVC框架

Spring框架对于Java后端程序员来说再熟悉不过了,以前只知道它用的反射实现的,但了解之后才知道有很多巧妙的设计在里面。

1433
来自专栏Ryan Miao

使用dropwizard(6)-国际化-easy-i18n

前言 Dropwizard官方文档并没有提供国际化的模块,所以只能自己加。Spring的MessageResource用的很顺手,所以copy过来。 Easy...

37412
来自专栏后端之路

Dubbo之telnet实现

我们可以通过telnet来访问道对应dubbo服务的信息 比如 ? 我们可以利用一些指令来访问。 我们知道,默认情况下,dubbo使用netty做transpo...

4609
来自专栏岑玉海

Carbondata源码系列(一)文件生成过程

在滴滴的两年一直在加班,人也变懒了,就很少再写博客了,最近在进行Carbondata和hive集成方面的工作,于是乎需要对Carbondata进行深入的研究。 ...

5766
来自专栏zhisheng

渣渣菜鸡的 ElasticSearch 源码解析 —— 启动流程(上)

上篇文章写了 ElasticSearch 源码解析 —— 环境搭建 ,其中里面说了启动 打开 server 模块下的 Elasticsearch 类:org.e...

1281
来自专栏Spark学习技巧

hadoop系列之MR经典案例分享二

4、MapReduce的join(hive已经实现) http://database.51cto.com/art/201410/454277.htm ? 这三种...

34710
来自专栏cmazxiaoma的架构师之路

通用Mapper和PageHelper插件 学习笔记

8513
来自专栏Pythonista

Django之ORM数据库

            django默认使用sqlite的数据库,默认自带sqlite的数据库驱动 , 引擎名称:django.db.backends.sqli...

1171
来自专栏Java学习网

常见的 Java 错误及避免方法之第五集(每集10个错误后续持续发布)

当输入期间意外终止文件或流时,将抛出“EOFException”。 以下是抛出EOFException异常的一个示例,来自JavaBeat应用程序:

1343
来自专栏屈定‘s Blog

造轮子--Excel报表工具

由于公司内部之前对于excel封装操作并不是很方便,而且对于特殊的需求不是很容易满足,这个月的任务是迁移部分业务小报表顺便重构下,因此这里造个轮子,便于导入和导...

1643

扫码关注云+社区