From 3adbc3a1f57a7fceb2984d7bd9456c5bea546ecc Mon Sep 17 00:00:00 2001 From: terrymanu Date: Sun, 26 May 2024 16:02:23 +0800 Subject: [PATCH 01/12] Remove ShardingSpherePreparedStatement --- .../ShardingSpherePreparedStatement.java | 46 +++++++------------ 1 file changed, 17 insertions(+), 29 deletions(-) diff --git a/jdbc/src/main/java/org/apache/shardingsphere/driver/jdbc/core/statement/ShardingSpherePreparedStatement.java b/jdbc/src/main/java/org/apache/shardingsphere/driver/jdbc/core/statement/ShardingSpherePreparedStatement.java index 5f93209c5ba4b..1063cd329bade 100644 --- a/jdbc/src/main/java/org/apache/shardingsphere/driver/jdbc/core/statement/ShardingSpherePreparedStatement.java +++ b/jdbc/src/main/java/org/apache/shardingsphere/driver/jdbc/core/statement/ShardingSpherePreparedStatement.java @@ -159,10 +159,6 @@ public final class ShardingSpherePreparedStatement extends AbstractPreparedState private ResultSet currentResultSet; - private String trafficInstanceId; - - private boolean useFederation; - private final HintValueContext hintValueContext; private ResultSet currentBatchGeneratedKeysResultSet; @@ -224,7 +220,6 @@ private boolean isStatementsCacheable(final RuleMetaData databaseRuleMetaData) { @Override public ResultSet executeQuery() throws SQLException { - ResultSet result; try { if (statementsCacheable && !statements.isEmpty()) { resetParameters(); @@ -233,18 +228,19 @@ public ResultSet executeQuery() throws SQLException { clearPrevious(); QueryContext queryContext = createQueryContext(); handleAutoCommit(queryContext); - trafficInstanceId = getInstanceIdAndSet(queryContext).orElse(null); + String trafficInstanceId = getInstanceIdAndSet(queryContext).orElse(null); if (null != trafficInstanceId) { JDBCExecutionUnit executionUnit = createTrafficExecutionUnit(trafficInstanceId, queryContext); - return executor.getTrafficExecutor().execute(executionUnit, (statement, sql) -> ((PreparedStatement) statement).executeQuery()); + currentResultSet = executor.getTrafficExecutor().execute(executionUnit, (statement, sql) -> ((PreparedStatement) statement).executeQuery()); + return currentResultSet; } - useFederation = decide(queryContext, - metaDataContexts.getMetaData().getDatabase(databaseName), metaDataContexts.getMetaData().getGlobalRuleMetaData()); - if (useFederation) { - return executeFederationQuery(queryContext); + if (decide(queryContext, metaDataContexts.getMetaData().getDatabase(databaseName), metaDataContexts.getMetaData().getGlobalRuleMetaData())) { + currentResultSet = executeFederationQuery(queryContext); + return currentResultSet; } executionContext = createExecutionContext(queryContext); - result = doExecuteQuery(executionContext); + currentResultSet = doExecuteQuery(executionContext); + return currentResultSet; // CHECKSTYLE:OFF } catch (final RuntimeException ex) { // CHECKSTYLE:ON @@ -253,8 +249,6 @@ public ResultSet executeQuery() throws SQLException { } finally { clearBatch(); } - currentResultSet = result; - return result; } private ShardingSphereResultSet doExecuteQuery(final ExecutionContext executionContext) throws SQLException { @@ -347,7 +341,7 @@ public int executeUpdate() throws SQLException { clearPrevious(); QueryContext queryContext = createQueryContext(); handleAutoCommit(queryContext); - trafficInstanceId = getInstanceIdAndSet(queryContext).orElse(null); + String trafficInstanceId = getInstanceIdAndSet(queryContext).orElse(null); if (null != trafficInstanceId) { JDBCExecutionUnit executionUnit = createTrafficExecutionUnit(trafficInstanceId, queryContext); return executor.getTrafficExecutor().execute(executionUnit, (statement, sql) -> ((PreparedStatement) statement).executeUpdate()); @@ -411,15 +405,16 @@ public boolean execute() throws SQLException { clearPrevious(); QueryContext queryContext = createQueryContext(); handleAutoCommit(queryContext); - trafficInstanceId = getInstanceIdAndSet(queryContext).orElse(null); + String trafficInstanceId = getInstanceIdAndSet(queryContext).orElse(null); if (null != trafficInstanceId) { JDBCExecutionUnit executionUnit = createTrafficExecutionUnit(trafficInstanceId, queryContext); - return executor.getTrafficExecutor().execute(executionUnit, (statement, sql) -> ((PreparedStatement) statement).execute()); + boolean result = executor.getTrafficExecutor().execute(executionUnit, (statement, sql) -> ((PreparedStatement) statement).execute()); + currentResultSet = executor.getTrafficExecutor().getResultSet(); + return result; } - useFederation = decide(queryContext, - metaDataContexts.getMetaData().getDatabase(databaseName), metaDataContexts.getMetaData().getGlobalRuleMetaData()); - if (useFederation) { + if (decide(queryContext, metaDataContexts.getMetaData().getDatabase(databaseName), metaDataContexts.getMetaData().getGlobalRuleMetaData())) { ResultSet resultSet = executeFederationQuery(queryContext); + currentResultSet = resultSet; return null != resultSet; } executionContext = createExecutionContext(queryContext); @@ -496,14 +491,7 @@ public ResultSet getResultSet() throws SQLException { if (null != currentResultSet) { return currentResultSet; } - if (null != trafficInstanceId) { - return executor.getTrafficExecutor().getResultSet(); - } - if (useFederation) { - return executor.getSqlFederationEngine().getResultSet(); - } - if (executionContext.getSqlStatementContext() instanceof SelectStatementContext - || executionContext.getSqlStatementContext().getSqlStatement() instanceof DALStatement) { + if (executionContext.getSqlStatementContext() instanceof SelectStatementContext || executionContext.getSqlStatementContext().getSqlStatement() instanceof DALStatement) { List resultSets = getResultSets(); if (resultSets.isEmpty()) { return currentResultSet; @@ -629,7 +617,7 @@ private String getGeneratedKeysColumnName(final String columnName) { public void addBatch() { try { QueryContext queryContext = createQueryContext(); - trafficInstanceId = getInstanceIdAndSet(queryContext).orElse(null); + String trafficInstanceId = getInstanceIdAndSet(queryContext).orElse(null); executionContext = null == trafficInstanceId ? createExecutionContext(queryContext) : createExecutionContext(queryContext, trafficInstanceId); batchPreparedStatementExecutor.addBatchForExecutionUnits(executionContext.getExecutionUnits()); } finally { From 98b2b4ebd5cadbad18702b0fe08e5c19ec2cd74d Mon Sep 17 00:00:00 2001 From: terrymanu Date: Sun, 26 May 2024 17:08:27 +0800 Subject: [PATCH 02/12] Remove ShardingSpherePreparedStatement --- .../jdbc/core/statement/ShardingSpherePreparedStatement.java | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/jdbc/src/main/java/org/apache/shardingsphere/driver/jdbc/core/statement/ShardingSpherePreparedStatement.java b/jdbc/src/main/java/org/apache/shardingsphere/driver/jdbc/core/statement/ShardingSpherePreparedStatement.java index 1063cd329bade..d798003c08fee 100644 --- a/jdbc/src/main/java/org/apache/shardingsphere/driver/jdbc/core/statement/ShardingSpherePreparedStatement.java +++ b/jdbc/src/main/java/org/apache/shardingsphere/driver/jdbc/core/statement/ShardingSpherePreparedStatement.java @@ -430,7 +430,8 @@ public boolean execute() throws SQLException { handleExceptionInTransaction(connection, metaDataContexts); throw SQLExceptionTransformEngine.toSQLException(ex, metaDataContexts.getMetaData().getDatabase(databaseName).getProtocolType()); } finally { - clearBatch(); + batchPreparedStatementExecutor.clear(); + clearParameters(); } } From b182ebded226208ff074915fc1e0986689dc9109 Mon Sep 17 00:00:00 2001 From: terrymanu Date: Sun, 26 May 2024 17:09:59 +0800 Subject: [PATCH 03/12] Remove ShardingSpherePreparedStatement --- .../jdbc/core/statement/ShardingSpherePreparedStatement.java | 1 + 1 file changed, 1 insertion(+) diff --git a/jdbc/src/main/java/org/apache/shardingsphere/driver/jdbc/core/statement/ShardingSpherePreparedStatement.java b/jdbc/src/main/java/org/apache/shardingsphere/driver/jdbc/core/statement/ShardingSpherePreparedStatement.java index d798003c08fee..699c45e7b6269 100644 --- a/jdbc/src/main/java/org/apache/shardingsphere/driver/jdbc/core/statement/ShardingSpherePreparedStatement.java +++ b/jdbc/src/main/java/org/apache/shardingsphere/driver/jdbc/core/statement/ShardingSpherePreparedStatement.java @@ -417,6 +417,7 @@ public boolean execute() throws SQLException { currentResultSet = resultSet; return null != resultSet; } + currentResultSet = null; executionContext = createExecutionContext(queryContext); if (hasRawExecutionRule()) { Collection results = From 09f88a1f1bf26f3cf8d878521d2547d01f409be5 Mon Sep 17 00:00:00 2001 From: terrymanu Date: Sun, 26 May 2024 17:15:17 +0800 Subject: [PATCH 04/12] Remove ShardingSpherePreparedStatement --- .../proxy/backend/connector/DatabaseConnector.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/proxy/backend/core/src/main/java/org/apache/shardingsphere/proxy/backend/connector/DatabaseConnector.java b/proxy/backend/core/src/main/java/org/apache/shardingsphere/proxy/backend/connector/DatabaseConnector.java index cd6c51bcb2c72..32dc87430d75d 100644 --- a/proxy/backend/core/src/main/java/org/apache/shardingsphere/proxy/backend/connector/DatabaseConnector.java +++ b/proxy/backend/core/src/main/java/org/apache/shardingsphere/proxy/backend/connector/DatabaseConnector.java @@ -120,7 +120,7 @@ public DatabaseConnector(final String driverType, final ShardingSphereDatabase d this.driverType = driverType; this.database = database; this.queryContext = queryContext; - this.selectContainsEnhancedTable = sqlStatementContext instanceof SelectStatementContext && ((SelectStatementContext) sqlStatementContext).isContainsEnhancedTable(); + selectContainsEnhancedTable = sqlStatementContext instanceof SelectStatementContext && ((SelectStatementContext) sqlStatementContext).isContainsEnhancedTable(); this.databaseConnectionManager = databaseConnectionManager; if (sqlStatementContext instanceof CursorAvailable) { prepareCursorStatementContext((CursorAvailable) sqlStatementContext, databaseConnectionManager.getConnectionSession()); From 0f528fc6244016ddadff49933c7fa75cfff72c6f Mon Sep 17 00:00:00 2001 From: terrymanu Date: Sun, 26 May 2024 17:21:05 +0800 Subject: [PATCH 05/12] Remove ShardingSpherePreparedStatement --- .../jdbc/core/statement/ShardingSpherePreparedStatement.java | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/jdbc/src/main/java/org/apache/shardingsphere/driver/jdbc/core/statement/ShardingSpherePreparedStatement.java b/jdbc/src/main/java/org/apache/shardingsphere/driver/jdbc/core/statement/ShardingSpherePreparedStatement.java index 699c45e7b6269..53b0ff42c7da4 100644 --- a/jdbc/src/main/java/org/apache/shardingsphere/driver/jdbc/core/statement/ShardingSpherePreparedStatement.java +++ b/jdbc/src/main/java/org/apache/shardingsphere/driver/jdbc/core/statement/ShardingSpherePreparedStatement.java @@ -247,7 +247,8 @@ public ResultSet executeQuery() throws SQLException { handleExceptionInTransaction(connection, metaDataContexts); throw SQLExceptionTransformEngine.toSQLException(ex, metaDataContexts.getMetaData().getDatabase(databaseName).getProtocolType()); } finally { - clearBatch(); + batchPreparedStatementExecutor.clear(); + clearParameters(); } } From cd14681e0a0135397997d25291d44d02756869c7 Mon Sep 17 00:00:00 2001 From: terrymanu Date: Sun, 26 May 2024 17:23:45 +0800 Subject: [PATCH 06/12] Remove ShardingSpherePreparedStatement --- .../jdbc/core/statement/ShardingSpherePreparedStatement.java | 1 + 1 file changed, 1 insertion(+) diff --git a/jdbc/src/main/java/org/apache/shardingsphere/driver/jdbc/core/statement/ShardingSpherePreparedStatement.java b/jdbc/src/main/java/org/apache/shardingsphere/driver/jdbc/core/statement/ShardingSpherePreparedStatement.java index 53b0ff42c7da4..227cfc2fbe959 100644 --- a/jdbc/src/main/java/org/apache/shardingsphere/driver/jdbc/core/statement/ShardingSpherePreparedStatement.java +++ b/jdbc/src/main/java/org/apache/shardingsphere/driver/jdbc/core/statement/ShardingSpherePreparedStatement.java @@ -582,6 +582,7 @@ private void replaySetParameter() throws SQLException { } private void clearPrevious() { + currentResultSet = null; statements.clear(); parameterSets.clear(); generatedValues.clear(); From fe18f6058d26cea21e6164dba00fada642b8ddea Mon Sep 17 00:00:00 2001 From: terrymanu Date: Sun, 26 May 2024 21:21:13 +0800 Subject: [PATCH 07/12] Refactor ShardingSpherePreparedStatement --- .../jdbc/core/statement/ShardingSpherePreparedStatement.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/jdbc/src/main/java/org/apache/shardingsphere/driver/jdbc/core/statement/ShardingSpherePreparedStatement.java b/jdbc/src/main/java/org/apache/shardingsphere/driver/jdbc/core/statement/ShardingSpherePreparedStatement.java index 227cfc2fbe959..46a03a78afcbb 100644 --- a/jdbc/src/main/java/org/apache/shardingsphere/driver/jdbc/core/statement/ShardingSpherePreparedStatement.java +++ b/jdbc/src/main/java/org/apache/shardingsphere/driver/jdbc/core/statement/ShardingSpherePreparedStatement.java @@ -230,8 +230,8 @@ public ResultSet executeQuery() throws SQLException { handleAutoCommit(queryContext); String trafficInstanceId = getInstanceIdAndSet(queryContext).orElse(null); if (null != trafficInstanceId) { - JDBCExecutionUnit executionUnit = createTrafficExecutionUnit(trafficInstanceId, queryContext); - currentResultSet = executor.getTrafficExecutor().execute(executionUnit, (statement, sql) -> ((PreparedStatement) statement).executeQuery()); + currentResultSet = executor.getTrafficExecutor().execute( + createTrafficExecutionUnit(trafficInstanceId, queryContext), (statement, sql) -> ((PreparedStatement) statement).executeQuery()); return currentResultSet; } if (decide(queryContext, metaDataContexts.getMetaData().getDatabase(databaseName), metaDataContexts.getMetaData().getGlobalRuleMetaData())) { From d0a5ed103c553bfd0919419a772fa89fe3c53cef Mon Sep 17 00:00:00 2001 From: terrymanu Date: Sun, 26 May 2024 21:29:57 +0800 Subject: [PATCH 08/12] Refactor ShardingSpherePreparedStatement --- .../statement/ShardingSpherePreparedStatement.java | 10 +++++----- .../jdbc/core/statement/ShardingSphereStatement.java | 10 +++++----- 2 files changed, 10 insertions(+), 10 deletions(-) diff --git a/jdbc/src/main/java/org/apache/shardingsphere/driver/jdbc/core/statement/ShardingSpherePreparedStatement.java b/jdbc/src/main/java/org/apache/shardingsphere/driver/jdbc/core/statement/ShardingSpherePreparedStatement.java index 46a03a78afcbb..2ad38cd815d9f 100644 --- a/jdbc/src/main/java/org/apache/shardingsphere/driver/jdbc/core/statement/ShardingSpherePreparedStatement.java +++ b/jdbc/src/main/java/org/apache/shardingsphere/driver/jdbc/core/statement/ShardingSpherePreparedStatement.java @@ -227,7 +227,7 @@ public ResultSet executeQuery() throws SQLException { } clearPrevious(); QueryContext queryContext = createQueryContext(); - handleAutoCommit(queryContext); + handleAutoCommit(queryContext.getSqlStatementContext().getSqlStatement()); String trafficInstanceId = getInstanceIdAndSet(queryContext).orElse(null); if (null != trafficInstanceId) { currentResultSet = executor.getTrafficExecutor().execute( @@ -266,8 +266,8 @@ private boolean decide(final QueryContext queryContext, final ShardingSphereData return executor.getSqlFederationEngine().decide(queryContext.getSqlStatementContext(), queryContext.getParameters(), database, globalRuleMetaData); } - private void handleAutoCommit(final QueryContext queryContext) throws SQLException { - if (AutoCommitUtils.needOpenTransaction(queryContext.getSqlStatementContext().getSqlStatement())) { + private void handleAutoCommit(final SQLStatement sqlStatement) throws SQLException { + if (AutoCommitUtils.needOpenTransaction(sqlStatement)) { connection.handleAutoCommit(); } } @@ -341,7 +341,7 @@ public int executeUpdate() throws SQLException { } clearPrevious(); QueryContext queryContext = createQueryContext(); - handleAutoCommit(queryContext); + handleAutoCommit(queryContext.getSqlStatementContext().getSqlStatement()); String trafficInstanceId = getInstanceIdAndSet(queryContext).orElse(null); if (null != trafficInstanceId) { JDBCExecutionUnit executionUnit = createTrafficExecutionUnit(trafficInstanceId, queryContext); @@ -405,7 +405,7 @@ public boolean execute() throws SQLException { } clearPrevious(); QueryContext queryContext = createQueryContext(); - handleAutoCommit(queryContext); + handleAutoCommit(queryContext.getSqlStatementContext().getSqlStatement()); String trafficInstanceId = getInstanceIdAndSet(queryContext).orElse(null); if (null != trafficInstanceId) { JDBCExecutionUnit executionUnit = createTrafficExecutionUnit(trafficInstanceId, queryContext); diff --git a/jdbc/src/main/java/org/apache/shardingsphere/driver/jdbc/core/statement/ShardingSphereStatement.java b/jdbc/src/main/java/org/apache/shardingsphere/driver/jdbc/core/statement/ShardingSphereStatement.java index abe0ed5e0a559..70b99d3dd07b3 100644 --- a/jdbc/src/main/java/org/apache/shardingsphere/driver/jdbc/core/statement/ShardingSphereStatement.java +++ b/jdbc/src/main/java/org/apache/shardingsphere/driver/jdbc/core/statement/ShardingSphereStatement.java @@ -158,7 +158,7 @@ public ResultSet executeQuery(final String sql) throws SQLException { ShardingSpherePreconditions.checkNotEmpty(sql, () -> new EmptySQLException().toSQLException()); try { QueryContext queryContext = createQueryContext(sql); - handleAutoCommit(queryContext); + handleAutoCommit(queryContext.getSqlStatementContext().getSqlStatement()); databaseName = queryContext.getDatabaseNameFromSQLStatement().orElse(connection.getDatabaseName()); connection.getDatabaseConnectionManager().getConnectionContext().setCurrentDatabase(databaseName); String trafficInstanceId = getInstanceIdAndSet(queryContext).orElse(null); @@ -311,7 +311,7 @@ private int executeUpdate(final ExecuteUpdateCallback updateCallback, final SQLS private int executeUpdate0(final String sql, final ExecuteUpdateCallback updateCallback, final TrafficExecutorCallback trafficCallback) throws SQLException { QueryContext queryContext = createQueryContext(sql); - handleAutoCommit(queryContext); + handleAutoCommit(queryContext.getSqlStatementContext().getSqlStatement()); databaseName = queryContext.getDatabaseNameFromSQLStatement().orElse(connection.getDatabaseName()); connection.getDatabaseConnectionManager().getConnectionContext().setCurrentDatabase(databaseName); String trafficInstanceId = getInstanceIdAndSet(queryContext).orElse(null); @@ -416,7 +416,7 @@ public boolean execute(final String sql, final String[] columnNames) throws SQLE private boolean execute0(final String sql, final ExecuteCallback executeCallback, final TrafficExecutorCallback trafficCallback) throws SQLException { QueryContext queryContext = createQueryContext(sql); - handleAutoCommit(queryContext); + handleAutoCommit(queryContext.getSqlStatementContext().getSqlStatement()); databaseName = queryContext.getDatabaseNameFromSQLStatement().orElse(connection.getDatabaseName()); connection.getDatabaseConnectionManager().getConnectionContext().setCurrentDatabase(databaseName); String trafficInstanceId = getInstanceIdAndSet(queryContext).orElse(null); @@ -440,8 +440,8 @@ private boolean execute0(final String sql, final ExecuteCallback executeCallback return executeWithExecutionContext(executeCallback, executionContext); } - private void handleAutoCommit(final QueryContext queryContext) throws SQLException { - if (AutoCommitUtils.needOpenTransaction(queryContext.getSqlStatementContext().getSqlStatement())) { + private void handleAutoCommit(final SQLStatement sqlStatement) throws SQLException { + if (AutoCommitUtils.needOpenTransaction(sqlStatement)) { connection.handleAutoCommit(); } } From f11b7802fd839fcc0ae084be0399efe14d366ff6 Mon Sep 17 00:00:00 2001 From: terrymanu Date: Sun, 26 May 2024 21:32:10 +0800 Subject: [PATCH 09/12] Refactor ShardingSpherePreparedStatement --- .../core/statement/ShardingSpherePreparedStatement.java | 6 ++---- .../driver/jdbc/core/statement/ShardingSphereStatement.java | 6 ++---- 2 files changed, 4 insertions(+), 8 deletions(-) diff --git a/jdbc/src/main/java/org/apache/shardingsphere/driver/jdbc/core/statement/ShardingSpherePreparedStatement.java b/jdbc/src/main/java/org/apache/shardingsphere/driver/jdbc/core/statement/ShardingSpherePreparedStatement.java index 2ad38cd815d9f..70186b05498df 100644 --- a/jdbc/src/main/java/org/apache/shardingsphere/driver/jdbc/core/statement/ShardingSpherePreparedStatement.java +++ b/jdbc/src/main/java/org/apache/shardingsphere/driver/jdbc/core/statement/ShardingSpherePreparedStatement.java @@ -344,8 +344,7 @@ public int executeUpdate() throws SQLException { handleAutoCommit(queryContext.getSqlStatementContext().getSqlStatement()); String trafficInstanceId = getInstanceIdAndSet(queryContext).orElse(null); if (null != trafficInstanceId) { - JDBCExecutionUnit executionUnit = createTrafficExecutionUnit(trafficInstanceId, queryContext); - return executor.getTrafficExecutor().execute(executionUnit, (statement, sql) -> ((PreparedStatement) statement).executeUpdate()); + return executor.getTrafficExecutor().execute(createTrafficExecutionUnit(trafficInstanceId, queryContext), (statement, sql) -> ((PreparedStatement) statement).executeUpdate()); } executionContext = createExecutionContext(queryContext); if (hasRawExecutionRule()) { @@ -408,8 +407,7 @@ public boolean execute() throws SQLException { handleAutoCommit(queryContext.getSqlStatementContext().getSqlStatement()); String trafficInstanceId = getInstanceIdAndSet(queryContext).orElse(null); if (null != trafficInstanceId) { - JDBCExecutionUnit executionUnit = createTrafficExecutionUnit(trafficInstanceId, queryContext); - boolean result = executor.getTrafficExecutor().execute(executionUnit, (statement, sql) -> ((PreparedStatement) statement).execute()); + boolean result = executor.getTrafficExecutor().execute(createTrafficExecutionUnit(trafficInstanceId, queryContext), (statement, sql) -> ((PreparedStatement) statement).execute()); currentResultSet = executor.getTrafficExecutor().getResultSet(); return result; } diff --git a/jdbc/src/main/java/org/apache/shardingsphere/driver/jdbc/core/statement/ShardingSphereStatement.java b/jdbc/src/main/java/org/apache/shardingsphere/driver/jdbc/core/statement/ShardingSphereStatement.java index 70b99d3dd07b3..218714c23a8d4 100644 --- a/jdbc/src/main/java/org/apache/shardingsphere/driver/jdbc/core/statement/ShardingSphereStatement.java +++ b/jdbc/src/main/java/org/apache/shardingsphere/driver/jdbc/core/statement/ShardingSphereStatement.java @@ -316,8 +316,7 @@ private int executeUpdate0(final String sql, final ExecuteUpdateCallback updateC connection.getDatabaseConnectionManager().getConnectionContext().setCurrentDatabase(databaseName); String trafficInstanceId = getInstanceIdAndSet(queryContext).orElse(null); if (null != trafficInstanceId) { - JDBCExecutionUnit executionUnit = createTrafficExecutionUnit(trafficInstanceId, queryContext); - return executor.getTrafficExecutor().execute(executionUnit, trafficCallback); + return executor.getTrafficExecutor().execute(createTrafficExecutionUnit(trafficInstanceId, queryContext), trafficCallback); } executionContext = createExecutionContext(queryContext); if (!metaDataContexts.getMetaData().getDatabase(databaseName).getRuleMetaData().getAttributes(RawExecutionRuleAttribute.class).isEmpty()) { @@ -421,8 +420,7 @@ private boolean execute0(final String sql, final ExecuteCallback executeCallback connection.getDatabaseConnectionManager().getConnectionContext().setCurrentDatabase(databaseName); String trafficInstanceId = getInstanceIdAndSet(queryContext).orElse(null); if (null != trafficInstanceId) { - JDBCExecutionUnit executionUnit = createTrafficExecutionUnit(trafficInstanceId, queryContext); - boolean result = executor.getTrafficExecutor().execute(executionUnit, trafficCallback); + boolean result = executor.getTrafficExecutor().execute(createTrafficExecutionUnit(trafficInstanceId, queryContext), trafficCallback); currentResultSet = executor.getTrafficExecutor().getResultSet(); return result; } From 39cca6e1440241396ef98ccf8ac6b46212e96d9b Mon Sep 17 00:00:00 2001 From: terrymanu Date: Sun, 26 May 2024 21:36:00 +0800 Subject: [PATCH 10/12] Refactor ShardingSpherePreparedStatement --- .../jdbc/core/statement/ShardingSpherePreparedStatement.java | 4 +--- .../driver/jdbc/core/statement/ShardingSphereStatement.java | 2 +- 2 files changed, 2 insertions(+), 4 deletions(-) diff --git a/jdbc/src/main/java/org/apache/shardingsphere/driver/jdbc/core/statement/ShardingSpherePreparedStatement.java b/jdbc/src/main/java/org/apache/shardingsphere/driver/jdbc/core/statement/ShardingSpherePreparedStatement.java index 70186b05498df..0a0b9644236e6 100644 --- a/jdbc/src/main/java/org/apache/shardingsphere/driver/jdbc/core/statement/ShardingSpherePreparedStatement.java +++ b/jdbc/src/main/java/org/apache/shardingsphere/driver/jdbc/core/statement/ShardingSpherePreparedStatement.java @@ -41,7 +41,6 @@ import org.apache.shardingsphere.infra.connection.kernel.KernelProcessor; import org.apache.shardingsphere.infra.database.core.type.DatabaseType; import org.apache.shardingsphere.infra.database.mysql.type.MySQLDatabaseType; -import org.apache.shardingsphere.infra.exception.core.ShardingSpherePreconditions; import org.apache.shardingsphere.infra.exception.dialect.SQLExceptionTransformEngine; import org.apache.shardingsphere.infra.exception.kernel.syntax.EmptySQLException; import org.apache.shardingsphere.infra.executor.audit.SQLAuditEngine; @@ -277,8 +276,7 @@ private JDBCExecutionUnit createTrafficExecutionUnit(final String trafficInstanc ExecutionUnit executionUnit = new ExecutionUnit(trafficInstanceId, new SQLUnit(queryContext.getSql(), queryContext.getParameters())); ExecutionGroupContext context = prepareEngine.prepare(new RouteContext(), Collections.singleton(executionUnit), new ExecutionGroupReportContext(connection.getProcessId(), databaseName, new Grantee("", ""))); - ShardingSpherePreconditions.checkState(!context.getInputGroups().isEmpty() && !context.getInputGroups().iterator().next().getInputs().isEmpty(), EmptyTrafficExecutionUnitException::new); - return context.getInputGroups().iterator().next().getInputs().iterator().next(); + return context.getInputGroups().stream().flatMap(each -> each.getInputs().stream()).findFirst().orElseThrow(EmptyTrafficExecutionUnitException::new); } private Optional getInstanceIdAndSet(final QueryContext queryContext) { diff --git a/jdbc/src/main/java/org/apache/shardingsphere/driver/jdbc/core/statement/ShardingSphereStatement.java b/jdbc/src/main/java/org/apache/shardingsphere/driver/jdbc/core/statement/ShardingSphereStatement.java index 218714c23a8d4..5455b80c4813f 100644 --- a/jdbc/src/main/java/org/apache/shardingsphere/driver/jdbc/core/statement/ShardingSphereStatement.java +++ b/jdbc/src/main/java/org/apache/shardingsphere/driver/jdbc/core/statement/ShardingSphereStatement.java @@ -448,7 +448,7 @@ private JDBCExecutionUnit createTrafficExecutionUnit(final String trafficInstanc DriverExecutionPrepareEngine prepareEngine = createDriverExecutionPrepareEngine(); ExecutionUnit executionUnit = new ExecutionUnit(trafficInstanceId, new SQLUnit(queryContext.getSql(), queryContext.getParameters())); ExecutionGroupContext context = - prepareEngine.prepare(new RouteContext(), Collections.singletonList(executionUnit), new ExecutionGroupReportContext(connection.getProcessId(), databaseName, new Grantee("", ""))); + prepareEngine.prepare(new RouteContext(), Collections.singleton(executionUnit), new ExecutionGroupReportContext(connection.getProcessId(), databaseName, new Grantee("", ""))); return context.getInputGroups().stream().flatMap(each -> each.getInputs().stream()).findFirst().orElseThrow(EmptyTrafficExecutionUnitException::new); } From 14154b0dcd358482d45ae4fbff9c7aa66e66c840 Mon Sep 17 00:00:00 2001 From: terrymanu Date: Sun, 26 May 2024 21:42:26 +0800 Subject: [PATCH 11/12] Refactor ShardingSpherePreparedStatement --- .../ShardingSpherePreparedStatement.java | 20 ++++++++++--------- .../statement/ShardingSphereStatement.java | 19 +++++++++--------- 2 files changed, 21 insertions(+), 18 deletions(-) diff --git a/jdbc/src/main/java/org/apache/shardingsphere/driver/jdbc/core/statement/ShardingSpherePreparedStatement.java b/jdbc/src/main/java/org/apache/shardingsphere/driver/jdbc/core/statement/ShardingSpherePreparedStatement.java index 0a0b9644236e6..c0b69932ecc20 100644 --- a/jdbc/src/main/java/org/apache/shardingsphere/driver/jdbc/core/statement/ShardingSpherePreparedStatement.java +++ b/jdbc/src/main/java/org/apache/shardingsphere/driver/jdbc/core/statement/ShardingSpherePreparedStatement.java @@ -272,7 +272,8 @@ private void handleAutoCommit(final SQLStatement sqlStatement) throws SQLExcepti } private JDBCExecutionUnit createTrafficExecutionUnit(final String trafficInstanceId, final QueryContext queryContext) throws SQLException { - DriverExecutionPrepareEngine prepareEngine = createDriverExecutionPrepareEngine(); + ShardingSphereDatabase database = metaDataContexts.getMetaData().getDatabase(databaseName); + DriverExecutionPrepareEngine prepareEngine = createDriverExecutionPrepareEngine(database); ExecutionUnit executionUnit = new ExecutionUnit(trafficInstanceId, new SQLUnit(queryContext.getSql(), queryContext.getParameters())); ExecutionGroupContext context = prepareEngine.prepare(new RouteContext(), Collections.singleton(executionUnit), new ExecutionGroupReportContext(connection.getProcessId(), databaseName, new Grantee("", ""))); @@ -317,17 +318,17 @@ private List executeQuery0(final ExecutionContext executionContext) } private ResultSet executeFederationQuery(final QueryContext queryContext) { - PreparedStatementExecuteQueryCallback callback = new PreparedStatementExecuteQueryCallback(metaDataContexts.getMetaData().getDatabase(databaseName).getProtocolType(), - metaDataContexts.getMetaData().getDatabase(databaseName).getResourceMetaData(), sqlStatement, SQLExecutorExceptionHandler.isExceptionThrown()); + ShardingSphereDatabase database = metaDataContexts.getMetaData().getDatabase(databaseName); + PreparedStatementExecuteQueryCallback callback = new PreparedStatementExecuteQueryCallback(database.getProtocolType(), + database.getResourceMetaData(), sqlStatement, SQLExecutorExceptionHandler.isExceptionThrown()); SQLFederationContext context = new SQLFederationContext(false, queryContext, metaDataContexts.getMetaData(), connection.getProcessId()); - return executor.getSqlFederationEngine().executeQuery(createDriverExecutionPrepareEngine(), callback, context); + return executor.getSqlFederationEngine().executeQuery(createDriverExecutionPrepareEngine(database), callback, context); } - private DriverExecutionPrepareEngine createDriverExecutionPrepareEngine() { + private DriverExecutionPrepareEngine createDriverExecutionPrepareEngine(final ShardingSphereDatabase database) { int maxConnectionsSizePerQuery = metaDataContexts.getMetaData().getProps().getValue(ConfigurationPropertyKey.MAX_CONNECTIONS_SIZE_PER_QUERY); - return new DriverExecutionPrepareEngine<>(JDBCDriverType.PREPARED_STATEMENT, maxConnectionsSizePerQuery, connection.getDatabaseConnectionManager(), statementManager, - statementOption, metaDataContexts.getMetaData().getDatabase(databaseName).getRuleMetaData().getRules(), - metaDataContexts.getMetaData().getDatabase(databaseName).getResourceMetaData().getStorageUnits()); + return new DriverExecutionPrepareEngine<>(JDBCDriverType.PREPARED_STATEMENT, maxConnectionsSizePerQuery, connection.getDatabaseConnectionManager(), statementManager, statementOption, + database.getRuleMetaData().getRules(), database.getResourceMetaData().getStorageUnits()); } @Override @@ -480,7 +481,8 @@ protected Optional getSaneResult(final SQLStatement sqlStatement, final } private ExecutionGroupContext createExecutionGroupContext(final ExecutionContext executionContext) throws SQLException { - DriverExecutionPrepareEngine prepareEngine = createDriverExecutionPrepareEngine(); + ShardingSphereDatabase database = metaDataContexts.getMetaData().getDatabase(databaseName); + DriverExecutionPrepareEngine prepareEngine = createDriverExecutionPrepareEngine(database); return prepareEngine.prepare(executionContext.getRouteContext(), executionContext.getExecutionUnits(), new ExecutionGroupReportContext(connection.getProcessId(), databaseName, new Grantee("", ""))); } diff --git a/jdbc/src/main/java/org/apache/shardingsphere/driver/jdbc/core/statement/ShardingSphereStatement.java b/jdbc/src/main/java/org/apache/shardingsphere/driver/jdbc/core/statement/ShardingSphereStatement.java index 5455b80c4813f..00e2bb9bfe797 100644 --- a/jdbc/src/main/java/org/apache/shardingsphere/driver/jdbc/core/statement/ShardingSphereStatement.java +++ b/jdbc/src/main/java/org/apache/shardingsphere/driver/jdbc/core/statement/ShardingSphereStatement.java @@ -226,18 +226,17 @@ private List executeQuery0(final ExecutionContext executionContext) } private ResultSet executeFederationQuery(final QueryContext queryContext) { - StatementExecuteQueryCallback callback = new StatementExecuteQueryCallback(metaDataContexts.getMetaData().getDatabase(databaseName).getProtocolType(), - metaDataContexts.getMetaData().getDatabase(databaseName).getResourceMetaData(), queryContext.getSqlStatementContext().getSqlStatement(), - SQLExecutorExceptionHandler.isExceptionThrown()); + ShardingSphereDatabase database = metaDataContexts.getMetaData().getDatabase(databaseName); + StatementExecuteQueryCallback callback = new StatementExecuteQueryCallback(database.getProtocolType(), + database.getResourceMetaData(), queryContext.getSqlStatementContext().getSqlStatement(), SQLExecutorExceptionHandler.isExceptionThrown()); SQLFederationContext context = new SQLFederationContext(false, queryContext, metaDataContexts.getMetaData(), connection.getProcessId()); - return executor.getSqlFederationEngine().executeQuery(createDriverExecutionPrepareEngine(), callback, context); + return executor.getSqlFederationEngine().executeQuery(createDriverExecutionPrepareEngine(database), callback, context); } - private DriverExecutionPrepareEngine createDriverExecutionPrepareEngine() { + private DriverExecutionPrepareEngine createDriverExecutionPrepareEngine(final ShardingSphereDatabase database) { int maxConnectionsSizePerQuery = metaDataContexts.getMetaData().getProps().getValue(ConfigurationPropertyKey.MAX_CONNECTIONS_SIZE_PER_QUERY); return new DriverExecutionPrepareEngine<>(JDBCDriverType.STATEMENT, maxConnectionsSizePerQuery, connection.getDatabaseConnectionManager(), statementManager, statementOption, - metaDataContexts.getMetaData().getDatabase(databaseName).getRuleMetaData().getRules(), - metaDataContexts.getMetaData().getDatabase(databaseName).getResourceMetaData().getStorageUnits()); + database.getRuleMetaData().getRules(), database.getResourceMetaData().getStorageUnits()); } @Override @@ -445,7 +444,8 @@ private void handleAutoCommit(final SQLStatement sqlStatement) throws SQLExcepti } private JDBCExecutionUnit createTrafficExecutionUnit(final String trafficInstanceId, final QueryContext queryContext) throws SQLException { - DriverExecutionPrepareEngine prepareEngine = createDriverExecutionPrepareEngine(); + ShardingSphereDatabase database = metaDataContexts.getMetaData().getDatabase(databaseName); + DriverExecutionPrepareEngine prepareEngine = createDriverExecutionPrepareEngine(database); ExecutionUnit executionUnit = new ExecutionUnit(trafficInstanceId, new SQLUnit(queryContext.getSql(), queryContext.getParameters())); ExecutionGroupContext context = prepareEngine.prepare(new RouteContext(), Collections.singleton(executionUnit), new ExecutionGroupReportContext(connection.getProcessId(), databaseName, new Grantee("", ""))); @@ -493,7 +493,8 @@ private ExecutionContext createExecutionContext(final QueryContext queryContext) } private ExecutionGroupContext createExecutionGroupContext(final ExecutionContext executionContext) throws SQLException { - DriverExecutionPrepareEngine prepareEngine = createDriverExecutionPrepareEngine(); + ShardingSphereDatabase database = metaDataContexts.getMetaData().getDatabase(databaseName); + DriverExecutionPrepareEngine prepareEngine = createDriverExecutionPrepareEngine(database); return prepareEngine.prepare(executionContext.getRouteContext(), executionContext.getExecutionUnits(), new ExecutionGroupReportContext(connection.getProcessId(), databaseName, new Grantee("", ""))); } From e59fadda277c159cf4e1ae0ff7d25b9c7b8af8ea Mon Sep 17 00:00:00 2001 From: terrymanu Date: Sun, 26 May 2024 22:33:07 +0800 Subject: [PATCH 12/12] Refactor ShardingSpherePreparedStatement --- .../ShardingSpherePreparedStatement.java | 15 ++++-- .../statement/ShardingSphereStatement.java | 27 ++++------ .../traffic/executor/TrafficExecutor.java | 41 ++++++++++++++++ .../traffic/executor/TrafficExecutorTest.java | 49 ------------------- 4 files changed, 61 insertions(+), 71 deletions(-) delete mode 100644 kernel/traffic/core/src/test/java/org/apache/shardingsphere/traffic/executor/TrafficExecutorTest.java diff --git a/jdbc/src/main/java/org/apache/shardingsphere/driver/jdbc/core/statement/ShardingSpherePreparedStatement.java b/jdbc/src/main/java/org/apache/shardingsphere/driver/jdbc/core/statement/ShardingSpherePreparedStatement.java index c0b69932ecc20..b7dc4c334e54e 100644 --- a/jdbc/src/main/java/org/apache/shardingsphere/driver/jdbc/core/statement/ShardingSpherePreparedStatement.java +++ b/jdbc/src/main/java/org/apache/shardingsphere/driver/jdbc/core/statement/ShardingSpherePreparedStatement.java @@ -227,13 +227,14 @@ public ResultSet executeQuery() throws SQLException { clearPrevious(); QueryContext queryContext = createQueryContext(); handleAutoCommit(queryContext.getSqlStatementContext().getSqlStatement()); + ShardingSphereDatabase database = metaDataContexts.getMetaData().getDatabase(databaseName); String trafficInstanceId = getInstanceIdAndSet(queryContext).orElse(null); if (null != trafficInstanceId) { - currentResultSet = executor.getTrafficExecutor().execute( - createTrafficExecutionUnit(trafficInstanceId, queryContext), (statement, sql) -> ((PreparedStatement) statement).executeQuery()); + currentResultSet = executor.getTrafficExecutor().execute(connection.getProcessId(), databaseName, + trafficInstanceId, queryContext, createDriverExecutionPrepareEngine(database), (statement, sql) -> ((PreparedStatement) statement).executeQuery()); return currentResultSet; } - if (decide(queryContext, metaDataContexts.getMetaData().getDatabase(databaseName), metaDataContexts.getMetaData().getGlobalRuleMetaData())) { + if (decide(queryContext, database, metaDataContexts.getMetaData().getGlobalRuleMetaData())) { currentResultSet = executeFederationQuery(queryContext); return currentResultSet; } @@ -343,7 +344,9 @@ public int executeUpdate() throws SQLException { handleAutoCommit(queryContext.getSqlStatementContext().getSqlStatement()); String trafficInstanceId = getInstanceIdAndSet(queryContext).orElse(null); if (null != trafficInstanceId) { - return executor.getTrafficExecutor().execute(createTrafficExecutionUnit(trafficInstanceId, queryContext), (statement, sql) -> ((PreparedStatement) statement).executeUpdate()); + ShardingSphereDatabase database = metaDataContexts.getMetaData().getDatabase(databaseName); + return executor.getTrafficExecutor().execute(connection.getProcessId(), databaseName, + trafficInstanceId, queryContext, createDriverExecutionPrepareEngine(database), (statement, sql) -> ((PreparedStatement) statement).executeUpdate()); } executionContext = createExecutionContext(queryContext); if (hasRawExecutionRule()) { @@ -406,7 +409,9 @@ public boolean execute() throws SQLException { handleAutoCommit(queryContext.getSqlStatementContext().getSqlStatement()); String trafficInstanceId = getInstanceIdAndSet(queryContext).orElse(null); if (null != trafficInstanceId) { - boolean result = executor.getTrafficExecutor().execute(createTrafficExecutionUnit(trafficInstanceId, queryContext), (statement, sql) -> ((PreparedStatement) statement).execute()); + ShardingSphereDatabase database = metaDataContexts.getMetaData().getDatabase(databaseName); + boolean result = executor.getTrafficExecutor().execute(connection.getProcessId(), databaseName, + trafficInstanceId, queryContext, createDriverExecutionPrepareEngine(database), (statement, sql) -> ((PreparedStatement) statement).execute()); currentResultSet = executor.getTrafficExecutor().getResultSet(); return result; } diff --git a/jdbc/src/main/java/org/apache/shardingsphere/driver/jdbc/core/statement/ShardingSphereStatement.java b/jdbc/src/main/java/org/apache/shardingsphere/driver/jdbc/core/statement/ShardingSphereStatement.java index 00e2bb9bfe797..d2b2e12dfe2c0 100644 --- a/jdbc/src/main/java/org/apache/shardingsphere/driver/jdbc/core/statement/ShardingSphereStatement.java +++ b/jdbc/src/main/java/org/apache/shardingsphere/driver/jdbc/core/statement/ShardingSphereStatement.java @@ -46,8 +46,6 @@ import org.apache.shardingsphere.infra.executor.kernel.model.ExecutionGroupContext; import org.apache.shardingsphere.infra.executor.kernel.model.ExecutionGroupReportContext; import org.apache.shardingsphere.infra.executor.sql.context.ExecutionContext; -import org.apache.shardingsphere.infra.executor.sql.context.ExecutionUnit; -import org.apache.shardingsphere.infra.executor.sql.context.SQLUnit; import org.apache.shardingsphere.infra.executor.sql.execute.engine.ConnectionMode; import org.apache.shardingsphere.infra.executor.sql.execute.engine.SQLExecutorExceptionHandler; import org.apache.shardingsphere.infra.executor.sql.execute.engine.driver.jdbc.JDBCExecutionUnit; @@ -70,7 +68,6 @@ import org.apache.shardingsphere.infra.metadata.database.ShardingSphereDatabase; import org.apache.shardingsphere.infra.metadata.database.rule.RuleMetaData; import org.apache.shardingsphere.infra.metadata.user.Grantee; -import org.apache.shardingsphere.infra.route.context.RouteContext; import org.apache.shardingsphere.infra.rule.attribute.datanode.DataNodeRuleAttribute; import org.apache.shardingsphere.infra.rule.attribute.raw.RawExecutionRuleAttribute; import org.apache.shardingsphere.infra.session.query.QueryContext; @@ -80,7 +77,6 @@ import org.apache.shardingsphere.sql.parser.sql.common.statement.dal.DALStatement; import org.apache.shardingsphere.sqlfederation.executor.context.SQLFederationContext; import org.apache.shardingsphere.traffic.engine.TrafficEngine; -import org.apache.shardingsphere.traffic.exception.EmptyTrafficExecutionUnitException; import org.apache.shardingsphere.traffic.executor.TrafficExecutorCallback; import org.apache.shardingsphere.traffic.rule.TrafficRule; import org.apache.shardingsphere.transaction.util.AutoCommitUtils; @@ -161,12 +157,14 @@ public ResultSet executeQuery(final String sql) throws SQLException { handleAutoCommit(queryContext.getSqlStatementContext().getSqlStatement()); databaseName = queryContext.getDatabaseNameFromSQLStatement().orElse(connection.getDatabaseName()); connection.getDatabaseConnectionManager().getConnectionContext().setCurrentDatabase(databaseName); + ShardingSphereDatabase database = metaDataContexts.getMetaData().getDatabase(databaseName); String trafficInstanceId = getInstanceIdAndSet(queryContext).orElse(null); if (null != trafficInstanceId) { - currentResultSet = executor.getTrafficExecutor().execute(createTrafficExecutionUnit(trafficInstanceId, queryContext), Statement::executeQuery); + currentResultSet = executor.getTrafficExecutor().execute( + connection.getProcessId(), databaseName, trafficInstanceId, queryContext, createDriverExecutionPrepareEngine(database), Statement::executeQuery); return currentResultSet; } - if (decide(queryContext, metaDataContexts.getMetaData().getDatabase(databaseName), metaDataContexts.getMetaData().getGlobalRuleMetaData())) { + if (decide(queryContext, database, metaDataContexts.getMetaData().getGlobalRuleMetaData())) { currentResultSet = executeFederationQuery(queryContext); return currentResultSet; } @@ -315,7 +313,9 @@ private int executeUpdate0(final String sql, final ExecuteUpdateCallback updateC connection.getDatabaseConnectionManager().getConnectionContext().setCurrentDatabase(databaseName); String trafficInstanceId = getInstanceIdAndSet(queryContext).orElse(null); if (null != trafficInstanceId) { - return executor.getTrafficExecutor().execute(createTrafficExecutionUnit(trafficInstanceId, queryContext), trafficCallback); + ShardingSphereDatabase database = metaDataContexts.getMetaData().getDatabase(databaseName); + return executor.getTrafficExecutor().execute( + connection.getProcessId(), databaseName, trafficInstanceId, queryContext, createDriverExecutionPrepareEngine(database), trafficCallback); } executionContext = createExecutionContext(queryContext); if (!metaDataContexts.getMetaData().getDatabase(databaseName).getRuleMetaData().getAttributes(RawExecutionRuleAttribute.class).isEmpty()) { @@ -419,7 +419,9 @@ private boolean execute0(final String sql, final ExecuteCallback executeCallback connection.getDatabaseConnectionManager().getConnectionContext().setCurrentDatabase(databaseName); String trafficInstanceId = getInstanceIdAndSet(queryContext).orElse(null); if (null != trafficInstanceId) { - boolean result = executor.getTrafficExecutor().execute(createTrafficExecutionUnit(trafficInstanceId, queryContext), trafficCallback); + ShardingSphereDatabase database = metaDataContexts.getMetaData().getDatabase(databaseName); + boolean result = executor.getTrafficExecutor().execute( + connection.getProcessId(), databaseName, trafficInstanceId, queryContext, createDriverExecutionPrepareEngine(database), trafficCallback); currentResultSet = executor.getTrafficExecutor().getResultSet(); return result; } @@ -443,15 +445,6 @@ private void handleAutoCommit(final SQLStatement sqlStatement) throws SQLExcepti } } - private JDBCExecutionUnit createTrafficExecutionUnit(final String trafficInstanceId, final QueryContext queryContext) throws SQLException { - ShardingSphereDatabase database = metaDataContexts.getMetaData().getDatabase(databaseName); - DriverExecutionPrepareEngine prepareEngine = createDriverExecutionPrepareEngine(database); - ExecutionUnit executionUnit = new ExecutionUnit(trafficInstanceId, new SQLUnit(queryContext.getSql(), queryContext.getParameters())); - ExecutionGroupContext context = - prepareEngine.prepare(new RouteContext(), Collections.singleton(executionUnit), new ExecutionGroupReportContext(connection.getProcessId(), databaseName, new Grantee("", ""))); - return context.getInputGroups().stream().flatMap(each -> each.getInputs().stream()).findFirst().orElseThrow(EmptyTrafficExecutionUnitException::new); - } - private void clearStatements() throws SQLException { for (Statement each : statements) { each.close(); diff --git a/kernel/traffic/core/src/main/java/org/apache/shardingsphere/traffic/executor/TrafficExecutor.java b/kernel/traffic/core/src/main/java/org/apache/shardingsphere/traffic/executor/TrafficExecutor.java index b349a280cb14f..fddefd4740f5d 100644 --- a/kernel/traffic/core/src/main/java/org/apache/shardingsphere/traffic/executor/TrafficExecutor.java +++ b/kernel/traffic/core/src/main/java/org/apache/shardingsphere/traffic/executor/TrafficExecutor.java @@ -18,13 +18,23 @@ package org.apache.shardingsphere.traffic.executor; import lombok.Getter; +import org.apache.shardingsphere.infra.executor.kernel.model.ExecutionGroupContext; +import org.apache.shardingsphere.infra.executor.kernel.model.ExecutionGroupReportContext; +import org.apache.shardingsphere.infra.executor.sql.context.ExecutionUnit; import org.apache.shardingsphere.infra.executor.sql.context.SQLUnit; import org.apache.shardingsphere.infra.executor.sql.execute.engine.driver.jdbc.JDBCExecutionUnit; +import org.apache.shardingsphere.infra.executor.sql.prepare.driver.DriverExecutionPrepareEngine; +import org.apache.shardingsphere.infra.metadata.user.Grantee; +import org.apache.shardingsphere.infra.route.context.RouteContext; +import org.apache.shardingsphere.infra.session.query.QueryContext; +import org.apache.shardingsphere.traffic.exception.EmptyTrafficExecutionUnitException; +import java.sql.Connection; import java.sql.PreparedStatement; import java.sql.ResultSet; import java.sql.SQLException; import java.sql.Statement; +import java.util.Collections; import java.util.List; /** @@ -54,6 +64,37 @@ public T execute(final JDBCExecutionUnit executionUnit, final TrafficExecuto return result; } + /** + * Execute. + * + * @param processId process ID + * @param databaseName database name + * @param trafficInstanceId traffic instance ID + * @param queryContext query context + * @param prepareEngine prepare engine + * @param callback callback + * @param return type + * @return execute result + * @throws SQLException SQL exception + */ + public T execute(final String processId, final String databaseName, final String trafficInstanceId, final QueryContext queryContext, + final DriverExecutionPrepareEngine prepareEngine, final TrafficExecutorCallback callback) throws SQLException { + JDBCExecutionUnit executionUnit = createTrafficExecutionUnit(processId, databaseName, trafficInstanceId, queryContext, prepareEngine); + SQLUnit sqlUnit = executionUnit.getExecutionUnit().getSqlUnit(); + cacheStatement(sqlUnit.getParameters(), executionUnit.getStorageResource()); + T result = callback.execute(statement, sqlUnit.getSql()); + resultSet = statement.getResultSet(); + return result; + } + + private JDBCExecutionUnit createTrafficExecutionUnit(final String processId, final String databaseName, final String trafficInstanceId, final QueryContext queryContext, + final DriverExecutionPrepareEngine prepareEngine) throws SQLException { + ExecutionUnit executionUnit = new ExecutionUnit(trafficInstanceId, new SQLUnit(queryContext.getSql(), queryContext.getParameters())); + ExecutionGroupContext context = + prepareEngine.prepare(new RouteContext(), Collections.singleton(executionUnit), new ExecutionGroupReportContext(processId, databaseName, new Grantee("", ""))); + return context.getInputGroups().stream().flatMap(each -> each.getInputs().stream()).findFirst().orElseThrow(EmptyTrafficExecutionUnitException::new); + } + private void cacheStatement(final List params, final Statement statement) throws SQLException { this.statement = statement; setParameters(statement, params); diff --git a/kernel/traffic/core/src/test/java/org/apache/shardingsphere/traffic/executor/TrafficExecutorTest.java b/kernel/traffic/core/src/test/java/org/apache/shardingsphere/traffic/executor/TrafficExecutorTest.java deleted file mode 100644 index 7074ad28b8588..0000000000000 --- a/kernel/traffic/core/src/test/java/org/apache/shardingsphere/traffic/executor/TrafficExecutorTest.java +++ /dev/null @@ -1,49 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.shardingsphere.traffic.executor; - -import org.apache.shardingsphere.infra.executor.sql.context.ExecutionUnit; -import org.apache.shardingsphere.infra.executor.sql.context.SQLUnit; -import org.apache.shardingsphere.infra.executor.sql.execute.engine.driver.jdbc.JDBCExecutionUnit; -import org.junit.jupiter.api.Test; - -import java.sql.SQLException; -import java.sql.Statement; -import java.util.Collections; - -import static org.mockito.Mockito.RETURNS_DEEP_STUBS; -import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.times; -import static org.mockito.Mockito.verify; -import static org.mockito.Mockito.when; - -class TrafficExecutorTest { - - @Test - void assertClose() throws SQLException { - Statement statement = mock(Statement.class, RETURNS_DEEP_STUBS); - try (TrafficExecutor trafficExecutor = new TrafficExecutor()) { - JDBCExecutionUnit executionUnit = mock(JDBCExecutionUnit.class); - when(executionUnit.getExecutionUnit()).thenReturn(new ExecutionUnit("oltp_proxy_instance_id", new SQLUnit("SELECT 1", Collections.emptyList()))); - when(executionUnit.getStorageResource()).thenReturn(statement); - trafficExecutor.execute(executionUnit, Statement::executeQuery); - } - verify(statement).close(); - verify(statement, times(0)).getConnection(); - } -}