增量接口的设计及实现

引言

在应用开发过程中,我们总会碰到这样的场景:某系统需要同步我们系统的数据去做一些业务逻辑,当数据量较小的时候,可以全量的提供,但当数据量很大时,全量提供就显得很笨重,不仅耗时而且做了很多无用功,这时我们需要一种提供增量数据的机制,只告诉对方变化的数据。提供增量数据大致可分为两种方式:MQ和接口提供,MQ的优点是及时,缺点是丢失、重复、回溯复杂等等问题(依赖于具体MQ实现),这里不过多赘述;接口提供不限于RPC或HTTP等方式,接口提供的优缺点正好和MQ反过来,及时性取决于调用周期。P.S.本文描述的数据同步区别于数据库层的同步,应用层的同步表不一定同构,或者都不落地。

Created with Raphaël 2.1.0AABBsync datado something

接口设计

只需要一个version参数,其它参数根据实际业务场景添加,返回值中也加入version,调用端使用返回值中的version用于下次调用。

接口使用

String lastVersion = getLastVersion();// 拿到上一次版本号
try {
    while(true) {
        List<Data> datas = syncData(lastVersion);// remote or local
        if (datas.isEmpty()) {
            break;
        }
        for (Data data : datas) {
            // 1. 做一些逻辑处理
            // ...............  
            // 2. 暂存version
            lastVersion = data.getVersion();
        }
    }
} finally {
    saveVersion(lastVersion);// 保存版本号
}

以上代码需要放到一个定时调度模块中,周期越短,数据延时越低。如果由于故障或BUG导致处理出现问题时,只需将版本号向前调整即可,回溯简单。

接口实现

实现要考虑以下几个方面,内存占用、version设计、数据删除。

内存占用

增量接口很可能被其它系统频繁的调用,尤其当我们系统中有一种很核心的数据,所以要对每次调用返回的数据量有一个控制,比如每次只返回1000条,后面描述都以1000条为例。我建议这个数据量控制在数据提供方,而不是调用方,即便调用方可以控制,提供方也要做一个最大限制。

version设计

假设我们的数据类似于下表:

id

update_time

2

2017-03-09 23:59:59

68

2017-03-09 23:59:59

26

2017-03-09 23:59:59

71

2017-03-09 23:59:59

17

2017-03-09 23:59:59

14

2017-03-09 23:59:59

11

2017-03-09 23:59:59

8

2017-03-09 23:59:59

5

2017-03-09 23:59:59

65

2017-03-09 23:59:59

66

2017-03-09 23:59:59

version设计很多人第一时间会想到数据更新时间,SQL可能是这样:

SELECT * FROM data 
WHERE update_time > #{version} 
ORDER BY update_time ASC LIMIT 1000;

当很多数据update_time一样时,会丢失数据。比如上一批次返回的最后一条是id=71,version是2017-03-09 23:59:59,那本次查询就会忽略后面update_time=2017-03-09 23:59:59的数据。这时可能又有人想到下面这样的方式:

SELECT * FROM data 
WHERE update_time >= #{version} 
ORDER BY update_time ASC LIMIT 1000;

update_time加了个=,这样是不会丢数据了,但是会返回重复数据,甚至死循环。比如比如上一批次返回的最后一条是id=71,version是2017-03-09 23:59:59,id=71后面有10000条update_time=2017-03-09 23:59:59的数据,接口每次返回1000条,这时调用端永远跳不出这批数据了。鉴于上面的问题,显然version单单使用数据更新时间已经不够了,这时可以加入其它辅助项,比如自增ID。比如比如上一批次返回的最后一条是id=71,这时的version格式是这样:2017-03-09 23:59:59@71,SQL变成下面这样,分两步查询:

第一步: 查询update_time = '2017-03-09 23:59:59'并且id > 71的数据;
SELECT * FROM data 
WHERE update_time = '2017-03-09 23:59:59' AND id > 71
ORDER BY update_time ASC, id ASC LIMIT 1000;
第二步: 查询update_time > '2017-03-09 23:59:59'的数据;
SELECT * FROM data 
WHERE update_time > '2017-03-09 23:59:59'
ORDER BY update_time ASC, id ASC LIMIT ?;

这里有一些细节需要控制,如果第一步返回的数据量已经达到1000,则不需要执行第二步,如果小于1000,则需要执行第二步,数据量应该依据第一步返回的数据量计算。最终,version的格式:更新时间毫秒数@数据自增id,上面为了方便说明,直接用了格式化后的时间。

但上面这种基于数据更新时间的同步方式在并发写入场景下可能存在问题,比如一条数据在2017-03-09 23:59:59时被更新,但该事务是在2017-03-10 00:00:01时提交,恰好在2017-03-09 23:59:59有一个同步发生,那这次同步是同步不到这条数据的,因为事务还没有提交,而下一次同步也不会同步这条数据,因为时间(2017-03-09 23:59:59)极有可能已经过去了。解决这个问题也比较简单,我们可以在更新数据的同时,记录一条数据日志,并且有一个线程去定期清理过期的重复数据,最后我们的版本号就是该日志表的自增主键ID。

数据删除

增量数据的获取是依赖更新时间,这就有一个隐含的前提,需要数据存在,如果数据真正的删除了,那也就不能获取到这条数据的变更了。所以,通过接口提供增量数据不能真删数据,而要假删(增加一个状态,表示有效或无效),这也算一个缺点吧。

本文参与腾讯云自媒体分享计划,欢迎正在阅读的你也加入,一起分享。

发表于

我来说两句

0 条评论
登录 后参与评论

相关文章

来自专栏Golang语言社区

Go语言并发编程总结

Golang :不要通过共享内存来通信,而应该通过通信来共享内存。这句风靡在Go社区的话,说的就是 goroutine中的 channel ....... 他在...

2609
来自专栏编程思想之异常处理

C++异常处理

0.如果使用普通的处理方式:ASSERT,return等已经    足够简洁明了,请不要使用异常处理机制.

541
来自专栏Java技术分享

高并发分布式系统中生成全局唯一Id汇总

数据在分片时,典型的是分库分表,就有一个全局ID生成的问题。 单纯的生成全局ID并不是什么难题,但是生成的ID通常要满足分片的一些要求:    1 不能有单点故...

2475
来自专栏闻道于事

多线程基础必要知识点!看了学习多线程事半功倍

2023
来自专栏Golang语言社区

Go语言并发编程总结

Golang :不要通过共享内存来通信,而应该通过通信来共享内存。这句风靡在Go社区的话,说的就是 goroutine中的 channel …….

824
来自专栏Java架构沉思录

你真的懂Mybatis缓存机制吗

Mybatis的一级缓存是指Session缓存。一级缓存的作用域默认是一个SqlSession。Mybatis默认开启一级缓存。 也就是在同一个SqlSessi...

6905
来自专栏Golang语言社区

Go语言并发编程总结

Golang :不要通过共享内存来通信,而应该通过通信来共享内存。这句风靡在Go社区的话,说的就是 goroutine中的 channel ....... 他在...

2927
来自专栏pangguoming

分布式系统唯一ID生成方案汇总

系统唯一ID是我们在设计一个系统的时候常常会遇见的问题,也常常为这个问题而纠结。生成ID的方法有很多,适应不同的场景、需求以及性能要求。所以有些比较复杂的系统会...

4926
来自专栏java达人

使用Redis做MyBatis的二级缓存

使用Redis做MyBatis的二级缓存  通常为了减轻数据库的压力,我们会引入缓存。在Dao查询数据库之前,先去缓存中找是否有要找的数据,如果有则用缓存中的数...

3725
来自专栏Java3y

多线程之死锁就是这么简单

1835

扫码关注云+社区