From d06d0a771df8c4759266bad715dace873fad5a05 Mon Sep 17 00:00:00 2001 From: sunxiaojian Date: Wed, 20 Mar 2024 18:59:21 +0800 Subject: [PATCH] fixed --- .../jdbc/AbstractDistributedLockDialect.java | 5 +- .../org/apache/paimon/jdbc/JdbcCatalog.java | 41 +++++++----- .../paimon/jdbc/JdbcCatalogFactory.java | 7 ++- .../apache/paimon/jdbc/JdbcCatalogLock.java | 10 +-- .../paimon/jdbc/JdbcCatalogOptions.java | 62 +++++++++++++++++++ .../jdbc/JdbcDistributedLockDialect.java | 5 +- .../org/apache/paimon/jdbc/JdbcUtils.java | 20 +++--- .../jdbc/MysqlDistributedLockDialect.java | 2 +- .../jdbc/SqlLiteDistributedLockDialect.java | 3 +- .../apache/paimon/jdbc/JdbcCatalogTest.java | 5 +- 10 files changed, 124 insertions(+), 36 deletions(-) diff --git a/paimon-core/src/main/java/org/apache/paimon/jdbc/AbstractDistributedLockDialect.java b/paimon-core/src/main/java/org/apache/paimon/jdbc/AbstractDistributedLockDialect.java index f3469d0b5ba5a..853f99aa71901 100644 --- a/paimon-core/src/main/java/org/apache/paimon/jdbc/AbstractDistributedLockDialect.java +++ b/paimon-core/src/main/java/org/apache/paimon/jdbc/AbstractDistributedLockDialect.java @@ -18,6 +18,8 @@ package org.apache.paimon.jdbc; +import org.apache.paimon.options.Options; + import java.sql.DatabaseMetaData; import java.sql.PreparedStatement; import java.sql.ResultSet; @@ -27,7 +29,8 @@ public abstract class AbstractDistributedLockDialect implements JdbcDistributedLockDialect { @Override - public void createTable(JdbcClientPool connections) throws SQLException, InterruptedException { + public void createTable(JdbcClientPool connections, Options options) + throws SQLException, InterruptedException { connections.run( conn -> { DatabaseMetaData dbMeta = conn.getMetaData(); diff --git a/paimon-core/src/main/java/org/apache/paimon/jdbc/JdbcCatalog.java b/paimon-core/src/main/java/org/apache/paimon/jdbc/JdbcCatalog.java index 689a93ee91f74..e31b7cef3efa4 100644 --- a/paimon-core/src/main/java/org/apache/paimon/jdbc/JdbcCatalog.java +++ b/paimon-core/src/main/java/org/apache/paimon/jdbc/JdbcCatalog.java @@ -71,24 +71,20 @@ public class JdbcCatalog extends AbstractCatalog { private final JdbcClientPool connections; private final String catalogKey; - private final Map options; + private final Options options; private final String warehouse; - protected JdbcCatalog( - FileIO fileIO, String catalogKey, Map config, String warehouse) { - super(fileIO, Options.fromMap(config)); + protected JdbcCatalog(FileIO fileIO, String catalogKey, Options options, String warehouse) { + super(fileIO, options); this.catalogKey = catalogKey; - this.options = config; + this.options = options; this.warehouse = warehouse; Preconditions.checkNotNull(options, "Invalid catalog properties: null"); this.connections = new JdbcClientPool( - Integer.parseInt( - config.getOrDefault( - CatalogOptions.CLIENT_POOL_SIZE.key(), - CatalogOptions.CLIENT_POOL_SIZE.defaultValue().toString())), + options.get(CatalogOptions.CLIENT_POOL_SIZE), options.get(CatalogOptions.URI.key()), - options); + options.toMap()); try { initializeCatalogTablesIfNeed(); } catch (SQLException e) { @@ -116,7 +112,13 @@ private void initializeCatalogTablesIfNeed() throws SQLException, InterruptedExc if (tableExists.next()) { return true; } - return conn.prepareStatement(JdbcUtils.CREATE_CATALOG_TABLE).execute(); + String createCatalogTableSql = + String.format( + JdbcUtils.CREATE_CATALOG_TABLE, + JdbcCatalogOptions.catalogKeyMaxLength(options), + JdbcCatalogOptions.databaseNameMaxLength(options), + JdbcCatalogOptions.tableNameMaxLength(options)); + return conn.prepareStatement(createCatalogTableSql).execute(); }); // Check and create database properties table. @@ -129,13 +131,19 @@ private void initializeCatalogTablesIfNeed() throws SQLException, InterruptedExc if (tableExists.next()) { return true; } - return conn.prepareStatement(JdbcUtils.CREATE_DATABASE_PROPERTIES_TABLE) - .execute(); + String createDatabasePropertiesTableSql = + String.format( + JdbcUtils.CREATE_DATABASE_PROPERTIES_TABLE, + JdbcCatalogOptions.catalogKeyMaxLength(options), + JdbcCatalogOptions.databaseNameMaxLength(options), + JdbcCatalogOptions.databasePropertiesKeyMaxLength(options), + JdbcCatalogOptions.databasePropertiesValueMaxLength(options)); + return conn.prepareStatement(createDatabasePropertiesTableSql).execute(); }); // if lock enabled, Check and create distributed lock table. if (lockEnabled()) { - JdbcUtils.createDistributedLockTable(connections); + JdbcUtils.createDistributedLockTable(connections, options); } } @@ -357,7 +365,10 @@ private Lock lock(Identifier identifier) { } JdbcCatalogLock lock = new JdbcCatalogLock( - connections, catalogKey, checkMaxSleep(options), acquireTimeout(options)); + connections, + catalogKey, + checkMaxSleep(options.toMap()), + acquireTimeout(options.toMap())); return Lock.fromCatalog(lock, identifier); } diff --git a/paimon-core/src/main/java/org/apache/paimon/jdbc/JdbcCatalogFactory.java b/paimon-core/src/main/java/org/apache/paimon/jdbc/JdbcCatalogFactory.java index 5e605923206b3..86fb204054280 100644 --- a/paimon-core/src/main/java/org/apache/paimon/jdbc/JdbcCatalogFactory.java +++ b/paimon-core/src/main/java/org/apache/paimon/jdbc/JdbcCatalogFactory.java @@ -24,6 +24,7 @@ import org.apache.paimon.fs.FileIO; import org.apache.paimon.fs.Path; import org.apache.paimon.options.Options; +import org.apache.paimon.utils.Preconditions; import static org.apache.paimon.options.CatalogOptions.LOCK_ENABLED; import static org.apache.paimon.options.CatalogOptions.LOCK_TYPE; @@ -42,11 +43,15 @@ public String identifier() { public Catalog create(FileIO fileIO, Path warehouse, CatalogContext context) { Options options = context.options(); String catalogKey = options.get(JdbcCatalogOptions.CATALOG_KEY); + Integer catalogKeyMaxLength = JdbcCatalogOptions.catalogKeyMaxLength(options); + Preconditions.checkArgument( + catalogKey.length() > catalogKeyMaxLength, + "Catalog key exceeds the maximum length set, please adjust the 'catalog-key-max-length' configuration."); if (options.get(LOCK_ENABLED)) { if (!options.getOptional(LOCK_TYPE).isPresent()) { options.set(LOCK_TYPE, JdbcCatalogLock.JdbcCatalogLockFactory.IDENTIFIER); } } - return new JdbcCatalog(fileIO, catalogKey, context.options().toMap(), warehouse.toString()); + return new JdbcCatalog(fileIO, catalogKey, context.options(), warehouse.toString()); } } diff --git a/paimon-core/src/main/java/org/apache/paimon/jdbc/JdbcCatalogLock.java b/paimon-core/src/main/java/org/apache/paimon/jdbc/JdbcCatalogLock.java index d135a6caf22bc..d713feb7e49fc 100644 --- a/paimon-core/src/main/java/org/apache/paimon/jdbc/JdbcCatalogLock.java +++ b/paimon-core/src/main/java/org/apache/paimon/jdbc/JdbcCatalogLock.java @@ -19,6 +19,7 @@ package org.apache.paimon.jdbc; import org.apache.paimon.catalog.CatalogLock; +import org.apache.paimon.options.Options; import org.apache.paimon.utils.TimeUtils; import java.io.IOException; @@ -103,18 +104,17 @@ public CatalogLock create(LockContext context) { return new JdbcCatalogLock( lockContext.connections, lockContext.catalogKey, - checkMaxSleep(lockContext.conf), - acquireTimeout(lockContext.conf)); + checkMaxSleep(lockContext.conf.toMap()), + acquireTimeout(lockContext.conf.toMap())); } } static class JdbcLockContext implements LockContext { private final JdbcClientPool connections; private final String catalogKey; - private final Map conf; + private final Options conf; - public JdbcLockContext( - JdbcClientPool connections, String catalogKey, Map conf) { + public JdbcLockContext(JdbcClientPool connections, String catalogKey, Options conf) { this.connections = connections; this.catalogKey = catalogKey; this.conf = conf; diff --git a/paimon-core/src/main/java/org/apache/paimon/jdbc/JdbcCatalogOptions.java b/paimon-core/src/main/java/org/apache/paimon/jdbc/JdbcCatalogOptions.java index 97828f7c8d7d7..bb51bd360d708 100644 --- a/paimon-core/src/main/java/org/apache/paimon/jdbc/JdbcCatalogOptions.java +++ b/paimon-core/src/main/java/org/apache/paimon/jdbc/JdbcCatalogOptions.java @@ -20,6 +20,7 @@ import org.apache.paimon.options.ConfigOption; import org.apache.paimon.options.ConfigOptions; +import org.apache.paimon.options.Options; /** Options for jdbc catalog. */ public final class JdbcCatalogOptions { @@ -30,5 +31,66 @@ public final class JdbcCatalogOptions { .defaultValue("jdbc") .withDescription("Custom jdbc catalog store key."); + public static final ConfigOption CATALOG_KEY_MAX_LENGTH = + ConfigOptions.key("catalog-key-max-length") + .intType() + .defaultValue(255) + .withDescription("Set the maximum length of the catalog key."); + + public static final ConfigOption DATABASE_NAME_MAX_LENGTH = + ConfigOptions.key("database-name-max-length") + .intType() + .defaultValue(255) + .withDescription("Set the maximum length of the database name."); + + public static final ConfigOption TABLE_NAME_MAX_LENGTH = + ConfigOptions.key("table-name-max-length") + .intType() + .defaultValue(255) + .withDescription("Set the maximum length of the table name."); + + public static final ConfigOption DATABASE_PROPS_KEY_MAX_LENGTH = + ConfigOptions.key("database-properties-key-max-length") + .intType() + .defaultValue(255) + .withDescription("Set the maximum length of the database properties key."); + + public static final ConfigOption DATABASE_PROPS_VALUE_MAX_LENGTH = + ConfigOptions.key("database-properties-value-max-length") + .intType() + .defaultValue(1000) + .withDescription("Set the maximum length of the database properties value."); + + public static final ConfigOption LOCK_KEY_MAX_LENGTH = + ConfigOptions.key("lock-key-max-length") + .intType() + .defaultValue(1000) + .withDescription( + "Set the maximum length of the lock key. The 'lock-key' is composed of concatenating three fields : 'catalog-key', 'database', and 'table'."); + private JdbcCatalogOptions() {} + + static Integer catalogKeyMaxLength(Options options) { + return options.get(CATALOG_KEY_MAX_LENGTH); + } + + static Integer databaseNameMaxLength(Options options) { + return options.get(DATABASE_NAME_MAX_LENGTH); + } + + static Integer tableNameMaxLength(Options options) { + return options.get(TABLE_NAME_MAX_LENGTH); + } + + static Integer databasePropertiesKeyMaxLength(Options options) { + return options.get(DATABASE_PROPS_KEY_MAX_LENGTH); + } + + static Integer databasePropertiesValueMaxLength(Options options) { + return options.get(DATABASE_PROPS_VALUE_MAX_LENGTH); + } + + static Integer lockKeyMaxLength(Options options) { + return options.get(LOCK_KEY_MAX_LENGTH); + } } diff --git a/paimon-core/src/main/java/org/apache/paimon/jdbc/JdbcDistributedLockDialect.java b/paimon-core/src/main/java/org/apache/paimon/jdbc/JdbcDistributedLockDialect.java index a691aac2295fd..1252dc905d6b1 100644 --- a/paimon-core/src/main/java/org/apache/paimon/jdbc/JdbcDistributedLockDialect.java +++ b/paimon-core/src/main/java/org/apache/paimon/jdbc/JdbcDistributedLockDialect.java @@ -18,11 +18,14 @@ package org.apache.paimon.jdbc; +import org.apache.paimon.options.Options; + import java.sql.SQLException; /** Jdbc distributed lock interface. */ public interface JdbcDistributedLockDialect { - void createTable(JdbcClientPool connections) throws SQLException, InterruptedException; + void createTable(JdbcClientPool connections, Options options) + throws SQLException, InterruptedException; boolean lockAcquire(JdbcClientPool connections, String lockId, long timeoutMillSeconds) throws SQLException, InterruptedException; diff --git a/paimon-core/src/main/java/org/apache/paimon/jdbc/JdbcUtils.java b/paimon-core/src/main/java/org/apache/paimon/jdbc/JdbcUtils.java index 7b9b93a5a4e23..7eb1d5fa969b0 100644 --- a/paimon-core/src/main/java/org/apache/paimon/jdbc/JdbcUtils.java +++ b/paimon-core/src/main/java/org/apache/paimon/jdbc/JdbcUtils.java @@ -19,6 +19,7 @@ package org.apache.paimon.jdbc; import org.apache.paimon.catalog.Identifier; +import org.apache.paimon.options.Options; import org.apache.paimon.shade.guava30.com.google.common.collect.Maps; @@ -47,11 +48,11 @@ public class JdbcUtils { + CATALOG_TABLE_NAME + "(" + CATALOG_KEY - + " VARCHAR(255) NOT NULL," + + " VARCHAR(%s) NOT NULL," + TABLE_DATABASE - + " VARCHAR(255) NOT NULL," + + " VARCHAR(%s) NOT NULL," + TABLE_NAME - + " VARCHAR(255) NOT NULL," + + " VARCHAR(%s) NOT NULL," + " PRIMARY KEY (" + CATALOG_KEY + ", " @@ -154,13 +155,13 @@ public class JdbcUtils { + DATABASE_PROPERTIES_TABLE_NAME + "(" + CATALOG_KEY - + " VARCHAR(255) NOT NULL," + + " VARCHAR(%s) NOT NULL," + DATABASE_NAME - + " VARCHAR(255) NOT NULL," + + " VARCHAR(%s) NOT NULL," + DATABASE_PROPERTY_KEY - + " VARCHAR(255)," + + " VARCHAR(%s)," + DATABASE_PROPERTY_VALUE - + " VARCHAR(1000)," + + " VARCHAR(%s)," + "PRIMARY KEY (" + CATALOG_KEY + ", " @@ -402,9 +403,10 @@ private static String insertPropertiesStatement(int size) { return sqlStatement.toString(); } - public static void createDistributedLockTable(JdbcClientPool connections) + public static void createDistributedLockTable(JdbcClientPool connections, Options options) throws SQLException, InterruptedException { - DistributedLockDialectFactory.create(connections.getProtocol()).createTable(connections); + DistributedLockDialectFactory.create(connections.getProtocol()) + .createTable(connections, options); } public static boolean acquire( diff --git a/paimon-core/src/main/java/org/apache/paimon/jdbc/MysqlDistributedLockDialect.java b/paimon-core/src/main/java/org/apache/paimon/jdbc/MysqlDistributedLockDialect.java index 206aa8cd77ad3..bca8aac20b867 100644 --- a/paimon-core/src/main/java/org/apache/paimon/jdbc/MysqlDistributedLockDialect.java +++ b/paimon-core/src/main/java/org/apache/paimon/jdbc/MysqlDistributedLockDialect.java @@ -27,7 +27,7 @@ public String getCreateTableSql() { + JdbcUtils.DISTRIBUTED_LOCKS_TABLE_NAME + "(" + JdbcUtils.LOCK_ID - + " VARCHAR(1000) NOT NULL," + + " VARCHAR(%s) NOT NULL," + JdbcUtils.ACQUIRED_AT + " TIMESTAMP DEFAULT CURRENT_TIMESTAMP NOT NULL," + JdbcUtils.EXPIRE_TIME diff --git a/paimon-core/src/main/java/org/apache/paimon/jdbc/SqlLiteDistributedLockDialect.java b/paimon-core/src/main/java/org/apache/paimon/jdbc/SqlLiteDistributedLockDialect.java index 602fdd1d625e2..1ffdd61bc2f2a 100644 --- a/paimon-core/src/main/java/org/apache/paimon/jdbc/SqlLiteDistributedLockDialect.java +++ b/paimon-core/src/main/java/org/apache/paimon/jdbc/SqlLiteDistributedLockDialect.java @@ -20,14 +20,13 @@ /** Distributed lock implementation based on sqlite table. */ public class SqlLiteDistributedLockDialect extends AbstractDistributedLockDialect { - @Override public String getCreateTableSql() { return "CREATE TABLE " + JdbcUtils.DISTRIBUTED_LOCKS_TABLE_NAME + "(" + JdbcUtils.LOCK_ID - + " VARCHAR(1000) NOT NULL," + + " VARCHAR(%s) NOT NULL," + JdbcUtils.ACQUIRED_AT + " TIMESTAMP DEFAULT CURRENT_TIMESTAMP NOT NULL," + JdbcUtils.EXPIRE_TIME diff --git a/paimon-core/src/test/java/org/apache/paimon/jdbc/JdbcCatalogTest.java b/paimon-core/src/test/java/org/apache/paimon/jdbc/JdbcCatalogTest.java index d03c64bd825ee..cc1febeab023e 100644 --- a/paimon-core/src/test/java/org/apache/paimon/jdbc/JdbcCatalogTest.java +++ b/paimon-core/src/test/java/org/apache/paimon/jdbc/JdbcCatalogTest.java @@ -21,6 +21,7 @@ import org.apache.paimon.catalog.CatalogTestBase; import org.apache.paimon.catalog.Identifier; import org.apache.paimon.options.CatalogOptions; +import org.apache.paimon.options.Options; import org.apache.paimon.shade.guava30.com.google.common.collect.Maps; @@ -57,7 +58,9 @@ private JdbcCatalog initCatalog(Map props) { properties.put(CatalogOptions.LOCK_ENABLED.key(), "true"); properties.put(CatalogOptions.LOCK_TYPE.key(), "jdbc"); properties.putAll(props); - JdbcCatalog catalog = new JdbcCatalog(fileIO, "test-jdbc-catalog", properties, warehouse); + JdbcCatalog catalog = + new JdbcCatalog( + fileIO, "test-jdbc-catalog", Options.fromMap(properties), warehouse); assertThat(catalog.warehouse()).isEqualTo(warehouse); return catalog; }