From 983ba2bd53ad9ecac3985cc8b9a461b371f371fc Mon Sep 17 00:00:00 2001 From: sunxiaojian <sunxiaojian926@163.com> Date: Mon, 25 Mar 2024 23:49:03 +0800 Subject: [PATCH] fixed --- .../paimon/catalog/FileSystemCatalog.java | 8 +++- .../paimon/catalog/LockContextUtils.java | 47 +++++++++++-------- 2 files changed, 35 insertions(+), 20 deletions(-) diff --git a/paimon-core/src/main/java/org/apache/paimon/catalog/FileSystemCatalog.java b/paimon-core/src/main/java/org/apache/paimon/catalog/FileSystemCatalog.java index 995f00ae22488..da9dc3eae01fd 100644 --- a/paimon-core/src/main/java/org/apache/paimon/catalog/FileSystemCatalog.java +++ b/paimon-core/src/main/java/org/apache/paimon/catalog/FileSystemCatalog.java @@ -18,6 +18,7 @@ package org.apache.paimon.catalog; +import org.apache.paimon.client.ClientPool; import org.apache.paimon.fs.FileIO; import org.apache.paimon.fs.FileStatus; import org.apache.paimon.fs.Path; @@ -47,6 +48,8 @@ public class FileSystemCatalog extends AbstractCatalog { private final Path warehouse; + private ClientPool clientPool; + public FileSystemCatalog(FileIO fileIO, Path warehouse) { super(fileIO); this.warehouse = warehouse; @@ -159,7 +162,10 @@ private SchemaManager schemaManager(Identifier identifier) { @Override public Optional<CatalogLock.LockContext> lockContext() { - return LockContextUtils.lockContext(catalogOptions, "filesystem"); + if (clientPool == null) { + this.clientPool = LockContextUtils.tryInitializeClientPool(catalogOptions); + } + return LockContextUtils.lockContext(this.clientPool, catalogOptions, "filesystem"); } @Override diff --git a/paimon-core/src/main/java/org/apache/paimon/catalog/LockContextUtils.java b/paimon-core/src/main/java/org/apache/paimon/catalog/LockContextUtils.java index 99716946e9d42..a588b8c9c982b 100644 --- a/paimon-core/src/main/java/org/apache/paimon/catalog/LockContextUtils.java +++ b/paimon-core/src/main/java/org/apache/paimon/catalog/LockContextUtils.java @@ -18,6 +18,7 @@ package org.apache.paimon.catalog; +import org.apache.paimon.client.ClientPool; import org.apache.paimon.jdbc.JdbcCatalogFactory; import org.apache.paimon.jdbc.JdbcCatalogLock; import org.apache.paimon.jdbc.JdbcClientPool; @@ -39,15 +40,14 @@ public class LockContextUtils { private static JdbcClientPool connections; public static Optional<CatalogLock.LockContext> lockContext( - Options catalogOptions, String catalogKey) { - String lockType = catalogOptions.get(CatalogOptions.LOCK_TYPE); - if (lockType == null) { + ClientPool clientPool, Options catalogOptions, String catalogKey) { + if (clientPool == null) { return Optional.of(new AbstractCatalog.OptionLockContext(catalogOptions)); } + String lockType = catalogOptions.get(CatalogOptions.LOCK_TYPE); switch (lockType) { case JdbcCatalogFactory.IDENTIFIER: - // Try init jdbc connections. - tryInitializeJdbcConnections(catalogOptions); + JdbcClientPool connections = (JdbcClientPool) clientPool; return Optional.of( new JdbcCatalogLock.JdbcLockContext( connections, catalogKey, catalogOptions)); @@ -57,20 +57,29 @@ public static Optional<CatalogLock.LockContext> lockContext( } } - private static void tryInitializeJdbcConnections(Options catalogOptions) { - if (connections == null) { - connections = - new JdbcClientPool( - catalogOptions.get(CatalogOptions.CLIENT_POOL_SIZE), - catalogOptions.get(CatalogOptions.URI.key()), - catalogOptions.toMap()); - try { - JdbcUtils.createDistributedLockTable(connections, catalogOptions); - } catch (SQLException e) { - throw new RuntimeException("Cannot initialize JDBC distributed lock.", e); - } catch (InterruptedException e) { - throw new RuntimeException("Interrupted in call to initialize", e); - } + public static ClientPool tryInitializeClientPool(Options catalogOptions) { + String lockType = catalogOptions.get(CatalogOptions.LOCK_TYPE); + if (lockType == null) { + return null; + } + switch (lockType) { + case JdbcCatalogFactory.IDENTIFIER: + JdbcClientPool connections = + new JdbcClientPool( + catalogOptions.get(CatalogOptions.CLIENT_POOL_SIZE), + catalogOptions.get(CatalogOptions.URI.key()), + catalogOptions.toMap()); + try { + JdbcUtils.createDistributedLockTable(connections, catalogOptions); + } catch (SQLException e) { + throw new RuntimeException("Cannot initialize JDBC distributed lock.", e); + } catch (InterruptedException e) { + throw new RuntimeException("Interrupted in call to initialize", e); + } + return connections; + default: + LOG.warn("Unsupported lock type:" + lockType); + return null; } }