Skip to content

Commit

Permalink
JdbcCatalog supports custom configuration for the length of table fie…
Browse files Browse the repository at this point in the history
…lds.
  • Loading branch information
sunxiaojian committed Mar 21, 2024
1 parent 532cd8f commit da76e4f
Show file tree
Hide file tree
Showing 12 changed files with 60 additions and 27 deletions.
3 changes: 3 additions & 0 deletions docs/content/how-to/creating-catalogs.md
Original file line number Diff line number Diff line change
Expand Up @@ -210,8 +210,11 @@ You can configure any connection parameters that have been declared by JDBC thro
You can also perform logical isolation for databases under multiple catalogs by specifying "catalog-key".
Additionally, when creating a JdbcCatalog, you can specify the maximum length for the lock key by configuring "lock-key-max-length," which defaults to 255. Since this value is a combination of {catalog-key}.{database-name}.{table-name}, please adjust accordingly.
You can define any default table options with the prefix `table-default.` for tables created in the catalog.
{{< /tab >}}
{{< /tabs >}}
Original file line number Diff line number Diff line change
Expand Up @@ -32,5 +32,11 @@
<td>String</td>
<td>Custom jdbc catalog store key.</td>
</tr>
<tr>
<td><h5>lock-key-max-length</h5></td>
<td style="word-wrap: break-word;">1000</td>
<td>Integer</td>
<td>Set the maximum length of the lock key. The 'lock-key' is composed of concatenating three fields : 'catalog-key', 'database', and 'table'.</td>
</tr>
</tbody>
</table>
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@

package org.apache.paimon.jdbc;

import org.apache.paimon.options.Options;

import java.sql.DatabaseMetaData;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
Expand All @@ -27,7 +29,9 @@
public abstract class AbstractDistributedLockDialect implements JdbcDistributedLockDialect {

@Override
public void createTable(JdbcClientPool connections) throws SQLException, InterruptedException {
public void createTable(JdbcClientPool connections, Options options)
throws SQLException, InterruptedException {
Integer lockKeyMaxLength = JdbcCatalogOptions.lockKeyMaxLength(options);
connections.run(
conn -> {
DatabaseMetaData dbMeta = conn.getMetaData();
Expand All @@ -37,7 +41,9 @@ public void createTable(JdbcClientPool connections) throws SQLException, Interru
if (tableExists.next()) {
return true;
}
return conn.prepareStatement(getCreateTableSql()).execute();
String createDistributedLockTableSql =
String.format(getCreateTableSql(), lockKeyMaxLength);
return conn.prepareStatement(createDistributedLockTableSql).execute();
});
}

Expand Down
23 changes: 11 additions & 12 deletions paimon-core/src/main/java/org/apache/paimon/jdbc/JdbcCatalog.java
Original file line number Diff line number Diff line change
Expand Up @@ -71,24 +71,20 @@ public class JdbcCatalog extends AbstractCatalog {

private final JdbcClientPool connections;
private final String catalogKey;
private final Map<String, String> options;
private final Options options;
private final String warehouse;

protected JdbcCatalog(
FileIO fileIO, String catalogKey, Map<String, String> config, String warehouse) {
super(fileIO, Options.fromMap(config));
protected JdbcCatalog(FileIO fileIO, String catalogKey, Options options, String warehouse) {
super(fileIO, options);
this.catalogKey = catalogKey;
this.options = config;
this.options = options;
this.warehouse = warehouse;
Preconditions.checkNotNull(options, "Invalid catalog properties: null");
this.connections =
new JdbcClientPool(
Integer.parseInt(
config.getOrDefault(
CatalogOptions.CLIENT_POOL_SIZE.key(),
CatalogOptions.CLIENT_POOL_SIZE.defaultValue().toString())),
options.get(CatalogOptions.CLIENT_POOL_SIZE),
options.get(CatalogOptions.URI.key()),
options);
options.toMap());
try {
initializeCatalogTablesIfNeed();
} catch (SQLException e) {
Expand Down Expand Up @@ -135,7 +131,7 @@ private void initializeCatalogTablesIfNeed() throws SQLException, InterruptedExc

// if lock enabled, Check and create distributed lock table.
if (lockEnabled()) {
JdbcUtils.createDistributedLockTable(connections);
JdbcUtils.createDistributedLockTable(connections, options);
}
}

Expand Down Expand Up @@ -357,7 +353,10 @@ private Lock lock(Identifier identifier) {
}
JdbcCatalogLock lock =
new JdbcCatalogLock(
connections, catalogKey, checkMaxSleep(options), acquireTimeout(options));
connections,
catalogKey,
checkMaxSleep(options.toMap()),
acquireTimeout(options.toMap()));
return Lock.fromCatalog(lock, identifier);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,6 @@ public Catalog create(FileIO fileIO, Path warehouse, CatalogContext context) {
options.set(LOCK_TYPE, JdbcCatalogLock.JdbcCatalogLockFactory.IDENTIFIER);
}
}
return new JdbcCatalog(fileIO, catalogKey, context.options().toMap(), warehouse.toString());
return new JdbcCatalog(fileIO, catalogKey, context.options(), warehouse.toString());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
package org.apache.paimon.jdbc;

import org.apache.paimon.catalog.CatalogLock;
import org.apache.paimon.options.Options;
import org.apache.paimon.utils.TimeUtils;

import java.io.IOException;
Expand Down Expand Up @@ -103,18 +104,17 @@ public CatalogLock create(LockContext context) {
return new JdbcCatalogLock(
lockContext.connections,
lockContext.catalogKey,
checkMaxSleep(lockContext.conf),
acquireTimeout(lockContext.conf));
checkMaxSleep(lockContext.conf.toMap()),
acquireTimeout(lockContext.conf.toMap()));
}
}

static class JdbcLockContext implements LockContext {
private final JdbcClientPool connections;
private final String catalogKey;
private final Map<String, String> conf;
private final Options conf;

public JdbcLockContext(
JdbcClientPool connections, String catalogKey, Map<String, String> conf) {
public JdbcLockContext(JdbcClientPool connections, String catalogKey, Options conf) {
this.connections = connections;
this.catalogKey = catalogKey;
this.conf = conf;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@

import org.apache.paimon.options.ConfigOption;
import org.apache.paimon.options.ConfigOptions;
import org.apache.paimon.options.Options;

/** Options for jdbc catalog. */
public final class JdbcCatalogOptions {
Expand All @@ -30,5 +31,16 @@ public final class JdbcCatalogOptions {
.defaultValue("jdbc")
.withDescription("Custom jdbc catalog store key.");

public static final ConfigOption<Integer> LOCK_KEY_MAX_LENGTH =
ConfigOptions.key("lock-key-max-length")
.intType()
.defaultValue(255)
.withDescription(
"Set the maximum length of the lock key. The 'lock-key' is composed of concatenating three fields : 'catalog-key', 'database', and 'table'.");

private JdbcCatalogOptions() {}

static Integer lockKeyMaxLength(Options options) {
return options.get(LOCK_KEY_MAX_LENGTH);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,14 @@

package org.apache.paimon.jdbc;

import org.apache.paimon.options.Options;

import java.sql.SQLException;

/** Jdbc distributed lock interface. */
public interface JdbcDistributedLockDialect {
void createTable(JdbcClientPool connections) throws SQLException, InterruptedException;
void createTable(JdbcClientPool connections, Options options)
throws SQLException, InterruptedException;

boolean lockAcquire(JdbcClientPool connections, String lockId, long timeoutMillSeconds)
throws SQLException, InterruptedException;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
package org.apache.paimon.jdbc;

import org.apache.paimon.catalog.Identifier;
import org.apache.paimon.options.Options;

import org.apache.paimon.shade.guava30.com.google.common.collect.Maps;

Expand Down Expand Up @@ -402,9 +403,10 @@ private static String insertPropertiesStatement(int size) {
return sqlStatement.toString();
}

public static void createDistributedLockTable(JdbcClientPool connections)
public static void createDistributedLockTable(JdbcClientPool connections, Options options)
throws SQLException, InterruptedException {
DistributedLockDialectFactory.create(connections.getProtocol()).createTable(connections);
DistributedLockDialectFactory.create(connections.getProtocol())
.createTable(connections, options);
}

public static boolean acquire(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ public String getCreateTableSql() {
+ JdbcUtils.DISTRIBUTED_LOCKS_TABLE_NAME
+ "("
+ JdbcUtils.LOCK_ID
+ " VARCHAR(1000) NOT NULL,"
+ " VARCHAR(%s) NOT NULL,"
+ JdbcUtils.ACQUIRED_AT
+ " TIMESTAMP DEFAULT CURRENT_TIMESTAMP NOT NULL,"
+ JdbcUtils.EXPIRE_TIME
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,14 +20,13 @@

/** Distributed lock implementation based on sqlite table. */
public class SqlLiteDistributedLockDialect extends AbstractDistributedLockDialect {

@Override
public String getCreateTableSql() {
return "CREATE TABLE "
+ JdbcUtils.DISTRIBUTED_LOCKS_TABLE_NAME
+ "("
+ JdbcUtils.LOCK_ID
+ " VARCHAR(1000) NOT NULL,"
+ " VARCHAR(%s) NOT NULL,"
+ JdbcUtils.ACQUIRED_AT
+ " TIMESTAMP DEFAULT CURRENT_TIMESTAMP NOT NULL,"
+ JdbcUtils.EXPIRE_TIME
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import org.apache.paimon.catalog.CatalogTestBase;
import org.apache.paimon.catalog.Identifier;
import org.apache.paimon.options.CatalogOptions;
import org.apache.paimon.options.Options;

import org.apache.paimon.shade.guava30.com.google.common.collect.Maps;

Expand Down Expand Up @@ -57,7 +58,9 @@ private JdbcCatalog initCatalog(Map<String, String> props) {
properties.put(CatalogOptions.LOCK_ENABLED.key(), "true");
properties.put(CatalogOptions.LOCK_TYPE.key(), "jdbc");
properties.putAll(props);
JdbcCatalog catalog = new JdbcCatalog(fileIO, "test-jdbc-catalog", properties, warehouse);
JdbcCatalog catalog =
new JdbcCatalog(
fileIO, "test-jdbc-catalog", Options.fromMap(properties), warehouse);
assertThat(catalog.warehouse()).isEqualTo(warehouse);
return catalog;
}
Expand Down

0 comments on commit da76e4f

Please sign in to comment.