数据库分库分表中间件 Sharding-JDBC 源码分析 —— SQL 改写

本文主要基于 Sharding-JDBC 1.5.0 正式版

  • 1. 概述

  • 2. SQLToken

  • 3.SQL 改写

    • 3.4.1 分页补充

    • 3.1 TableToken

    • 3.2 ItemsToken

    • 3.3 OffsetToken

    • 3.4 RowCountToken

    • 3.5 OrderByToken

    • 3.6 GeneratedKeyToken

  • 4. SQL 生成

  • 666. 彩蛋

数据库分库分表中间件 Sharding-JDBC 源码分析 —— SQL 改写

🙂🙂🙂关注微信公众号:【芋道源码】有福利:

  1. RocketMQ / MyCAT / Sharding-JDBC 所有源码分析文章列表

  2. RocketMQ / MyCAT / Sharding-JDBC 中文注释源码 GitHub 地址

  3. 您对于源码的疑问每条留言将得到认真回复。甚至不知道如何读源码也可以请教噢

  4. 新的源码解析文章实时收到通知。每周更新一篇左右

  5. 认真的源码交流微信群。


1. 概述

前置阅读:《SQL 解析(三)之查询SQL》

本文分享SQL 改写的源码实现。主要涉及两方面:

  1. SQL 改写:改写 SQL,解决分库分表后,查询结果需要聚合,需要对 SQL 进行调整,例如分页

  2. SQL 生成:生成分表分库的执行 SQL

SQLRewriteEngine,SQL重写引擎,实现 SQL 改写、生成功能。从 Sharding-JDBC 1.5.0 版本,SQL 改写进行了调整和大量优化。

1.4.x及之前版本,SQL改写是在SQL路由之前完成的,在1.5.x中调整为SQL路由之后,因为SQL改写可以根据路由至单库表还是多库表而进行进一步优化。

😆 很多同学看完《SQL 解析-系列》 可能是一脸懵逼,特别对“SQL 半理解”数据库分库分表中间件 Sharding-JDBC 源码分析 —— SQL 改写希望本文能给你一些启发。

Sharding-JDBC 正在收集使用公司名单:传送门。
🙂 你的登记,会让更多人参与和使用 Sharding-JDBC。传送门
Sharding-JDBC 也会因此,能够覆盖更多的业务场景。传送门
登记吧,骚年!传送门

2. SQLToken

😁 SQLToken 在本文中很重要,所以即使在《SQL 解析-系列》已经分享过,我们也换个姿势,再来一次。

SQLToken,SQL标记对象接口。SQLRewriteEngine 基于 SQLToken 实现 SQL改写。SQL解析器在 SQL解析过程中,很重要的一个目的是标记需要SQL改写的部分,也就是 SQLToken。

数据库分库分表中间件 Sharding-JDBC 源码分析 —— SQL 改写

各 SQLToken 生成条件如下(悲伤,做成表格形式排版是乱的):

  1. GeneratedKeyToken 自增主键标记对象

    • 插入SQL自增列不存在: INSERT INTO t_order(nickname)VALUES... 中没有自增列 order_id

  2. TableToken 表标记对象

    • 查询列的表别名: SELECT o.order_id 的 o

    • 查询的表名: SELECT*FROM t_order 的 t_order

  3. ItemsToken 选择项标记对象

    • AVG查询列: SELECT AVG(price)FROM t_order 的 AVG(price)

    • ORDER BY 字段不在查询列: SELECT order_id FROM t_order ORDER BY create_time 的 create_time

    • GROUP BY 字段不在查询列: SELECT COUNT(order_id)FROM t_order GROUP BY user_id 的 user_id

    • 自增主键未在插入列中: INSERT INTO t_order(nickname)VALUES... 中没有自增列 order_id

  4. OffsetToken 分页偏移量标记对象

    • 分页有偏移量,但不是占位符 ?

  5. RowCountToken 分页长度标记对象

    • 分页有长度,但不是占位符 ?

  6. OrderByToken 排序标记对象

    • 有 GROUP BY 条件,无 ORDER BY 条件: SELECT COUNT(*)FROM t_order GROUP BY order_id 的 order_id

3.SQL 改写

SQLRewriteEngine#rewrite() 实现了 SQL改写 功能。

  1. // SQLRewriteEngine.java

  2. /**

  3. * SQL改写.

  4. * @param isRewriteLimit 是否重写Limit

  5. * @return SQL构建器

  6. */

  7. public SQLBuilder rewrite(final boolean isRewriteLimit) {

  8.   SQLBuilder result = new SQLBuilder();

  9.   if (sqlTokens.isEmpty()) {

  10.       result.appendLiterals(originalSQL);

  11.       return result;

  12.   }

  13.   int count = 0;

  14.   // 排序SQLToken,按照 beginPosition 递增

  15.   sortByBeginPosition();

  16.   for (SQLToken each : sqlTokens) {

  17.       if (0 == count) { // 拼接第一个 SQLToken 前的字符串

  18.           result.appendLiterals(originalSQL.substring(0, each.getBeginPosition()));

  19.       }

  20.       // 拼接每个SQLToken

  21.       if (each instanceof TableToken) {

  22.           appendTableToken(result, (TableToken) each, count, sqlTokens);

  23.       } else if (each instanceof ItemsToken) {

  24.           appendItemsToken(result, (ItemsToken) each, count, sqlTokens);

  25.       } else if (each instanceof RowCountToken) {

  26.           appendLimitRowCount(result, (RowCountToken) each, count, sqlTokens, isRewriteLimit);

  27.       } else if (each instanceof OffsetToken) {

  28.           appendLimitOffsetToken(result, (OffsetToken) each, count, sqlTokens, isRewriteLimit);

  29.       } else if (each instanceof OrderByToken) {

  30.           appendOrderByToken(result);

  31.       }

  32.       count++;

  33.   }

  34.   return result;

  35. }

  • SQL改写以 SQLToken 为间隔顺序改写。

    • 顺序:调用 #sortByBeginPosition() 将 SQLToken 按照 beginPosition 升序

    • 间隔:遍历 SQLToken,逐个拼接。

例如:数据库分库分表中间件 Sharding-JDBC 源码分析 —— SQL 改写


SQLBuilder,SQL构建器。下文会大量用到,我们看下实现代码。

  1. public final class SQLBuilder {

  2.    /**

  3.     * 段集合

  4.     */

  5.    private final List<Object> segments;

  6.    /**

  7.     * 当前段

  8.     */

  9.    private StringBuilder currentSegment;

  10.    public SQLBuilder() {

  11.        segments = new LinkedList<>();

  12.        currentSegment = new StringBuilder();

  13.        segments.add(currentSegment);

  14.    }

  15.    /**

  16.     * 追加字面量.

  17.     *

  18.     * @param literals 字面量

  19.     */

  20.    public void appendLiterals(final String literals) {

  21.        currentSegment.append(literals);

  22.    }

  23.    /**

  24.     * 追加表占位符.

  25.     *

  26.     * @param tableName 表名称

  27.     */

  28.    public void appendTable(final String tableName) {

  29.        // 添加 TableToken

  30.        segments.add(new TableToken(tableName));

  31.        // 新建当前段

  32.        currentSegment = new StringBuilder();

  33.        segments.add(currentSegment);

  34.    }

  35.    public String toSQL(final Map<String, String> tableTokens) {

  36.        // ... 省略代码,【SQL生成】处分享

  37.    }

  38.    @RequiredArgsConstructor

  39.    private class TableToken {

  40.        /**

  41.         * 表名

  42.         */

  43.        private final String tableName;

  44.    }

  45. }


现在我们来逐个分析每种 SQLToken 的拼接实现。

3.1 TableToken

调用 #appendTableToken() 方法拼接。

  1. // SQLRewriteEngine.java

  2. /**

  3. * 拼接 TableToken

  4. *

  5. * @param sqlBuilder SQL构建器

  6. * @param tableToken tableToken

  7. * @param count tableToken 在 sqlTokens 的顺序

  8. * @param sqlTokens sqlTokens

  9. */

  10. private void appendTableToken(final SQLBuilder sqlBuilder, final TableToken tableToken, final int count, final List<SQLToken> sqlTokens) {

  11.   // 拼接 TableToken

  12.   String tableName = sqlStatement.getTables().getTableNames().contains(tableToken.getTableName()) ? tableToken.getTableName() : tableToken.getOriginalLiterals();

  13.   sqlBuilder.appendTable(tableName);

  14.   // 拼接 SQLToken 后面的字符串

  15.   int beginPosition = tableToken.getBeginPosition() + tableToken.getOriginalLiterals().length();

  16.   int endPosition = sqlTokens.size() - 1 == count ? originalSQL.length() : sqlTokens.get(count + 1).getBeginPosition();

  17.   sqlBuilder.appendLiterals(originalSQL.substring(beginPosition, endPosition));

  18. }

  • 调用 SQLBuilder#appendTable() 拼接 TableToken。

  • sqlStatement.getTables().getTableNames().contains(tableToken.getTableName()) 目的是处理掉表名前后有的特殊字符,例如 SELECT*FROM't_order' 中 t_order 前后有 ' 符号。

  1. // TableToken.java

  2. /**

  3. * 获取表名称.

  4. */

  5. public String getTableName() {

  6.   return SQLUtil.getExactlyValue(originalLiterals);

  7. }

  8. // SQLUtil.java

  9. public static String getExactlyValue(final String value) {

  10.   return null == value ? null : CharMatcher.anyOf("[]`'"").removeFrom(value);

  11. }

  • 当 SQL 为 SELECT o.*FROM t_order o

    • TableToken 为查询列前的表别名 o 时返回结果: 数据库分库分表中间件 Sharding-JDBC 源码分析 —— SQL 改写

    • TableToken 为表名 t_order 时返回结果: 数据库分库分表中间件 Sharding-JDBC 源码分析 —— SQL 改写

3.2 ItemsToken

调用 #appendItemsToken() 方法拼接。

  1. // SQLRewriteEngine.java

  2. /**

  3. * 拼接 TableToken

  4. *

  5. * @param sqlBuilder SQL构建器

  6. * @param itemsToken itemsToken

  7. * @param count itemsToken 在 sqlTokens 的顺序

  8. * @param sqlTokens sqlTokens

  9. */

  10. private void appendItemsToken(final SQLBuilder sqlBuilder, final ItemsToken itemsToken, final int count, final List<SQLToken> sqlTokens) {

  11.   // 拼接 ItemsToken

  12.   for (String item : itemsToken.getItems()) {

  13.       sqlBuilder.appendLiterals(", ");

  14.       sqlBuilder.appendLiterals(item);

  15.   }

  16.   // SQLToken 后面的字符串

  17.   int beginPosition = itemsToken.getBeginPosition();

  18.   int endPosition = sqlTokens.size() - 1 == count ? originalSQL.length() : sqlTokens.get(count + 1).getBeginPosition();

  19.   sqlBuilder.appendLiterals(originalSQL.substring(beginPosition, endPosition));

  20. }

  • 第一种情况,AVG查询列,SQL 为 SELECT AVG(order_id)FROM t_order o 时返回结果: 数据库分库分表中间件 Sharding-JDBC 源码分析 —— SQL 改写

  • 第二种情况,ORDER BY 字段不在查询列,SQL 为 SELECT userId FROM t_order o ORDER BY order_id 时返回结果: 数据库分库分表中间件 Sharding-JDBC 源码分析 —— SQL 改写

  • 第三种情况,GROUP BY 字段不在查询列,类似第二种情况,就不举例子列。

3.3 OffsetToken

调用 #appendLimitOffsetToken() 方法拼接。

  1. // SQLRewriteEngine.java

  2. /**

  3. * 拼接 OffsetToken

  4. *

  5. * @param sqlBuilder SQL构建器

  6. * @param offsetToken offsetToken

  7. * @param count offsetToken 在 sqlTokens 的顺序

  8. * @param sqlTokens sqlTokens

  9. * @param isRewrite 是否重写。当路由结果为单分片时无需重写

  10. */

  11. private void appendLimitOffsetToken(final SQLBuilder sqlBuilder, final OffsetToken offsetToken, final int count, final List<SQLToken> sqlTokens, final boolean isRewrite) {

  12.   // 拼接 OffsetToken

  13.   sqlBuilder.appendLiterals(isRewrite ? "0" : String.valueOf(offsetToken.getOffset()));

  14.   // SQLToken 后面的字符串

  15.   int beginPosition = offsetToken.getBeginPosition() + String.valueOf(offsetToken.getOffset()).length();

  16.   int endPosition = sqlTokens.size() - 1 == count ? originalSQL.length() : sqlTokens.get(count + 1).getBeginPosition();

  17.   sqlBuilder.appendLiterals(originalSQL.substring(beginPosition, endPosition));

  18. }

  • 当分页跨分片时,需要每个分片都查询后在内存中进行聚合。此时 isRewrite=true。为什么是 "0" 开始呢?每个分片在 [0, offset) 的记录可能属于实际分页结果,因而查询每个分片需要从 0 开始。

  • 当分页单分片时,则无需重写,该分片执行的结果即是最终结果。SQL改写在SQL路由之后就有这个好处。如果先改写,因为没办法知道最终是单分片还是跨分片,考虑正确性,只能统一使用跨分片。

3.4 RowCountToken

调用 #appendLimitRowCount() 方法拼接。

  1. // SQLRewriteEngine.java

  2. private void appendLimitRowCount(final SQLBuilder sqlBuilder, final RowCountToken rowCountToken, final int count, final List<SQLToken> sqlTokens, final boolean isRewrite) {

  3.   SelectStatement selectStatement = (SelectStatement) sqlStatement;

  4.   Limit limit = selectStatement.getLimit();

  5.   if (!isRewrite) { // 路由结果为单分片

  6.       sqlBuilder.appendLiterals(String.valueOf(rowCountToken.getRowCount()));

  7.   } else if ((!selectStatement.getGroupByItems().isEmpty() || // [1.1] 跨分片分组需要在内存计算,可能需要全部加载

  8.           !selectStatement.getAggregationSelectItems().isEmpty()) // [1.2] 跨分片聚合列需要在内存计算,可能需要全部加载

  9.           && !selectStatement.isSameGroupByAndOrderByItems()) { // [2] 如果排序一致,即各分片已经排序好结果,就不需要全部加载

  10.       sqlBuilder.appendLiterals(String.valueOf(Integer.MAX_VALUE));

  11.   } else { // 路由结果为多分片

  12.       sqlBuilder.appendLiterals(String.valueOf(limit.isRowCountRewriteFlag() ? rowCountToken.getRowCount() + limit.getOffsetValue() : rowCountToken.getRowCount()));

  13.   }

  14.   // SQLToken 后面的字符串

  15.   int beginPosition = rowCountToken.getBeginPosition() + String.valueOf(rowCountToken.getRowCount()).length();

  16.   int endPosition = sqlTokens.size() - 1 == count ? originalSQL.length() : sqlTokens.get(count + 1).getBeginPosition();

  17.   sqlBuilder.appendLiterals(originalSQL.substring(beginPosition, endPosition));

  18. }

  • [1.1] !selectStatement.getGroupByItems().isEmpty() 跨分片分组需要在内存计算,可能需要全部加载。如果不全部加载,部分结果被分页条件错误结果,会导致结果不正确。

  • [1.2] !selectStatement.getAggregationSelectItems().isEmpty()) 跨分片聚合列需要在内存计算,可能需要全部加载。如果不全部加载,部分结果被分页条件错误结果,会导致结果不正确。

  • [1.1][1.2],可能变成必须的前提是 GROUP BY 和 ORDER BY 排序不一致。如果一致,各分片已经排序完成,无需内存中排序。

3.4.1 分页补充

OffsetToken、RowCountToken 只有在分页对应位置非占位符 ? 才存在。当对应位置是占位符时,会对分页条件对应的预编译 SQL 占位符参数进行重写,整体逻辑和 OffsetToken、RowCountToken 是一致的

  1. // 👼 ParsingSQLRouter#route() 调用 #processLimit()

  2. // ParsingSQLRouter.java

  3. /**

  4. * 处理分页条件

  5. *

  6. * @see SQLRewriteEngine#appendLimitRowCount(SQLBuilder, RowCountToken, int, List, boolean)

  7. * @param parameters 占位符对应参数列表

  8. * @param selectStatement Select SQL语句对象

  9. * @param isSingleRouting 是否单表路由

  10. */

  11. private void processLimit(final List<Object> parameters, final SelectStatement selectStatement, final boolean isSingleRouting) {

  12.   boolean isNeedFetchAll = (!selectStatement.getGroupByItems().isEmpty() // // [1.1] 跨分片分组需要在内存计算,可能需要全部加载

  13.                               || !selectStatement.getAggregationSelectItems().isEmpty()) // [1.2] 跨分片聚合列需要在内存计算,可能需要全部加载

  14.                           && !selectStatement.isSameGroupByAndOrderByItems(); // [2] 如果排序一致,即各分片已经排序好结果,就不需要全部加载

  15.   selectStatement.getLimit().processParameters(parameters, !isSingleRouting, isNeedFetchAll);

  16. }

  17. // Limit.java

  18. /**

  19. * 填充改写分页参数.

  20. * @param parameters 参数

  21. * @param isRewrite 是否重写参数

  22. * @param isFetchAll 是否获取所有数据

  23. */

  24. public void processParameters(final List<Object> parameters, final boolean isRewrite, final boolean isFetchAll) {

  25.   fill(parameters);

  26.   if (isRewrite) {

  27.       rewrite(parameters, isFetchAll);

  28.   }

  29. }

  30. /**

  31. * 将占位符参数里是分页的参数赋值给 offset 、rowCount

  32. * 赋值的前提条件是 offset、rowCount 是 占位符

  33. * @param parameters 占位符参数

  34. */

  35. private void fill(final List<Object> parameters) {

  36.   int offset = 0;

  37.   if (null != this.offset) {

  38.       offset = -1 == this.offset.getIndex() ? getOffsetValue() : NumberUtil.roundHalfUp(parameters.get(this.offset.getIndex()));

  39.       this.offset.setValue(offset);

  40.   }

  41.   int rowCount = 0;

  42.   if (null != this.rowCount) {

  43.       rowCount = -1 == this.rowCount.getIndex() ? getRowCountValue() : NumberUtil.roundHalfUp(parameters.get(this.rowCount.getIndex()));

  44.       this.rowCount.setValue(rowCount);

  45.   }

  46.   if (offset < 0 || rowCount < 0) {

  47.       throw new SQLParsingException("LIMIT offset and row count can not be a negative value.");

  48.   }

  49. }

  50. /**

  51. * 重写分页条件对应的参数

  52. * @param parameters 参数

  53. * @param isFetchAll 是否拉取所有

  54. */

  55. private void rewrite(final List<Object> parameters, final boolean isFetchAll) {

  56.   int rewriteOffset = 0;

  57.   int rewriteRowCount;

  58.   // 重写

  59.   if (isFetchAll) {

  60.       rewriteRowCount = Integer.MAX_VALUE;

  61.   } else if (rowCountRewriteFlag) {

  62.       rewriteRowCount = null == rowCount ? -1 : getOffsetValue() + rowCount.getValue();

  63.   } else {

  64.       rewriteRowCount = rowCount.getValue();

  65.   }

  66.   // 参数设置

  67.   if (null != offset && offset.getIndex() > -1) {

  68.       parameters.set(offset.getIndex(), rewriteOffset);

  69.   }

  70.   if (null != rowCount && rowCount.getIndex() > -1) {

  71.       parameters.set(rowCount.getIndex(), rewriteRowCount);

  72.   }

  73. }

3.5 OrderByToken

调用 #appendOrderByToken() 方法拼接。数据库里,当无 ORDER BY条件 而有 GROUP BY 条件时候,会使用 GROUP BY条件将结果升序排序:

  • SELECT order_id FROM t_order GROUP BY order_id 等价于 SELECT order_id FROM t_order GROUP BY order_id ORDER BY order_id ASC

  • SELECT order_id FROM t_order GROUP BY order_id DESC 等价于 SELECT order_id FROM t_order GROUP BY order_id ORDER BY order_id DESC

  1. // ParsingSQLRouter.java

  2. /**

  3. * 拼接 OrderByToken

  4. *

  5. * @param sqlBuilder SQL构建器

  6. */

  7. private void appendOrderByToken(final SQLBuilder sqlBuilder) {

  8.   SelectStatement selectStatement = (SelectStatement) sqlStatement;

  9.   // 拼接 OrderByToken

  10.   StringBuilder orderByLiterals = new StringBuilder(" ORDER BY ");

  11.   int i = 0;

  12.   for (OrderItem each : selectStatement.getOrderByItems()) {

  13.       if (0 == i) {

  14.           orderByLiterals.append(each.getColumnLabel()).append(" ").append(each.getType().name());

  15.       } else {

  16.           orderByLiterals.append(",").append(each.getColumnLabel()).append(" ").append(each.getType().name());

  17.       }

  18.       i++;

  19.   }

  20.   orderByLiterals.append(" ");

  21.   sqlBuilder.appendLiterals(orderByLiterals.toString());

  22. }

  • 当 SQL 为 SELECT order_id FROM t_order o GROUP BY order_id 返回结果: 数据库分库分表中间件 Sharding-JDBC 源码分析 —— SQL 改写

3.6 GeneratedKeyToken

前置阅读:《SQL 解析(四)之插入SQL》

GeneratedKeyToken,和其它 SQLToken 不同,在 SQL解析 完进行处理。

  1. // ParsingSQLRouter.java

  2. @Override

  3. public SQLStatement parse(final String logicSQL, final int parametersSize) {

  4.   SQLParsingEngine parsingEngine = new SQLParsingEngine(databaseType, logicSQL, shardingRule);

  5.   Context context = MetricsContext.start("Parse SQL");

  6.   SQLStatement result = parsingEngine.parse();

  7.   if (result instanceof InsertStatement) { // 处理 GenerateKeyToken

  8.       ((InsertStatement) result).appendGenerateKeyToken(shardingRule, parametersSize);

  9.   }

  10.   MetricsContext.stop(context);

  11.   return result;

  12. }

  13. // InsertStatement.java

  14. /**

  15. * 追加自增主键标记对象.

  16. *

  17. * @param shardingRule 分片规则

  18. * @param parametersSize 参数个数

  19. */

  20. public void appendGenerateKeyToken(final ShardingRule shardingRule, final int parametersSize) {

  21.   // SQL 里有主键列

  22.   if (null != generatedKey) {

  23.       return;

  24.   }

  25.   // TableRule 存在

  26.   Optional<TableRule> tableRule = shardingRule.tryFindTableRule(getTables().getSingleTableName());

  27.   if (!tableRule.isPresent()) {

  28.       return;

  29.   }

  30.   // GeneratedKeyToken 存在

  31.   Optional<GeneratedKeyToken> generatedKeysToken = findGeneratedKeyToken();

  32.   if (!generatedKeysToken.isPresent()) {

  33.       return;

  34.   }

  35.   // 处理 GenerateKeyToken

  36.   ItemsToken valuesToken = new ItemsToken(generatedKeysToken.get().getBeginPosition());

  37.   if (0 == parametersSize) {

  38.       appendGenerateKeyToken(shardingRule, tableRule.get(), valuesToken);

  39.   } else {

  40.       appendGenerateKeyToken(shardingRule, tableRule.get(), valuesToken, parametersSize);

  41.   }

  42.   // 移除 generatedKeysToken

  43.   getSqlTokens().remove(generatedKeysToken.get());

  44.   // 新增 ItemsToken

  45.   getSqlTokens().add(valuesToken);

  46. }

  • 根据占位符参数数量不同,调用的 #appendGenerateKeyToken() 是不同的:

  • 占位符参数数量 = 0 时,直接生成分布式主键,保持无占位符的做法。

  1. // InsertStatement.java

  2. private void appendGenerateKeyToken(final ShardingRule shardingRule, final TableRule tableRule, final ItemsToken valuesToken) {

  3.   // 生成分布式主键

  4.   Number generatedKey = shardingRule.generateKey(tableRule.getLogicTable());

  5.   // 添加到 ItemsToken

  6.   valuesToken.getItems().add(generatedKey.toString());

  7.   // 增加 Condition,用于路由

  8.   getConditions().add(new Condition(new Column(tableRule.getGenerateKeyColumn(), tableRule.getLogicTable()), new SQLNumberExpression(generatedKey)), shardingRule);

  9.   // 生成 GeneratedKey

  10.   this.generatedKey = new GeneratedKey(tableRule.getLogicTable(), -1, generatedKey);

  11. }

  • 占位符参数数量 > 0 时,生成自增列的占位符,保持有占位符的做法。

  1. private void appendGenerateKeyToken(final ShardingRule shardingRule, final TableRule tableRule, final ItemsToken valuesToken, final int parametersSize) {

  2.   // 生成占位符

  3.   valuesToken.getItems().add("?");

  4.   // 增加 Condition,用于路由

  5.   getConditions().add(new Condition(new Column(tableRule.getGenerateKeyColumn(), tableRule.getLogicTable()), new SQLPlaceholderExpression(parametersSize)), shardingRule);

  6.   // 生成 GeneratedKey

  7.   generatedKey = new GeneratedKey(tableRule.getGenerateKeyColumn(), parametersSize, null);

  8. }

  • 因为 GenerateKeyToken 已经处理完,所以移除,避免 SQLRewriteEngine#rewrite() 二次改写。另外,通过 ItemsToken 补充自增列。

  • 生成 GeneratedKey 会在 ParsingSQLRouter 进一步处理。

  1. // ParsingSQLRouter.java

  2. public SQLRouteResult route(final String logicSQL, final List<Object> parameters, final SQLStatement sqlStatement) {

  3.   final Context context = MetricsContext.start("Route SQL");

  4.   SQLRouteResult result = new SQLRouteResult(sqlStatement);

  5.   // 处理 插入SQL 主键字段

  6.   if (sqlStatement instanceof InsertStatement && null != ((InsertStatement) sqlStatement).getGeneratedKey()) {

  7.       processGeneratedKey(parameters, (InsertStatement) sqlStatement, result);

  8.   }

  9.   // ... 省略部分代码

  10. }  

  11. /**

  12. * 处理 插入SQL 主键字段

  13. * 当 主键编号 未生成时,{@link ShardingRule#generateKey(String)} 进行生成

  14. * @param parameters 占位符参数

  15. * @param insertStatement Insert SQL语句对象

  16. * @param sqlRouteResult SQL路由结果

  17. */

  18. private void processGeneratedKey(final List<Object> parameters, final InsertStatement insertStatement, final SQLRouteResult sqlRouteResult) {

  19.   GeneratedKey generatedKey = insertStatement.getGeneratedKey();

  20.   if (parameters.isEmpty()) { // 已有主键,无占位符,INSERT INTO t_order(order_id, user_id) VALUES (1, 100);

  21.       sqlRouteResult.getGeneratedKeys().add(generatedKey.getValue());

  22.   } else if (parameters.size() == generatedKey.getIndex()) { // 主键字段不存在存在,INSERT INTO t_order(user_id) VALUES(?);

  23.       Number key = shardingRule.generateKey(insertStatement.getTables().getSingleTableName()); // 生成主键编号

  24.       parameters.add(key);

  25.       setGeneratedKeys(sqlRouteResult, key);

  26.   } else if (-1 != generatedKey.getIndex()) { // 主键字段存在,INSERT INTO t_order(order_id, user_id) VALUES(?, ?);

  27.       setGeneratedKeys(sqlRouteResult, (Number) parameters.get(generatedKey.getIndex()));

  28.   }

  29. }

  30. /**

  31. * 设置 主键编号 到 SQL路由结果

  32. * @param sqlRouteResult SQL路由结果

  33. * @param generatedKey 主键编号

  34. */

  35. private void setGeneratedKeys(final SQLRouteResult sqlRouteResult, final Number generatedKey) {

  36.   generatedKeys.add(generatedKey);

  37.   sqlRouteResult.getGeneratedKeys().clear();

  38.   sqlRouteResult.getGeneratedKeys().addAll(generatedKeys);

  39. }

  • parameters.size()==generatedKey.getIndex() 处对应 #appendGenerateKeyToken() 的 占位符参数数量 > 0 情况,此时会生成分布式主键。😈 该处是不是可以考虑把生成分布式主键挪到 #appendGenerateKeyToken(),这样更加统一一些。

4. SQL 生成

SQL路由完后,会生成各数据分片的执行SQL

  1. // ParsingSQLRouter.java

  2. @Override

  3. public SQLRouteResult route(final String logicSQL, final List<Object> parameters, final SQLStatement sqlStatement) {

  4.   SQLRouteResult result = new SQLRouteResult(sqlStatement);

  5.   // 省略部分代码... 处理 插入SQL 主键字段

  6.   // 路由

  7.   RoutingResult routingResult = route(parameters, sqlStatement);

  8.   // 省略部分代码... SQL重写引擎

  9.   SQLRewriteEngine rewriteEngine = new SQLRewriteEngine(shardingRule, logicSQL, sqlStatement);

  10.   boolean isSingleRouting = routingResult.isSingleRouting();

  11.   // 省略部分代码... 处理分页

  12.   // SQL 重写

  13.   SQLBuilder sqlBuilder = rewriteEngine.rewrite(!isSingleRouting);

  14.   // 生成 ExecutionUnit

  15.   if (routingResult instanceof CartesianRoutingResult) {

  16.       for (CartesianDataSource cartesianDataSource : ((CartesianRoutingResult) routingResult).getRoutingDataSources()) {

  17.           for (CartesianTableReference cartesianTableReference : cartesianDataSource.getRoutingTableReferences()) {

  18.               // 👼 生成 SQL

  19.               result.getExecutionUnits().add(new SQLExecutionUnit(cartesianDataSource.getDataSource(), rewriteEngine.generateSQL(cartesianTableReference, sqlBuilder)));

  20.           }

  21.       }

  22.   } else {

  23.       for (TableUnit each : routingResult.getTableUnits().getTableUnits()) {

  24.           // 👼 生成 SQL

  25.           result.getExecutionUnits().add(new SQLExecutionUnit(each.getDataSourceName(), rewriteEngine.generateSQL(each, sqlBuilder)));

  26.       }

  27.   }

  28.   return result;

  29. }

  • 调用 RewriteEngine#generateSQL() 生成执行SQL。对于笛卡尔积路由结果和简单路由结果传递的参数略有不同:前者使用 CartesianDataSource ( CartesianTableReference ),后者使用路由表单元 ( TableUnit )。对路由结果不是很了解的同学,建议看下 《SQL 路由(二)之分库分表路由》。

RewriteEngine#generateSQL() 对于笛卡尔积路由结果和简单路由结果两种情况,处理上大体是一致的:1. 获得 SQL 相关逻辑表对应的真实表映射,2. 根据映射改写 SQL 相关逻辑表真实表

  1. // SQLRewriteEngine.java

  2. /**

  3. * 生成SQL语句.

  4. * @param tableUnit 路由表单元

  5. * @param sqlBuilder SQL构建器

  6. * @return SQL语句

  7. */

  8. public String generateSQL(final TableUnit tableUnit, final SQLBuilder sqlBuilder) {

  9.   return sqlBuilder.toSQL(getTableTokens(tableUnit));

  10. }  

  11. /**

  12. * 生成SQL语句.

  13. * @param cartesianTableReference 笛卡尔积路由表单元

  14. * @param sqlBuilder SQL构建器

  15. * @return SQL语句

  16. */

  17. public String generateSQL(final CartesianTableReference cartesianTableReference, final SQLBuilder sqlBuilder) {

  18.   return sqlBuilder.toSQL(getTableTokens(cartesianTableReference));

  19. }

  20. // SQLRewriteEngine.java

  21. // SQLBuilder.java

  22. /**

  23. * 生成SQL语句.

  24. * @param tableTokens 占位符集合(逻辑表与真实表映射)

  25. * @return SQL语句

  26. */

  27. public String toSQL(final Map<String, String> tableTokens) {

  28.   StringBuilder result = new StringBuilder();

  29.   for (Object each : segments) {

  30.       if (each instanceof TableToken && tableTokens.containsKey(((TableToken) each).tableName)) {

  31.           result.append(tableTokens.get(((TableToken) each).tableName));

  32.       } else {

  33.           result.append(each);

  34.       }

  35.   }

  36.   return result.toString();

  37. }

  • #toSQL() 结果如图: 数据库分库分表中间件 Sharding-JDBC 源码分析 —— SQL 改写😜 对 SQL改写 是不是清晰很多了。


下面我们以笛卡尔积路由结果获得 SQL 相关逻辑表对应的真实表映射为例子(简单路由结果基本类似而且简单)。

  1. // SQLRewriteEngine.java

  2. /**

  3. * 获得(笛卡尔积表路由组里的路由表单元逻辑表 和 与其互为BindingTable关系的逻辑表)对应的真实表映射(逻辑表需要在 SQL 中存在)

  4. * @param cartesianTableReference 笛卡尔积表路由组

  5. * @return 集合

  6. */

  7. private Map<String, String> getTableTokens(final CartesianTableReference cartesianTableReference) {

  8.   Map<String, String> tableTokens = new HashMap<>();

  9.   for (TableUnit each : cartesianTableReference.getTableUnits()) {

  10.       tableTokens.put(each.getLogicTableName(), each.getActualTableName());

  11.       // 查找 BindingTableRule

  12.       Optional<BindingTableRule> bindingTableRule = shardingRule.findBindingTableRule(each.getLogicTableName());

  13.       if (bindingTableRule.isPresent()) {

  14.           tableTokens.putAll(getBindingTableTokens(each, bindingTableRule.get()));

  15.       }

  16.   }

  17.   return tableTokens;

  18. }

  19. /**

  20. * 获得 BindingTable 关系的逻辑表对应的真实表映射(逻辑表需要在 SQL 中存在)

  21. * @param tableUnit 路由单元

  22. * @param bindingTableRule Binding表规则配置对象

  23. * @return 映射

  24. */

  25. private Map<String, String> getBindingTableTokens(final TableUnit tableUnit, final BindingTableRule bindingTableRule) {

  26.   Map<String, String> result = new HashMap<>();

  27.   for (String eachTable : sqlStatement.getTables().getTableNames()) {

  28.       if (!eachTable.equalsIgnoreCase(tableUnit.getLogicTableName()) && bindingTableRule.hasLogicTable(eachTable)) {

  29.           result.put(eachTable, bindingTableRule.getBindingActualTable(tableUnit.getDataSourceName(), eachTable, tableUnit.getActualTableName()));

  30.       }

  31.   }

  32.   return result;

  33. }

  • 笛卡尔积表路由组( CartesianTableReference )包含多个路由表单元( TableUnit )。每个路由表单元需要遍历。

  • 路由表单元本身包含逻辑表和真实表,直接添加到映射即可。

  • 互为 BindingTable 关系的表只计算一次路由分片,因此未计算的真实表需要以其对应的已计算的真实表去查找,即 bindingTableRule.getBindingActualTable(tableUnit.getDataSourceName(),eachTable,tableUnit.getActualTableName()) 处逻辑。

  1. // BindingTableRule.java

  2. /**

  3. * 根据其他Binding表真实表名称获取相应的真实Binding表名称.

  4. *

  5. * @param dataSource 数据源名称

  6. * @param logicTable 逻辑表名称

  7. * @param otherActualTable 其他真实Binding表名称

  8. * @return 真实Binding表名称

  9. */

  10. public String getBindingActualTable(final String dataSource, final String logicTable, final String otherActualTable) {

  11.   // 计算 otherActualTable 在其 TableRule 的 actualTable 是第几个

  12.   int index = -1;

  13.   for (TableRule each : tableRules) {

  14.       if (each.isDynamic()) {

  15.           throw new UnsupportedOperationException("Dynamic table cannot support Binding table.");

  16.       }

  17.       index = each.findActualTableIndex(dataSource, otherActualTable);

  18.       if (-1 != index) {

  19.           break;

  20.       }

  21.   }

  22.   Preconditions.checkState(-1 != index, String.format("Actual table [%s].[%s] is not in table config", dataSource, otherActualTable));

  23.   // 计算 logicTable 在其 TableRule 的 第index 的 真实表

  24.   for (TableRule each : tableRules) {

  25.       if (each.getLogicTable().equalsIgnoreCase(logicTable)) {

  26.           return each.getActualTables().get(index).getTableName();

  27.       }

  28.   }

  29.   throw new IllegalStateException(String.format("Cannot find binding actual table, data source: %s, logic table: %s, other actual table: %s", dataSource, logicTable, otherActualTable));

  30. }

可能看起来有些绕,我们看张图:

数据库分库分表中间件 Sharding-JDBC 源码分析 —— SQL 改写

友情提示:这里不嫌啰嗦在提一句,互为 BindingTable 的表,配置 TableRule 时, actualTables 数量一定要一致,否则多出来的表,可能会无法被路由到。

666. 彩蛋

哈哈哈,看完SQL改写后,SQL解析是不是清晰多了!嘿嘿嘿,反正我现在有点嗨。恩,蛮嗨的。

当然,如果SQL解析理解上有点疑惑的你,欢迎加我的微信,咱 1对1 搞基。关注我的微信公众号:【芋道源码】 即可获得。

数据库分库分表中间件 Sharding-JDBC 源码分析 —— SQL 改写

道友,转发一波朋友圈可好?

Let's Go! 《分布式主键》、《SQL 执行》、《结果聚合》 继续。

感谢技术牛逼如你耐心的阅读本文。


发表评论