Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,11 @@

import com.google.common.base.Strings;
import org.apache.shardingsphere.globalclock.type.tso.provider.TSOProvider;
import org.apache.shardingsphere.infra.exception.dialect.exception.transaction.InTransactionException;
import redis.clients.jedis.Jedis;
import redis.clients.jedis.JedisPool;
import redis.clients.jedis.JedisPoolConfig;
import redis.clients.jedis.params.SetParams;

import java.util.Properties;
import java.util.concurrent.atomic.AtomicBoolean;
Expand All @@ -33,8 +35,12 @@ public final class RedisTSOProvider implements TSOProvider {

private static final String CSN_KEY = "csn";

private static final String CSN_LOCK_KEY = "csn_lock";

private static final long ERROR_CSN = 0L;

private static final int LOCK_EXPIRE_TIME = 200;

private static final long INIT_CSN = Integer.MAX_VALUE;

private final AtomicBoolean initialized = new AtomicBoolean(false);
Expand Down Expand Up @@ -79,17 +85,24 @@ private void initCSN() {

@Override
public long getCurrentTimestamp() {
String lockValue = String.valueOf(System.nanoTime());

try (Jedis jedis = jedisPool.getResource()) {
// TODO use redis lock to instead of reg center's lock. lock here #35041
String result = jedis.set(CSN_LOCK_KEY, lockValue, SetParams.setParams().nx().px(LOCK_EXPIRE_TIME));
if (!"OK".equals(result)) {
throw new InTransactionException();
}
return Long.parseLong(jedis.get(CSN_KEY));
}
}

@Override
public long getNextTimestamp() {
try (Jedis jedis = jedisPool.getResource()) {
return jedis.incr(CSN_KEY);
// TODO use redis lock to instead of reg center's lock. unlock here #35041
long next = jedis.incr(CSN_KEY);

jedis.del(CSN_LOCK_KEY);
return next;
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,9 @@
package org.apache.shardingsphere.proxy.backend.connector.jdbc.transaction;

import org.apache.shardingsphere.infra.database.core.type.DatabaseType;
import org.apache.shardingsphere.mode.lock.LockContext;
import org.apache.shardingsphere.mode.lock.LockDefinition;
import org.apache.shardingsphere.infra.rule.ShardingSphereRule;
import org.apache.shardingsphere.infra.session.connection.transaction.TransactionConnectionContext;
import org.apache.shardingsphere.infra.spi.type.ordered.OrderedSPILoader;
import org.apache.shardingsphere.mode.manager.cluster.lock.global.GlobalLockDefinition;
import org.apache.shardingsphere.proxy.backend.connector.ProxyDatabaseConnectionManager;
import org.apache.shardingsphere.proxy.backend.connector.TransactionManager;
import org.apache.shardingsphere.proxy.backend.context.ProxyContext;
Expand Down Expand Up @@ -102,17 +99,7 @@ public void commit() throws SQLException {
return;
}
DatabaseType databaseType = ProxyContext.getInstance().getDatabaseType();
LockContext lockContext = ProxyContext.getInstance().getContextManager().getLockContext();
boolean isNeedLock = isNeedLockWhenCommit();
LockDefinition lockDefinition = null;
try {
// FIXME if timeout when lock required, TSO not assigned, but commit will continue, solution is use redis lock in impl to instead of reg center's lock. #35041
if (isNeedLock) {
lockDefinition = new GlobalLockDefinition(new TransactionCommitLock());
if (!lockContext.tryLock(lockDefinition, 200L)) {
return;
}
}
for (Entry<ShardingSphereRule, TransactionHook> entry : transactionHooks.entrySet()) {
entry.getValue().beforeCommit(entry.getKey(), databaseType, connection.getCachedConnections().values(), getTransactionContext());
}
Expand All @@ -125,9 +112,6 @@ public void commit() throws SQLException {
for (Entry<ShardingSphereRule, TransactionHook> entry : transactionHooks.entrySet()) {
entry.getValue().afterCommit(entry.getKey(), databaseType, connection.getCachedConnections().values(), getTransactionContext());
}
if (isNeedLock) {
lockContext.unlock(lockDefinition);
}
for (Connection each : connection.getCachedConnections().values()) {
ConnectionSavepointManager.getInstance().transactionFinished(each);
}
Expand Down