Skip to content

Commit

Permalink
Support common jdbc catalog lock for filesystem catalog.
Browse files Browse the repository at this point in the history
  • Loading branch information
sunxiaojian committed Mar 25, 2024
1 parent 48aa793 commit 3bb35f9
Show file tree
Hide file tree
Showing 5 changed files with 173 additions and 2 deletions.
10 changes: 10 additions & 0 deletions docs/content/how-to/creating-catalogs.md
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,16 @@ USE CATALOG my_catalog;

You can define any default table options with the prefix `table-default.` for tables created in the catalog.

The FileSystem catalog supports jdbc lock and can take effect through the following configuration:

> ```shell
> 'uri' = 'jdbc:mysql://<host>:<port>/<databaseName>'
> 'jdbc.user' = '...',
> 'jdbc.password' = '...',
> ```
{{< /tab >}}
{{< tab "Spark3" >}}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.Callable;

import static org.apache.paimon.catalog.FileSystemCatalogOptions.CASE_SENSITIVE;
Expand Down Expand Up @@ -156,6 +157,11 @@ private SchemaManager schemaManager(Identifier identifier) {
.withLock(catalogLock == null ? null : Lock.fromCatalog(catalogLock, identifier));
}

@Override
public Optional<CatalogLock.LockContext> lockContext() {
return LockContextUtils.lockContext(catalogOptions, "filesystem");
}

@Override
public void renameTableImpl(Identifier fromTable, Identifier toTable) {
Path fromPath = getDataTableLocation(fromTable);
Expand Down Expand Up @@ -187,7 +193,9 @@ private static String database(Path path) {
}

@Override
public void close() throws Exception {}
public void close() throws Exception {
LockContextUtils.close();
}

@Override
public String warehouse() {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,84 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.paimon.catalog;

import org.apache.paimon.jdbc.JdbcCatalogFactory;
import org.apache.paimon.jdbc.JdbcCatalogLock;
import org.apache.paimon.jdbc.JdbcClientPool;
import org.apache.paimon.jdbc.JdbcUtils;
import org.apache.paimon.options.CatalogOptions;
import org.apache.paimon.options.Options;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.sql.SQLException;
import java.util.Optional;

/** Utils for {@link org.apache.paimon.catalog.CatalogLock.LockContext}. */
public class LockContextUtils {

private static final Logger LOG = LoggerFactory.getLogger(FileSystemCatalog.class);

private static JdbcClientPool connections;

public static Optional<CatalogLock.LockContext> lockContext(
Options catalogOptions, String catalogKey) {
String lockType = catalogOptions.get(CatalogOptions.LOCK_TYPE);
if (lockType == null) {
return Optional.of(new AbstractCatalog.OptionLockContext(catalogOptions));
}
switch (lockType) {
case JdbcCatalogFactory.IDENTIFIER:
// Try init jdbc connections.
tryInitializeJdbcConnections(catalogOptions);
return Optional.of(
new JdbcCatalogLock.JdbcLockContext(
connections, catalogKey, catalogOptions));
default:
LOG.warn("Unsupported lock type:" + lockType);
return Optional.of(new AbstractCatalog.OptionLockContext(catalogOptions));
}
}

private static void tryInitializeJdbcConnections(Options catalogOptions) {
if (connections == null) {
connections =
new JdbcClientPool(
catalogOptions.get(CatalogOptions.CLIENT_POOL_SIZE),
catalogOptions.get(CatalogOptions.URI.key()),
catalogOptions.toMap());
// Check and create distributed lock table.
try {
JdbcUtils.createDistributedLockTable(connections, catalogOptions);
} catch (SQLException e) {
throw new RuntimeException("Cannot initialize JDBC distributed lock.", e);
} catch (InterruptedException e) {
throw new RuntimeException("Interrupted in call to initialize", e);
}
}
}

public static void close() {
if (connections != null && !connections.isClosed()) {
connections.close();
connections = null;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,8 @@ public CatalogLock create(LockContext context) {
}
}

static class JdbcLockContext implements LockContext {
/** Jdbc lock context. */
public static class JdbcLockContext implements LockContext {
private final JdbcClientPool connections;
private final String catalogKey;
private final Options conf;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.paimon.catalog;

import org.apache.paimon.fs.Path;
import org.apache.paimon.jdbc.JdbcCatalog;
import org.apache.paimon.options.CatalogOptions;
import org.apache.paimon.options.Options;

import org.apache.paimon.shade.guava30.com.google.common.collect.Maps;

import org.junit.jupiter.api.BeforeEach;

import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.UUID;

import static org.assertj.core.api.Assertions.assertThat;

/** Tests for {@link FileSystemCatalog}. */
public class FileSystemCatalogTest extends CatalogTestBase {

@BeforeEach
public void setUp() throws Exception {
super.setUp();
catalog = initCatalog(Maps.newHashMap());
}

private FileSystemCatalog initCatalog(Map<String, String> props) {
Map<String, String> properties = Maps.newHashMap();
properties.put(
CatalogOptions.URI.key(),
"jdbc:sqlite:file::memory:?ic" + UUID.randomUUID().toString().replace("-", ""));

properties.put(JdbcCatalog.PROPERTY_PREFIX + "username", "user");
properties.put(JdbcCatalog.PROPERTY_PREFIX + "password", "password");
properties.put(CatalogOptions.WAREHOUSE.key(), warehouse);
properties.put(CatalogOptions.LOCK_ENABLED.key(), "true");
properties.put(CatalogOptions.LOCK_TYPE.key(), "jdbc");
properties.putAll(props);
FileSystemCatalog catalog =
new FileSystemCatalog(fileIO, new Path(warehouse), Options.fromMap(properties));
return catalog;
}

@Override
public void testListDatabasesWhenNoDatabases() {
List<String> databases = catalog.listDatabases();
assertThat(databases).isEqualTo(new ArrayList<>());
}
}

0 comments on commit 3bb35f9

Please sign in to comment.