#故事的开始
某个深夜,小朱(产品经理)悄悄发来微信
对于关键信息的变更,我们能持久化变更日志么?
.......
省略N多场景描述,总结就是:
想知道,某一天,某,把某个数据,从某改成了某?
拿到需求之后,自然难以入睡.分析了一下我们当前的应用结构.
1.采用SpringCloud框架,以微服务的形式架构应用,每个服务都有自己独立的数据库,涉及到跨数据库取数时,非主数据均采用远程服务调用.
2.底层持久化框架采用mybatis.
#解决方案分析
第一方案就想到在数据库写触发器,但是第一个否认的也是该方案.
灵活性差,针对不同表,对于每一个字段都需要处理,毕竟我们不是想监控每一个字段.不能灵活的配置监控表,监控字段.另直接嵌入数据库,不利于控制
在经过了对mybatis的一番检索之后,没有发现对该需求的解决方式.在认知范围内,想到了使用mabatis拦截器解决该问题。
1.简单构建一个持久化实体,表结构省略.
public class SqlLog {
@Id
@GeneratedValue()
protected String id;
private Integer result;
private Date whencreated;
private String sql;
private String parameter;
}
2.添加接口类
public interface SqlLogMapper extends BaseMapper<SqlLog> {
int insertSqlLog(SqlLog log);
}
3.添加对应mapper.xml
<?xml version="1.0" encoding="UTF-8" ?>
<!DOCTYPE mapper PUBLIC "-//mybatis.org//DTD Mapper 3.0//EN" "http://mybatis.org/dtd/mybatis-3-mapper.dtd" >
<mapper namespace="com.hand.hap.cloud.mybatis.mapper.SqlLogMapper">
<insert id="insertSqlLog" parameterType="com.hand.hap.cloud.mybatis.domain.SqlLog">
insert into sql_log(result, whencreated,sql,parameter) values(#{result}, #{whencreated},#{sql},#{parameter})
</insert>
</mapper>
4.编写拦截器
@Intercepts({@Signature(type = Executor.class, method = "update", args = {MappedStatement.class, Object.class})})
public class UpdateInterceptor implements Interceptor {
@Override
public Object intercept(Invocation invocation) throws Throwable {
Object result = null;
Object object = invocation.getTarget();
Object[] args = invocation.getArgs();
SqlLog log = new SqlLog();
if (object instanceof Executor) {
MappedStatement mappedStatement = (MappedStatement) args[0];
if (args.length > 1) {
Object domain = args[1];
Configuration configuration = mappedStatement.getConfiguration();
Object target = invocation.getTarget();
StatementHandler handler = configuration.newStatementHandler((Executor) target, mappedStatement,
domain, RowBounds.DEFAULT, null, null);
BoundSql boundSql = handler.getBoundSql();
log.setParameter(boundSql.getParameterObject().toString());
log.setSql(boundSql.getSql());
//记录时间
log.setWhencreated(new Date());
}
//执行真正的操作
result = invocation.proceed();
mappedStatement = mappedStatement.getConfiguration().getMappedStatement("insertSqlLog");
log.setResult(Integer.valueOf(Integer.parseInt(result.toString())));
args[0] = mappedStatement;
//insertSqlLog 方法的参数为 log
args[1] = log;
//执行insertSqlLog方法
invocation.proceed();
}
return result;
}
@Override
public Object plugin(Object target) {
if (target instanceof Executor) {
return Plugin.wrap(target, this);
}
return target;
}
@Override
public void setProperties(Properties properties) {
}
}
该解决方案的思路就是在执行完更新操作之后,保存本次操作的操作记录.对比上一次操作记录,就可以形成一个修改闭环.
该方案也被否定.对于每一个数据库都要有一张日志记录表,或者是对改表有操作权限.对于mybatis而言,我们想要做的是一个通用的持久化方案,不应该嵌入业务需求.当然这仅是在本人的认知范围的一些拙见.
最终采用的方案是在应用层监控mybatis的底层更新方法.达到了如下目标:
1.通过注解,简单可配置
2.异步解析与应用结偶
但是目前也存在如下不足:
一定程度上于我们项目的编程风格绑定,切合了我们的代码分层结构,如果其他小伙伴要用,一定程度上需要改写部分逻辑
@Target(ElementType.TYPE)
@Retention(RetentionPolicy.RUNTIME)
public @interface TableAudit {
//默认针对字段单个字段审计 配合ColumnAudit使用
boolean all() default false;
}
@Target(ElementType.FIELD)
@Retention(RetentionPolicy.RUNTIME)
public @interface ColumnAudit {
}
@Aspect
@Component
public class TableUpdateAop {
private static Logger LOGGER = LoggerFactory.getLogger(TableUpdateAop.class);
@Autowired
private RabbitMqSender rabbitMqSender;
private static final String OPERATOR_ID ="operatorId";
private static final String TABLE_NAME ="tableName";
private static final String LAST_NAME ="lastName";
private static final String ID ="id";
private static final String MAPPER = "Mapper";
/*我们使用的是mybatis源码,然后下载下来,自己做过一定的加工处理
*如果小伙伴在采用改方式处理table日志变更记录时只需要把切面对接到mybatis的核心修改接口就可以
* UpdateByPrimaryKeySelectiveMapper
* UpdateByPrimaryKeyMapper
* */
@Pointcut("execution(public * com.hand.hap.cloud.mybatis.common.base.update.*.update*(..))")
public void audit(){}
@Around("audit()")
public Object doBefore(ProceedingJoinPoint pjp) throws Throwable{
Map<String,Object> originValues = null;
Map<String,Object> newValues = null;
Map<String,Object> baseValues = null;
boolean sendAudit = false;
try {
Object[] args = pjp.getArgs();
Object parameter = args[0];
Class clazz =parameter.getClass();
TableAudit tableAudit = (TableAudit)clazz.getAnnotation(TableAudit.class);
if( tableAudit!=null){
StringBuilder clazzName = new StringBuilder();
String mapperName = clazz.getName().replace("domain","mapper");
clazzName.append(mapperName)
.append(MAPPER);
//此处和我们的项目代码结构有关,如果采用这种方式需要作出一些调整
BaseMapper baseMapper = (BaseMapper) SpringContextUtil.getBean(Class.forName(clazzName.toString()));
Object dbObj =baseMapper.selectByPrimaryKey(parameter);
//收集当前信息
originValues = collectValue(dbObj,tableAudit.all());
baseValues = new HashMap<>();
//收集表名
String tableName = StringUtil.camel2Underline(clazz.getSimpleName());
baseValues.put(TABLE_NAME,tableName);
//收集用户信息 需要根根自己项目做一些调整
baseValues.put(OPERATOR_ID,RequestContext.getUserId());
baseValues.put(LAST_NAME, RequestContext.getLastName());
//数据主键
baseValues.putAll(getFieldValueByName(clazz.getDeclaredField(ID),dbObj));
newValues = collectValue(parameter,tableAudit.all());
sendAudit = true;
}
}catch (Exception e){
LOGGER.warn("Table update data collect failed, error info {}",e.getMessage());
}
Object result = pjp.proceed();
try {
if(sendAudit && Integer.valueOf(Integer.parseInt(result.toString()))>0){
sender(baseValues,originValues,newValues);
}
}catch (Exception e){
LOGGER.warn("Table update data send failed, error info {}",e.getMessage());
}
return result;
}
public Map<String,Object> collectValue(Object obj,boolean scope) throws Exception{
Map<String,Object> values = new HashMap();
List<Field> list = Arrays.asList(obj.getClass().getDeclaredFields());
for(int i=0;i<list.size();i++){
Field field = list.get(i);
if(scope){
getFieldValueByName(field,obj);
}else{
if(field.isAnnotationPresent(ColumnAudit.class)){
values.putAll(getFieldValueByName(field,obj));
}
}
}
return values;
}
public void sender(Map<String,Object> baseValues ,Map<String,Object> originValues , Map<String,Object> newValues){
List<TableUpdateLog> logs = collectValue(baseValues,originValues,newValues);
//此处我们采用的是rabbitMq 理论上此处只要通过异步方式处理即可
rabbitMqSender.handleTableAuditData(logs);
}
public static List<TableUpdateLog> collectValue( Map<String,Object> baseValues ,Map<String,Object> originValues , Map<String,Object> newValues ){
List<TableUpdateLog> logs = new ArrayList<>();
if(!CollectionUtils.isEmpty(baseValues) && !CollectionUtils.isEmpty(originValues) && !CollectionUtils.isEmpty(newValues) ){
for(Map.Entry entry : originValues.entrySet()){
TableUpdateLog log = new TableUpdateLog();
String columnName = entry.getKey().toString();
log.setColumnName(columnName);
if(entry.getValue()!=null){
log.setOriginValue(entry.getValue().toString());
}
if(newValues.get(columnName)!=null){
log.setNewValue(newValues.get(columnName).toString());
}
if(baseValues.get(OPERATOR_ID)!=null){
log.setOperatorId(baseValues.get(OPERATOR_ID).toString());
}
log.setCreationDate(DateUtil.now().getTime());
if(baseValues.get(LAST_NAME)!=null){
log.setLastName(baseValues.get(LAST_NAME).toString());
}
if(baseValues.get(ID)!=null){
log.setPrimaryKey(baseValues.get(ID).toString());
}
if(baseValues.get(TABLE_NAME)!=null){
log.setTableName(baseValues.get(TABLE_NAME).toString());
}
logs.add(log);
}
}
return logs;
}
public Map<String,Object> getFieldValueByName(Field field,Object obj) throws Exception{
Map<String,Object> kv = new HashMap<>();
String name = field.getName();
// 将属性的首字符大写,方便构造get,set方法
name = name.substring(0, 1).toUpperCase() + name.substring(1);
// 获取属性的类型
//String type = field.getGenericType().toString();
// 如果type是类类型,则前面包含"class ",后面跟类名
Method m = obj.getClass().getMethod("get" + name);
// 调用getter方法获取属性值
Object value = m.invoke(obj);
String underlineFieldName = StringUtil.camel2Underline(name);
kv.put(underlineFieldName,value);
return kv;
}
}
@Service
public class SpringContextUtil implements ApplicationListener<ContextRefreshedEvent> {
private static ApplicationContext applicationContext = null;
@Override
public void onApplicationEvent(ContextRefreshedEvent event) {
if(applicationContext == null){
applicationContext = event.getApplicationContext();
}
}
/*ApplicationContext context= ContextLoader.getCurrentWebApplicationContext();//尝试下这个方法*/
public static ApplicationContext getApplicationContext() {
return applicationContext;
}
public static Object getBean(Class clazz) {
return applicationContext.getBean(clazz);
}
}
@Configuration
public class RabbitMqConfig {
public static final String QUEUE\_TABLE\_AUDIT = "queue.table.audit";
@Bean
public Queue queueTableAudit() {
return new Queue(QUEUE_TABLE_AUDIT);
}
}
@Component
public class RabbitMqSender {
@Autowired
private AmqpTemplate rabbitTemplate;
public void handleTableAuditData(List<TableUpdateLog> logs) {
String data = new JSONWriter().write(logs);
this.rabbitTemplate.convertAndSend(RabbitMqConfig.QUEUE_TABLE_AUDIT, data.getBytes());
}
}
@Data
@NoArgsConstructor
@AllArgsConstructor
public class TableUpdateLog {
@Id
@GeneratedValue()
private Integer id;
private String tableName;
private String primaryKey;
private String batchNumber;
private String columnName;
private String originValue;
private String newValue;
private String operatorId;
private String lastName;
private Long creationDate;
}
spring:
rabbitmq:
host: 192.168.11.210
port: 5672
整个插件我们已经完成,接下来我们使用刚刚搭建的插件
<dependency>
<groupId>com.hscf.cloud</groupId>
<artifactId>hscf-table-monitor-starter</artifactId>
<version>${hcloud.version}</version>
</dependency>
@ModifyAudit
@VersionAudit
@Data
@NoArgsConstructor
@AllArgsConstructor
@TableAudit
public class SysResourceGroup extends AuditDomain {
@Id
@GeneratedValue
@ColumnAudit
private Long id;
@ColumnAudit
private String groupCode;
@ColumnAudit
@NotNull
private String groupName;
@ColumnAudit
private String description;
}
使用就这么简单,当对这个domain进行修改操作时,就会监控其变更数据.
接下来我们对收集到的信息进行处理.
@Component
public class RabbitMqReceivers {
private Logger logger = LoggerFactory.getLogger(RabbitMqReceivers.class);
@Autowired
private TableUpdateLogDao tableUpdateLogDao;
@RabbitListener(queues = "queue.table.audit")
@RabbitHandler
public void handleApiMonitor(byte[] data, Channel channel, @Header(AmqpHeaders.DELIVERY_TAG) long deliveryTag) throws IOException {
JSONArray arrays = null;
try {
arrays = JSON.parseArray(new String(data));
if(arrays!=null && !arrays.isEmpty()){
String batchNumber = UUID.randomUUID().toString();
for(Object logJson :arrays ){
TableUpdateLog log = (TableUpdateLog)BeanParser.parse((JSONObject)logJson, TableUpdateLog.class);
log.setBatchNumber(batchNumber);
tableUpdateLogDao.save(log);
}
}
} catch (Exception e) {
logger.error("Handle table admin data failed, data:{}", arrays);
} finally {
channel.basicAck(deliveryTag, false);
}
}
}
以上方案还有很多不足,编写也比较匆忙,小伙伴们多多原谅.
总的说来,这是目前想到的一个比较实用的解决方式.
如果小伙伴们有好的解决方式,QAQ,评论区走起留言.
原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。
如有侵权,请联系 cloudcommunity@tencent.com 删除。
原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。
如有侵权,请联系 cloudcommunity@tencent.com 删除。