前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >关键数据变更监控

关键数据变更监控

原创
作者头像
3号攻城狮
修改2018-05-20 07:29:01
2.6K5
修改2018-05-20 07:29:01
举报

#故事的开始

某个深夜,小朱(产品经理)悄悄发来微信

代码语言:txt
复制
对于关键信息的变更,我们能持久化变更日志么?
.......

省略N多场景描述,总结就是:

代码语言:txt
复制
想知道,某一天,某,把某个数据,从某改成了某?

技术架构

拿到需求之后,自然难以入睡.分析了一下我们当前的应用结构.

代码语言:txt
复制
1.采用SpringCloud框架,以微服务的形式架构应用,每个服务都有自己独立的数据库,涉及到跨数据库取数时,非主数据均采用远程服务调用.
2.底层持久化框架采用mybatis.

#解决方案分析

数据库触发器

第一方案就想到在数据库写触发器,但是第一个否认的也是该方案.

代码语言:txt
复制
灵活性差,针对不同表,对于每一个字段都需要处理,毕竟我们不是想监控每一个字段.不能灵活的配置监控表,监控字段.另直接嵌入数据库,不利于控制

mybatis拦截器

在经过了对mybatis的一番检索之后,没有发现对该需求的解决方式.在认知范围内,想到了使用mabatis拦截器解决该问题。

1.简单构建一个持久化实体,表结构省略.

代码语言:txt
复制
public class SqlLog {
    @Id
    @GeneratedValue()
    protected String id;
    private Integer result;
    private Date whencreated;
    private String sql;
    private String parameter;
}

2.添加接口类

代码语言:txt
复制
public interface SqlLogMapper extends BaseMapper<SqlLog> {
    int insertSqlLog(SqlLog log);
}

3.添加对应mapper.xml

代码语言:txt
复制
<?xml version="1.0" encoding="UTF-8" ?>
<!DOCTYPE mapper PUBLIC "-//mybatis.org//DTD Mapper 3.0//EN" "http://mybatis.org/dtd/mybatis-3-mapper.dtd" >
<mapper namespace="com.hand.hap.cloud.mybatis.mapper.SqlLogMapper">
    <insert id="insertSqlLog" parameterType="com.hand.hap.cloud.mybatis.domain.SqlLog">
        insert into sql_log(result, whencreated,sql,parameter) values(#{result}, #{whencreated},#{sql},#{parameter})
    </insert>
</mapper>

4.编写拦截器

代码语言:txt
复制
@Intercepts({@Signature(type = Executor.class, method = "update", args = {MappedStatement.class, Object.class})})
public class UpdateInterceptor implements Interceptor {
    @Override
    public Object intercept(Invocation invocation) throws Throwable {
        Object result = null;
        Object object = invocation.getTarget();
        Object[] args = invocation.getArgs();
        SqlLog log = new SqlLog();
        if (object instanceof Executor) {
            MappedStatement mappedStatement = (MappedStatement) args[0];
            if (args.length > 1) {
                Object domain = args[1];
                Configuration configuration = mappedStatement.getConfiguration();
                Object target = invocation.getTarget();
                StatementHandler handler = configuration.newStatementHandler((Executor) target, mappedStatement,
                        domain, RowBounds.DEFAULT, null, null);
                BoundSql boundSql = handler.getBoundSql();
                log.setParameter(boundSql.getParameterObject().toString());
                log.setSql(boundSql.getSql());
                //记录时间
                log.setWhencreated(new Date());
            }
            //执行真正的操作
            result = invocation.proceed();
            mappedStatement = mappedStatement.getConfiguration().getMappedStatement("insertSqlLog");

            log.setResult(Integer.valueOf(Integer.parseInt(result.toString())));
            args[0] = mappedStatement;
            //insertSqlLog 方法的参数为 log
            args[1] = log;
            //执行insertSqlLog方法
            invocation.proceed();
        }
        return result;
    }
    @Override
    public Object plugin(Object target) {
        if (target instanceof Executor) {
            return Plugin.wrap(target, this);
        }
        return target;
    }
    @Override
    public void setProperties(Properties properties) {
    }
}

该解决方案的思路就是在执行完更新操作之后,保存本次操作的操作记录.对比上一次操作记录,就可以形成一个修改闭环.

该方案也被否定.对于每一个数据库都要有一张日志记录表,或者是对改表有操作权限.对于mybatis而言,我们想要做的是一个通用的持久化方案,不应该嵌入业务需求.当然这仅是在本人的认知范围的一些拙见.

Spring AOP

最终采用的方案是在应用层监控mybatis的底层更新方法.达到了如下目标:

代码语言:txt
复制
1.通过注解,简单可配置
2.异步解析与应用结偶

但是目前也存在如下不足:

代码语言:txt
复制
一定程度上于我们项目的编程风格绑定,切合了我们的代码分层结构,如果其他小伙伴要用,一定程度上需要改写部分逻辑

AOP 插件编写

新建插件项目

image.png
image.png

编写注解,适用于表审计,列审计

代码语言:txt
复制
@Target(ElementType.TYPE)
@Retention(RetentionPolicy.RUNTIME)
public @interface TableAudit {
    //默认针对字段单个字段审计 配合ColumnAudit使用
    boolean all() default false;
}
代码语言:txt
复制
@Target(ElementType.FIELD)
@Retention(RetentionPolicy.RUNTIME)
public @interface ColumnAudit {
}

编写切面

代码语言:txt
复制
@Aspect
@Component
public class TableUpdateAop {
    private static Logger LOGGER = LoggerFactory.getLogger(TableUpdateAop.class);
    @Autowired
    private RabbitMqSender rabbitMqSender;

    private static final String OPERATOR_ID ="operatorId";
    private static final String TABLE_NAME ="tableName";
    private static final String LAST_NAME ="lastName";
    private static final String ID ="id";
    private static final String MAPPER = "Mapper";

    /*我们使用的是mybatis源码,然后下载下来,自己做过一定的加工处理
    *如果小伙伴在采用改方式处理table日志变更记录时只需要把切面对接到mybatis的核心修改接口就可以
    * UpdateByPrimaryKeySelectiveMapper
    * UpdateByPrimaryKeyMapper
    * */
    @Pointcut("execution(public * com.hand.hap.cloud.mybatis.common.base.update.*.update*(..))")
    public void audit(){}
 
    @Around("audit()")
    public Object doBefore(ProceedingJoinPoint pjp) throws  Throwable{

        Map<String,Object> originValues = null;
        Map<String,Object> newValues = null;
        Map<String,Object> baseValues = null;

        boolean sendAudit = false;

        try {
            Object[] args = pjp.getArgs();
            Object parameter = args[0];
            Class clazz =parameter.getClass();

            TableAudit tableAudit = (TableAudit)clazz.getAnnotation(TableAudit.class);
            if( tableAudit!=null){

                StringBuilder clazzName = new StringBuilder();

                String mapperName = clazz.getName().replace("domain","mapper");
                clazzName.append(mapperName)
                        .append(MAPPER);
                //此处和我们的项目代码结构有关,如果采用这种方式需要作出一些调整
                BaseMapper baseMapper = (BaseMapper) SpringContextUtil.getBean(Class.forName(clazzName.toString()));
                Object dbObj =baseMapper.selectByPrimaryKey(parameter);
                //收集当前信息
                originValues = collectValue(dbObj,tableAudit.all());
                baseValues = new HashMap<>();

                //收集表名
                String tableName = StringUtil.camel2Underline(clazz.getSimpleName());
                baseValues.put(TABLE_NAME,tableName);
                //收集用户信息 需要根根自己项目做一些调整
                baseValues.put(OPERATOR_ID,RequestContext.getUserId());
                baseValues.put(LAST_NAME, RequestContext.getLastName());
                //数据主键
                baseValues.putAll(getFieldValueByName(clazz.getDeclaredField(ID),dbObj));

                newValues = collectValue(parameter,tableAudit.all());
                sendAudit = true;
            }
        }catch (Exception e){
            LOGGER.warn("Table update data collect failed, error info {}",e.getMessage());
        }

        Object result = pjp.proceed();

        try {
            if(sendAudit && Integer.valueOf(Integer.parseInt(result.toString()))>0){
                sender(baseValues,originValues,newValues);
            }
        }catch (Exception e){
            LOGGER.warn("Table update data send failed, error info {}",e.getMessage());
        }

        return result;
    }


    public  Map<String,Object> collectValue(Object obj,boolean scope) throws Exception{

        Map<String,Object> values = new HashMap();

        List<Field> list = Arrays.asList(obj.getClass().getDeclaredFields());
        for(int i=0;i<list.size();i++){
            Field field = list.get(i);
            if(scope){
                getFieldValueByName(field,obj);
            }else{
                if(field.isAnnotationPresent(ColumnAudit.class)){
                    values.putAll(getFieldValueByName(field,obj));
                }
            }
        }
        return values;
    }

    public  void sender(Map<String,Object> baseValues ,Map<String,Object> originValues , Map<String,Object> newValues){

        List<TableUpdateLog> logs = collectValue(baseValues,originValues,newValues);
        //此处我们采用的是rabbitMq 理论上此处只要通过异步方式处理即可
        rabbitMqSender.handleTableAuditData(logs);
    }
    public static List<TableUpdateLog> collectValue( Map<String,Object> baseValues ,Map<String,Object> originValues , Map<String,Object> newValues ){

        List<TableUpdateLog> logs = new ArrayList<>();

        if(!CollectionUtils.isEmpty(baseValues) && !CollectionUtils.isEmpty(originValues) && !CollectionUtils.isEmpty(newValues) ){
            for(Map.Entry entry : originValues.entrySet()){
                TableUpdateLog log = new TableUpdateLog();
                String columnName = entry.getKey().toString();
                log.setColumnName(columnName);
                if(entry.getValue()!=null){
                    log.setOriginValue(entry.getValue().toString());
                }
                if(newValues.get(columnName)!=null){
                    log.setNewValue(newValues.get(columnName).toString());
                }
                if(baseValues.get(OPERATOR_ID)!=null){
                    log.setOperatorId(baseValues.get(OPERATOR_ID).toString());
                }

                log.setCreationDate(DateUtil.now().getTime());
                if(baseValues.get(LAST_NAME)!=null){
                    log.setLastName(baseValues.get(LAST_NAME).toString());
                }
                if(baseValues.get(ID)!=null){
                    log.setPrimaryKey(baseValues.get(ID).toString());
                }
                if(baseValues.get(TABLE_NAME)!=null){
                    log.setTableName(baseValues.get(TABLE_NAME).toString());
                }
                logs.add(log);
            }
        }
        return logs;
    }


    public  Map<String,Object> getFieldValueByName(Field field,Object obj) throws Exception{

        Map<String,Object>  kv = new HashMap<>();

        String name = field.getName();
        // 将属性的首字符大写,方便构造get,set方法
        name = name.substring(0, 1).toUpperCase() + name.substring(1);
        // 获取属性的类型
        //String type = field.getGenericType().toString();
        // 如果type是类类型,则前面包含"class ",后面跟类名
        Method m = obj.getClass().getMethod("get" + name);
        // 调用getter方法获取属性值
        Object value =  m.invoke(obj);
        String underlineFieldName = StringUtil.camel2Underline(name);
        kv.put(underlineFieldName,value);
        return kv;
    }
}

工具方法

代码语言:txt
复制
@Service
public class SpringContextUtil  implements ApplicationListener<ContextRefreshedEvent> {
    private static ApplicationContext applicationContext = null;
    @Override
    public void onApplicationEvent(ContextRefreshedEvent event) {
        if(applicationContext == null){
            applicationContext = event.getApplicationContext();
        }
    }
    /*ApplicationContext context= ContextLoader.getCurrentWebApplicationContext();//尝试下这个方法*/
    public static ApplicationContext getApplicationContext() {
        return applicationContext;
    }

    public static Object getBean(Class clazz) {
        return applicationContext.getBean(clazz);
    }
}

MQ 处理工具

代码语言:txt
复制
@Configuration
public class RabbitMqConfig {
public static final String QUEUE\_TABLE\_AUDIT = "queue.table.audit";
@Bean
public Queue queueTableAudit() {
return new Queue(QUEUE_TABLE_AUDIT);
}
}
代码语言:txt
复制
@Component
public class RabbitMqSender {
@Autowired
private AmqpTemplate rabbitTemplate;
public void handleTableAuditData(List<TableUpdateLog> logs) {
    String data = new JSONWriter().write(logs);
    this.rabbitTemplate.convertAndSend(RabbitMqConfig.QUEUE_TABLE_AUDIT, data.getBytes());
}
}

日志审计bean

代码语言:txt
复制
@Data
@NoArgsConstructor
@AllArgsConstructor
public class TableUpdateLog  {
@Id
@GeneratedValue()
private Integer id;
private String tableName;
private String primaryKey;
private String batchNumber;
private String columnName;
private String originValue;
private String newValue;
private String operatorId;
private String lastName;
private Long   creationDate;
}

基础配置

代码语言:txt
复制
spring:
  rabbitmq:
    host: 192.168.11.210
    port: 5672

整个插件我们已经完成,接下来我们使用刚刚搭建的插件

插件使用

导入插件

代码语言:txt
复制
 <dependency>
            <groupId>com.hscf.cloud</groupId>
            <artifactId>hscf-table-monitor-starter</artifactId>
            <version>${hcloud.version}</version>
</dependency>

对应domain上配置注解

代码语言:txt
复制
@ModifyAudit
@VersionAudit
@Data
@NoArgsConstructor
@AllArgsConstructor
@TableAudit
public class SysResourceGroup extends AuditDomain {
    @Id
    @GeneratedValue
    @ColumnAudit
    private Long id;
    @ColumnAudit
    private String groupCode;
    @ColumnAudit
    @NotNull
    private String groupName;
    @ColumnAudit
    private String description;
}

使用就这么简单,当对这个domain进行修改操作时,就会监控其变更数据.

接下来我们对收集到的信息进行处理.

信息收集

新建服务

image.png
image.png

处理MQ队列消息

代码语言:txt
复制
@Component
public class RabbitMqReceivers {

    private Logger logger = LoggerFactory.getLogger(RabbitMqReceivers.class);
    @Autowired
    private TableUpdateLogDao tableUpdateLogDao;

    @RabbitListener(queues = "queue.table.audit")
    @RabbitHandler
    public void handleApiMonitor(byte[] data, Channel channel, @Header(AmqpHeaders.DELIVERY_TAG) long deliveryTag) throws IOException {
        JSONArray arrays = null;
        try {
            arrays = JSON.parseArray(new String(data));

            if(arrays!=null && !arrays.isEmpty()){

                String batchNumber = UUID.randomUUID().toString();

                for(Object logJson :arrays ){
                    TableUpdateLog log = (TableUpdateLog)BeanParser.parse((JSONObject)logJson, TableUpdateLog.class);
                    log.setBatchNumber(batchNumber);
                    tableUpdateLogDao.save(log);
                }
            }
        } catch (Exception e) {
            logger.error("Handle table admin data failed, data:{}", arrays);
        } finally {
            channel.basicAck(deliveryTag, false);
        }
    }
}

效果截图

image.png
image.png

总结

以上方案还有很多不足,编写也比较匆忙,小伙伴们多多原谅.

总的说来,这是目前想到的一个比较实用的解决方式.

如果小伙伴们有好的解决方式,QAQ,评论区走起留言.

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 技术架构
    • 数据库触发器
      • mybatis拦截器
        • Spring AOP
          • AOP 插件编写
            • 新建插件项目
            • 编写注解,适用于表审计,列审计
            • 编写切面
            • 工具方法
            • MQ 处理工具
            • 日志审计bean
            • 基础配置
          • 插件使用
            • 导入插件
            • 对应domain上配置注解
          • 信息收集
            • 新建服务
            • 处理MQ队列消息
            • 效果截图
          • 总结
          领券
          问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档