1.需求
从mongodb数据库的A表(或者叫集合)读取数据
经过处理后导入MySQL的B表中
其中
A表有近2000万条的数据
需要对每条数据进行分析处理,分析处理过程就不细说了
每条A表数据分析处理后可提取生成数条B表数据
B表数据的字段中有school、speciality和post三个字段,和一个字段number
导入B表中的数据需要通过这三个字段联合去重,并统计重复的条数number
最终将生成的B表数据以及频率字段number存储到MySQL数据库中
需求不复杂
2.解决过程
2.1思路
逐步读取那2000万条数据,并进行处理
定义一个Map,key值为school、speciality和post三个字段的拼接值,value为出现的次数
循环遍历2000万条数据
通过Map的key值进行判断
如果不存在,则将数据通过insert语句插入到MySQL数据库,同时put到Map中,key值为school、speciality和post三个字段的拼接值,value为1
如果存在了,则从Map中取出对应的value,value++,并将value值通过update语句更新到MySQL数据库中,同时更新Map中的value
貌似没问题,运行,程序快速的运行起来了
2.2遇到问题1
通过控制台打印的日志,我发现一个问题
所有的insert语句都是都是哗哗哗的打印
update却有明显的停顿感
很明显update语句影响了执行效率
特别是当MySQL表中数据量达到200万+之后
数据插入速度明显减慢了
这么搞下去可能要耽误事儿了
2.3优化1
update B表 set number = ?where school = ? and speciality = ? and post = ?
明显这个语句是执行速度的短板,所以应该从分析这条sql语句开始
因为用到了where关键字,所以条件查询应该是时间瓶颈
最先应该想到的就是建立索引
所以建立了school、speciality、post三个字段的联合索引,索引类型是normal
晚上执行的建立索引的语句,然后就去睡觉了,第二天早上起来建立好了
200多万数据,忘了看执行时间了
继续运行,果然速度快了很多
2.4遇到问题2
当数据量逐渐变大,达到近千万级时
用来存储频率出现次数的Map大小也随之达到了千万级
从内存使用和效率方面,性能都有所下降
所以我就考虑不再通过Map记录数据的出现频率
想通过update B表 set number = number + 1 where school = ? and speciality = ? and post = ?来解决
但是这个时候我发现,由于前期代码问题,在程序中断又重启过好多次, school、speciality、post这三个字段的内容并不是唯一存在的
如果直接使用上边的语句,会导致我统计的number数字不准确
所以第一步需要解决的问题就是去重
可以通过以下语句对数据进行多字段判断的去重
DELETE
FROM
speciality_post
WHERE
( school, speciality, post ) IN (
SELECT
school,
speciality,
post
FROM
( SELECT school, speciality, post FROM speciality_post GROUP BY school, speciality, post HAVING COUNT( * ) > 1 ) s1
)
AND id NOT IN ( SELECT id FROM ( SELECT id FROM speciality_post GROUP BY school, speciality, post HAVING COUNT( * ) > 1 ) s2 );
然后再根据那三个字段建立唯一索引
但是建立索引也是一个很漫长的过程
并且去重之后,如果不重新遍历2000万的原始数据
number也是统计的不准确的
2.5优化2
最终决定
雄关漫道真如铁
而今迈步从头越
新建B表,并将school、speciality、post这三个字段建立联合唯一索引
同时改进sql语句,使用insert ... on duplicate key update语句,简化代码
然后修改代码,运行
有时候从头再来也是一种策略
可以很大程度的挽回损失或停止损失
3.总结
3.1为什么insert语句要比update语句要快?
update语句需要先定位数据行位置,需要根据主键索引或条件索引逐行扫描,然后再更新特定字段
而insert语句是没有这些开销的
所以insert语句肯定要update语句快的多
3.2为什么将update语句中的where条件字段建立索引后,更新速度就变快了?
因为如果update语句中的where条件字段没有建立索引,在执行update语句的时候是要进行全表扫描的,扫描的过程中对每一行数据进行加锁判断释放锁,这个过程耗时会随着数据量的增加直线上升
而如果加了索引,就能快速准确定位到目标数据行,而且MySQL底层使用的是B+树建立的索引,所以稳定性也特别好
3.3解释一下insert ... on duplicate key update语句
如果你插入的记录导致一个UNIQUE索引或者primary key(主键)出现重复,那么就会认为该条记录存在,则执行update语句而不是insert语句,反之,则执行insert语句而不是更新语句
判断数据是否存在是通过判断是否导致了一个UNIQUE索引或者primary key(主键)出现重复,所以update语句是不能跟where条件判断的
4.测试
接下来才是我想说的内容
针对这个过程中的出现的几种情况
我对update语句的效率做了一个测试和对比
目标表B表中的数据量有2,013,035条
执行更新语句50次(本来想用5000条,测试的时候才发现,如果是5000条我得等一天)
分别测试无索引、normal索引、unique索引下update和insert..on duplicate key update语句的耗时
话不多说上代码
主函数方法
public static void main(String[] args) {
DruidUtils.init("localhost", "3306", "test", "root", "root");
String specialityPostNoIndex = "speciality_post_no_index";
String specialityPostUniqueIndex = "speciality_post_unique_index";
String specialityPostNormalIndex = "speciality_post_normal_index";
List<SpecialityPost> list = select();
long start = System.currentTimeMillis();
for (SpecialityPost specialityPost : list) {
update(specialityPost, specialityPostNoIndex);
}
long end = System.currentTimeMillis();
System.out.println("update语句在无索引情况下耗时:\t" + (end - start) + "毫秒\n");
System.out.println("update语句在无索引情况下平均耗时:\t" + ((end - start) / 50) + "毫秒\n\n\n\n");
start = System.currentTimeMillis();
for (SpecialityPost specialityPost : list) {
insertOrUpdate(specialityPost, specialityPostNoIndex);
}
end = System.currentTimeMillis();
System.out.println("insert·· on duplicate key update语句在无索引情况下耗时:\t" + (end - start) + "毫秒\n");
System.out.println("insert·· on duplicate key update语句在无索引情况下平均耗时:\t" + ((end - start) / 50) + "毫秒\n\n\n\n");
start = System.currentTimeMillis();
for (SpecialityPost specialityPost : list) {
update(specialityPost, specialityPostNormalIndex);
}
end = System.currentTimeMillis();
System.out.println("update语句在normal索引情况下耗时:\t" + (end - start) + "毫秒\n");
System.out.println("update语句在normal索引情况下平均耗时:\t" + ((end - start) / 50) + "毫秒\n\n\n\n");
start = System.currentTimeMillis();
for (SpecialityPost specialityPost : list) {
insertOrUpdate(specialityPost, specialityPostNormalIndex);
}
end = System.currentTimeMillis();
System.out.println("insert·· on duplicate key update语句在normal索引情况下耗时:\t" + (end - start) + "毫秒\n");
System.out.println("insert·· on duplicate key update语句在normal索引情况下平均耗时:\t" + ((end - start) / 50) + "毫秒\n\n\n\n");
start = System.currentTimeMillis();
for (SpecialityPost specialityPost : list) {
update(specialityPost, specialityPostUniqueIndex);
}
end = System.currentTimeMillis();
System.out.println("update语句在unique索引情况下耗时:\t" + (end - start) + "毫秒\n");
System.out.println("update语句在unique索引情况下平均耗时:\t" + ((end - start) / 50) + "毫秒\n\n\n\n");
start = System.currentTimeMillis();
for (SpecialityPost specialityPost : list) {
insertOrUpdate(specialityPost, specialityPostUniqueIndex);
}
end = System.currentTimeMillis();
System.out.println("insert·· on duplicate key update语句在unique索引情况下耗时:\t" + (end - start) + "毫秒\n");
System.out.println("insert·· on duplicate key update语句在unique索引情况下平均耗时:\t" + ((end - start) / 50) + "毫秒\n\n\n\n");
}
其他用到的方法
private static int update(SpecialityPost specialityPost, String table) {
String sql = "update " + table + " set number = number+1 where school = ? and speciality = ? and post = ?";
Connection connection = DruidUtils.getConnection();
PreparedStatement preparedStatement = null;
try {
preparedStatement = connection.prepareStatement(sql);
preparedStatement.setString(1, specialityPost.getSchool());
preparedStatement.setString(2, specialityPost.getSpeciality());
preparedStatement.setString(3, specialityPost.getPost());
System.out.println(preparedStatement.toString());
return preparedStatement.executeUpdate();
} catch (SQLException throwables) {
throwables.printStackTrace();
} finally {
DruidUtils.closeAll(connection, preparedStatement, null);
}
return 0;
}
private static int insertOrUpdate(SpecialityPost specialityPost, String table) {
String sql = "INSERT INTO " + table + " (area,school,degree,original_speciality,speciality,original_post,post,number) " +
"VALUE" +
"(?,?,?,?,?,?,?,?) " +
"ON DUPLICATE KEY UPDATE " +
"number = number+1;";
Connection connection = DruidUtils.getConnection();
PreparedStatement preparedStatement = null;
try {
preparedStatement = connection.prepareStatement(sql);
preparedStatement.setString(1, specialityPost.getArea());
preparedStatement.setString(2, specialityPost.getSchool());
preparedStatement.setString(3, specialityPost.getDegree());
preparedStatement.setString(4, specialityPost.getOriginalSpeciality());
preparedStatement.setString(5, specialityPost.getSpeciality());
preparedStatement.setString(6, specialityPost.getOriginalPost());
preparedStatement.setString(7, specialityPost.getPost());
preparedStatement.setInt(8, specialityPost.getNumber());
System.out.println(preparedStatement.toString());
return preparedStatement.executeUpdate();
} catch (SQLException throwables) {
throwables.printStackTrace();
} finally {
DruidUtils.closeAll(connection, preparedStatement, null);
}
return 0;
}
private static List<SpecialityPost> select() {
String sql = "select * from speciality_post_unique_index order by number limit 50";
Connection connection = DruidUtils.getConnection();
List<SpecialityPost> list = new ArrayList<>();
PreparedStatement preparedStatement = null;
ResultSet resultSet = null;
try {
preparedStatement = connection.prepareStatement(sql);
System.out.println(preparedStatement.toString());
resultSet = preparedStatement.executeQuery();
while (resultSet.next()) {
SpecialityPost specialityPost = new SpecialityPost();
specialityPost.setId(resultSet.getInt("id"));
specialityPost.setNumber(resultSet.getInt("number"));
specialityPost.setSchool(resultSet.getString("school"));
specialityPost.setSpeciality(resultSet.getString("speciality"));
specialityPost.setPost(resultSet.getString("post"));
specialityPost.setArea(resultSet.getString("area"));
specialityPost.setOriginalSpeciality(resultSet.getString("original_speciality"));
specialityPost.setOriginalPost(resultSet.getString("original_post"));
list.add(specialityPost);
}
} catch (SQLException throwables) {
throwables.printStackTrace();
} finally {
DruidUtils.closeAll(connection, preparedStatement, resultSet);
}
return list;
}
4.1无索引情况下的update语句
4.2无索引情况下的insert ... on duplicate key update语句
注意:这个之所以执行这么快,不是因为sql语句的优化的好,前边说到了,这个语句判断是否更新是判断是否与唯一索引冲突,在这里是没冲突的,所以其实执行的是插入操作
4.3normal索引情况下的update语句
4.4normal索引情况下的 insert ... on duplicate key update语句
注意:这个之所以执行这么快,不是因为sql语句的优化的好,前边说到了,这个语句判断是否更新是判断是否与唯一索引冲突,在这里是没冲突的,所以其实执行的是插入操作
4.5unique索引情况下的update语句
4.6unique索引情况下的 insert ... on duplicate key update语句
可以看到使用索引和insert ... on duplicate key update语句的效率还是极高的
在做类似数据操作的时候,可以参考一下
你有其他更好的办法吗?
文/戴先生@2020年6月20日
---end---