Skip to content

Commit

Permalink
fixed
Browse files Browse the repository at this point in the history
  • Loading branch information
sunxiaojian committed Feb 29, 2024
1 parent d718b78 commit a4fd960
Show file tree
Hide file tree
Showing 5 changed files with 27 additions and 69 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -44,13 +44,14 @@ public void createTable(JdbcClientPool connections) throws SQLException, Interru
public abstract String getCreateTableSql();

@Override
public boolean lockAcquire(JdbcClientPool connections, String lockId)
public boolean lockAcquire(JdbcClientPool connections, String lockId, long timeoutMillSeconds)
throws SQLException, InterruptedException {
return connections.run(
connection -> {
try (PreparedStatement preparedStatement =
connection.prepareStatement(getLockAcquireSql())) {
preparedStatement.setString(1, lockId);
preparedStatement.setLong(2, timeoutMillSeconds / 1000);
return preparedStatement.executeUpdate() > 0;
} catch (SQLException ex) {
return false;
Expand All @@ -76,15 +77,13 @@ public boolean releaseLock(JdbcClientPool connections, String lockId)
public abstract String getReleaseLockSql();

@Override
public int tryReleaseTimedOutLock(
JdbcClientPool connections, String lockId, long timeoutMillSeconds)
public int tryReleaseTimedOutLock(JdbcClientPool connections, String lockId)
throws SQLException, InterruptedException {
return connections.run(
connection -> {
try (PreparedStatement preparedStatement =
connection.prepareStatement(getTryReleaseTimedOutLock())) {
preparedStatement.setLong(1, timeoutMillSeconds / 1000);
preparedStatement.setString(2, lockId);
preparedStatement.setString(1, lockId);
return preparedStatement.executeUpdate();
}
});
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,12 +24,12 @@
public interface JdbcDistributedLockTable {
void createTable(JdbcClientPool connections) throws SQLException, InterruptedException;

boolean lockAcquire(JdbcClientPool connections, String lockId)
boolean lockAcquire(JdbcClientPool connections, String lockId, long timeoutMillSeconds)
throws SQLException, InterruptedException;

boolean releaseLock(JdbcClientPool connections, String lockId)
throws SQLException, InterruptedException;

int tryReleaseTimedOutLock(JdbcClientPool connections, String lockId, long timeoutSeconds)
int tryReleaseTimedOutLock(JdbcClientPool connections, String lockId)
throws SQLException, InterruptedException;
}
63 changes: 5 additions & 58 deletions paimon-core/src/main/java/org/apache/paimon/jdbc/JdbcUtils.java
Original file line number Diff line number Diff line change
Expand Up @@ -201,16 +201,6 @@ public class JdbcUtils {
+ " = ? AND "
+ DATABASE_NAME
+ " = ? ";
static final String DELETE_DATABASE_PROPERTIES_SQL =
"DELETE FROM "
+ DATABASE_PROPERTIES_TABLE_NAME
+ " WHERE "
+ CATALOG_NAME
+ " = ? AND "
+ DATABASE_NAME
+ " = ? AND "
+ DATABASE_PROPERTY_KEY
+ " IN ";
static final String DELETE_ALL_DATABASE_PROPERTIES_SQL =
"DELETE FROM "
+ DATABASE_PROPERTIES_TABLE_NAME
Expand All @@ -219,16 +209,6 @@ public class JdbcUtils {
+ " = ? AND "
+ DATABASE_NAME
+ " = ?";
static final String LIST_PROPERTY_DATABASES_SQL =
"SELECT DISTINCT "
+ DATABASE_NAME
+ " FROM "
+ DATABASE_PROPERTIES_TABLE_NAME
+ " WHERE "
+ CATALOG_NAME
+ " = ? AND "
+ DATABASE_NAME
+ " LIKE ?";
static final String LIST_ALL_PROPERTY_DATABASES_SQL =
"SELECT DISTINCT "
+ DATABASE_NAME
Expand All @@ -242,40 +222,7 @@ public class JdbcUtils {
static final String DISTRIBUTED_LOCKS_TABLE_NAME = "paimon_distributed_locks";
static final String LOCK_ID = "lock_id";
static final String ACQUIRED_AT = "acquired_at";

static final String CREATE_DISTRIBUTED_LOCK_TABLE_SQL =
"CREATE TABLE "
+ DISTRIBUTED_LOCKS_TABLE_NAME
+ "("
+ LOCK_ID
+ " VARCHAR(1000) NOT NULL,"
+ ACQUIRED_AT
+ " TIMESTAMP NULL DEFAULT NULL,"
+ "PRIMARY KEY ("
+ LOCK_ID
+ ")"
+ ")";

static final String DISTRIBUTED_LOCK_ACQUIRE_SQL =
"INSERT INTO "
+ DISTRIBUTED_LOCKS_TABLE_NAME
+ " ("
+ LOCK_ID
+ ", "
+ ACQUIRED_AT
+ ") VALUES (?, ?)";

static final String DISTRIBUTED_LOCK_RELEASE_SQL =
"DELETE FROM " + DISTRIBUTED_LOCKS_TABLE_NAME + " WHERE " + LOCK_ID + " = ?";

static final String DISTRIBUTED_LOCK_EXPIRE_CLEAR_SQL =
"DELETE FROM "
+ DISTRIBUTED_LOCKS_TABLE_NAME
+ " WHERE "
+ LOCK_ID
+ " = ? AND "
+ ACQUIRED_AT
+ " < ?";
static final String EXPIRE_TIME = "expire_time_seconds";

public static Properties extractJdbcConfiguration(
Map<String, String> properties, String prefix) {
Expand Down Expand Up @@ -463,17 +410,17 @@ public static void createDistributedLockTable(JdbcClientPool connections)
DistributedLockFactory.create(connections.getProtocol()).createTable(connections);
}

public static boolean acquire(JdbcClientPool connections, String lockId, long timeout)
public static boolean acquire(
JdbcClientPool connections, String lockId, long timeoutMillSeconds)
throws SQLException, InterruptedException {
JdbcDistributedLockTable jdbcDistributedLockTable =
DistributedLockFactory.create(connections.getProtocol());
// Check and clear expire lock.
int affectedRows =
jdbcDistributedLockTable.tryReleaseTimedOutLock(connections, lockId, timeout);
int affectedRows = jdbcDistributedLockTable.tryReleaseTimedOutLock(connections, lockId);
if (affectedRows > 0) {
LOG.debug("Successfully cleared " + affectedRows + " lock records");
}
return jdbcDistributedLockTable.lockAcquire(connections, lockId);
return jdbcDistributedLockTable.lockAcquire(connections, lockId, timeoutMillSeconds);
}

public static void release(JdbcClientPool connections, String lockId)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,8 @@ public String getCreateTableSql() {
+ " VARCHAR(1000) NOT NULL,"
+ JdbcUtils.ACQUIRED_AT
+ " TIMESTAMP DEFAULT CURRENT_TIMESTAMP NOT NULL,"
+ JdbcUtils.EXPIRE_TIME
+ " BIGINT DEFAULT 0 NOT NULL,"
+ "PRIMARY KEY ("
+ JdbcUtils.LOCK_ID
+ ")"
Expand All @@ -42,7 +44,9 @@ public String getLockAcquireSql() {
+ JdbcUtils.DISTRIBUTED_LOCKS_TABLE_NAME
+ " ("
+ JdbcUtils.LOCK_ID
+ ") VALUES (?)";
+ ","
+ JdbcUtils.EXPIRE_TIME
+ ") VALUES (?,?)";
}

@Override
Expand All @@ -60,7 +64,9 @@ public String getTryReleaseTimedOutLock() {
+ JdbcUtils.DISTRIBUTED_LOCKS_TABLE_NAME
+ " WHERE TIMESTAMPDIFF(SECOND, "
+ JdbcUtils.ACQUIRED_AT
+ ", NOW()) > ? and "
+ ", NOW()) >"
+ JdbcUtils.EXPIRE_TIME
+ " and "
+ JdbcUtils.LOCK_ID
+ " = ?";
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,8 @@ public String getCreateTableSql() {
+ " VARCHAR(1000) NOT NULL,"
+ JdbcUtils.ACQUIRED_AT
+ " TIMESTAMP DEFAULT CURRENT_TIMESTAMP NOT NULL,"
+ JdbcUtils.EXPIRE_TIME
+ " BIGINT DEFAULT 0 NOT NULL,"
+ "PRIMARY KEY ("
+ JdbcUtils.LOCK_ID
+ ")"
Expand All @@ -42,7 +44,9 @@ public String getLockAcquireSql() {
+ JdbcUtils.DISTRIBUTED_LOCKS_TABLE_NAME
+ " ("
+ JdbcUtils.LOCK_ID
+ ") VALUES (?)";
+ ","
+ JdbcUtils.EXPIRE_TIME
+ ") VALUES (?,?)";
}

@Override
Expand All @@ -60,7 +64,9 @@ public String getTryReleaseTimedOutLock() {
+ JdbcUtils.DISTRIBUTED_LOCKS_TABLE_NAME
+ " WHERE strftime('%s', 'now') - strftime('%s', "
+ JdbcUtils.ACQUIRED_AT
+ ") > ? and "
+ ") > "
+ JdbcUtils.EXPIRE_TIME
+ " and "
+ JdbcUtils.LOCK_ID
+ " = ?";
}
Expand Down

0 comments on commit a4fd960

Please sign in to comment.