数据库中间件 MyCAT源码分析 —— PreparedStatement 重新入门

1. 概述

  • 2. JDBC Client 实现
  • 3. MyCAT Server 实现
    • 3.1 创建 PreparedStatement
    • 3.2 执行 SQL
  • 4. 彩蛋

1. 概述

相信很多同学在学习 JDBC 时,都碰到 PreparedStatementStatement。究竟该使用哪个呢?最终很可能是懵里懵懂的看了各种总结,使用 PreparedStatement。那么本文,通过 MyCAT 对 PreparedStatement 的实现对大家能够重新理解下。

本文主要分成两部分:

  1. JDBC Client 如何实现 PreparedStatement
  2. MyCAT Server 如何处理 PreparedStatement

😈 Let's Go,

2. JDBC Client 实现

首先,我们来看一段大家最喜欢复制粘贴之一的代码,JDBC PreparedStatement 查询 MySQL 数据库:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
public class PreparedStatementDemo {
 
   public static void main(String[] args) throws ClassNotFoundException, SQLException {
 
       // 1. 获得数据库连接
 
       Class.forName("com.mysql.jdbc.Driver");
 
       Connection conn = DriverManager.getConnection("jdbc:mysql://127.0.0.1:8066/dbtest?useServerPrepStmts=true", "root", "123456");
 
       // PreparedStatement
 
       PreparedStatement ps = conn.prepareStatement("SELECT id, username, password FROM t_user WHERE id = ?");
 
       ps.setLong(1, Math.abs(new Random().nextLong()));
 
       // execute
 
       ps.executeQuery();
 
   }
 
}

获取 MySQL 连接时, useServerPrepStmts=true非常非常非常重要的参数。如果不配置, PreparedStatement 实际是个PreparedStatement(新版本默认为 FALSE,据说部分老版本默认为 TRUE),未开启服务端级别的 SQL 预编译。

WHY ?来看下 JDBC 里面是怎么实现的。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
// com.mysql.jdbc.ConnectionImpl.java
 
public PreparedStatement prepareStatement(String sql, int resultSetType, int resultSetConcurrency) throws SQLException {
 
  synchronized (getConnectionMutex()) {
 
      checkClosed();
 
      PreparedStatement pStmt = null;
 
      boolean canServerPrepare = true;
 
      String nativeSql = getProcessEscapeCodesForPrepStmts() ? nativeSQL(sql) : sql;
 
      if (this.useServerPreparedStmts && getEmulateUnsupportedPstmts()) {
 
          canServerPrepare = canHandleAsServerPreparedStatement(nativeSql);
 
      }
 
      if (this.useServerPreparedStmts && canServerPrepare) {
 
          if (this.getCachePreparedStatements()) { // 从缓存中获取 pStmt
 
              synchronized (this.serverSideStatementCache) {
 
                  pStmt = (com.mysql.jdbc.ServerPreparedStatement) this.serverSideStatementCache
 
                          .remove(makePreparedStatementCacheKey(this.database, sql));
 
                  if (pStmt != null) {
 
                      ((com.mysql.jdbc.ServerPreparedStatement) pStmt).setClosed(false);
 
                      pStmt.clearParameters(); // 清理上次留下的参数
 
                  }
 
                  if (pStmt == null) {
 
                       // .... 省略代码 :向 Server 提交 SQL 预编译。
 
                  }
 
              }
 
          } else {
 
              try {
 
                  // 向 Server 提交 SQL 预编译。
 
                  pStmt = ServerPreparedStatement.getInstance(getMultiHostSafeProxy(), nativeSql, this.database, resultSetType, resultSetConcurrency);
 
                  pStmt.setResultSetType(resultSetType);
 
                  pStmt.setResultSetConcurrency(resultSetConcurrency);
 
              } catch (SQLException sqlEx) {
 
                  // Punt, if necessary
 
                  if (getEmulateUnsupportedPstmts()) {
 
                      pStmt = (PreparedStatement) clientPrepareStatement(nativeSql, resultSetType, resultSetConcurrency, false);
 
                  } else {
 
                      throw sqlEx;
 
                  }
 
              }
 
          }
 
      } else {
 
          pStmt = (PreparedStatement) clientPrepareStatement(nativeSql, resultSetType, resultSetConcurrency, false);
 
      }
 
      return pStmt;
 
  }
 
}
  • 【前者】当 Client 开启 useServerPreparedStmts 并且 Server 支持 ServerPrepareClient 会向 Server 提交 SQL 预编译请求
  1. 1
    2
    3
    4
    5
    
    if (this.useServerPreparedStmts && canServerPrepare) {
     
          pStmt = ServerPreparedStatement.getInstance(getMultiHostSafeProxy(), nativeSql, this.database, resultSetType, resultSetConcurrency);
     
    }

     

  • 【后者】当 Client 未开启 useServerPreparedStmts 或者 Server 不支持 ServerPrepare,Client 创建 PreparedStatement不会向 Server 提交 SQL 预编译请求
1
pStmt = (PreparedStatement) clientPrepareStatement(nativeSql, resultSetType, resultSetConcurrency, false);

即使这样,究竟为什么性能会更好呢?

  • 【前者】返回的 PreparedStatement 对象类是 JDBC42ServerPreparedStatement.java,后续每次执行 SQL 只需将对应占位符?对应的值提交给 Server即可,减少网络传输和 SQL 解析开销。
  • 【后者】返回的 PreparedStatement 对象类是 JDBC42PreparedStatement.java,后续每次执行 SQL 需要将完整的 SQL 提交给 Server,增加了网络传输和 SQL 解析开销。

🌚:【前者】性能一定比【后者】好吗?相信你已经有了正确的答案。

3. MyCAT Server 实现

3.1 创建 PreparedStatement

该操作对应 Client conn.prepareStatement(....)

数据库中间件 MyCAT源码分析 —— PreparedStatement 重新入门

MyCAT 接收到请求后,创建 PreparedStatement,并返回 statementId 等信息。Client 发起 SQL 执行时,需要将 statementId 带给 MyCAT。核心代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
// ServerPrepareHandler.java
@Override
 
public void prepare(String sql) {
 
    LOGGER.debug("use server prepare, sql: " + sql);
 
     PreparedStatement pstmt = pstmtForSql.get(sql);
 
     
    if (pstmt == null) {
        // 缓存中获取
           
        // 解析获取字段个数和参数个数
           int columnCount = getColumnCount(sql);
 
           int paramCount = getParamCount(sql);
 
           pstmt = new PreparedStatement(++pstmtId, sql, columnCount, paramCount);
 
           pstmtForSql.put(pstmt.getStatement(), pstmt);
 
           pstmtForId.put(pstmt.getId(), pstmt);
 
         
    }
 
     PreparedStmtResponse.response(pstmt, source);
 
}
 
// PreparedStmtResponse.java
public static void response(PreparedStatement pstmt, FrontendConnection c) {
 
     byte packetId = 0;
 
     
    // write preparedOk packet
     PreparedOkPacket preparedOk = new PreparedOkPacket();
 
     preparedOk.packetId = ++packetId;
 
     preparedOk.statementId = pstmt.getId();
 
     preparedOk.columnsNumber = pstmt.getColumnsNumber();
 
     preparedOk.parametersNumber = pstmt.getParametersNumber();
 
     ByteBuffer buffer = preparedOk.write(c.allocate(), c, true);
 
     
    // write parameter field packet
     int parametersNumber = preparedOk.parametersNumber;
 
     
    if (parametersNumber > 0) {
 
           
        for (int i = 0; i < parametersNumber; i++) {
 
                 FieldPacket field = new FieldPacket();
 
                 field.packetId = ++packetId;
 
                 buffer = field.write(buffer, c, true);
 
               
        }
 
           EOFPacket eof = new EOFPacket();
 
           eof.packetId = ++packetId;
 
           buffer = eof.write(buffer, c, true);
 
         
    }
 
     
    // write column field packet
     int columnsNumber = preparedOk.columnsNumber;
 
     
    if (columnsNumber > 0) {
 
           
        for (int i = 0; i < columnsNumber; i++) {
 
                 FieldPacket field = new FieldPacket();
 
                 field.packetId = ++packetId;
 
                 buffer = field.write(buffer, c, true);
 
               
        }
 
           EOFPacket eof = new EOFPacket();
 
           eof.packetId = ++packetId;
 
           buffer = eof.write(buffer, c, true);
 
         
    }
 
     
    // send buffer
     c.write(buffer);
 
}

每个连接之间,PreparedStatement 不共享,即不同连接,即使 SQL相同,对应的 PreparedStatement 不同。

3.2 执行 SQL

该操作对应 Client conn.execute(....)

数据库中间件 MyCAT源码分析 —— PreparedStatement 重新入门

MyCAT 接收到请求后,将 PreparedStatement 使用请求的参数格式化成可执行的 SQL 进行执行。伪代码如下:

1
2
3
String sql = pstmt.sql.format(request.params);
 
execute(sql);

核心代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
// ServerPrepareHandler.java
@Override
 
public void execute(byte[] data) {
 
     long pstmtId = ByteUtil.readUB4(data, 5);
 
     PreparedStatement pstmt = null;
 
     
    if ((pstmt = pstmtForId.get(pstmtId)) == null) {
 
           source.writeErrMessage(ErrorCode.ER_ERROR_WHEN_EXECUTING_COMMAND, "Unknown pstmtId when executing.");
 
         
    } else {
 
           
        // 参数读取
           ExecutePacket packet = new ExecutePacket(pstmt);
 
           
        try {
 
                 packet.read(data, source.getCharset());
 
               
        } catch(UnsupportedEncodingException e) {
 
                 source.writeErrMessage(ErrorCode.ER_ERROR_WHEN_EXECUTING_COMMAND, e.getMessage());
 
                 
            return;
 
               
        }
 
           BindValue[] bindValues = packet.values;
 
           
        // 还原sql中的动态参数为实际参数值
           String sql = prepareStmtBindValue(pstmt, bindValues);
 
           
        // 执行sql
           source.getSession2().setPrepared(true);
 
           source.query(sql);
 
         
    }
 
}
 
private String prepareStmtBindValue(PreparedStatement pstmt, BindValue[] bindValues) {
 
     String sql = pstmt.getStatement();
 
     int[] paramTypes = pstmt.getParametersType();
 
     StringBuilder sb = new StringBuilder();
 
     int idx = 0;
 
     
    for (int i = 0, len = sql.length(); i < len; i++) {
 
           char c = sql.charAt(i);
 
           
        if (c != '?') {
 
                 sb.append(c);
 
                 
            continue;
 
               
        }
 
           
        // 处理占位符?
           int paramType = paramTypes[idx];
 
           BindValue bindValue = bindValues[idx];
 
           idx++;
 
           
        // 处理字段为空的情况
           
        if (bindValue.isNull) {
 
                 sb.append("NULL");
 
                 
            continue;
 
               
        }
 
           
        // 非空情况, 根据字段类型获取值
           
        switch (paramType & 0xff) {
 
                 
        case Fields.FIELD_TYPE_TINY:
 
                   sb.append(String.valueOf(bindValue.byteBinding));
 
                   
            break;
 
                 
        case Fields.FIELD_TYPE_SHORT:
 
                   sb.append(String.valueOf(bindValue.shortBinding));
 
                   
            break;
 
                 
        case Fields.FIELD_TYPE_LONG:
 
                   sb.append(String.valueOf(bindValue.intBinding));
 
                   
            break;
 
                 
            // .... 省略非核心代码
                
        }
 
         
    }
 
     
    return sb.toString();
 
}

4. 彩蛋

💯 看到此处是不是真爱?!反正我信了。
给老铁们额外加个🍗。

细心的同学们可能已经注意到 JDBC Client 是支持缓存 PreparedStatement,无需每次都让 Server 进行创建。

当配置 MySQL 数据连接 cachePrepStmts=true 时开启 Client 级别的缓存。But,此处的缓存又和一般的缓存不一样,是使用 remove 的方式获得的,并且创建好 PreparedStatement 时也不添加到缓存。那什么时候添加缓存呢?在 pstmt.close() 时,并且 pstmt 是通过缓存获取时,添加到缓存。核心代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
// ServerPreparedStatement.java
 
public void close() throws SQLException {
 
  MySQLConnection locallyScopedConn = this.connection;
 
  if (locallyScopedConn == null) {
 
      return; // already closed
 
  }
 
  synchronized (locallyScopedConn.getConnectionMutex()) {
 
      if (this.isCached && isPoolable() && !this.isClosed) {
 
          clearParameters();
 
          this.isClosed = true;
 
          this.connection.recachePreparedStatement(this);
 
          return;
 
      }
 
      realClose(true, true);
 
  }
 
}
 
// ConnectionImpl.java
 
public void recachePreparedStatement(ServerPreparedStatement pstmt) throws SQLException {
 
  synchronized (getConnectionMutex()) {
 
      if (getCachePreparedStatements() && pstmt.isPoolable()) {
 
          synchronized (this.serverSideStatementCache) {
 
              this.serverSideStatementCache.put(makePreparedStatementCacheKey(pstmt.currentCatalog, pstmt.originalSql), pstmt);
 
          }
 
      }
 
  }
 
}

为什么要这么实现? PreparedStatement 是有状态的变量,我们会去 setXXX(pos,value),一旦多线程共享,会导致错乱。

原文地址:http://www.yunai.me/MyCAT/what-is-PreparedStatement/
MyCat-Server 带注释代码地址 :https://github.com/YunaiV/Mycat-Server
😈本系列每 1-2 周更新一篇,欢迎订阅、关注、收藏 公众号
QQ :7685413

数据库中间件 MyCAT源码分析 —— PreparedStatement 重新入门

另外推荐一篇文章:《JDBC PreparedStatement》。

 

发表评论