将MySQL去重操作优化到极致之三弹连发(二):多线程并行执行

        上一篇已经将单条查重语句调整到最优,但该语句是以单线程方式执行。能否利用多处理器,让去重操作多线程并行执行,从而进一步提高速度呢?比如我的实验环境是4处理器,如果使用4个线程同时执行查重sql,理论上应该接近4倍的性能提升。

一、数据分片

        我们生成测试数据时,created_time采用每条记录加一秒的方式,也就是最大和在最小的时间差为50万秒,而且数据均匀分布。因此先把数据平均分成4份。

1. 查询出4份数据的created_time边界值

select date_add('2017-01-01',interval 125000 second) dt1,
       date_add('2017-01-01',interval 2*125000 second) dt2,
       date_add('2017-01-01',interval 3*125000 second) dt3,
       max(created_time) dt4
  from t_source;

        查询结果如图一所示。

图一

2. 查看每份数据的记录数,确认数据平均分布

select case when created_time >= '2017-01-01' 
             and created_time < '2017-01-02 10:43:20'
            then '2017-01-01'
            when created_time >= '2017-01-02 10:43:20'
             and created_time < '2017-01-03 21:26:40'
            then '2017-01-02 10:43:20'
            when created_time >= '2017-01-03 21:26:40' 
             and created_time < '2017-01-05 08:10:00'
            then '2017-01-03 21:26:40' 
            else '2017-01-05 08:10:00'
        end min_dt,
       case when created_time >= '2017-01-01' 
             and created_time < '2017-01-02 10:43:20'
            then '2017-01-02 10:43:20'
            when created_time >= '2017-01-02 10:43:20'
             and created_time < '2017-01-03 21:26:40'
            then '2017-01-03 21:26:40'
            when created_time >= '2017-01-03 21:26:40' 
             and created_time < '2017-01-05 08:10:00'
            then '2017-01-05 08:10:00'
            else '2017-01-06 18:53:20'
        end max_dt,
       count(*)
  from t_source
 group by case when created_time >= '2017-01-01' 
             and created_time < '2017-01-02 10:43:20'
            then '2017-01-01'
            when created_time >= '2017-01-02 10:43:20'
             and created_time < '2017-01-03 21:26:40'
            then '2017-01-02 10:43:20'
            when created_time >= '2017-01-03 21:26:40' 
             and created_time < '2017-01-05 08:10:00'
            then '2017-01-03 21:26:40' 
            else '2017-01-05 08:10:00'
        end,
       case when created_time >= '2017-01-01' 
             and created_time < '2017-01-02 10:43:20'
            then '2017-01-02 10:43:20'
            when created_time >= '2017-01-02 10:43:20'
             and created_time < '2017-01-03 21:26:40'
            then '2017-01-03 21:26:40'
            when created_time >= '2017-01-03 21:26:40' 
             and created_time < '2017-01-05 08:10:00'
            then '2017-01-05 08:10:00'
            else '2017-01-06 18:53:20'
        end;

        查询结果如图二所示。

图二

        4份数据的并集应该覆盖整个源数据集,并且数据之间是不重复的。也就是说4份数据的created_time要连续且互斥,连续保证处理全部数据,互斥确保了不需要二次查重。实际上这和时间范围分区的概念类似,或许用分区表更好些,只是这里省略了重建表的步骤。

3. 建立查重的存储过程

        有了以上信息我们就可以写出4条语句处理全部数据。为了调用接口尽量简单,建立下面的存储过程。

delimiter //
create procedure sp_unique(i smallint)    
begin     
    set @a:='0000-00-00 00:00:00';  
    set @b:=' ';  
	if (i<4) then
        insert into t_target  
        select * from t_source force index (idx_sort)  
         where created_time >= date_add('2017-01-01',interval (i-1)*125000 second) 
           and created_time < date_add('2017-01-01',interval i*125000 second) 
           and (@a!=created_time or @b!=item_name) 
           and (@a:=created_time) is not null 
           and (@b:=item_name) is not null  
         order by created_time,item_name;  
        commit;
    else 
	insert into t_target  
        select * from t_source force index (idx_sort)  
         where created_time >= date_add('2017-01-01',interval (i-1)*125000 second) 
           and created_time <= date_add('2017-01-01',interval i*125000 second) 
           and (@a!=created_time or @b!=item_name) 
           and (@a:=created_time) is not null 
           and (@b:=item_name) is not null  
         order by created_time,item_name;  
        commit;
    end if;	
end     
// 

delimiter ; 

        查询的执行计划都如图三所示。

图三

        mysql优化器进行索引范围扫描,并且使用索引条件下推(ICP)优化查询。

二、并行执行

        下面分别使用shell后台进程和MySQL Schedule Event实现并行。

1. shell后台进程

(1)建立duplicate_removal.sh文件,内容如下。

#!/bin/bash
mysql -vvv -u root -p123456 test -e "truncate t_target" &>/dev/null 
date '+%H:%M.%N'
for y in {1..4}
do
  sql="call sp_unique($y)"
  mysql -vvv -u root -p123456 test -e "$sql" &>par_sql1_$y.log &
done
wait
date '+%H:%M.%N'

(2)执行脚本文件

chmod 755 duplicate_removal.sh
./duplicate_removal.sh

        执行输出入图四所示。

图四

这种方法用时3.4秒,并行执行的4个过程调用分别用时如图五所示。

图五

        可以看到,每个过程的执行时间均不到3.4秒,因为是并行执行,总的过程执行时间也小于3.4秒,比单线程sql速度提高了近3倍。

2. MySQL Schedule Event

        吴老师也用到了并行,但他是利用MySQL自带的Schedule Event功能实现的,代码应该和下面的类似。

(1)建立事件历史日志表

-- 用于查看事件执行时间等信息
create table t_event_history  (  
   dbname  varchar(128) not null default '',  
   eventname  varchar(128) not null default '',  
   starttime  datetime(3) not null default '0000-00-00 00:00:00',  
   endtime  datetime(3) default null,  
   issuccess  int(11) default null,  
   duration  int(11) default null,  
   errormessage  varchar(512) default null,  
   randno  int(11) default null
);  

(2)修改event_scheduler参数

set global event_scheduler = 1;

(3)为每个并发线程创建一个事件

delimiter //
create event ev1 on schedule at current_timestamp + interval 1 hour on completion preserve disable do 
begin
    declare r_code char(5) default '00000';  
    declare r_msg text;  
    declare v_error integer;  
    declare v_starttime datetime default now(3);  
    declare v_randno integer default floor(rand()*100001);  
      
    insert into t_event_history (dbname,eventname,starttime,randno) 
    #作业名	
    values(database(),'ev1', v_starttime,v_randno);    
     
    begin  
        #异常处理段  
        declare continue handler for sqlexception    
        begin  
            set v_error = 1;  
            get diagnostics condition 1 r_code = returned_sqlstate , r_msg = message_text;  
        end;  
          
        #此处为实际调用的用户程序过程  
        call sp_unique(1);  
    end;  
      
    update t_event_history set endtime=now(3),issuccess=isnull(v_error),duration=timestampdiff(microsecond,starttime,now(3)), errormessage=concat('error=',r_code,', message=',r_msg),randno=null where starttime=v_starttime and randno=v_randno;  
      
end
//     

create event ev2 on schedule at current_timestamp + interval 1 hour on completion preserve disable do 
begin
    declare r_code char(5) default '00000';  
    declare r_msg text;  
    declare v_error integer;  
    declare v_starttime datetime default now(3);  
    declare v_randno integer default floor(rand()*100001);  
      
    insert into t_event_history (dbname,eventname,starttime,randno) 
    #作业名	
    values(database(),'ev2', v_starttime,v_randno);    
     
    begin  
        #异常处理段  
        declare continue handler for sqlexception    
        begin  
            set v_error = 1;  
            get diagnostics condition 1 r_code = returned_sqlstate , r_msg = message_text;  
        end;  
          
        #此处为实际调用的用户程序过程  
        call sp_unique(2);  
    end;  
      
    update t_event_history set endtime=now(3),issuccess=isnull(v_error),duration=timestampdiff(microsecond,starttime,now(3)), errormessage=concat('error=',r_code,', message=',r_msg),randno=null where starttime=v_starttime and randno=v_randno;  
      
end
//  

create event ev3 on schedule at current_timestamp + interval 1 hour on completion preserve disable do 
begin
    declare r_code char(5) default '00000';  
    declare r_msg text;  
    declare v_error integer;  
    declare v_starttime datetime default now(3);  
    declare v_randno integer default floor(rand()*100001);  
      
    insert into t_event_history (dbname,eventname,starttime,randno) 
    #作业名	
    values(database(),'ev3', v_starttime,v_randno);    
     
    begin  
        #异常处理段  
        declare continue handler for sqlexception    
        begin  
            set v_error = 1;  
            get diagnostics condition 1 r_code = returned_sqlstate , r_msg = message_text;  
        end;  
          
        #此处为实际调用的用户程序过程  
        call sp_unique(3);  
    end;  
      
    update t_event_history set endtime=now(3),issuccess=isnull(v_error),duration=timestampdiff(microsecond,starttime,now(3)), errormessage=concat('error=',r_code,', message=',r_msg),randno=null where starttime=v_starttime and randno=v_randno;  
      
end
//  

create event ev4 on schedule at current_timestamp + interval 1 hour on completion preserve disable do 
begin
    declare r_code char(5) default '00000';  
    declare r_msg text;  
    declare v_error integer;  
    declare v_starttime datetime default now(3);  
    declare v_randno integer default floor(rand()*100001);  
      
    insert into t_event_history (dbname,eventname,starttime,randno) 
    #作业名	
    values(database(),'ev4', v_starttime,v_randno);    
     
    begin  
        #异常处理段  
        declare continue handler for sqlexception    
        begin  
            set v_error = 1;  
            get diagnostics condition 1 r_code = returned_sqlstate , r_msg = message_text;  
        end;  
          
        #此处为实际调用的用户程序过程  
        call sp_unique(4);  
    end;  
      
    update t_event_history set endtime=now(3),issuccess=isnull(v_error),duration=timestampdiff(microsecond,starttime,now(3)), errormessage=concat('error=',r_code,', message=',r_msg),randno=null where starttime=v_starttime and randno=v_randno;  
      
end
//

delimiter ;   

        说明:为了记录每个事件执行的时间,在事件定义中增加了操作日志表的逻辑,因为每个事件中只多执行了一条insert,一条update,4个事件总共多执行8条很简单的语句,对测试的影响可以忽略不计。执行时间精确到毫秒。

(4)触发事件执行

mysql -vvv -u root -p123456 test -e "truncate t_target;alter event ev1 on schedule at current_timestamp enable;alter event ev2 on schedule at current_timestamp enable;alter event ev3 on schedule at current_timestamp enable;alter event ev4 on schedule at current_timestamp enable;"

        说明:该命令行顺序触发了4个事件,但不会等前一个执行完才执行下一个,而是立即向下执行。从图六的输出也可以清楚地看到这一点。因此四次过程调用是并行执行的。

图六

(5)查看事件执行日志

select * from t_event_history;

        查询结果如图7所示。

图七

        可以看到,每个过程的执行均为3.5秒,又因为是并行执行的,因此总的执行之间也是3.5秒,优化效果和shell后台进程方式几乎相同。

参考:

Increasing slow query performance with the parallel query execution

Mysql Event 调度历史记录

本文参与腾讯云自媒体分享计划,欢迎正在阅读的你也加入,一起分享。

发表于

我来说两句

0 条评论
登录 后参与评论

相关文章

来自专栏desperate633

计算机网络之数据交换(电路交换,报文交换,分组交换)电路交换报文交换分组交换分组交换与电路交换

就要经过网络核心进行数据交换,数据不断从一个网络交换到另一个网络,直到到达目的主机。所以网络核心解决的基本问题就是数据交换。

761
来自专栏小小挖掘机

整理一些计算机基础知识!

为了使不同计算机厂家生产的计算机能够相互通信,以便在更大的范围内建立计算机网络,国际标准化组织(ISO)在1978年提出了“开放系统互联参考模型”,即著名的OS...

863
来自专栏公有云大数据平台弹性 MapReduce

EMR上Zeppelin入门

简而言之,就是一个大数据分析平台。用户可以利用提供好的WEB UI,在线编写分析逻辑代码,输出结果,并且能够利用可视化工具,形象生动的在线展示结果。

1181
来自专栏FreeBuf

代码审计之任意文件下载漏洞案例分享

继上次审计HDWiki之后,最近拿到一套新的源码Ear_Music_20180510_UTF8最新版进行审计,发现这套cms还是比较安全的,而当我审计遇到一处下...

1124
来自专栏谭广健的专栏

【小程序-云开发】手把手教你使用云开发(数据库开发)

继上一次程序员哥哥简单开发了一个照片储存小程序后,感觉还是有些小小缺陷,就是没办法对上传照片进行文字描述。因为主要都是文字,如果将文字描述再保...

1.5K1
来自专栏分布式系统和大数据处理

C#网络编程(基本概念和操作) - Part.1

C#网络编程系列文章计划简单地讲述网络编程方面的基础知识,由于本人在这方面功力有限,所以只能提供一些初步的入门知识,希望能对刚开始学习的朋友提供一些帮助。如果想...

1013
来自专栏散尽浮华

MongoDB集群运维笔记

前面的文章介绍了MongoDB副本集和分片集群的做法,下面对MongoDB集群的日常维护操作进行小总结:         MongDB副本集故障转移功能得益于它...

1.2K9
来自专栏H2Cloud

C++执行内存memcpy的效率测试

在进行memcpy操作时,虽然是内存操作,但是仍然是耗一点点CPU的,今天测试了一下单线程中执行memcpy的效率,这个结果对于配置TCP epoll中的wor...

3454
来自专栏Jack-Cui

Jetson TX1开发笔记(三):开发利器-Nsight Eclipse Edition

PC平台(Host): 虚拟机Ubuntu14.04 嵌入式平台(Target): Jeston TX1 一、NSight简介     Jetpack开...

2475
来自专栏开发 & 算法杂谈

并行化的动态数据竞争验证和检测方法

之前系列提到的动态数据竞争验证和检测方法是结合了验证和检测两部分。这篇文章主要介绍一下并行化的动态数据竞争验证和检测方法。

2084

扫码关注云+社区