Skip to content

Commit

Permalink
Fix auto commit in jdbc adapter
Browse files Browse the repository at this point in the history
  • Loading branch information
FlyingZC committed Oct 7, 2023
1 parent 576e75b commit 0fee295
Show file tree
Hide file tree
Showing 35 changed files with 248 additions and 128 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -58,10 +58,10 @@ public abstract class AbstractStatementAdapter extends AbstractUnsupportedOperat
private boolean closed;

protected final boolean isNeedImplicitCommitTransaction(final ShardingSphereConnection connection, final ExecutionContext executionContext) {
return isInDistributedTransaction(connection) && isModifiedSQL(executionContext) && executionContext.getExecutionUnits().size() > 1;
return connection.getAutoCommit() && isNotInDistributedTransaction(connection) && isModifiedSQL(executionContext) && executionContext.getExecutionUnits().size() > 1;
}

private boolean isInDistributedTransaction(final ShardingSphereConnection connection) {
private boolean isNotInDistributedTransaction(final ShardingSphereConnection connection) {
ConnectionTransaction connectionTransaction = connection.getDatabaseConnectionManager().getConnectionTransaction();
boolean isInTransaction = connection.getDatabaseConnectionManager().getConnectionContext().getTransactionContext().isInTransaction();
return TransactionType.isDistributedTransaction(connectionTransaction.getTransactionType()) && !isInTransaction;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import org.apache.shardingsphere.infra.session.connection.ConnectionContext;
import org.apache.shardingsphere.infra.exception.core.ShardingSpherePreconditions;
import org.apache.shardingsphere.mode.manager.ContextManager;
import org.apache.shardingsphere.transaction.api.TransactionType;

import java.sql.Array;
import java.sql.CallableStatement;
Expand Down Expand Up @@ -152,7 +153,7 @@ public void setAutoCommit(final boolean autoCommit) throws SQLException {
if (databaseConnectionManager.getConnectionTransaction().isLocalTransaction()) {
processLocalTransaction();
} else {
processDistributeTransaction();
processDistributedTransaction();
}
}

Expand All @@ -163,12 +164,10 @@ private void processLocalTransaction() throws SQLException {
}
}

private void processDistributeTransaction() throws SQLException {
private void processDistributedTransaction() throws SQLException {
switch (databaseConnectionManager.getConnectionTransaction().getDistributedTransactionOperationType(autoCommit)) {
case BEGIN:
databaseConnectionManager.close();
databaseConnectionManager.getConnectionTransaction().begin();
getConnectionContext().getTransactionContext().setInTransaction(true);
beginDistributedTransaction();
break;
case COMMIT:
databaseConnectionManager.getConnectionTransaction().commit();
Expand All @@ -178,6 +177,24 @@ private void processDistributeTransaction() throws SQLException {
}
}

private void beginDistributedTransaction() throws SQLException {
databaseConnectionManager.close();
databaseConnectionManager.getConnectionTransaction().begin();
getConnectionContext().getTransactionContext().setInTransaction(true);
}

/**
* Handle auto commit.
*
* @throws SQLException SQL exception
*/
public void handleAutoCommit() throws SQLException {
if (!autoCommit && TransactionType.isDistributedTransaction(databaseConnectionManager.getConnectionTransaction().getTransactionType())
&& !databaseConnectionManager.getConnectionTransaction().isInTransaction()) {
beginDistributedTransaction();
}
}

@Override
public void commit() throws SQLException {
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,7 @@
import org.apache.shardingsphere.traffic.engine.TrafficEngine;
import org.apache.shardingsphere.traffic.exception.metadata.EmptyTrafficExecutionUnitException;
import org.apache.shardingsphere.traffic.rule.TrafficRule;
import org.apache.shardingsphere.transaction.util.AutoCommitUtils;

import java.sql.Connection;
import java.sql.ParameterMetaData;
Expand Down Expand Up @@ -267,6 +268,12 @@ private boolean decide(final QueryContext queryContext, final ShardingSphereData
return executor.getSqlFederationEngine().decide(queryContext.getSqlStatementContext(), queryContext.getParameters(), database, globalRuleMetaData);
}

private void handleAutoCommit(final QueryContext queryContext) throws SQLException {
if (AutoCommitUtils.needOpenTransaction(queryContext.getSqlStatementContext().getSqlStatement())) {
connection.handleAutoCommit();
}
}

private JDBCExecutionUnit createTrafficExecutionUnit(final String trafficInstanceId, final QueryContext queryContext) throws SQLException {
DriverExecutionPrepareEngine<JDBCExecutionUnit, Connection> prepareEngine = createDriverExecutionPrepareEngine();
ExecutionUnit executionUnit = new ExecutionUnit(trafficInstanceId, new SQLUnit(queryContext.getSql(), queryContext.getParameters()));
Expand Down Expand Up @@ -338,6 +345,7 @@ public int executeUpdate() throws SQLException {
}
clearPrevious();
QueryContext queryContext = createQueryContext();
handleAutoCommit(queryContext);
trafficInstanceId = getInstanceIdAndSet(queryContext).orElse(null);
if (null != trafficInstanceId) {
JDBCExecutionUnit executionUnit = createTrafficExecutionUnit(trafficInstanceId, queryContext);
Expand Down Expand Up @@ -400,6 +408,7 @@ public boolean execute() throws SQLException {
}
clearPrevious();
QueryContext queryContext = createQueryContext();
handleAutoCommit(queryContext);
trafficInstanceId = getInstanceIdAndSet(queryContext).orElse(null);
if (null != trafficInstanceId) {
JDBCExecutionUnit executionUnit = createTrafficExecutionUnit(trafficInstanceId, queryContext);
Expand Down Expand Up @@ -454,6 +463,8 @@ private boolean executeWithImplicitCommitTransaction() throws SQLException {
// CHECKSTYLE:ON
connection.rollback();
throw SQLExceptionTransformEngine.toSQLException(ex, metaDataContexts.getMetaData().getDatabase(databaseName).getProtocolType());
} finally {
connection.setAutoCommit(true);
}
return result;
}
Expand All @@ -469,6 +480,8 @@ private int executeUpdateWithImplicitCommitTransaction() throws SQLException {
// CHECKSTYLE:ON
connection.rollback();
throw SQLExceptionTransformEngine.toSQLException(ex, metaDataContexts.getMetaData().getDatabase(databaseName).getProtocolType());
} finally {
connection.setAutoCommit(true);
}
return result;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,7 @@
import org.apache.shardingsphere.traffic.exception.metadata.EmptyTrafficExecutionUnitException;
import org.apache.shardingsphere.traffic.executor.TrafficExecutorCallback;
import org.apache.shardingsphere.traffic.rule.TrafficRule;
import org.apache.shardingsphere.transaction.util.AutoCommitUtils;

import java.sql.Connection;
import java.sql.ResultSet;
Expand Down Expand Up @@ -310,6 +311,7 @@ private int executeUpdate(final ExecuteUpdateCallback updateCallback, final SQLS

private int executeUpdate0(final String sql, final ExecuteUpdateCallback updateCallback, final TrafficExecutorCallback<Integer> trafficCallback) throws SQLException {
QueryContext queryContext = createQueryContext(sql);
handleAutoCommit(queryContext);
databaseName = queryContext.getDatabaseNameFromSQLStatement().orElse(connection.getDatabaseName());
connection.getDatabaseConnectionManager().getConnectionContext().setCurrentDatabase(databaseName);
trafficInstanceId = getInstanceIdAndSet(queryContext).orElse(null);
Expand All @@ -335,6 +337,8 @@ private int executeUpdateWithImplicitCommitTransaction(final ExecuteUpdateCallba
// CHECKSTYLE:ON
connection.rollback();
throw SQLExceptionTransformEngine.toSQLException(ex, metaDataContexts.getMetaData().getDatabase(databaseName).getProtocolType());
} finally {
connection.setAutoCommit(true);
}
return result;
}
Expand Down Expand Up @@ -428,6 +432,7 @@ public boolean execute(final String sql, final String[] columnNames) throws SQLE
private boolean execute0(final String sql, final ExecuteCallback executeCallback, final TrafficExecutorCallback<Boolean> trafficCallback) throws SQLException {
try {
QueryContext queryContext = createQueryContext(sql);
handleAutoCommit(queryContext);
databaseName = queryContext.getDatabaseNameFromSQLStatement().orElse(connection.getDatabaseName());
connection.getDatabaseConnectionManager().getConnectionContext().setCurrentDatabase(databaseName);
trafficInstanceId = getInstanceIdAndSet(queryContext).orElse(null);
Expand All @@ -452,6 +457,12 @@ private boolean execute0(final String sql, final ExecuteCallback executeCallback
}
}

private void handleAutoCommit(final QueryContext queryContext) throws SQLException {
if (AutoCommitUtils.needOpenTransaction(queryContext.getSqlStatementContext().getSqlStatement())) {
connection.handleAutoCommit();
}
}

private JDBCExecutionUnit createTrafficExecutionUnit(final String trafficInstanceId, final QueryContext queryContext) throws SQLException {
DriverExecutionPrepareEngine<JDBCExecutionUnit, Connection> prepareEngine = createDriverExecutionPrepareEngine();
ExecutionUnit executionUnit = new ExecutionUnit(trafficInstanceId, new SQLUnit(queryContext.getSql(), queryContext.getParameters()));
Expand Down Expand Up @@ -527,6 +538,8 @@ private boolean executeWithImplicitCommitTransaction(final ExecuteCallback callb
// CHECKSTYLE:ON
connection.rollback();
throw SQLExceptionTransformEngine.toSQLException(ex, metaDataContexts.getMetaData().getDatabase(databaseName).getProtocolType());
} finally {
connection.setAutoCommit(true);
}
return result;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -180,6 +180,9 @@ private Collection<ExecutionContext> generateExecutionContexts() {
}

private boolean isNeedImplicitCommitTransaction(final Collection<ExecutionContext> executionContexts) {
if (!databaseConnectionManager.getConnectionSession().isAutoCommit()) {
return false;
}
TransactionStatus transactionStatus = databaseConnectionManager.getConnectionSession().getTransactionStatus();
if (!TransactionType.isDistributedTransaction(transactionStatus.getTransactionType()) || transactionStatus.isInTransaction()) {
return false;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,10 @@
package org.apache.shardingsphere.test.e2e.transaction.cases.alterresource;

import org.apache.shardingsphere.test.e2e.transaction.cases.base.BaseTransactionTestCase;
import org.apache.shardingsphere.test.e2e.transaction.engine.base.TransactionBaseE2EIT;
import org.apache.shardingsphere.test.e2e.transaction.engine.base.TransactionContainerComposer;
import org.apache.shardingsphere.test.e2e.transaction.engine.base.TransactionTestCase;
import org.apache.shardingsphere.test.e2e.transaction.engine.constants.TransactionTestConstants;

import javax.sql.DataSource;
import java.sql.Connection;
import java.sql.SQLException;

Expand All @@ -36,8 +34,8 @@
@TransactionTestCase(adapters = TransactionTestConstants.PROXY, scenario = "addResource")
public final class AddResourceTestCase extends BaseTransactionTestCase {

public AddResourceTestCase(final TransactionBaseE2EIT baseTransactionITCase, final DataSource dataSource) {
super(baseTransactionITCase, dataSource);
public AddResourceTestCase(final TransactionTestCaseParameter parameters) {
super(parameters);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,12 +19,10 @@

import lombok.extern.slf4j.Slf4j;
import org.apache.shardingsphere.test.e2e.transaction.cases.base.BaseTransactionTestCase;
import org.apache.shardingsphere.test.e2e.transaction.engine.base.TransactionBaseE2EIT;
import org.apache.shardingsphere.test.e2e.transaction.engine.base.TransactionContainerComposer;
import org.apache.shardingsphere.test.e2e.transaction.engine.base.TransactionTestCase;
import org.apache.shardingsphere.test.e2e.transaction.engine.constants.TransactionTestConstants;

import javax.sql.DataSource;
import java.sql.Connection;
import java.sql.SQLException;

Expand All @@ -35,8 +33,8 @@
@Slf4j
public final class CloseResourceTestCase extends BaseTransactionTestCase {

public CloseResourceTestCase(final TransactionBaseE2EIT baseTransactionITCase, final DataSource dataSource) {
super(baseTransactionITCase, dataSource);
public CloseResourceTestCase(final TransactionTestCaseParameter parameters) {
super(parameters);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,92 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.shardingsphere.test.e2e.transaction.cases.autocommit;

import org.apache.shardingsphere.test.e2e.transaction.cases.base.BaseTransactionTestCase;

import java.sql.Connection;
import java.sql.PreparedStatement;
import java.sql.SQLException;

import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertTrue;

/**
* Auto commit transaction integration test.
*/
public abstract class AutoCommitTestCase extends BaseTransactionTestCase {

protected AutoCommitTestCase(final TransactionTestCaseParameter parameters) {
super(parameters);
}

protected void assertAutoCommitWithStatement() throws SQLException {
try (Connection connection = getDataSource().getConnection()) {
connection.setAutoCommit(false);
executeWithLog(connection, "DELETE FROM account");
assertFalse(connection.getAutoCommit());
executeWithLog(connection, "INSERT INTO account VALUES (1, 1, 1)");
connection.commit();
assertFalse(connection.getAutoCommit());
executeUpdateWithLog(connection, "INSERT INTO account VALUES (2, 2, 2)");
connection.commit();
assertFalse(connection.getAutoCommit());
executeWithLog(connection, "INSERT INTO account VALUES (3, 3, 3)");
connection.rollback();
assertFalse(connection.getAutoCommit());
assertAccountRowCount(connection, 2);
connection.setAutoCommit(true);
assertTrue(connection.getAutoCommit());
executeWithLog(connection, "INSERT INTO account VALUES (4, 4, 4)");
assertAccountRowCount(connection, 3);
}
}

protected void assertAutoCommitWithPrepareStatement() throws SQLException {
try (Connection connection = getDataSource().getConnection()) {
PreparedStatement prepareStatement = connection.prepareStatement("INSERT INTO account VALUES(?, ?, ?)");
connection.setAutoCommit(false);
executeWithLog(connection, "DELETE FROM account");
assertFalse(connection.getAutoCommit());
setPrepareStatementParameters(prepareStatement, 1);
prepareStatement.execute();
connection.commit();
assertFalse(connection.getAutoCommit());
setPrepareStatementParameters(prepareStatement, 2);
prepareStatement.executeUpdate();
connection.commit();
assertFalse(connection.getAutoCommit());
setPrepareStatementParameters(prepareStatement, 3);
prepareStatement.execute();
connection.rollback();
assertFalse(connection.getAutoCommit());
assertAccountRowCount(connection, 2);
connection.setAutoCommit(true);
assertTrue(connection.getAutoCommit());
setPrepareStatementParameters(prepareStatement, 4);
prepareStatement.execute();
assertAccountRowCount(connection, 3);
}
}

private void setPrepareStatementParameters(final PreparedStatement prepareStatement, final int value) throws SQLException {
prepareStatement.setInt(1, value);
prepareStatement.setInt(2, value);
prepareStatement.setInt(3, value);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,15 +17,12 @@

package org.apache.shardingsphere.test.e2e.transaction.cases.autocommit;

import org.apache.shardingsphere.test.e2e.transaction.cases.base.BaseTransactionTestCase;
import org.apache.shardingsphere.test.e2e.transaction.engine.base.TransactionBaseE2EIT;
import org.apache.shardingsphere.test.e2e.transaction.engine.base.TransactionContainerComposer;
import org.apache.shardingsphere.test.e2e.transaction.engine.base.TransactionTestCase;
import org.apache.shardingsphere.test.e2e.transaction.engine.constants.TransactionTestConstants;
import org.apache.shardingsphere.transaction.api.TransactionType;
import org.awaitility.Awaitility;

import javax.sql.DataSource;
import java.sql.Connection;
import java.sql.SQLException;
import java.util.concurrent.TimeUnit;
Expand All @@ -36,16 +33,20 @@
/**
* MySQL auto commit transaction integration test.
*/
@TransactionTestCase(dbTypes = TransactionTestConstants.MYSQL, transactionTypes = TransactionType.LOCAL)
public final class MySQLAutoCommitTestCase extends BaseTransactionTestCase {
@TransactionTestCase(dbTypes = TransactionTestConstants.MYSQL)
public final class MySQLAutoCommitTestCase extends AutoCommitTestCase {

public MySQLAutoCommitTestCase(final TransactionBaseE2EIT baseTransactionITCase, final DataSource dataSource) {
super(baseTransactionITCase, dataSource);
public MySQLAutoCommitTestCase(final TransactionTestCaseParameter parameters) {
super(parameters);
}

@Override
public void executeTest(final TransactionContainerComposer containerComposer) throws SQLException {
assertAutoCommit();
if (TransactionType.LOCAL == getTransactionType()) {
assertAutoCommit();
}
assertAutoCommitWithStatement();
assertAutoCommitWithPrepareStatement();
}

private void assertAutoCommit() throws SQLException {
Expand Down
Loading

0 comments on commit 0fee295

Please sign in to comment.