【死磕Sharding-jdbc】---group by结果合并(2)

在sharding-jdbc源码之group by结果合并(1)中主要分析了sharding-jdbc如何在GroupByStreamResultSetMergerGroupByMemoryResultSetMerger中选择,并分析了GroupByStreamResultSetMerger的实现;接下来分析GroupByMemoryResultSetMerger的实现原理;

通过sharding-jdbc源码之group by结果合并(1)的分析可知,如果要走GroupByMemoryResultSetMerger,那么需要这样的SQL: SELECT o.status,count(o.user_id)count_user_id FROM t_order o whereo.user_id=10groupbyo.status order bycount_user_id asc,即group by和order by的字段不一样;接下来的分析都是基于这条SQL;

ExecutorEngine.build()方法中通过 returnnewGroupByMemoryResultSetMerger(columnLabelIndexMap,resultSets,selectStatement);调用GroupByMemoryResultSetMerger,GroupByMemoryResultSetMerger的构造方法源码如下:

public GroupByMemoryResultSetMerger(        final Map<String, Integer> labelAndIndexMap, final List<ResultSet> resultSets, final SelectStatement selectStatement) throws SQLException {    // labelAndIndexMap就是select结果列与位置索引的map,例如{count_user_id:2, status:1}    super(labelAndIndexMap);    // select查询语句    this.selectStatement = selectStatement;    // resultSets就是并发在多个实际表执行返回的结果集合,在多少个实际表上执行,resultSets的size就有多大;    memoryResultSetRows = init(resultSets);}

在实际表torder0和torder1上执行SQL返回的结果如下:

torder0和torder1结果.png

知道实际表的返回结果后,后面的分析更容易理解;假定这些返回结果用json表示为:{[{"status":"NEW", "countuserid":1},{"status":"VALID", "countuserid":1},{"status":INIT, "countuserid":2}],[{"status":"VALID", "countuserid":1},{"status":"INIT", "countuserid":1},{"status":""NEW, "countuserid":3}]}

init()方法源码如下:

private Iterator<MemoryResultSetRow> init(final List<ResultSet> resultSets) throws SQLException {    Map<GroupByValue, MemoryResultSetRow> dataMap = new HashMap<>(1024);    Map<GroupByValue, Map<AggregationSelectItem, AggregationUnit>> aggregationMap = new HashMap<>(1024);    // 遍历多个实际表执行返回的结果集合中所有的结果,即2个实际表每个实际表3条结果,总计6条结果    for (ResultSet each : resultSets) {        while (each.next()) {            // each就是遍历过程中的一条结果,selectStatement.getGroupByItems()即group by项,即status,将结果和group by项组成一个GroupByValue对象--实际是从ResultSet中取出group by项的值,例如NEW,VALID,INIT等            GroupByValue groupByValue = new GroupByValue(each, selectStatement.getGroupByItems());            // initForFirstGroupByValue()分析如下            initForFirstGroupByValue(each, groupByValue, dataMap, aggregationMap);            aggregate(each, groupByValue, aggregationMap);        }    }    // 将aggregationMap中的聚合计算结果封装到dataMap中    setAggregationValueToMemoryRow(dataMap, aggregationMap);    // 将结果转换成List<MemoryResultSetRow>形式    List<MemoryResultSetRow> result = getMemoryResultSetRows(dataMap);    if (!result.isEmpty()) {        // 如果有结果,再将currentResultSetRow置为List<MemoryResultSetRow>的第一个元素        setCurrentResultSetRow(result.get(0));    }    // 返回List<MemoryResultSetRow>的迭代器,后面的取结果,实际上就是迭代这个集合;    return result.iterator();}

initForFirstGroupByValue()源码如下:

private void initForFirstGroupByValue(final ResultSet resultSet, final GroupByValue groupByValue, final Map<GroupByValue, MemoryResultSetRow> dataMap,                                       final Map<GroupByValue, Map<AggregationSelectItem, AggregationUnit>> aggregationMap) throws SQLException {    // groupByValue如果是第一次出现,那么在dataMap中初始化一条数据,key就是groupByValue,例如NEW;value就是new MemoryResultSetRow(resultSet),即将ResultSet中的结果取出来封装到MemoryResultSetRow中,MemoryResultSetRow实际就一个属性Object[] data,那么data值就是这样的["NEW", 1]     if (!dataMap.containsKey(groupByValue)) {        dataMap.put(groupByValue, new MemoryResultSetRow(resultSet));    }    // groupByValue如果是第一次出现,那么在aggregationMap中初始化一条数据,key就是groupByValue,例如NEW;value又是一个map,这个map的key就是select中有聚合计算的列,例如count(user_id),即count_user_id;value就是AggregationUnit的实现,count聚合计算的实现是AccumulationAggregationUnit    if (!aggregationMap.containsKey(groupByValue)) {        Map<AggregationSelectItem, AggregationUnit> map = Maps.toMap(selectStatement.getAggregationSelectItems(), new Function<AggregationSelectItem, AggregationUnit>() {            @Override            public AggregationUnit apply(final AggregationSelectItem input) {                // 根据聚合计算类型得到AggregationUnit的实现                return AggregationUnitFactory.create(input.getType());            }        });        aggregationMap.put(groupByValue, map);    }}

该方法都是为了接下来的聚合计算做准备工作;

aggregate()源码如下--即在内存中将多个实际表中返回的结果进行聚合:

private void aggregate(final ResultSet resultSet, final GroupByValue groupByValue, final Map<GroupByValue, Map<AggregationSelectItem, AggregationUnit>> aggregationMap) throws SQLException {    // 遍历select中所有的聚合类型,例如COUNT(o.user_id)    for (AggregationSelectItem each : selectStatement.getAggregationSelectItems()) {        List<Comparable<?>> values = new ArrayList<>(2);        if (each.getDerivedAggregationSelectItems().isEmpty()) {            values.add(getAggregationValue(resultSet, each));        } else {            for (AggregationSelectItem derived : each.getDerivedAggregationSelectItems()) {                values.add(getAggregationValue(resultSet, derived));            }        }        // 通过AggregationUnit实现类即AccumulationAggregationUnit进行聚合,实际上就是聚合本次遍历到的ResultSet,聚合的临时结果就在AccumulationAggregationUnit的属性result中(AccumulationAggregationUnit聚合的本质就是累加)        aggregationMap.get(groupByValue).get(each).merge(values);    }}

经过 for(ResultSeteach :resultSets){while(each.next()){...遍历所有结果并聚合计算后,aggregationMap这个map中已经聚合计算完后的结果,如下所示:

{    "VALID": {        "COUNT(user_id)": 2    },    "INIT": {        "COUNT(user_id)": 5    },    "NEW": {        "COUNT(user_id)": 3    }}

再将aggregationMap中的结果封装到 Map<GroupByValue,MemoryResultSetRow>dataMap这个map中,结果形式如下所示:

{    "VALID": ["VALID", 2],    "INIT": ["INIT", 5],    "NEW": ["NEW", 3]}

MemoryResultSetRow的本质就是一个 Object[]data,所以其值是["VALID", 2],["INIT", 5]这种形式

将结果转成 List<MemoryResultSetRow>,并且排序--如果有order by,那么根据order by的值进行排序,否则根据group by的值排序:

private List<MemoryResultSetRow> getMemoryResultSetRows(final Map<GroupByValue, MemoryResultSetRow> dataMap) {    List<MemoryResultSetRow> result = new ArrayList<>(dataMap.values());    Collections.sort(result, new GroupByRowComparator(selectStatement));    return result;}@RequiredArgsConstructorpublic final class GroupByRowComparator implements Comparator<MemoryResultSetRow> {    private final SelectStatement selectStatement;    @Override    public int compare(final MemoryResultSetRow o1, final MemoryResultSetRow o2) {        if (!selectStatement.getOrderByItems().isEmpty()) {            return compare(o1, o2, selectStatement.getOrderByItems());        }        return compare(o1, o2, selectStatement.getGroupByItems());    }    ...}

到这里,GroupByMemoryResultSetMerger即内存GROUP聚合计算已经分析完成,依旧通过运行过程图解加深对GroupByMemoryResultSetMerger的理解,运行过程图如下图所示:

总结

正如GroupByMemoryResultSetMerger的名字一样,其实现原理是把所有结果加载到内存中,在内存中进行计算,而GroupByMemoryResultSetMerger是流式计算方法,并不需要加载所有实际表返回的结果到内存中。这样的话,如果SQL返回的总结果数比较多,GroupByMemoryResultSetMerger的处理方式就可能会撑爆内存;这个是使用sharding-jdbc一个非常需要注意的地方;

END

原文发布于微信公众号 - Java技术驿站(chenssy89)

原文发表时间:2018-05-27

本文参与腾讯云自媒体分享计划,欢迎正在阅读的你也加入,一起分享。

发表于

我来说两句

0 条评论
登录 后参与评论

相关文章

来自专栏Java成神之路

HQL语句大全

Hibernate配备了一种非常强大的查询语言,这种语言看上去很像SQL。但是不要被语法结构 上的相似所迷惑,HQL是非常有意识的被设计为完全面向对象的查询,它...

1805
来自专栏GreenLeaves

SQL学习之高级数据过滤

一、高级数据过滤之IN操作符 IN 操作符用来指定条件范围,范围中的每个条件都可以进行匹配。IN取一组由逗号分隔、括在圆括号中的合法值。代码如下: select...

2185
来自专栏三丰SanFeng

扩展mysql - 手把手教你写udf

使用过MySQL的人都知道,MySQL有很多内置函数提供给使用者,包括字符串函数、数值函数、日期和时间函数等,给开发人员和使用者带来了很多方便。

6615
来自专栏钟绍威的专栏

递归思想解决输出目录下的全部文件

刚刚了解了下递归思想 递归就是在方法内调用本方法 下面说一个实际的应用 输出目录下的全部文件,当目录中还有目录时,则进入目录输出里面的文件 import j...

1987
来自专栏应兆康的专栏

Python Web - Flask笔记5

MySQL Workbench是一款专为MySQL设计的ER/数据库建模工具。它是著名的数据库设计工具DBDesigner4的继任者。你可以用MySQL Wor...

1831
来自专栏程序员的SOD蜜

使用操作符重载,生成ORM实体类的SQL条件语句

ORM框架的一个不可或缺的功能就是根据实体类,生成操作数据库的SQL语句,这其中,最难处理的就是那些复杂的SQL条件比较语句。比如,有下面这样一个SQL语句: ...

23310
来自专栏JAVA后端开发

给mybatis添加自动建表,自动加字段的功能

以前项目用惯了hibernate,jpa,它有个自动建表功能,只要在PO里加上配置就可以了,感觉很爽. 但现在用mybatis,发现没有该功能,每次都加个字段...

5193
来自专栏简书专栏

Python程序结构2

上面一段代码的运行结果如下,从下面的结果可以看出速度差距为2、3倍左右,当数据量更大时,可能差距更大。:

1123
来自专栏spring源码深度学习

java基础io流——File的告白(重温经典)

创建成功返回true,如果存在就不创建返回false,创建一个文件时需要确保当前文件夹存在,所有要异常处理。

1683
来自专栏风口上的猪的文章

.NET面试题系列[13] - LINQ to Object

"C# 3.0所有特性的提出都是更好地为LINQ服务的" - Learning Hard

1202

扫码关注云+社区

领取腾讯云代金券