For update带来的思考

For update or not

起源

​ 之所以想写这个专题,是因为最近在做一个抢占任务的实现。假设数据库很多个任务,在抢占发生之前任务的状态都是FREE。现在假设同时有一堆抢占线程开始工作,抢占线程会查找数据库中状态为FREE的任务,并且将其状态置为BUSY,然后开始执行对应任务。执行完成之后,再将任务状态置为FINISH。任何任务都是不能被重复执行的,即必须保证所有任务都只能被一个线程执行。

​ 笔者和人民群众一样,第一个想到的就是利用数据库的for update实现悲观锁。这样肯定能够保证数据的强一致性,但是这样会大大影响效率,加重数据库的负担。想到之前看过的一篇文章https://www.cnblogs.com/bigben0123/p/8986507.html,文章里面有提到数据库引擎本身对更新的记录会行级上锁。这个行级锁的粒度非常细,上锁的时间窗口也最少,只有在更新记录的那一刻,才会对记录上锁。同时笔者也想到在前一家公司工作的时候,当时有幸进入到了核心支付组,负责过一段时间的账务系统。当时使用的是mysql的InnoDB引擎。记得当时的代码在往账户里面加钱的时候是没有加任何锁的,只有在从账户扣钱的时候才用for update。所以这个问题应该有更加完美的答案......


探索之路

for update的实现这里就不再做过多尝试了。这里笔者直接探索在没有for update的时候高并发情况下是否会有问题。具体尝试的过程如下:

造测试数据

​ 首先建立一个任务表,为了简单模拟,我们这里就只添加必要的字段。建表语句如下:

create table task(
       ID NUMBER(10) NOT NULL, 
    TASK_RUN_STATUS NUMBER(4) NOT NULL
);
comment on table task is '互斥任务表';
comment on column task.ID is '主键ID.';
comment on column task.TASK_RUN_STATUS is '任务运行状态(1.初始待运行 2.运行中 3.运行完成).';
alter table task add constraint TASK_PK primary key (ID) using index;

​ 为了方便测试,这里我们加入三条任务记录,插入任务记录的语句如下:

insert into task(id, task_run_status) values(0, 1);
insert into task(id, task_run_status) values(1, 1);
insert into task(id, task_run_status) values(2, 1);

模拟并发抢占

public class MultiThreadUpdate {
    public static void main(String[] args) throws Exception {
        Class.forName("oracle.jdbc.OracleDriver");
        ExecutorService executorService = Executors.newFixedThreadPool(30);
        List<Future<Void>> futures = new ArrayList<Future<Void>>();
        
        // 每个ID开20个线程去并发更新数据
        for (int i=0; i<20; i++) {
            for (int j=0; j<3; j++) {
                final int id = j;
                futures.add(executorService.submit(new Callable<Void>() {
                    public Void call() throws Exception {
                        Connection con = DriverManager.getConnection("jdbc:oracle:thin:@localhost:1521:orcl", "czbank", "123456");
                        // con.setAutoCommit(false);        // 不自动提交事务
                        PreparedStatement pstm = con.prepareStatement("update task set TASK_RUN_STATUS = ? where id = ? and TASK_RUN_STATUS = ?");
                        pstm.setInt(1, 2);
                        pstm.setInt(2, id);
                        pstm.setInt(3, 1);
                        int upRec = pstm.executeUpdate();
                        // 打印更新的记录条数
                        System.out.println("Thread:" + Thread.currentThread().getName() + " updated(id=" + id + "):" + upRec + " records...");
                        // Thread.sleep(1000);      // 在事务提交之前,其线程都会阻塞直到对特定记录的更新提交
                        // con.commit();
                        con.close();
                        pstm.close();
                        return null;
                    }
                }));
            }
        }
        executorService.shutdown();
    }
}

​ 最终程序的输出结果如下:

Thread:pool-1-thread-9 updated(id=2):0 records...
Thread:pool-1-thread-15 updated(id=2):0 records...
Thread:pool-1-thread-22 updated(id=0):0 records...
Thread:pool-1-thread-28 updated(id=0):0 records...
Thread:pool-1-thread-14 updated(id=1):0 records...
Thread:pool-1-thread-17 updated(id=1):0 records...
Thread:pool-1-thread-26 updated(id=1):0 records...
Thread:pool-1-thread-30 updated(id=2):0 records...
Thread:pool-1-thread-29 updated(id=1):0 records...
Thread:pool-1-thread-27 updated(id=2):0 records...
Thread:pool-1-thread-5 updated(id=1):0 records...
Thread:pool-1-thread-23 updated(id=1):0 records...
Thread:pool-1-thread-21 updated(id=2):1 records...
Thread:pool-1-thread-1 updated(id=0):1 records...
Thread:pool-1-thread-6 updated(id=2):0 records...
Thread:pool-1-thread-8 updated(id=1):1 records...
Thread:pool-1-thread-10 updated(id=0):0 records...
Thread:pool-1-thread-13 updated(id=0):0 records...
Thread:pool-1-thread-4 updated(id=0):0 records...
Thread:pool-1-thread-19 updated(id=0):0 records...
Thread:pool-1-thread-16 updated(id=0):0 records...
Thread:pool-1-thread-2 updated(id=1):0 records...
Thread:pool-1-thread-11 updated(id=1):0 records...
Thread:pool-1-thread-7 updated(id=0):0 records...
Thread:pool-1-thread-25 updated(id=0):0 records...
Thread:pool-1-thread-3 updated(id=2):0 records...
Thread:pool-1-thread-18 updated(id=2):0 records...
Thread:pool-1-thread-12 updated(id=2):0 records...
Thread:pool-1-thread-20 updated(id=1):0 records...
Thread:pool-1-thread-24 updated(id=2):0 records...
Thread:pool-1-thread-15 updated(id=2):0 records...
Thread:pool-1-thread-9 updated(id=0):0 records...
Thread:pool-1-thread-22 updated(id=1):0 records...
Thread:pool-1-thread-30 updated(id=0):0 records...
Thread:pool-1-thread-5 updated(id=1):0 records...
Thread:pool-1-thread-17 updated(id=2):0 records...
Thread:pool-1-thread-26 updated(id=0):0 records...
Thread:pool-1-thread-29 updated(id=1):0 records...
Thread:pool-1-thread-27 updated(id=2):0 records...
Thread:pool-1-thread-28 updated(id=0):0 records...
Thread:pool-1-thread-21 updated(id=1):0 records...
Thread:pool-1-thread-1 updated(id=2):0 records...
Thread:pool-1-thread-14 updated(id=0):0 records...
Thread:pool-1-thread-2 updated(id=1):0 records...
Thread:pool-1-thread-16 updated(id=0):0 records...
Thread:pool-1-thread-4 updated(id=2):0 records...
Thread:pool-1-thread-13 updated(id=1):0 records...
Thread:pool-1-thread-19 updated(id=2):0 records...
Thread:pool-1-thread-6 updated(id=0):0 records...
Thread:pool-1-thread-8 updated(id=1):0 records...
Thread:pool-1-thread-10 updated(id=2):0 records...
Thread:pool-1-thread-23 updated(id=0):0 records...
Thread:pool-1-thread-11 updated(id=1):0 records...
Thread:pool-1-thread-7 updated(id=2):0 records...
Thread:pool-1-thread-25 updated(id=0):0 records...
Thread:pool-1-thread-3 updated(id=1):0 records...
Thread:pool-1-thread-18 updated(id=2):0 records...
Thread:pool-1-thread-12 updated(id=0):0 records...
Thread:pool-1-thread-20 updated(id=1):0 records...
Thread:pool-1-thread-24 updated(id=2):0 records...

​ 可以看到,即使在没有显示使用事务的情况下,多线程并发执行也能够保证某一条数据的更新只被执行一次。


最终任务设计

​ 通过上面的测试例子,已经验证了我的猜想。接下来就是如何设计抢占任务的执行步骤了。废话不多说,直接上基本代码:

public void runMutexTasks(MutexTaskDto runCond) throws Exception {
    // STEP1: 先去查找待执行的互斥任务
    runCond.setTaskRunStatus(Enums.MutexTaskRunStatus.WAIT_RUN.getKey());   // 待运行
    runCond.setPhysicsFlag(Enums.TaskStatus.NORMAL.getKey());               // 正常状态(未废弃)
    PageInfo<MutexTaskDto> runnableTasks = MutexTaskService.pagingQueryGroupByTaskId(0, 0, runCond);
    if (CollectionUtils.isEmpty(runnableTasks.getRows())) {
        LOGGER.debug("根据条件未找到待执行的互斥任务,跳过执行......");
        return;
    }
    
    // STEP2: 分别尝试执行
    List<MutexTaskDto> runTasks = null;
    Collections.shuffle(runnableTasks.getRows());   // 打乱顺序
    for (MutexTaskDto oneTask : runnableTasks.getRows()) {
        runTasks = mutexTaskService.selectRunnableTaskByTaskId(oneTask.getTaskId());
        if (CollectionUtils.isEmpty(runTasks)) {
            LOGGER.info("互斥任务ID【{}】已不是待运行状态,跳过任务执行......", oneTask.getTaskId());
            continue;
        }
        
        // STEP3: 运行任务
        MutexTaskDto updateCond = new MutexTaskDto();
        updateCond.setTaskRunStatus(Enums.MutexTaskRunStatus.RUN_SUCCESS.getKey());
        updateCond.setTaskPreStatus(Enums.MutexTaskRunStatus.RUNNING.getKey());
        updateCond.setTaskId(oneTask.getTaskId());
        try {
            runTasks(runTasks);
        } catch(Exception  e) {
            updateCond.setRunRemark(getErrorMsg(e));
            updateCond.setTaskRunStatus(Enums.MutexTaskRunStatus.RUN_FAILED.getKey());
            mutexTaskService.updateByTaskId(updateCond);
            // 这里只打印失败结果,具体失败信息需要上层调用方法日志打印出来
            LOGGER.error("互斥任务ID【{}】执行失败!", oneTask.getTaskId());
            throw e;
        }
        mutexTaskService.updateByTaskId(updateCond);
        LOGGER.info("互斥任务ID【{}】执行成功......", oneTask.getTaskId());
        Thread.sleep(1000); // 抢到了一个节点执行权限,此处暂停1s,给其他机器机会
    }
}

// 其中mutexTaskService的selectRunnableTaskByTaskId方法如下:
// 不使用事务,利用数据库引擎自身的行级锁

public List<MutexTaskDto> selectRunnableTaskByTaskId(String taskId) {
    // STEP1: 先用查询数据(一个taskID可能对应多条记录,对应不同的参数)
    List<MutexTaskModle> mutexTaskModles = this.mutexTaskDao
            .selectByTaskId(taskId);
    if (CollectionUtils.isEmpty(mutexTaskModles)) {
        return Collections.emptyList();
    }
    
    // STEP2: 更新数据(使用数据库引擎自身所带的行级锁)
    MutexTaskModle updateInfo = new MutexTaskModle();
    updateInfo.setTaskRunStatus(2);
    updateInfo.setTaskPreStatus(1);
    updateInfo.setTaskId(taskId);
    int updateCount = cleaningMutexTaskDao.updateByTaskId(updateInfo);
    if (updateCount <= 0) {
        LOGGER.info("找到待执行的互斥任务,但是更新任务为执行中失败......");
        return Collections.emptyList();
    }
    
    // STEP3: 前面两项都校验过,则确认当前任务列表是可以执行的
    List<MutexTaskDto> mutexTasks = BeanConvertUtils.convertList(mutexTaskModles,
            MutexTaskDto.class);
    return mutexTasks;
}

​ 关键点就在于第58行的cleaningMutexTaskDao.updateByTaskId(updateInfo);。该语句对应的SQL大致为:

update TASK set task_status = ? where task_id = ? and task_tatus = ?

​ 其中task_id为表的主键,且启用了唯一索引。


总结

​ 这个问题刚开始笔者想到的解决方案就是使用for update。但内心总觉得这不是最佳方案,想起以前做过的项目还有看过的文章,却也总是不太确定。最终还是自己动手写了个测试用例"释怀"了内心的疑惑。最终也顺利地想出了这个"完美"的实现。不得不承认:实践是检验真理的唯一标准!工作到现在,越来越觉得大家觉得最好的实现不一定就是最好的,大家认为的最高效的方法不一定就是最高效的。很多事情没有绝对,就像写代码一样,没有绝对的好代码。

​ 当然这不是鼓励大家随便写代码,笔者想说的是:做软件就像做学问。不能纯粹地拿别人的结论奉为圣经。遇到问题要多思考,才会有自己的沉淀。思考之后要多行动,才不会仅仅停留在思想的巨人,行动的矮子。当然,行动之后也要多多整理出来,就像笔者这样,奉献社会,方便你我他......(一脸无语)?

​---

参考链接

https://www.cnblogs.com/bigben0123/p/8986507.html https://www.cnblogs.com/clphp/p/6398667.html

本文参与腾讯云自媒体分享计划,欢迎正在阅读的你也加入,一起分享。

发表于

我来说两句

0 条评论
登录 后参与评论

相关文章

来自专栏XAI

Highcharts AJAX JSON JQuery 实现动态数据交互显示图表 柱形图

这是第一篇实例的步骤与代码。还有整个项目的结构图。 http://my.oschina.net/xshuai/blog/345117 原创的博文。转载注明出处...

57860
来自专栏逢魔安全实验室

SQL注入ByPass的一些小技巧

? 01 — 前言 SQL注入从古至今都是一个经久不衰的影响严重的高危漏洞,但是网络安全发展到现在,如果想通过SQL注入直接获取数据或者权限,多多少少都需要绕...

42090
来自专栏架构师之旅

【强烈推荐】Java工程师如何从一名普通的码农成长为一位大神

本文源自 http://www.hollischuang.com/archives/489 写在前面 java作为一门编程语言,在各类编程语言中...

30280
来自专栏NetCore

[原创]Fluent NHibernate之旅二--Entity Mapping

接着上一篇,今天我们说说ORM中的Mapping。如果你要体验NHibernate的强大,首先你就要学会配置,包括SessionFactory和Mapping的...

24190
来自专栏Python爬虫与算法进阶

Scrapy中如何提高数据的插入速度

速度问题 最近工作中遇到这么一个问题,全站抓取时采用分布式:爬虫A与爬虫B,爬虫A给爬虫B喂饼,爬虫B由于各种原因运行的比较慢,达不到预期效果,所以必须对爬虫...

516110
来自专栏GreenLeaves

Oracle pl/sql编程值控制结构

以下测试案例均来自于scott方案,运行前请确保该方案解锁。 1、条件分支状语 pl/sql中提供了三种条件分支状语,分别是: if   then if   t...

20290
来自专栏Java开发者杂谈

分布式改造剧集1

背景介绍 ​ 我所在的项目组,使用的技术一直是接近原始社会的:jdk1.6 + SpringMVC + hessian + Mybatis,当前最火的中间件技术...

30040
来自专栏安恒网络空间安全讲武堂

浙江省首届网络安全大赛决赛Write Up

直接打开pwd.docx,发现是空的,那么将其作为zip 解压,在其中找到了一张二维码

21230
来自专栏我杨某人的青春满是悔恨

Swift2网络操作和异常处理

相信写过Swift的人应该都知道Alamofire,它是AFNetworking的Swift版本,同一个作者写的。之前在项目中我也一直使用Alamofire,但...

12710
来自专栏Hongten

JSP 六讲

16320

扫码关注云+社区

领取腾讯云代金券