❝沉淀、分享、成长,让自己和他人都能有所收获!😜❞
不卷了,能用就行!
哈哈哈,说好的不卷了,能凑活用就行了。但每次接到新需求时都手痒,想结合着上一次的架构设计和落地经验,在这一次需求上在迭代更新,或者找到完全颠覆之前的更优方案。卷完代码的那一刻总是神清气爽
其实大部分喜欢写代码的一类纯粹码农,都是比较卷的,就比如一个需求在实现上是能用大概是P5
、如果这个做出来的功能不只是能用还非常好用是P6
、除了好用还凝练共性需求开发成通用的组件服务是P7
。每一个成长过来的码农,都是在造轮子的路上一次次验证自己的想法和加以实践,绝对不是一篇篇的八股文就能累出来一个高级的技术大牛。
什么是延迟任务?
当我们的实际业务需求场景中,有一些活动开始前的状态变更、订单结算后的T+1对账、贷款单息费的产生,都是需要使用到延迟任务来进行触达。实际的操作一般会有 Quartz、Schedule 来对你的库表数据进行定时扫描和处理,当条件满足后做数据状态的变更或者产生新的数据插入到表中。
这样一个简单的需求就是延迟任务最初需求,如果需求前期内容较少、使用方不多,可能在实际开发中就只是一个单台机器直接对着表一顿轮训就完事了。但随着业务需求的发展和功能的复杂度提升,往往反馈到研发设计和实现,就不那么简单了,比如:你需要保障尽可能低延迟完成较大规模的数据量扫描处理,否则就像贷款单息费的产生,已经到了第二天用户还没看到自己的息费信息或者是还款后的重新对账,可能就这个时候就要产生客诉了。
那么,类似这样的场景该如何设计呢?
通常的任务中心处理流程主要,主要是由定时任务扫描任务库表,把即将达到超时时间的任务信息扫描到处理队列(内存/MQ消息
),再由业务系统进行处理任务,处理完成后更新库表中的任务状态。
高延时任务调度
问题:
除了一些较小的状态变更场景,例如在各自业务的库表中,就包含了一个状态字段,这个字段一方面有程序逻辑处理变更的状态,也有到达指定到期
时间后由任务服务自动变更处理的操作,一般这类功能,直接设计到自己的库表中即可。
那么还有一些较大也较为频繁使用的场景,如果都是在每个系统的各自所需的N多个表中,都添加这样的字段进行维护,就显得非常冗余了,也不那么易于维护。所以针对这样的场景就很适合做一个通用的任务延时系统,各业务系统把需要被延时执行的动作提交到延时系统中,再有延时系统在指定时间下进行回调,回调的动作可以是接口或者MQ消息进行触达。例如可以设计这样一个任务调度表:
任务调度库表设计
低延迟处理方案,是在任务表方式的基础上,新增加的时间把控处理。它可以把即将到期的前一段时间的任务,放置到 Redis 集群队里中,在消费的时候再从队列中 pop 出来,这样可以更快的接近任务的处理时效,避免因为扫库间隔较大延迟任务执行。
任务处理流程
Redis 消费队列
Redis 消费队列
index = CRC32 & 7
#{topic}_#{index}
和 Sorted Set 的数据结构按执行任务分数排序,存放任务执行信息。定时消息将时间戳作为分数,消费时每次弹出分数小于当前时间戳的一个消息低延迟的超时中心实现方式
简单案例
@Test
public void test_delay_queue() throws InterruptedException {
RBlockingQueue<Object> blockingQueue = redissonClient.getBlockingQueue("TASK");
RDelayedQueue<Object> delayedQueue = redissonClient.getDelayedQueue(blockingQueue);
new Thread(() -> {
try {
while (true){
Object take = blockingQueue.take();
System.out.println(take);
Thread.sleep(10);
}
} catch (InterruptedException e) {
e.printStackTrace();
}
}).start();
int i = 0;
while (true){
delayedQueue.offerAsync("测试" + ++i, 100L, TimeUnit.MILLISECONDS);
Thread.sleep(1000L);
}
}
测试数据
2022-02-13 WARN 204760 --- [ Finalizer] i.l.c.resource.DefaultClientResources : io.lettuce.core.resource.DefaultClientResources was not shut down properly, shutdown() was not called before it's garbage-collected. Call shutdown() or shutdown(long,long,TimeUnit)
测试1
测试2
测试3
测试4
测试5
Process finished with exit code -1
你好,我是小傅哥。一线互联网java
工程师、架构师,开发过交易&营销、写过运营&活动、设计过中间件也倒腾过中继器、IO板卡。不只是写Java语言,也搞过C#、PHP,是一个技术活跃的折腾者。