数据库中间件 MyCAT 源码分析 —— 【单库单表】查询

本文主要基于 MyCAT 1.6.5 正式版

  • 1. 概述
  • 2. 接收请求,解析 SQL
  • 3. 获得路由结果
  • 4. 获得 MySQL 连接,执行 SQL
  • 5. 响应执行 SQL 结果
  • 6. 其他 :更新 / 删除
1. 概述

内容形态以 顺序图 + 核心代码 为主。 如果有地方表述不错误或者不清晰,欢迎留言。 对于内容形态,非常纠结,如果有建议,特别特别特别欢迎您提出。 微信号:wangwenbin-server。

本文讲解 【单库单表】查询 所涉及到的代码。

?内容和 《MyCAT 源码分析 —— 【单库单表】插入》 超级相似,一方面本身流程基本相同,另外一方面文章结构没拆分好。我们使用 ? 标记差异的逻辑。

交互如下图:

单库单表查询简图

整个过程,MyCAT Server 流程如下:

  1. 接收 MySQL Client 请求,解析 SQL。
  2. 获得路由结果,进行路由。
  3. 获得 MySQL 连接,执行 SQL。
  4. 响应执行结果,发送结果给 MySQL Client。

我们逐个步骤分析,一起来看看源码。

2. 接收请求,解析 SQL

【单库单表】查询(01主流程)

【1 - 2】

接收一条 MySQL 命令。在【1】之前,还有请求数据读取、拆成单条 MySQL SQL。

【3】

  1: // ⬇️⬇️⬇️【FrontendCommandHandler.java】
  2: public class FrontendCommandHandler implements NIOHandler {
  3: 
  4:     @Override
  5:     public void handle(byte[] data) {
  6:     
  7:         // .... 省略部分代码
  8:         switch (data[4]) // 
  9:         {
 10:             case MySQLPacket.COM_INIT_DB:
 11:                 commands.doInitDB();
 12:                 source.initDB(data);
 13:                 break;
 14:             case MySQLPacket.COM_QUERY: // 查询命令
 15:                 // 计数查询命令
 16:                 commands.doQuery();
 17:                 // 执行查询命令
 18:                 source.query(data);
 19:                 break;
 20:             case MySQLPacket.COM_PING:
 21:                 commands.doPing();
 22:                 source.ping();
 23:                 break;
 24:             // .... 省略部分case
 25:         }
 26:     }
 27: 
 28: }

INSERT/SELECT/UPDATE/DELETE 等 SQL 归属于 MySQLPacket.COM_QUERY,详细可见:《MySQL协议分析#4.2 客户端命令请求报文(客户端 -> 服务器)》。

【4】

将 二进制数组 解析成 SQL。核心代码如下:

  1: // ⬇️⬇️⬇️【FrontendConnection.java】
  2: public void query(byte[] data) {
  3:     // 取得语句
  4:     String sql = null;      
  5:     try {
  6:         MySQLMessage mm = new MySQLMessage(data);
  7:         mm.position(5);
  8:         sql = mm.readString(charset);
  9:     } catch (UnsupportedEncodingException e) {
 10:         writeErrMessage(ErrorCode.ER_UNKNOWN_CHARACTER_SET, "Unknown charset '" + charset + "'");
 11:         return;
 12:     }       
 13:     // 执行语句
 14:     this.query( sql );
 15: }

【5】

解析 SQL 类型。核心代码如下:

  1: // ⬇️⬇️⬇️【ServerQueryHandler.java】
  2: @Override
  3: public void query(String sql) {
  4:     // 解析 SQL 类型
  5:     int rs = ServerParse.parse(sql);
  6:     int sqlType = rs & 0xff;
  7:     
  8:     switch (sqlType) {
  9:     //explain sql
 10:     case ServerParse.EXPLAIN:
 11:         ExplainHandler.handle(sql, c, rs >>> 8);
 12:         break;
 13:     // .... 省略部分case
 14:         break;
 15:     case ServerParse.SELECT:
 16:         SelectHandler.handle(sql, c, rs >>> 8);
 17:         break;
 18:     // .... 省略部分case
 19:     default:
 20:         if(readOnly){
 21:             LOGGER.warn(new StringBuilder().append("User readonly:").append(sql).toString());
 22:             c.writeErrMessage(ErrorCode.ER_USER_READ_ONLY, "User readonly");
 23:             break;
 24:         }
 25:         c.execute(sql, rs & 0xff);
 26:     }
 27: }
 28: 
 29:
 30: // ⬇️⬇️⬇️【ServerParse.java】
 31: public static int parse(String stmt) {
 32:     int length = stmt.length();
 33:     //FIX BUG FOR SQL SUCH AS /XXXX/SQL
 34:     int rt = -1;
 35:     for (int i = 0; i < length; ++i) {
 36:         switch (stmt.charAt(i)) {
 37:         // .... 省略部分case            case 'I':
 38:         case 'i':
 39:             rt = insertCheck(stmt, i);
 40:             if (rt != OTHER) {
 41:                 return rt;
 42:             }
 43:             continue;
 44:             // .... 省略部分case
 45:         case 'S':
 46:         case 's':
 47:             rt = sCheck(stmt, i);
 48:             if (rt != OTHER) {
 49:                 return rt;
 50:             }
 51:             continue;
 52:             // .... 省略部分case
 53:         default:
 54:             continue;
 55:         }
 56:     }
 57:     return OTHER;
 58: }

?【6】【7】

解析 Select SQL 类型,分发到对应的逻辑。核心代码如下:

  1: // ⬇️⬇️⬇️【SelectHandler.java】
  2: public static void handle(String stmt, ServerConnection c, int offs) {
  3:     int offset = offs;
  4:     switch (ServerParseSelect.parse(stmt, offs)) { // 解析 Select SQL 类型
  5:     case ServerParseSelect.VERSION_COMMENT: // select @@VERSION_COMMENT;
  6:         SelectVersionComment.response(c);
  7:         break;
  8:     case ServerParseSelect.DATABASE: // select DATABASE();
  9:         SelectDatabase.response(c);
 10:         break;
 11:     case ServerParseSelect.USER: // select CURRENT_USER();
 12:         SelectUser.response(c);
 13:         break;
 14:     case ServerParseSelect.VERSION: // select VERSION();
 15:         SelectVersion.response(c);
 16:         break;
 17:     case ServerParseSelect.SESSION_INCREMENT: // select @@session.auto_increment_increment;
 18:         SessionIncrement.response(c);
 19:         break;
 20:     case ServerParseSelect.SESSION_ISOLATION: // select @@session.tx_isolation;
 21:         SessionIsolation.response(c);
 22:         break;
 23:     case ServerParseSelect.LAST_INSERT_ID: // select LAST_INSERT_ID();
 24:         // ....省略代码
 25:         break;
 26:     case ServerParseSelect.IDENTITY: // select @@identity
 27:         // ....省略代码
 28:         break;
 29:     case ServerParseSelect.SELECT_VAR_ALL: //
 30:         SelectVariables.execute(c,stmt);
 31:             break;
 32:     case ServerParseSelect.SESSION_TX_READ_ONLY: //
 33:         SelectTxReadOnly.response(c);
 34:             break;
 35:     default: // 其他,例如 select * from table
 36:         c.execute(stmt, ServerParse.SELECT);
 37:     }
 38: }
 39: // ⬇️⬇️⬇️【ServerParseSelect.java】
 40: public static int parse(String stmt, int offset) {
 41:     int i = offset;
 42:     for (; i < stmt.length(); ++i) {
 43:         switch (stmt.charAt(i)) {
 44:         case ' ':
 45:             continue;
 46:         case '/':
 47:         case '#':
 48:             i = ParseUtil.comment(stmt, i);
 49:             continue;
 50:         case '@':
 51:             return select2Check(stmt, i);
 52:         case 'D':
 53:         case 'd':
 54:             return databaseCheck(stmt, i);
 55:         case 'L':
 56:         case 'l':
 57:             return lastInsertCheck(stmt, i);
 58:         case 'U':
 59:         case 'u':
 60:             return userCheck(stmt, i);
 61:         case 'C':
 62:         case 'c':
 63:             return currentUserCheck(stmt, i);
 64:         case 'V':
 65:         case 'v':
 66:             return versionCheck(stmt, i);
 67:         default:
 68:             return OTHER;
 69:         }
 70:     }
 71:     return OTHER;
 72: }

【8】

执行 SQL,详细解析见下文,核心代码如下:

  1: // ⬇️⬇️⬇️【ServerConnection.java】
  2: public class ServerConnection extends FrontendConnection {
  3:     public void execute(String sql, int type) {
  4:         // .... 省略代码
  5:         SchemaConfig schema = MycatServer.getInstance().getConfig().getSchemas().get(db);
  6:         if (schema == null) {
  7:             writeErrMessage(ErrorCode.ERR_BAD_LOGICDB,
  8:                     "Unknown MyCAT Database '" + db + "'");
  9:             return;
 10:         }
 11: 
 12:         // .... 省略代码
 13: 
 14:         // 路由到后端数据库,执行 SQL
 15:         routeEndExecuteSQL(sql, type, schema);
 16:     }
 17:     
 18:     public void routeEndExecuteSQL(String sql, final int type, final SchemaConfig schema) {
 19:         // 路由计算
 20:         RouteResultset rrs = null;
 21:         try {
 22:             rrs = MycatServer
 23:                     .getInstance()
 24:                     .getRouterservice()
 25:                     .route(MycatServer.getInstance().getConfig().getSystem(),
 26:                             schema, type, sql, this.charset, this);
 27: 
 28:         } catch (Exception e) {
 29:             StringBuilder s = new StringBuilder();
 30:             LOGGER.warn(s.append(this).append(sql).toString() + " err:" + e.toString(),e);
 31:             String msg = e.getMessage();
 32:             writeErrMessage(ErrorCode.ER_PARSE_ERROR, msg == null ? e.getClass().getSimpleName() : msg);
 33:             return;
 34:         }
 35: 
 36:         // 执行 SQL
 37:         if (rrs != null) {
 38:             // session执行
 39:             session.execute(rrs, rrs.isSelectForUpdate() ? ServerParse.UPDATE : type);
 40:         }
 41:         
 42:      }
 43: 
 44: }

3. 获得路由结果

【单库单表】插入(02获取路由)

【 1 - 5 】

获得路由主流程。核心代码如下:

  1: // ⬇️⬇️⬇️【SelectHandler.java】
  2: public RouteResultset route(SystemConfig sysconf, SchemaConfig schema,
  3:         int sqlType, String stmt, String charset, ServerConnection sc)
  4:         throws SQLNonTransientException {
  5:     RouteResultset rrs = null;
  6: 
  7:     // SELECT 类型的SQL, 检测缓存是否存在
  8:     if (sqlType == ServerParse.SELECT) {
  9:         cacheKey = schema.getName() + stmt;         
 10:         rrs = (RouteResultset) sqlRouteCache.get(cacheKey);
 11:         if (rrs != null) {
 12:             checkMigrateRule(schema.getName(),rrs,sqlType);
 13:             return rrs;
 14:             }
 15:         }
 16:     }
 17: 
 18:     // .... 省略代码
 19:     int hintLength = RouteService.isHintSql(stmt);
 20:     if(hintLength != -1){ // TODO 待读:hint
 21:         // .... 省略代码
 22:         }
 23:     } else {
 24:         stmt = stmt.trim();
 25:         rrs = RouteStrategyFactory.getRouteStrategy().route(sysconf, schema, sqlType, stmt,
 26:                 charset, sc, tableId2DataNodeCache);
 27:     }
 28: 
 29:     // 记录查询命令路由结果缓存
 30:     if (rrs != null && sqlType == ServerParse.SELECT && rrs.isCacheAble()) {
 31:         sqlRouteCache.putIfAbsent(cacheKey, rrs);
 32:     }
 33:     // .... 省略代码        return rrs;
 34: }
 35: // ⬇️⬇️⬇️【AbstractRouteStrategy.java】
 36: @Override
 37: public RouteResultset route(SystemConfig sysConfig, SchemaConfig schema, int sqlType, String origSQL,
 38:         String charset, ServerConnection sc, LayerCachePool cachePool) throws SQLNonTransientException {
 39: 
 40:     // .... 省略代码
 41: 
 42:     // 处理一些路由之前的逻辑;全局序列号,父子表插入
 43:     if (beforeRouteProcess(schema, sqlType, origSQL, sc) ) {
 44:         return null;
 45:     }
 46: 
 47:     // .... 省略代码
 48: 
 49:     // 检查是否有分片
 50:     if (schema.isNoSharding() && ServerParse.SHOW != sqlType) {
 51:         rrs = RouterUtil.routeToSingleNode(rrs, schema.getDataNode(), stmt);
 52:     } else {
 53:         RouteResultset returnedSet = routeSystemInfo(schema, sqlType, stmt, rrs);
 54:         if (returnedSet == null) {
 55:             rrs = routeNormalSqlWithAST(schema, stmt, rrs, charset, cachePool,sqlType,sc);
 56:         }
 57:     }
 58: 
 59:     return rrs;
 60: }

?【3】第 7 至 16 行 :当 Select SQL 存在路由结果缓存时,直接返回缓存。 ?【6】第 29 至 32 行 :记录 Select SQL 路由结果到缓存。

路由 详细解析,我们另开文章,避免内容过多,影响大家对【插入】流程和逻辑的理解。

4. 获得 MySQL 连接,执行 SQL

【单库单表】查询(03执行 SQL)

【 1 - 8 】

获得 MySQL 连接。

  • PhysicalDBNode :物理数据库节点。
  • PhysicalDatasource :物理数据库数据源。

【 9 - 13 】

发送 SQL 到 MySQL Server,执行 SQL。

? 5. 响应执行 SQL 结果

【单库单表】查询(04执行响应)

核心代码如下:

  1: // ⬇️⬇️⬇️【MySQLConnectionHandler.java】
  2: @Override
  3: protected void handleData(byte[] data) {
  4:     switch (resultStatus) {
  5:     case RESULT_STATUS_INIT:
  6:         switch (data[4]) {
  7:         case OkPacket.FIELD_COUNT:
  8:             handleOkPacket(data);
  9:             break;
 10:         case ErrorPacket.FIELD_COUNT:
 11:             handleErrorPacket(data);
 12:             break;
 13:         case RequestFilePacket.FIELD_COUNT:
 14:             handleRequestPacket(data);
 15:             break;
 16:         default: // 初始化 header fields
 17:             resultStatus = RESULT_STATUS_HEADER;
 18:             header = data;
 19:             fields = new ArrayList<byte[]>((int) ByteUtil.readLength(data,
 20:                     4));
 21:         }
 22:         break;
 23:     case RESULT_STATUS_HEADER:
 24:         switch (data[4]) {
 25:         case ErrorPacket.FIELD_COUNT:
 26:             resultStatus = RESULT_STATUS_INIT;
 27:             handleErrorPacket(data);
 28:             break;
 29:         case EOFPacket.FIELD_COUNT: // 解析 fields 结束
 30:             resultStatus = RESULT_STATUS_FIELD_EOF;
 31:             handleFieldEofPacket(data);
 32:             break;
 33:         default: // 解析 fields
 34:             fields.add(data);
 35:         }
 36:         break;
 37:     case RESULT_STATUS_FIELD_EOF:
 38:         switch (data[4]) {
 39:         case ErrorPacket.FIELD_COUNT:
 40:             resultStatus = RESULT_STATUS_INIT;
 41:             handleErrorPacket(data);
 42:             break;
 43:         case EOFPacket.FIELD_COUNT: // 解析 每行记录 结束
 44:             resultStatus = RESULT_STATUS_INIT;
 45:             handleRowEofPacket(data);
 46:             break;
 47:         default: // 每行记录
 48:             handleRowPacket(data);
 49:         }
 50:         break;
 51:     default:
 52:         throw new RuntimeException("unknown status!");
 53:     }
 54: }

6. 其他 :更新 / 删除

流程基本和 《MyCAT源码分析:【单库单表】插入》 相同。我们就不另外文章解析。

原文发布于微信公众号 - 芋道源码(YunaiV)

原文发表时间:2018-02-14

本文参与腾讯云自媒体分享计划,欢迎正在阅读的你也加入,一起分享。

发表于

我来说两句

0 条评论
登录 后参与评论

相关文章

来自专栏杨建荣的学习笔记

基于时间点的不完全恢复的例子(r6笔记第9天)

说到不完全恢复,一般有三种场景,基于时间点的不完全恢复,基于scn的不完全恢复,基于cancel的不完全恢复。 三种情况都是不完全恢复采用的方式,而不完全恢复都...

2695
来自专栏社区的朋友们

MySQL 入门常用命令大全(上)

作为一个 MySQL 的初学者,在短短的几个月中接触了一下,记录了一下工作中用到的 SQL 语句以及未来可能会用到的 MySQL 知识点,作为日后的参考手册。因...

9411
来自专栏杨建荣的学习笔记

一个SQL性能问题的优化探索(一)(r11笔记第33天)

今天同事问我一个问题,看起来比较常规,但是仔细分析了一圈,发现实在是有些晕,我隐隐感觉这是一个bug,但是有感觉问题还有很多需要确认和理解的细节。 同事...

3519
来自专栏沃趣科技

MVCC原理探究及MySQL源码实现分析

目录预览 数据库多版本读场景 MVCC实现原理 1、通过DB_ROLL_PT 回溯查找数据历史版本 2、通过read view判断行...

5848
来自专栏C/C++基础

MySQL入门常用命令大全

SQL(Structured Query Language)是结构化查询语言,也是一种高级的非过程化编程语言。SQL语句可用于增删查改数据以及管理关系型数据库,...

1142
来自专栏芋道源码1024

数据库中间件 MyCAT源码分析:【单库单表】插入

本文主要基于 MyCAT 1.6.5 正式版 1. 概述 2. 接收请求,解析 SQL 3. 获得路由结果 4. 获得 MySQL 连接,执行 SQL 5. 响...

52212
来自专栏数据和云

举一反三-分区裁剪作用的“新”发现

作者介绍 ? 赵勇 云和恩墨北区技术工程师 专注于SQL审核和优化相关工作。曾经服务的客户涉及金融保险、电信运营商、政府、生产制造等行业。 分区裁剪的定义 分区...

28110
来自专栏逸鹏说道

程序猿是如何解决SQLServer占CPU100%的

文章目录 遇到的问题 使用SQLServer Profiler监控数据库 SQL1:查找最新的30条告警事件 SQL2:获取当前的总报警记录数 有哪些SQL语句...

3748
来自专栏乐沙弥的世界

批量迁移Oracle数据文件,日志文件及控制文件

   有些时候需要将Oracle的多个数据文件以及日志文件重定位或者迁移到新的分区或新的位置,比如磁盘空间不足,或因为特殊需求。对于这种情形可以采取批量迁移的方...

1122
来自专栏程序猿

mysql 命令大全

1、连接到本机上的MYSQL。 首先打开DOS窗口,然后进入目录mysql\bin,再键入命令mysql -u root -p,回车后提示你输密码.注意用户名前...

36111

扫码关注云+社区

领取腾讯云代金券