diff --git a/kernel/global-clock/type/tso/provider/redis/src/main/java/org/apache/shardingsphere/globalclock/type/tso/provider/redis/RedisTSOProvider.java b/kernel/global-clock/type/tso/provider/redis/src/main/java/org/apache/shardingsphere/globalclock/type/tso/provider/redis/RedisTSOProvider.java index 747d38ecbbad4..0679d1121cf9a 100644 --- a/kernel/global-clock/type/tso/provider/redis/src/main/java/org/apache/shardingsphere/globalclock/type/tso/provider/redis/RedisTSOProvider.java +++ b/kernel/global-clock/type/tso/provider/redis/src/main/java/org/apache/shardingsphere/globalclock/type/tso/provider/redis/RedisTSOProvider.java @@ -19,9 +19,11 @@ import com.google.common.base.Strings; import org.apache.shardingsphere.globalclock.type.tso.provider.TSOProvider; +import org.apache.shardingsphere.infra.exception.dialect.exception.transaction.InTransactionException; import redis.clients.jedis.Jedis; import redis.clients.jedis.JedisPool; import redis.clients.jedis.JedisPoolConfig; +import redis.clients.jedis.params.SetParams; import java.util.Properties; import java.util.concurrent.atomic.AtomicBoolean; @@ -33,8 +35,12 @@ public final class RedisTSOProvider implements TSOProvider { private static final String CSN_KEY = "csn"; + private static final String CSN_LOCK_KEY = "csn_lock"; + private static final long ERROR_CSN = 0L; + private static final int LOCK_EXPIRE_TIME = 200; + private static final long INIT_CSN = Integer.MAX_VALUE; private final AtomicBoolean initialized = new AtomicBoolean(false); @@ -79,8 +85,13 @@ private void initCSN() { @Override public long getCurrentTimestamp() { + String lockValue = String.valueOf(System.nanoTime()); + try (Jedis jedis = jedisPool.getResource()) { - // TODO use redis lock to instead of reg center's lock. lock here #35041 + String result = jedis.set(CSN_LOCK_KEY, lockValue, SetParams.setParams().nx().px(LOCK_EXPIRE_TIME)); + if (!"OK".equals(result)) { + throw new InTransactionException(); + } return Long.parseLong(jedis.get(CSN_KEY)); } } @@ -88,8 +99,10 @@ public long getCurrentTimestamp() { @Override public long getNextTimestamp() { try (Jedis jedis = jedisPool.getResource()) { - return jedis.incr(CSN_KEY); - // TODO use redis lock to instead of reg center's lock. unlock here #35041 + long next = jedis.incr(CSN_KEY); + + jedis.del(CSN_LOCK_KEY); + return next; } } diff --git a/proxy/backend/core/src/main/java/org/apache/shardingsphere/proxy/backend/connector/jdbc/transaction/BackendTransactionManager.java b/proxy/backend/core/src/main/java/org/apache/shardingsphere/proxy/backend/connector/jdbc/transaction/BackendTransactionManager.java index ee62cb4fb6d83..353e6bf12ffe5 100644 --- a/proxy/backend/core/src/main/java/org/apache/shardingsphere/proxy/backend/connector/jdbc/transaction/BackendTransactionManager.java +++ b/proxy/backend/core/src/main/java/org/apache/shardingsphere/proxy/backend/connector/jdbc/transaction/BackendTransactionManager.java @@ -18,12 +18,9 @@ package org.apache.shardingsphere.proxy.backend.connector.jdbc.transaction; import org.apache.shardingsphere.infra.database.core.type.DatabaseType; -import org.apache.shardingsphere.mode.lock.LockContext; -import org.apache.shardingsphere.mode.lock.LockDefinition; import org.apache.shardingsphere.infra.rule.ShardingSphereRule; import org.apache.shardingsphere.infra.session.connection.transaction.TransactionConnectionContext; import org.apache.shardingsphere.infra.spi.type.ordered.OrderedSPILoader; -import org.apache.shardingsphere.mode.manager.cluster.lock.global.GlobalLockDefinition; import org.apache.shardingsphere.proxy.backend.connector.ProxyDatabaseConnectionManager; import org.apache.shardingsphere.proxy.backend.connector.TransactionManager; import org.apache.shardingsphere.proxy.backend.context.ProxyContext; @@ -102,17 +99,7 @@ public void commit() throws SQLException { return; } DatabaseType databaseType = ProxyContext.getInstance().getDatabaseType(); - LockContext lockContext = ProxyContext.getInstance().getContextManager().getLockContext(); - boolean isNeedLock = isNeedLockWhenCommit(); - LockDefinition lockDefinition = null; try { - // FIXME if timeout when lock required, TSO not assigned, but commit will continue, solution is use redis lock in impl to instead of reg center's lock. #35041 - if (isNeedLock) { - lockDefinition = new GlobalLockDefinition(new TransactionCommitLock()); - if (!lockContext.tryLock(lockDefinition, 200L)) { - return; - } - } for (Entry entry : transactionHooks.entrySet()) { entry.getValue().beforeCommit(entry.getKey(), databaseType, connection.getCachedConnections().values(), getTransactionContext()); } @@ -125,9 +112,6 @@ public void commit() throws SQLException { for (Entry entry : transactionHooks.entrySet()) { entry.getValue().afterCommit(entry.getKey(), databaseType, connection.getCachedConnections().values(), getTransactionContext()); } - if (isNeedLock) { - lockContext.unlock(lockDefinition); - } for (Connection each : connection.getCachedConnections().values()) { ConnectionSavepointManager.getInstance().transactionFinished(each); }