Skip to content

Commit

Permalink
HiveCatalog supports client pool
Browse files Browse the repository at this point in the history
  • Loading branch information
sunxiaojian committed Mar 27, 2024
1 parent 3237e1a commit 1a527f3
Show file tree
Hide file tree
Showing 17 changed files with 532 additions and 130 deletions.
12 changes: 12 additions & 0 deletions docs/layouts/shortcodes/generated/catalog_configuration.html
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,18 @@
</tr>
</thead>
<tbody>
<tr>
<td><h5>catalog-key</h5></td>
<td style="word-wrap: break-word;">(none)</td>
<td>String</td>
<td>Custom catalog key.</td>
</tr>
<tr>
<td><h5>client-pool-cache-eviction-interval-ms</h5></td>
<td style="word-wrap: break-word;">300000</td>
<td>Long</td>
<td>Client pool cache eviction interval ms.</td>
</tr>
<tr>
<td><h5>client-pool-size</h5></td>
<td style="word-wrap: break-word;">2</td>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,12 +26,6 @@
</tr>
</thead>
<tbody>
<tr>
<td><h5>catalog-key</h5></td>
<td style="word-wrap: break-word;">"jdbc"</td>
<td>String</td>
<td>Custom jdbc catalog store key.</td>
</tr>
<tr>
<td><h5>lock-key-max-length</h5></td>
<td style="word-wrap: break-word;">255</td>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,21 @@

package org.apache.paimon.client;

import org.apache.paimon.options.CatalogOptions;
import org.apache.paimon.options.Options;
import org.apache.paimon.utils.StringUtils;

import org.apache.paimon.shade.guava30.com.google.common.collect.Lists;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.Closeable;
import java.io.IOException;
import java.io.Serializable;
import java.util.ArrayDeque;
import java.util.Deque;
import java.util.List;

import static org.apache.paimon.utils.Preconditions.checkState;

Expand All @@ -43,7 +52,8 @@ interface Action<R, C, E extends Exception> {
<R> R run(Action<R, C, E> action, boolean retry) throws E, InterruptedException;

/** Default implementation for {@link ClientPool}. */
abstract class ClientPoolImpl<C, E extends Exception> implements Closeable, ClientPool<C, E> {
abstract class ClientPoolImpl<C, E extends Exception>
implements Closeable, Serializable, ClientPool<C, E> {
private static final Logger LOG = LoggerFactory.getLogger(ClientPoolImpl.class);

private final int poolSize;
Expand Down Expand Up @@ -169,4 +179,60 @@ public boolean isClosed() {
return closed;
}
}

/** Cached client pool for {@link ClientPool}. */
abstract class CachedClientPool<C, E extends Exception, CP extends ClientPoolImpl>
implements Closeable, Serializable, ClientPool<C, E> {

protected static final String CONF_KEY_PREFIX = "confKey:";
protected final long evictionInterval;
protected final String key;
protected final String metadata;
private final Options options;

public CachedClientPool(Options options) {
this.options = options;
this.evictionInterval =
options.get(CatalogOptions.CLIENT_POOL_CACHE_EVICTION_INTERVAL_MS);
this.metadata = options.get(CatalogOptions.METASTORE);
this.key = extractKey(options);
init();
}

protected Options options() {
return options;
}

protected abstract void init();

protected abstract ClientPool<C, E> clientPool();

@Override
public <R> R run(Action<R, C, E> action) throws E, InterruptedException {
return clientPool().run(action);
}

@Override
public <R> R run(Action<R, C, E> action, boolean retry) throws E, InterruptedException {
return clientPool().run(action, retry);
}

private String extractKey(Options options) {
List<Object> elements = Lists.newArrayList();
elements.add(options.get(CatalogOptions.URI));
String metastore = options.get(CatalogOptions.METASTORE);
elements.add(metastore);
String catalogKey = options.getOptional(CatalogOptions.CATALOG_KEY).orElse(metastore);
elements.add(catalogKey);
elements.addAll(extractKeyElement());
return CONF_KEY_PREFIX.concat(StringUtils.join(elements, "."));
}

protected abstract List<String> extractKeyElement();

@Override
public void close() throws IOException {
// Do nothing, will automatically clean up
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -110,4 +110,16 @@ public class CatalogOptions {
TextElement.text(
"\"custom\": You can implement LineageMetaFactory and LineageMeta to store lineage information in customized storage."))
.build());

public static final ConfigOption<String> CATALOG_KEY =
ConfigOptions.key("catalog-key")
.stringType()
.noDefaultValue()
.withDescription("Custom catalog key.");

public static final ConfigOption<Long> CLIENT_POOL_CACHE_EVICTION_INTERVAL_MS =
ConfigOptions.key("client-pool-cache-eviction-interval-ms")
.longType()
.defaultValue(5 * 60 * 1000L)
.withDescription("Client pool cache eviction interval ms.");
}
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
import org.apache.paimon.schema.SchemaManager;
import org.apache.paimon.schema.TableSchema;
import org.apache.paimon.utils.Preconditions;
import org.apache.paimon.utils.StringUtils;

import org.apache.paimon.shade.guava30.com.google.common.collect.ImmutableMap;
import org.apache.paimon.shade.guava30.com.google.common.collect.Lists;
Expand Down Expand Up @@ -77,7 +78,7 @@ public class JdbcCatalog extends AbstractCatalog {

protected JdbcCatalog(FileIO fileIO, String catalogKey, Options options, String warehouse) {
super(fileIO, options);
this.catalogKey = catalogKey;
this.catalogKey = StringUtils.isBlank(catalogKey) ? "jdbc" : catalogKey;
this.options = options;
this.warehouse = warehouse;
Preconditions.checkNotNull(options, "Invalid catalog properties: null");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import org.apache.paimon.catalog.CatalogFactory;
import org.apache.paimon.fs.FileIO;
import org.apache.paimon.fs.Path;
import org.apache.paimon.options.CatalogOptions;
import org.apache.paimon.options.Options;

/** Factory to create {@link JdbcCatalog}. */
Expand All @@ -38,7 +39,7 @@ public String identifier() {
@Override
public Catalog create(FileIO fileIO, Path warehouse, CatalogContext context) {
Options options = context.options();
String catalogKey = options.get(JdbcCatalogOptions.CATALOG_KEY);
String catalogKey = options.get(CatalogOptions.CATALOG_KEY);
return new JdbcCatalog(fileIO, catalogKey, context.options(), warehouse.toString());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -25,12 +25,6 @@
/** Options for jdbc catalog. */
public final class JdbcCatalogOptions {

public static final ConfigOption<String> CATALOG_KEY =
ConfigOptions.key("catalog-key")
.stringType()
.defaultValue("jdbc")
.withDescription("Custom jdbc catalog store key.");

public static final ConfigOption<Integer> LOCK_KEY_MAX_LENGTH =
ConfigOptions.key("lock-key-max-length")
.intType()
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,99 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.paimon.hive;

import org.apache.paimon.client.ClientPool;
import org.apache.paimon.options.Options;

import org.apache.paimon.shade.caffeine2.com.github.benmanes.caffeine.cache.Cache;
import org.apache.paimon.shade.caffeine2.com.github.benmanes.caffeine.cache.Caffeine;
import org.apache.paimon.shade.caffeine2.com.github.benmanes.caffeine.cache.Scheduler;
import org.apache.paimon.shade.guava30.com.google.common.collect.Lists;

import org.apache.hadoop.hive.metastore.IMetaStoreClient;
import org.apache.thrift.TException;

import java.util.List;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;

/**
* Cache HiveClientPool, share connection pool between multiple tasks to prevent excessive
* MetadataClient requests.
*/
public class HiveCachedClientPool
extends ClientPool.CachedClientPool<IMetaStoreClient, TException, HiveClientPool> {

protected static Cache<String, ClientPoolImpl> clientPoolCache;
private final SerializableHiveConf hiveConf;
private final String clientClassName;
private final int poolSize;

public HiveCachedClientPool(
int poolSize, SerializableHiveConf hiveConf, String clientClassName, Options options) {
super(options);
this.poolSize = poolSize;
this.hiveConf = hiveConf;
this.clientClassName = clientClassName;
}

@Override
protected synchronized void init() {
if (clientPoolCache == null) {
clientPoolCache =
Caffeine.newBuilder()
.expireAfterAccess(evictionInterval, TimeUnit.MILLISECONDS)
.removalListener(
(ignored, value, cause) -> ((ClientPoolImpl) value).close())
.scheduler(
Scheduler.forScheduledExecutorService(
new ScheduledThreadPoolExecutor(
1,
new ThreadFactory() {
final ThreadFactory defaultFactory =
Executors.defaultThreadFactory();

@Override
public Thread newThread(Runnable r) {
Thread thread =
defaultFactory.newThread(r);
thread.setDaemon(true);
return thread;
}
})))
.build();
}
}

@Override
public HiveClientPool clientPool() {
return (HiveClientPool)
clientPoolCache.get(
key, k -> new HiveClientPool(poolSize, hiveConf, clientClassName));
}

@Override
protected List<String> extractKeyElement() {
List<String> elements = Lists.newArrayList();
elements.add(options().get(HiveCatalogFactory.METASTORE_CLIENT_CLASS));
return elements;
}
}
Loading

0 comments on commit 1a527f3

Please sign in to comment.