Skip to content

Commit

Permalink
fixed
Browse files Browse the repository at this point in the history
  • Loading branch information
sunxiaojian committed Mar 21, 2024
1 parent 532cd8f commit d06d0a7
Show file tree
Hide file tree
Showing 10 changed files with 124 additions and 36 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@

package org.apache.paimon.jdbc;

import org.apache.paimon.options.Options;

import java.sql.DatabaseMetaData;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
Expand All @@ -27,7 +29,8 @@
public abstract class AbstractDistributedLockDialect implements JdbcDistributedLockDialect {

@Override
public void createTable(JdbcClientPool connections) throws SQLException, InterruptedException {
public void createTable(JdbcClientPool connections, Options options)
throws SQLException, InterruptedException {
connections.run(
conn -> {
DatabaseMetaData dbMeta = conn.getMetaData();
Expand Down
41 changes: 26 additions & 15 deletions paimon-core/src/main/java/org/apache/paimon/jdbc/JdbcCatalog.java
Original file line number Diff line number Diff line change
Expand Up @@ -71,24 +71,20 @@ public class JdbcCatalog extends AbstractCatalog {

private final JdbcClientPool connections;
private final String catalogKey;
private final Map<String, String> options;
private final Options options;
private final String warehouse;

protected JdbcCatalog(
FileIO fileIO, String catalogKey, Map<String, String> config, String warehouse) {
super(fileIO, Options.fromMap(config));
protected JdbcCatalog(FileIO fileIO, String catalogKey, Options options, String warehouse) {
super(fileIO, options);
this.catalogKey = catalogKey;
this.options = config;
this.options = options;
this.warehouse = warehouse;
Preconditions.checkNotNull(options, "Invalid catalog properties: null");
this.connections =
new JdbcClientPool(
Integer.parseInt(
config.getOrDefault(
CatalogOptions.CLIENT_POOL_SIZE.key(),
CatalogOptions.CLIENT_POOL_SIZE.defaultValue().toString())),
options.get(CatalogOptions.CLIENT_POOL_SIZE),
options.get(CatalogOptions.URI.key()),
options);
options.toMap());
try {
initializeCatalogTablesIfNeed();
} catch (SQLException e) {
Expand Down Expand Up @@ -116,7 +112,13 @@ private void initializeCatalogTablesIfNeed() throws SQLException, InterruptedExc
if (tableExists.next()) {
return true;
}
return conn.prepareStatement(JdbcUtils.CREATE_CATALOG_TABLE).execute();
String createCatalogTableSql =
String.format(
JdbcUtils.CREATE_CATALOG_TABLE,
JdbcCatalogOptions.catalogKeyMaxLength(options),
JdbcCatalogOptions.databaseNameMaxLength(options),
JdbcCatalogOptions.tableNameMaxLength(options));
return conn.prepareStatement(createCatalogTableSql).execute();
});

// Check and create database properties table.
Expand All @@ -129,13 +131,19 @@ private void initializeCatalogTablesIfNeed() throws SQLException, InterruptedExc
if (tableExists.next()) {
return true;
}
return conn.prepareStatement(JdbcUtils.CREATE_DATABASE_PROPERTIES_TABLE)
.execute();
String createDatabasePropertiesTableSql =
String.format(
JdbcUtils.CREATE_DATABASE_PROPERTIES_TABLE,
JdbcCatalogOptions.catalogKeyMaxLength(options),
JdbcCatalogOptions.databaseNameMaxLength(options),
JdbcCatalogOptions.databasePropertiesKeyMaxLength(options),
JdbcCatalogOptions.databasePropertiesValueMaxLength(options));
return conn.prepareStatement(createDatabasePropertiesTableSql).execute();
});

// if lock enabled, Check and create distributed lock table.
if (lockEnabled()) {
JdbcUtils.createDistributedLockTable(connections);
JdbcUtils.createDistributedLockTable(connections, options);
}
}

Expand Down Expand Up @@ -357,7 +365,10 @@ private Lock lock(Identifier identifier) {
}
JdbcCatalogLock lock =
new JdbcCatalogLock(
connections, catalogKey, checkMaxSleep(options), acquireTimeout(options));
connections,
catalogKey,
checkMaxSleep(options.toMap()),
acquireTimeout(options.toMap()));
return Lock.fromCatalog(lock, identifier);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import org.apache.paimon.fs.FileIO;
import org.apache.paimon.fs.Path;
import org.apache.paimon.options.Options;
import org.apache.paimon.utils.Preconditions;

import static org.apache.paimon.options.CatalogOptions.LOCK_ENABLED;
import static org.apache.paimon.options.CatalogOptions.LOCK_TYPE;
Expand All @@ -42,11 +43,15 @@ public String identifier() {
public Catalog create(FileIO fileIO, Path warehouse, CatalogContext context) {
Options options = context.options();
String catalogKey = options.get(JdbcCatalogOptions.CATALOG_KEY);
Integer catalogKeyMaxLength = JdbcCatalogOptions.catalogKeyMaxLength(options);
Preconditions.checkArgument(
catalogKey.length() > catalogKeyMaxLength,
"Catalog key exceeds the maximum length set, please adjust the 'catalog-key-max-length' configuration.");
if (options.get(LOCK_ENABLED)) {
if (!options.getOptional(LOCK_TYPE).isPresent()) {
options.set(LOCK_TYPE, JdbcCatalogLock.JdbcCatalogLockFactory.IDENTIFIER);
}
}
return new JdbcCatalog(fileIO, catalogKey, context.options().toMap(), warehouse.toString());
return new JdbcCatalog(fileIO, catalogKey, context.options(), warehouse.toString());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
package org.apache.paimon.jdbc;

import org.apache.paimon.catalog.CatalogLock;
import org.apache.paimon.options.Options;
import org.apache.paimon.utils.TimeUtils;

import java.io.IOException;
Expand Down Expand Up @@ -103,18 +104,17 @@ public CatalogLock create(LockContext context) {
return new JdbcCatalogLock(
lockContext.connections,
lockContext.catalogKey,
checkMaxSleep(lockContext.conf),
acquireTimeout(lockContext.conf));
checkMaxSleep(lockContext.conf.toMap()),
acquireTimeout(lockContext.conf.toMap()));
}
}

static class JdbcLockContext implements LockContext {
private final JdbcClientPool connections;
private final String catalogKey;
private final Map<String, String> conf;
private final Options conf;

public JdbcLockContext(
JdbcClientPool connections, String catalogKey, Map<String, String> conf) {
public JdbcLockContext(JdbcClientPool connections, String catalogKey, Options conf) {
this.connections = connections;
this.catalogKey = catalogKey;
this.conf = conf;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@

import org.apache.paimon.options.ConfigOption;
import org.apache.paimon.options.ConfigOptions;
import org.apache.paimon.options.Options;

/** Options for jdbc catalog. */
public final class JdbcCatalogOptions {
Expand All @@ -30,5 +31,66 @@ public final class JdbcCatalogOptions {
.defaultValue("jdbc")
.withDescription("Custom jdbc catalog store key.");

public static final ConfigOption<Integer> CATALOG_KEY_MAX_LENGTH =
ConfigOptions.key("catalog-key-max-length")
.intType()
.defaultValue(255)
.withDescription("Set the maximum length of the catalog key.");

public static final ConfigOption<Integer> DATABASE_NAME_MAX_LENGTH =
ConfigOptions.key("database-name-max-length")
.intType()
.defaultValue(255)
.withDescription("Set the maximum length of the database name.");

public static final ConfigOption<Integer> TABLE_NAME_MAX_LENGTH =
ConfigOptions.key("table-name-max-length")
.intType()
.defaultValue(255)
.withDescription("Set the maximum length of the table name.");

public static final ConfigOption<Integer> DATABASE_PROPS_KEY_MAX_LENGTH =
ConfigOptions.key("database-properties-key-max-length")
.intType()
.defaultValue(255)
.withDescription("Set the maximum length of the database properties key.");

public static final ConfigOption<Integer> DATABASE_PROPS_VALUE_MAX_LENGTH =
ConfigOptions.key("database-properties-value-max-length")
.intType()
.defaultValue(1000)
.withDescription("Set the maximum length of the database properties value.");

public static final ConfigOption<Integer> LOCK_KEY_MAX_LENGTH =
ConfigOptions.key("lock-key-max-length")
.intType()
.defaultValue(1000)
.withDescription(
"Set the maximum length of the lock key. The 'lock-key' is composed of concatenating three fields : 'catalog-key', 'database', and 'table'.");

private JdbcCatalogOptions() {}

static Integer catalogKeyMaxLength(Options options) {
return options.get(CATALOG_KEY_MAX_LENGTH);
}

static Integer databaseNameMaxLength(Options options) {
return options.get(DATABASE_NAME_MAX_LENGTH);
}

static Integer tableNameMaxLength(Options options) {
return options.get(TABLE_NAME_MAX_LENGTH);
}

static Integer databasePropertiesKeyMaxLength(Options options) {
return options.get(DATABASE_PROPS_KEY_MAX_LENGTH);
}

static Integer databasePropertiesValueMaxLength(Options options) {
return options.get(DATABASE_PROPS_VALUE_MAX_LENGTH);
}

static Integer lockKeyMaxLength(Options options) {
return options.get(LOCK_KEY_MAX_LENGTH);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,14 @@

package org.apache.paimon.jdbc;

import org.apache.paimon.options.Options;

import java.sql.SQLException;

/** Jdbc distributed lock interface. */
public interface JdbcDistributedLockDialect {
void createTable(JdbcClientPool connections) throws SQLException, InterruptedException;
void createTable(JdbcClientPool connections, Options options)
throws SQLException, InterruptedException;

boolean lockAcquire(JdbcClientPool connections, String lockId, long timeoutMillSeconds)
throws SQLException, InterruptedException;
Expand Down
20 changes: 11 additions & 9 deletions paimon-core/src/main/java/org/apache/paimon/jdbc/JdbcUtils.java
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
package org.apache.paimon.jdbc;

import org.apache.paimon.catalog.Identifier;
import org.apache.paimon.options.Options;

import org.apache.paimon.shade.guava30.com.google.common.collect.Maps;

Expand Down Expand Up @@ -47,11 +48,11 @@ public class JdbcUtils {
+ CATALOG_TABLE_NAME
+ "("
+ CATALOG_KEY
+ " VARCHAR(255) NOT NULL,"
+ " VARCHAR(%s) NOT NULL,"
+ TABLE_DATABASE
+ " VARCHAR(255) NOT NULL,"
+ " VARCHAR(%s) NOT NULL,"
+ TABLE_NAME
+ " VARCHAR(255) NOT NULL,"
+ " VARCHAR(%s) NOT NULL,"
+ " PRIMARY KEY ("
+ CATALOG_KEY
+ ", "
Expand Down Expand Up @@ -154,13 +155,13 @@ public class JdbcUtils {
+ DATABASE_PROPERTIES_TABLE_NAME
+ "("
+ CATALOG_KEY
+ " VARCHAR(255) NOT NULL,"
+ " VARCHAR(%s) NOT NULL,"
+ DATABASE_NAME
+ " VARCHAR(255) NOT NULL,"
+ " VARCHAR(%s) NOT NULL,"
+ DATABASE_PROPERTY_KEY
+ " VARCHAR(255),"
+ " VARCHAR(%s),"
+ DATABASE_PROPERTY_VALUE
+ " VARCHAR(1000),"
+ " VARCHAR(%s),"
+ "PRIMARY KEY ("
+ CATALOG_KEY
+ ", "
Expand Down Expand Up @@ -402,9 +403,10 @@ private static String insertPropertiesStatement(int size) {
return sqlStatement.toString();
}

public static void createDistributedLockTable(JdbcClientPool connections)
public static void createDistributedLockTable(JdbcClientPool connections, Options options)
throws SQLException, InterruptedException {
DistributedLockDialectFactory.create(connections.getProtocol()).createTable(connections);
DistributedLockDialectFactory.create(connections.getProtocol())
.createTable(connections, options);
}

public static boolean acquire(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ public String getCreateTableSql() {
+ JdbcUtils.DISTRIBUTED_LOCKS_TABLE_NAME
+ "("
+ JdbcUtils.LOCK_ID
+ " VARCHAR(1000) NOT NULL,"
+ " VARCHAR(%s) NOT NULL,"
+ JdbcUtils.ACQUIRED_AT
+ " TIMESTAMP DEFAULT CURRENT_TIMESTAMP NOT NULL,"
+ JdbcUtils.EXPIRE_TIME
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,14 +20,13 @@

/** Distributed lock implementation based on sqlite table. */
public class SqlLiteDistributedLockDialect extends AbstractDistributedLockDialect {

@Override
public String getCreateTableSql() {
return "CREATE TABLE "
+ JdbcUtils.DISTRIBUTED_LOCKS_TABLE_NAME
+ "("
+ JdbcUtils.LOCK_ID
+ " VARCHAR(1000) NOT NULL,"
+ " VARCHAR(%s) NOT NULL,"
+ JdbcUtils.ACQUIRED_AT
+ " TIMESTAMP DEFAULT CURRENT_TIMESTAMP NOT NULL,"
+ JdbcUtils.EXPIRE_TIME
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import org.apache.paimon.catalog.CatalogTestBase;
import org.apache.paimon.catalog.Identifier;
import org.apache.paimon.options.CatalogOptions;
import org.apache.paimon.options.Options;

import org.apache.paimon.shade.guava30.com.google.common.collect.Maps;

Expand Down Expand Up @@ -57,7 +58,9 @@ private JdbcCatalog initCatalog(Map<String, String> props) {
properties.put(CatalogOptions.LOCK_ENABLED.key(), "true");
properties.put(CatalogOptions.LOCK_TYPE.key(), "jdbc");
properties.putAll(props);
JdbcCatalog catalog = new JdbcCatalog(fileIO, "test-jdbc-catalog", properties, warehouse);
JdbcCatalog catalog =
new JdbcCatalog(
fileIO, "test-jdbc-catalog", Options.fromMap(properties), warehouse);
assertThat(catalog.warehouse()).isEqualTo(warehouse);
return catalog;
}
Expand Down

0 comments on commit d06d0a7

Please sign in to comment.