数据库分库分表中间件 Sharding-JDBC 源码分析 —— SQL 执行

>>强大,10k+点赞的 SpringBoot 后台管理系统竟然出了详细教程!

摘要: 原创出处 http://www.iocoder.cn/Sharding-JDBC/sql-execute/ 「芋道源码」欢迎转载,保留摘要,谢谢!

本文主要基于 Sharding-JDBC 1.5.0 正式版

  • 1. 概述

  • 2. ExecutorEngine

    • 2.1 ListeningExecutorService

    • 2.2 关闭

    • 2.3 执行 SQL 任务

  • 3. Executor

    • 3.1 StatementExecutor

    • 3.2 PreparedStatementExecutor

    • 3.3 BatchPreparedStatementExecutor

  • 4. ExecutionEvent

    • 4.1 EventBus

    • 4.2 BestEffortsDeliveryListener

  • 666. 彩蛋



数据库分库分表中间件 Sharding-JDBC 源码分析 —— SQL 执行

🙂🙂🙂关注微信公众号:【芋道源码】有福利:

  1. RocketMQ / MyCAT / Sharding-JDBC 所有源码分析文章列表

  2. RocketMQ / MyCAT / Sharding-JDBC 中文注释源码 GitHub 地址

  3. 您对于源码的疑问每条留言将得到认真回复。甚至不知道如何读源码也可以请教噢

  4. 新的源码解析文章实时收到通知。每周更新一篇左右

  5. 认真的源码交流微信群。


1. 概述

越过千山万水(SQL 解析、SQL 路由、SQL 改写),我们终于来到了 SQL 执行。开森不开森?!

数据库分库分表中间件 Sharding-JDBC 源码分析 —— SQL 执行

本文主要分享SQL 执行的过程,不包括结果聚合。《结果聚合》 东半球第二良心笔者会更新,关注微信公众号【芋道源码】完稿后第一时间通知您哟。

数据库分库分表中间件 Sharding-JDBC 源码分析 —— SQL 执行

绿框部分 SQL 执行主流程。


Sharding-JDBC 正在收集使用公司名单:传送门。
🙂 你的登记,会让更多人参与和使用 Sharding-JDBC。传送门
Sharding-JDBC 也会因此,能够覆盖更多的业务场景。传送门
登记吧,骚年!传送门

2. ExecutorEngine

ExecutorEngine,SQL执行引擎。

分表分库,需要执行的 SQL 数量从单条变成了多条,此时有两种方式执行:

  • 串行执行 SQL

  • 并行执行 SQL

前者,编码容易,性能较差,总耗时是多条 SQL 执行时间累加。
后者,编码复杂,性能较好,总耗时约等于执行时间最长的 SQL。

👼 ExecutorEngine 当然采用的是后者,并行执行 SQL。

2.1 ListeningExecutorService

Guava( Java 工具库 ) 提供的继承自 ExecutorService 的线程服务接口,提供创建 ListenableFuture 功能。ListenableFuture 接口,继承 Future 接口,有如下好处:

我们强烈地建议你在代码中多使用ListenableFuture来代替JDK的 Future, 因为:

  • 大多数Futures 方法中需要它。

  • 转到ListenableFuture 编程比较容易。

  • Guava提供的通用公共类封装了公共的操作方方法,不需要提供Future和ListenableFuture的扩展方法。

传统JDK中的Future通过异步的方式计算返回结果:在多线程运算中可能或者可能在没有结束返回结果,Future是运行中的多线程的一个引用句柄,确保在服务执行返回一个Result。

ListenableFuture可以允许你注册回调方法(callbacks),在运算(多线程执行)完成的时候进行调用, 或者在运算(多线程执行)完成后立即执行。这样简单的改进,使得可以明显的支持更多的操作,这样的功能在JDK concurrent中的Future是不支持的。

如上内容来自《Google Guava包的ListenableFuture解析 》,文章写的很棒。下文你会看到 Sharding-JDBC 是如何通过 ListenableFuture 简化并发编程的

下面看看 ExecutorEngine 如何初始化 ListeningExecutorService

  1. // ShardingDataSource.java

  2. public ShardingDataSource(final ShardingRule shardingRule, final Properties props) {

  3.    // .... 省略部分代码

  4.   shardingProperties = new ShardingProperties(props);

  5.   int executorSize = shardingProperties.getValue(ShardingPropertiesConstant.EXECUTOR_SIZE);

  6.   executorEngine = new ExecutorEngine(executorSize);

  7.   // .... 省略部分代码

  8. }

  9. // ExecutorEngine

  10. public ExecutorEngine(final int executorSize) {

  11.   executorService = MoreExecutors.listeningDecorator(new ThreadPoolExecutor(

  12.           executorSize, executorSize, 0, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>(),

  13.           new ThreadFactoryBuilder().setDaemon(true).setNameFormat("ShardingJDBC-%d").build()));

  14.   MoreExecutors.addDelayedShutdownHook(executorService, 60, TimeUnit.SECONDS);

  15. }

  • 一个分片数据源( ShardingDataSource ) 独占 一个 SQL执行引擎( ExecutorEngine )。

  • MoreExecutors#listeningDecorator() 创建 ListeningExecutorService,这样 #submit(), #invokeAll() 可以返回 ListenableFuture。

  • 默认情况下,线程池大小为 8。可以根据实际业务需要,设置 ShardingProperties 进行调整。

  • #setNameFormat() 并发编程时,一定要对线程名字做下定义,这样排查问题会方便很多。

  • MoreExecutors#addDelayedShutdownHook()应用关闭时,等待所有任务全部完成再关闭。默认配置等待时间为 60 秒,建议将等待时间做成可配的。

2.2 关闭

数据源关闭时,会调用 ExecutorEngine 也进行关闭。

  1. // ShardingDataSource.java

  2. @Override

  3. public void close() {

  4.   executorEngine.close();

  5. }

  6. // ExecutorEngine

  7. @Override

  8. public void close() {

  9.   executorService.shutdownNow();

  10.   try {

  11.       executorService.awaitTermination(5, TimeUnit.SECONDS);

  12.   } catch (final InterruptedException ignored) {

  13.   }

  14.   if (!executorService.isTerminated()) {

  15.       throw new ShardingJdbcException("ExecutorEngine can not been terminated");

  16.   }

  17. }

  • #shutdownNow() 尝试使用 Thread.interrupt() 打断正在执行中的任务,未执行的任务不再执行。建议打印下哪些任务未执行,因为 SQL 未执行,可能数据未能持久化。

  • #awaitTermination() 因为 #shutdownNow() 打断不是立即结束,需要一个过程,因此这里等待了 5 秒。

  • 等待 5 秒后,线程池不一定已经关闭,此时抛出异常给上层。建议打印下日志,记录出现这个情况。

2.3 执行 SQL 任务

ExecutorEngine 对外暴露 #executeStatement()#executePreparedStatement()#executeBatch()

三个方法分别提供给 StatementExecutor、PreparedStatementExecutor、BatchPreparedStatementExecutor 调用。而这三个方法,内部调用的都是 #execute() 私有方法。

  1. // ExecutorEngine.java

  2. /**

  3. * 执行Statement.

  4. * @param sqlType SQL类型

  5. * @param statementUnits 语句对象执行单元集合

  6. * @param executeCallback 执行回调函数

  7. * @param <T> 返回值类型

  8. * @return 执行结果

  9. */

  10. public <T> List<T> executeStatement(final SQLType sqlType, final Collection<StatementUnit> statementUnits, final ExecuteCallback<T> executeCallback) {

  11.   return execute(sqlType, statementUnits, Collections.<List<Object>>emptyList(), executeCallback);

  12. }

  13. /**

  14. * 执行PreparedStatement.

  15. * @param sqlType SQL类型

  16. * @param preparedStatementUnits 语句对象执行单元集合

  17. * @param parameters 参数列表

  18. * @param executeCallback 执行回调函数

  19. * @param <T> 返回值类型

  20. * @return 执行结果

  21. */

  22. public <T> List<T> executePreparedStatement(

  23.       final SQLType sqlType, final Collection<PreparedStatementUnit> preparedStatementUnits, final List<Object> parameters, final ExecuteCallback<T> executeCallback) {

  24.   return execute(sqlType, preparedStatementUnits, Collections.singletonList(parameters), executeCallback);

  25. }

  26. /**

  27. * 执行Batch.

  28. * @param sqlType SQL类型

  29. * @param batchPreparedStatementUnits 语句对象执行单元集合

  30. * @param parameterSets 参数列表集

  31. * @param executeCallback 执行回调函数

  32. * @return 执行结果

  33. */

  34. public List<int[]> executeBatch(

  35.       final SQLType sqlType, final Collection<BatchPreparedStatementUnit> batchPreparedStatementUnits, final List<List<Object>> parameterSets, final ExecuteCallback<int[]> executeCallback) {

  36.   return execute(sqlType, batchPreparedStatementUnits, parameterSets, executeCallback);

  37. }

#execute() 执行过程大体流程如下图:

数据库分库分表中间件 Sharding-JDBC 源码分析 —— SQL 执行

  1. /**

  2. * 执行

  3. *

  4. * @param sqlType SQL 类型

  5. * @param baseStatementUnits 语句对象执行单元集合

  6. * @param parameterSets 参数列表集

  7. * @param executeCallback 执行回调函数

  8. * @param <T> 返回值类型

  9. * @return 执行结果

  10. */

  11. private  <T> List<T> execute(

  12.       final SQLType sqlType, final Collection<? extends BaseStatementUnit> baseStatementUnits, final List<List<Object>> parameterSets, final ExecuteCallback<T> executeCallback) {

  13.   if (baseStatementUnits.isEmpty()) {

  14.       return Collections.emptyList();

  15.   }

  16.   Iterator<? extends BaseStatementUnit> iterator = baseStatementUnits.iterator();

  17.   BaseStatementUnit firstInput = iterator.next();

  18.   // 第二个任务开始所有 SQL任务 提交线程池【异步】执行任务

  19.   ListenableFuture<List<T>> restFutures = asyncExecute(sqlType, Lists.newArrayList(iterator), parameterSets, executeCallback);

  20.   T firstOutput;

  21.   List<T> restOutputs;

  22.   try {

  23.       // 第一个任务【同步】执行任务

  24.       firstOutput = syncExecute(sqlType, firstInput, parameterSets, executeCallback);

  25.       // 等待第二个任务开始所有 SQL任务完成

  26.       restOutputs = restFutures.get();

  27.       //CHECKSTYLE:OFF

  28.   } catch (final Exception ex) {

  29.       //CHECKSTYLE:ON

  30.       ExecutorExceptionHandler.handleException(ex);

  31.       return null;

  32.   }

  33.   // 返回结果

  34.   List<T> result = Lists.newLinkedList(restOutputs);

  35.   result.add(0, firstOutput);

  36.   return result;

  37. }

  • 第一个任务【同步】调用 #executeInternal() 执行任务。

  1. private <T> T syncExecute(final SQLType sqlType, final BaseStatementUnit baseStatementUnit, final List<List<Object>> parameterSets, final ExecuteCallback<T> executeCallback) throws Exception {

  2.   // 【同步】执行任务

  3.   return executeInternal(sqlType, baseStatementUnit, parameterSets, executeCallback, ExecutorExceptionHandler.isExceptionThrown(), ExecutorDataMap.getDataMap());

  4. }

  • 第二个开始的任务提交线程池异步调用 #executeInternal() 执行任务。

  1. private <T> ListenableFuture<List<T>> asyncExecute(

  2.       final SQLType sqlType, final Collection<BaseStatementUnit> baseStatementUnits, final List<List<Object>> parameterSets, final ExecuteCallback<T> executeCallback) {

  3.   List<ListenableFuture<T>> result = new ArrayList<>(baseStatementUnits.size());

  4.   final boolean isExceptionThrown = ExecutorExceptionHandler.isExceptionThrown();

  5.   final Map<String, Object> dataMap = ExecutorDataMap.getDataMap();

  6.   for (final BaseStatementUnit each : baseStatementUnits) {

  7.       // 提交线程池【异步】执行任务

  8.       result.add(executorService.submit(new Callable<T>() {

  9.           @Override

  10.           public T call() throws Exception {

  11.               return executeInternal(sqlType, each, parameterSets, executeCallback, isExceptionThrown, dataMap);

  12.           }

  13.       }));

  14.   }

  15.   // 返回 ListenableFuture

  16.   return Futures.allAsList(result);

  17. }

  • 我们注意下 Futures.allAsList(result); 和 restOutputs=restFutures.get();。神器 Guava 简化并发编程 的好处就提现出来了。 ListenableFuture#get() 当所有任务都成功时,返回所有任务执行结果;当任何一个任务失败时,马上抛出异常,无需等待其他任务执行完成。

数据库分库分表中间件 Sharding-JDBC 源码分析 —— SQL 执行

😮 Guava 真她喵神器,公众号:【芋道源码】会更新 Guava 源码分享的一个系列哟!老司机还不赶紧上车?

  • 为什么会分同步执行和异步执行呢?猜测,当SQL 执行是单表时,只要进行第一个任务的同步调用,性能更加优秀。等跟张亮大神请教确认原因后,咱会进行更新。

  1. // ExecutorEngine.java

  2. private <T> T executeInternal(final SQLType sqlType, final BaseStatementUnit baseStatementUnit, final List<List<Object>> parameterSets, final ExecuteCallback<T> executeCallback,

  3.                     final boolean isExceptionThrown, final Map<String, Object> dataMap) throws Exception {

  4.   synchronized (baseStatementUnit.getStatement().getConnection()) {

  5.       T result;

  6.       ExecutorExceptionHandler.setExceptionThrown(isExceptionThrown);

  7.       ExecutorDataMap.setDataMap(dataMap);

  8.       List<AbstractExecutionEvent> events = new LinkedList<>();

  9.       // 生成 Event

  10.       if (parameterSets.isEmpty()) {

  11.           events.add(getExecutionEvent(sqlType, baseStatementUnit, Collections.emptyList()));

  12.       } else {

  13.           for (List<Object> each : parameterSets) {

  14.               events.add(getExecutionEvent(sqlType, baseStatementUnit, each));

  15.           }

  16.       }

  17.       // EventBus 发布 EventExecutionType.BEFORE_EXECUTE

  18.       for (AbstractExecutionEvent event : events) {

  19.           EventBusInstance.getInstance().post(event);

  20.       }

  21.       try {

  22.           // 执行回调函数

  23.           result = executeCallback.execute(baseStatementUnit);

  24.       } catch (final SQLException ex) {

  25.           // EventBus 发布 EventExecutionType.EXECUTE_FAILURE

  26.           for (AbstractExecutionEvent each : events) {

  27.               each.setEventExecutionType(EventExecutionType.EXECUTE_FAILURE);

  28.               each.setException(Optional.of(ex));

  29.               EventBusInstance.getInstance().post(each);

  30.               ExecutorExceptionHandler.handleException(ex);

  31.           }

  32.           return null;

  33.       }

  34.       // EventBus 发布 EventExecutionType.EXECUTE_SUCCESS

  35.       for (AbstractExecutionEvent each : events) {

  36.           each.setEventExecutionType(EventExecutionType.EXECUTE_SUCCESS);

  37.           EventBusInstance.getInstance().post(each);

  38.       }

  39.       return result;

  40.   }

  41. }

  • result=executeCallback.execute(baseStatementUnit); 执行回调函数。StatementExecutor,PreparedStatementExecutor,BatchPreparedStatementExecutor 通过传递执行回调函数( ExecuteCallback )实现给 ExecutorEngine 实现并行执行。

  1. public interface ExecuteCallback<T> {

  2.    /**

  3.     * 执行任务.

  4.     *

  5.     * @param baseStatementUnit 语句对象执行单元

  6.     * @return 处理结果

  7.     * @throws Exception 执行期异常

  8.     */

  9.    T execute(BaseStatementUnit baseStatementUnit) throws Exception;

  10. }

  • synchronized(baseStatementUnit.getStatement().getConnection()) 原以为 Connection 非线程安全,因此需要用同步,后翻查资料《数据库连接池为什么要建立多个连接》,Connection 是线程安全的。等跟张亮大神请教确认原因后,咱会进行更新。

    FROM https://github.com/dangdangdotcom/sharding-jdbc/issues/166
    druid的数据源的stat这种filter在并发使用同一个connection链接时没有考虑线程安全的问题,故造成多个线程修改filter中的状态异常。 改造这个问题时,考虑到mysql驱动在执行statement时对同一个connection是线程安全的。也就是说同一个数据库链接的会话是串行执行的。故在sjdbc的executor对于多线程执行的情况也进行了针对数据库链接级别的同步。故该方案不会降低sjdbc的性能。 同时jdk1.7版本的同步采用了锁升级技术,在碰撞较低的情况下开销也是很小的。


    • 解答:MySQL、Oracle 的 Connection 实现是线程安全的。数据库连接池实现的 Connection 不一定是线程安全,例如 Druid 的线程池 Connection 非线程安全

  • ExecutionEvent 这里先不解释,在本文第四节【EventBus】分享。


  • ExecutorExceptionHandler、ExecutorDataMap 和 柔性事务 ( AbstractSoftTransaction ),放在《柔性事务》分享。


3. Executor

Executor,执行器,目前一共有三个执行器。不同的执行器对应不同的执行单元 (BaseStatementUnit)。

执行器类 执行器名 执行单元
StatementExecutor 静态语句对象执行单元 StatementUnit
PreparedStatementExecutor 预编译语句对象请求的执行器 PreparedStatementUnit
BatchPreparedStatementExecutor 批量预编译语句对象请求的执行器 BatchPreparedStatementUnit


    • 执行器提供的方法不同,因此不存在公用接口或者抽象类。



    • 执行单元继承自 BaseStatementUnit






    数据库分库分表中间件 Sharding-JDBC 源码分析 —— SQL 执行

    3.1 StatementExecutor

    StatementExecutor,多线程执行静态语句对象请求的执行器,一共有三类方法:

    • #executeQuery()

    1. // StatementExecutor.java

    2. /**

    3. * 执行SQL查询.

    4. * @return 结果集列表

    5. */

    6. public List<ResultSet> executeQuery() {

    7.   Context context = MetricsContext.start("ShardingStatement-executeQuery");

    8.   List<ResultSet> result;

    9.   try {

    10.       result = executorEngine.executeStatement(sqlType, statementUnits, new ExecuteCallback<ResultSet>() {

    11.           @Override

    12.           public ResultSet execute(final BaseStatementUnit baseStatementUnit) throws Exception {

    13.               return baseStatementUnit.getStatement().executeQuery(baseStatementUnit.getSqlExecutionUnit().getSql());

    14.           }

    15.       });

    16.   } finally {

    17.       MetricsContext.stop(context);

    18.   }

    19.   return result;

    20. }

    • #executeUpdate() 因为有四个不同情况的 #executeUpdate(),所以抽象了 Updater 接口,从而达到逻辑重用。

    1. // StatementExecutor.java

    2. /**

    3. * 执行SQL更新.

    4. * @return 更新数量

    5. */

    6. public int executeUpdate() {

    7.   return executeUpdate(new Updater() {

    8.       @Override

    9.       public int executeUpdate(final Statement statement, final String sql) throws SQLException {

    10.           return statement.executeUpdate(sql);

    11.       }

    12.   });

    13. }

    14. private int executeUpdate(final Updater updater) {

    15.   Context context = MetricsContext.start("ShardingStatement-executeUpdate");

    16.   try {

    17.       List<Integer> results = executorEngine.executeStatement(sqlType, statementUnits, new ExecuteCallback<Integer>() {

    18.           @Override

    19.           public Integer execute(final BaseStatementUnit baseStatementUnit) throws Exception {

    20.               return updater.executeUpdate(baseStatementUnit.getStatement(), baseStatementUnit.getSqlExecutionUnit().getSql());

    21.           }

    22.       });

    23.       return accumulate(results);

    24.   } finally {

    25.       MetricsContext.stop(context);

    26.   }

    27. }

    28. /**

    29. * 计算总的更新数量

    30. * @param results 更新数量数组

    31. * @return 更新数量

    32. */

    33. private int accumulate(final List<Integer> results) {

    34.   int result = 0;

    35.   for (Integer each : results) {

    36.       result += null == each ? 0 : each;

    37.   }

    38.   return result;

    39. }

    • #execute() 因为有四个不同情况的 #execute(),所以抽象了 Executor 接口,从而达到逻辑重用。

    1. /**

    2. * 执行SQL请求.

    3. * @return true表示执行DQL语句, false表示执行的DML语句

    4. */

    5. public boolean execute() {

    6.   return execute(new Executor() {

    7.       @Override

    8.       public boolean execute(final Statement statement, final String sql) throws SQLException {

    9.           return statement.execute(sql);

    10.       }

    11.   });

    12. }

    13. private boolean execute(final Executor executor) {

    14.   Context context = MetricsContext.start("ShardingStatement-execute");

    15.   try {

    16.       List<Boolean> result = executorEngine.executeStatement(sqlType, statementUnits, new ExecuteCallback<Boolean>() {

    17.           @Override

    18.           public Boolean execute(final BaseStatementUnit baseStatementUnit) throws Exception {

    19.               return executor.execute(baseStatementUnit.getStatement(), baseStatementUnit.getSqlExecutionUnit().getSql());

    20.           }

    21.       });

    22.       if (null == result || result.isEmpty() || null == result.get(0)) {

    23.           return false;

    24.       }

    25.       return result.get(0);

    26.   } finally {

    27.       MetricsContext.stop(context);

    28.   }

    29. }

    3.2 PreparedStatementExecutor

    PreparedStatementExecutor,多线程执行预编译语句对象请求的执行器。比 StatementExecutor 多了 parameters 参数,方法逻辑上基本一致,就不重复分享啦。

    3.3 BatchPreparedStatementExecutor

    BatchPreparedStatementExecutor,多线程执行批量预编译语句对象请求的执行器。

    1. // BatchPreparedStatementExecutor.java

    2. /**

    3. * 执行批量SQL.

    4. *

    5. * @return 执行结果

    6. */

    7. public int[] executeBatch() {

    8.   Context context = MetricsContext.start("ShardingPreparedStatement-executeBatch");

    9.   try {

    10.       return accumulate(executorEngine.executeBatch(sqlType, batchPreparedStatementUnits, parameterSets, new ExecuteCallback<int[]>() {

    11.           @Override

    12.           public int[] execute(final BaseStatementUnit baseStatementUnit) throws Exception {

    13.               return baseStatementUnit.getStatement().executeBatch();

    14.           }

    15.       }));

    16.   } finally {

    17.       MetricsContext.stop(context);

    18.   }

    19. }

    20. /**

    21. * 计算每个语句的更新数量

    22. *

    23. * @param results 每条 SQL 更新数量

    24. * @return 每个语句的更新数量

    25. */

    26. private int[] accumulate(final List<int[]> results) {

    27.   int[] result = new int[parameterSets.size()];

    28.   int count = 0;

    29.   // 每个语句按照顺序,读取到其对应的每个分片SQL影响的行数进行累加

    30.   for (BatchPreparedStatementUnit each : batchPreparedStatementUnits) {

    31.       for (Map.Entry<Integer, Integer> entry : each.getJdbcAndActualAddBatchCallTimesMap().entrySet()) {

    32.           result[entry.getKey()] += null == results.get(count) ? 0 : results.get(count)[entry.getValue()];

    33.       }

    34.       count++;

    35.   }

    36.   return result;

    37. }

    眼尖的同学会发现,为什么有 BatchPreparedStatementExecutor,而没有 BatchStatementExecutor 呢?目前 Sharding-JDBC 不支持 Statement 批量操作,只能进行 PreparedStatement 的批操作。

    1. // PreparedStatement 批量操作,不会报错

    2. PreparedStatement ps = conn.prepareStatement(sql)

    3. ps.addBatch();

    4. ps.addBatch();

    5. // Statement 批量操作,会报错

    6. ps.addBatch(sql); // 报错:at com.dangdang.ddframe.rdb.sharding.jdbc.unsupported.AbstractUnsupportedOperationStatement.addBatch

    4. ExecutionEvent

    AbstractExecutionEvent,SQL 执行事件抽象接口。

    1. public abstract class AbstractExecutionEvent {

    2.    /**

    3.     * 事件编号

    4.     */

    5.    private final String id;

    6.    /**

    7.     * 数据源

    8.     */

    9.    private final String dataSource;

    10.    /**

    11.     * SQL

    12.     */

    13.    private final String sql;

    14.    /**

    15.     * 参数

    16.     */

    17.    private final List<Object> parameters;

    18.    /**

    19.     * 事件类型

    20.     */

    21.    private EventExecutionType eventExecutionType;

    22.    /**

    23.     * 异常

    24.     */

    25.    private Optional<SQLException> exception;

    26. }

    AbstractExecutionEvent 有两个实现子类:

    • DMLExecutionEvent:DML类SQL执行时事件

    • DQLExecutionEvent:DQL类SQL执行时事件

    EventExecutionType,事件触发类型。

    • BEFORE_EXECUTE:执行前

    • EXECUTE_SUCCESS:执行成功

    • EXECUTE_FAILURE:执行失败

    4.1 EventBus

    那究竟有什么用途呢? Sharding-JDBC 使用 Guava(没错,又是它)的 EventBus 实现了事件的发布和订阅。从上文 ExecutorEngine#executeInternal() 我们可以看到每个分片 SQL 执行的过程中会发布相应事件:

    • 执行 SQL 前:发布类型类型为 BEFORE_EXECUTE 的事件

    • 执行 SQL 成功:发布类型类型为 EXECUTE_SUCCESS 的事件

    • 执行 SQL 失败:发布类型类型为 EXECUTE_FAILURE 的事件

    怎么订阅事件呢?非常简单,例子如下:

    1. EventBusInstance.getInstance().register(new Runnable() {

    2.  @Override

    3.  public void run() {

    4.  }

    5.  @Subscribe // 订阅

    6.  @AllowConcurrentEvents // 是否允许并发执行,即线程安全

    7.  public void listen(final DMLExecutionEvent event) { // DMLExecutionEvent

    8.      System.out.println("DMLExecutionEvent:" + event.getSql() + "t" + event.getEventExecutionType());

    9.  }

    10.  @Subscribe // 订阅

    11.  @AllowConcurrentEvents // 是否允许并发执行,即线程安全

    12.  public void listen2(final DQLExecutionEvent event) { //DQLExecutionEvent

    13.      System.out.println("DQLExecutionEvent:" + event.getSql() + "t" + event.getEventExecutionType());

    14.  }

    15. });

    • #register() 任何类都可以,并非一定需要使用 Runnable 类。此处例子单纯因为方便

    • @Subscribe 注解在方法上,实现对事件的订阅

    • @AllowConcurrentEvents 注解在方法上,表示线程安全,允许并发执行

    • 方法上的参数对应的类即是订阅的事件。例如, #listen() 订阅了 DMLExecutionEvent 事件

    • EventBus#post() 发布事件,同步调用订阅逻辑

    数据库分库分表中间件 Sharding-JDBC 源码分析 —— SQL 执行

    • 推荐阅读文章:《Guava学习笔记:EventBus》

    Sharding-JDBC 正在收集使用公司名单:传送门。
    🙂 你的登记,会让更多人参与和使用 Sharding-JDBC。传送门
    Sharding-JDBC 也会因此,能够覆盖更多的业务场景。传送门
    登记吧,骚年!传送门

    4.2 BestEffortsDeliveryListener

    BestEffortsDeliveryListener,最大努力送达型事务监听器。

    本文暂时暂时不分析其实现,仅仅作为另外一个订阅者的例子。我们会在《柔性事务》进行分享。

    1. public final class BestEffortsDeliveryListener {

    2.    @Subscribe

    3.    @AllowConcurrentEvents

    4.    public void listen(final DMLExecutionEvent event) {

    5.        if (!isProcessContinuously()) {

    6.            return;

    7.        }

    8.        SoftTransactionConfiguration transactionConfig = SoftTransactionManager.getCurrentTransactionConfiguration().get();

    9.        TransactionLogStorage transactionLogStorage = TransactionLogStorageFactory.createTransactionLogStorage(transactionConfig.buildTransactionLogDataSource());

    10.        BEDSoftTransaction bedSoftTransaction = (BEDSoftTransaction) SoftTransactionManager.getCurrentTransaction().get();

    11.        switch (event.getEventExecutionType()) {

    12.            case BEFORE_EXECUTE:

    13.                //TODO 对于批量执行的SQL需要解析成两层列表

    14.                transactionLogStorage.add(new TransactionLog(event.getId(), bedSoftTransaction.getTransactionId(), bedSoftTransaction.getTransactionType(),

    15.                        event.getDataSource(), event.getSql(), event.getParameters(), System.currentTimeMillis(), 0));

    16.                return;

    17.            case EXECUTE_SUCCESS:

    18.                transactionLogStorage.remove(event.getId());

    19.                return;

    20.            case EXECUTE_FAILURE:

    21.                boolean deliverySuccess = false;

    22.                for (int i = 0; i < transactionConfig.getSyncMaxDeliveryTryTimes(); i++) {

    23.                    if (deliverySuccess) {

    24.                        return;

    25.                    }

    26.                    boolean isNewConnection = false;

    27.                    Connection conn = null;

    28.                    PreparedStatement preparedStatement = null;

    29.                    try {

    30.                        conn = bedSoftTransaction.getConnection().getConnection(event.getDataSource(), SQLType.UPDATE);

    31.                        if (!isValidConnection(conn)) {

    32.                            bedSoftTransaction.getConnection().release(conn);

    33.                            conn = bedSoftTransaction.getConnection().getConnection(event.getDataSource(), SQLType.UPDATE);

    34.                            isNewConnection = true;

    35.                        }

    36.                        preparedStatement = conn.prepareStatement(event.getSql());

    37.                        //TODO 对于批量事件需要解析成两层列表

    38.                        for (int parameterIndex = 0; parameterIndex < event.getParameters().size(); parameterIndex++) {

    39.                            preparedStatement.setObject(parameterIndex + 1, event.getParameters().get(parameterIndex));

    40.                        }

    41.                        preparedStatement.executeUpdate();

    42.                        deliverySuccess = true;

    43.                        transactionLogStorage.remove(event.getId());

    44.                    } catch (final SQLException ex) {

    45.                        log.error(String.format("Delivery times %s error, max try times is %s", i + 1, transactionConfig.getSyncMaxDeliveryTryTimes()), ex);

    46.                    } finally {

    47.                        close(isNewConnection, conn, preparedStatement);

    48.                    }

    49.                }

    50.                return;

    51.            default:

    52.                throw new UnsupportedOperationException(event.getEventExecutionType().toString());

    53.        }

    54.    }

    55. }

    666. 彩蛋

    本文完,但也未完。

    跨分片事务问题。例如:

    1. UPDATE t_order SET nickname = ? WHERE user_id = ?

    A 节点 connection.commit() 时,应用突然挂了!B节点 connection.commit() 还来不及执行。
    我们一起去《柔性事务》寻找答案。

    道友,分享一波朋友圈可好?