Skip to content

Commit

Permalink
fixed
Browse files Browse the repository at this point in the history
  • Loading branch information
sunxiaojian committed Mar 25, 2024
1 parent eece701 commit 983ba2b
Show file tree
Hide file tree
Showing 2 changed files with 35 additions and 20 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

package org.apache.paimon.catalog;

import org.apache.paimon.client.ClientPool;
import org.apache.paimon.fs.FileIO;
import org.apache.paimon.fs.FileStatus;
import org.apache.paimon.fs.Path;
Expand Down Expand Up @@ -47,6 +48,8 @@ public class FileSystemCatalog extends AbstractCatalog {

private final Path warehouse;

private ClientPool clientPool;

public FileSystemCatalog(FileIO fileIO, Path warehouse) {
super(fileIO);
this.warehouse = warehouse;
Expand Down Expand Up @@ -159,7 +162,10 @@ private SchemaManager schemaManager(Identifier identifier) {

@Override
public Optional<CatalogLock.LockContext> lockContext() {
return LockContextUtils.lockContext(catalogOptions, "filesystem");
if (clientPool == null) {
this.clientPool = LockContextUtils.tryInitializeClientPool(catalogOptions);
}
return LockContextUtils.lockContext(this.clientPool, catalogOptions, "filesystem");
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

package org.apache.paimon.catalog;

import org.apache.paimon.client.ClientPool;
import org.apache.paimon.jdbc.JdbcCatalogFactory;
import org.apache.paimon.jdbc.JdbcCatalogLock;
import org.apache.paimon.jdbc.JdbcClientPool;
Expand All @@ -39,15 +40,14 @@ public class LockContextUtils {
private static JdbcClientPool connections;

public static Optional<CatalogLock.LockContext> lockContext(
Options catalogOptions, String catalogKey) {
String lockType = catalogOptions.get(CatalogOptions.LOCK_TYPE);
if (lockType == null) {
ClientPool clientPool, Options catalogOptions, String catalogKey) {
if (clientPool == null) {
return Optional.of(new AbstractCatalog.OptionLockContext(catalogOptions));
}
String lockType = catalogOptions.get(CatalogOptions.LOCK_TYPE);
switch (lockType) {
case JdbcCatalogFactory.IDENTIFIER:
// Try init jdbc connections.
tryInitializeJdbcConnections(catalogOptions);
JdbcClientPool connections = (JdbcClientPool) clientPool;
return Optional.of(
new JdbcCatalogLock.JdbcLockContext(
connections, catalogKey, catalogOptions));
Expand All @@ -57,20 +57,29 @@ public static Optional<CatalogLock.LockContext> lockContext(
}
}

private static void tryInitializeJdbcConnections(Options catalogOptions) {
if (connections == null) {
connections =
new JdbcClientPool(
catalogOptions.get(CatalogOptions.CLIENT_POOL_SIZE),
catalogOptions.get(CatalogOptions.URI.key()),
catalogOptions.toMap());
try {
JdbcUtils.createDistributedLockTable(connections, catalogOptions);
} catch (SQLException e) {
throw new RuntimeException("Cannot initialize JDBC distributed lock.", e);
} catch (InterruptedException e) {
throw new RuntimeException("Interrupted in call to initialize", e);
}
public static ClientPool tryInitializeClientPool(Options catalogOptions) {
String lockType = catalogOptions.get(CatalogOptions.LOCK_TYPE);
if (lockType == null) {
return null;
}
switch (lockType) {
case JdbcCatalogFactory.IDENTIFIER:
JdbcClientPool connections =
new JdbcClientPool(
catalogOptions.get(CatalogOptions.CLIENT_POOL_SIZE),
catalogOptions.get(CatalogOptions.URI.key()),
catalogOptions.toMap());
try {
JdbcUtils.createDistributedLockTable(connections, catalogOptions);
} catch (SQLException e) {
throw new RuntimeException("Cannot initialize JDBC distributed lock.", e);
} catch (InterruptedException e) {
throw new RuntimeException("Interrupted in call to initialize", e);
}
return connections;
default:
LOG.warn("Unsupported lock type:" + lockType);
return null;
}
}

Expand Down

0 comments on commit 983ba2b

Please sign in to comment.