diff --git a/plugin/trino-adb/pom.xml b/plugin/trino-adb/pom.xml index c91adb248c2a..72d48b81311f 100644 --- a/plugin/trino-adb/pom.xml +++ b/plugin/trino-adb/pom.xml @@ -11,6 +11,9 @@ trino-adb trino-plugin Trino - Adb connector + + 6.2.1 + @@ -34,6 +37,12 @@ + + com.zaxxer + HikariCP + ${hikaricp.version} + + io.airlift concurrent @@ -135,6 +144,10 @@ io.opentelemetry opentelemetry-context + + io.trino + trino-cache + io.trino trino-matching @@ -154,10 +167,14 @@ + + io.trino + trino-cache + + io.trino trino-matching - compile diff --git a/plugin/trino-adb/src/main/java/io/trino/plugin/adb/AdbPluginConfig.java b/plugin/trino-adb/src/main/java/io/trino/plugin/adb/AdbPluginConfig.java index 569ab27a8fc9..671d33eda305 100644 --- a/plugin/trino-adb/src/main/java/io/trino/plugin/adb/AdbPluginConfig.java +++ b/plugin/trino-adb/src/main/java/io/trino/plugin/adb/AdbPluginConfig.java @@ -26,13 +26,13 @@ public class AdbPluginConfig { public static final String IDENTIFIER_QUOTE = "\""; + private final TransferDataProtocol dataProtocol = TransferDataProtocol.GPFDIST; private AdbPluginConfig.ArrayMapping arrayMapping = AdbPluginConfig.ArrayMapping.DISABLED; private int maxScanParallelism = 1; private boolean includeSystemTables; private Integer fetchSize; private DataSize writeBufferSize = DataSize.of(64L, DataSize.Unit.MEGABYTE); private DataSize readBufferSize = DataSize.of(64L, DataSize.Unit.MEGABYTE); - private final TransferDataProtocol dataProtocol = TransferDataProtocol.GPFDIST; private Duration gpfdistRetryTimeout; private boolean enableStringPushdownWithCollate; diff --git a/plugin/trino-adb/src/main/java/io/trino/plugin/adb/connector/AdbClientModule.java b/plugin/trino-adb/src/main/java/io/trino/plugin/adb/connector/AdbClientModule.java index 731d03ffe5c5..7f49a7b8c488 100644 --- a/plugin/trino-adb/src/main/java/io/trino/plugin/adb/connector/AdbClientModule.java +++ b/plugin/trino-adb/src/main/java/io/trino/plugin/adb/connector/AdbClientModule.java @@ -20,6 +20,7 @@ import io.airlift.configuration.AbstractConfigurationAwareModule; import io.airlift.configuration.ConfigBinder; import io.trino.plugin.adb.AdbPluginConfig; +import io.trino.plugin.adb.connector.connection.AdbConnectionFactoryModule; import io.trino.plugin.adb.connector.datatype.mapper.DataTypeMapper; import io.trino.plugin.adb.connector.datatype.mapper.DataTypeMapperImpl; import io.trino.plugin.adb.connector.encode.DataFormatModule; @@ -51,6 +52,8 @@ public class AdbClientModule @Override protected void setup(Binder binder) { + install(new AdbConnectionFactoryModule()); + binder.bind(AdbMetadataDao.class).to(AdbMetadataDaoImpl.class).in(Scopes.SINGLETON); binder.bind(DataTypeMapper.class).to(DataTypeMapperImpl.class).in(Scopes.SINGLETON); binder.bind(StatisticsManager.class).to(StatisticsManagerImpl.class).in(Scopes.SINGLETON); @@ -58,6 +61,7 @@ protected void setup(Binder binder) binder.bind(JdbcClient.class).annotatedWith(ForBaseJdbc.class).to(AdbSqlClient.class).in(Scopes.SINGLETON); ConfigBinder.configBinder(binder).bindConfig(AdbCreateTableStorageConfig.class); ConfigBinder.configBinder(binder).bindConfig(JdbcStatisticsConfig.class); + ConfigBinder.configBinder(binder).bindConfig(AdbPushdownConfig.class); JdbcModule.bindSessionPropertiesProvider(binder, AdbSessionProperties.class); JdbcModule.bindSessionPropertiesProvider(binder, AdbPushdownSessionProperties.class); JdbcModule.bindTablePropertiesProvider(binder, AdbTableProperties.class); diff --git a/plugin/trino-adb/src/main/java/io/trino/plugin/adb/TypeUtil.java b/plugin/trino-adb/src/main/java/io/trino/plugin/adb/connector/TypeUtil.java similarity index 98% rename from plugin/trino-adb/src/main/java/io/trino/plugin/adb/TypeUtil.java rename to plugin/trino-adb/src/main/java/io/trino/plugin/adb/connector/TypeUtil.java index 00c55fc0c18f..72bd99f09f78 100644 --- a/plugin/trino-adb/src/main/java/io/trino/plugin/adb/TypeUtil.java +++ b/plugin/trino-adb/src/main/java/io/trino/plugin/adb/connector/TypeUtil.java @@ -11,7 +11,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package io.trino.plugin.adb; +package io.trino.plugin.adb.connector; import com.google.common.primitives.Shorts; import com.google.common.primitives.SignedBytes; @@ -184,7 +184,8 @@ private static Object trinoNativeToJdbcObject(ConnectorSession session, Type tri if (trinoType instanceof DecimalType decimalType) { if (decimalType.isShort()) { BigInteger unscaledValue = BigInteger.valueOf((long) trinoNative); - return new BigDecimal(unscaledValue, decimalType.getScale(), new MathContext(decimalType.getPrecision())); + return new BigDecimal(unscaledValue, decimalType.getScale(), + new MathContext(decimalType.getPrecision())); } BigInteger unscaledValue = ((Int128) trinoNative).toBigInteger(); return new BigDecimal(unscaledValue, decimalType.getScale(), new MathContext(decimalType.getPrecision())); diff --git a/plugin/trino-adb/src/main/java/io/trino/plugin/adb/connector/connection/AdbCachedJdbcConnectionFactory.java b/plugin/trino-adb/src/main/java/io/trino/plugin/adb/connector/connection/AdbCachedJdbcConnectionFactory.java new file mode 100644 index 000000000000..104b9d5967ed --- /dev/null +++ b/plugin/trino-adb/src/main/java/io/trino/plugin/adb/connector/connection/AdbCachedJdbcConnectionFactory.java @@ -0,0 +1,67 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.trino.plugin.adb.connector.connection; + +import io.airlift.log.Logger; +import io.trino.plugin.adb.connector.connection.pool.CachedDataSourceEntry; +import io.trino.plugin.adb.connector.connection.pool.JdbcDataSourcePool; +import io.trino.plugin.jdbc.ConnectionFactory; +import io.trino.spi.StandardErrorCode; +import io.trino.spi.TrinoException; +import io.trino.spi.connector.ConnectorSession; + +import java.sql.Connection; +import java.sql.SQLException; +import java.util.Optional; + +import static java.lang.String.format; + +public class AdbCachedJdbcConnectionFactory + implements ConnectionFactory +{ + private static final Logger log = Logger.get(AdbCachedJdbcConnectionFactory.class); + private static final int RETRY_GETTING_CONNECTION_DELAY_MS = 100; + private final AdbConnectionConfig connectionConfig; + private final JdbcDataSourcePool connectionPool; + + public AdbCachedJdbcConnectionFactory(AdbConnectionConfig connectionConfig, JdbcDataSourcePool connectionPool) + { + this.connectionConfig = connectionConfig; + this.connectionPool = connectionPool; + } + + @Override + public Connection openConnection(ConnectorSession session) + throws SQLException + { + long startTimeMs = System.currentTimeMillis(); + while ((startTimeMs + connectionConfig.getOpenConnectionTimeout().toMillis() > System.currentTimeMillis())) { + try { + CachedDataSourceEntry cachedDataSourceEntry = connectionPool.get(session); + Optional connectionOptional = cachedDataSourceEntry.tryOpenConnection(); + if (connectionOptional.isPresent()) { + return connectionOptional.get(); + } + Thread.sleep(RETRY_GETTING_CONNECTION_DELAY_MS); + } + catch (Exception e) { + String errMsg = "Failed to open connection" + e.getMessage(); + log.error(errMsg, e); + throw new TrinoException(StandardErrorCode.GENERIC_INTERNAL_ERROR, errMsg, e); + } + } + throw new SQLException( + format("Failed to open connection. Timeout %s exceeded", connectionConfig.getOpenConnectionTimeout())); + } +} diff --git a/plugin/trino-adb/src/main/java/io/trino/plugin/adb/connector/connection/AdbConnectionConfig.java b/plugin/trino-adb/src/main/java/io/trino/plugin/adb/connector/connection/AdbConnectionConfig.java new file mode 100644 index 000000000000..099248fdc536 --- /dev/null +++ b/plugin/trino-adb/src/main/java/io/trino/plugin/adb/connector/connection/AdbConnectionConfig.java @@ -0,0 +1,52 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.trino.plugin.adb.connector.connection; + +import io.airlift.configuration.Config; +import io.airlift.configuration.ConfigDescription; +import io.airlift.units.Duration; + +import java.util.concurrent.TimeUnit; + +public class AdbConnectionConfig +{ + private boolean isConnectionPoolEnabled = true; + private Duration openConnectionTimeout = new Duration(30, TimeUnit.SECONDS); + + public boolean getConnectionPoolEnabled() + { + return isConnectionPoolEnabled; + } + + @Config("adb.jdbc.connection.pool.enabled") + @ConfigDescription("Using connection pool. Default is true") + public AdbConnectionConfig setConnectionPoolEnabled(boolean isConnectionPoolEnabled) + { + this.isConnectionPoolEnabled = isConnectionPoolEnabled; + return this; + } + + public Duration getOpenConnectionTimeout() + { + return this.openConnectionTimeout; + } + + @Config("adb.jdbc.connection.open-timeout") + @ConfigDescription("Max time of trying to open connection to adb. Default is 30 seconds") + public AdbConnectionConfig setOpenConnectionTimeout(Duration openConnectionTimeout) + { + this.openConnectionTimeout = openConnectionTimeout; + return this; + } +} diff --git a/plugin/trino-adb/src/main/java/io/trino/plugin/adb/connector/connection/AdbConnectionFactoryModule.java b/plugin/trino-adb/src/main/java/io/trino/plugin/adb/connector/connection/AdbConnectionFactoryModule.java new file mode 100644 index 000000000000..4c327b8ccafb --- /dev/null +++ b/plugin/trino-adb/src/main/java/io/trino/plugin/adb/connector/connection/AdbConnectionFactoryModule.java @@ -0,0 +1,75 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.trino.plugin.adb.connector.connection; + +import com.google.inject.Binder; +import com.google.inject.Provides; +import com.google.inject.Singleton; +import io.airlift.configuration.AbstractConfigurationAwareModule; +import io.airlift.configuration.ConfigBinder; +import io.opentelemetry.api.OpenTelemetry; +import io.trino.plugin.adb.connector.connection.pool.AdbJdbcDataSourcePool; +import io.trino.plugin.adb.connector.connection.pool.AdbJdbcConnectionPoolConfig; +import io.trino.plugin.jdbc.BaseJdbcConfig; +import io.trino.plugin.jdbc.ConnectionFactory; +import io.trino.plugin.jdbc.DriverConnectionFactory; +import io.trino.plugin.jdbc.ForBaseJdbc; +import io.trino.plugin.jdbc.IdentityCacheMapping; +import io.trino.plugin.jdbc.credential.CredentialProvider; +import io.trino.spi.catalog.CatalogName; +import org.postgresql.Driver; +import org.postgresql.PGProperty; + +import java.util.Properties; + +public class AdbConnectionFactoryModule + extends AbstractConfigurationAwareModule +{ + @Override + protected void setup(Binder binder) + { + ConfigBinder.configBinder(binder).bindConfig(AdbConnectionConfig.class); + ConfigBinder.configBinder(binder).bindConfig(AdbJdbcConnectionPoolConfig.class); + } + + @Provides + @Singleton + @ForBaseJdbc + public static ConnectionFactory getConnectionFactory(BaseJdbcConfig config, + OpenTelemetry openTelemetry, + CredentialProvider credentialProvider, + CatalogName catalogName, + IdentityCacheMapping identityCacheMapping, + AdbJdbcConnectionPoolConfig connectionPoolConfig, + AdbConnectionConfig connectionConfig) + { + Properties connectionProperties = new Properties(); + connectionProperties.put(PGProperty.REWRITE_BATCHED_INSERTS.getName(), "true"); + ConnectionFactory delegate = + DriverConnectionFactory.builder(new Driver(), config.getConnectionUrl(), credentialProvider) + .setConnectionProperties(connectionProperties) + .setOpenTelemetry(openTelemetry) + .build(); + if (connectionConfig.getConnectionPoolEnabled()) { + return new AdbCachedJdbcConnectionFactory(connectionConfig, + new AdbJdbcDataSourcePool(connectionPoolConfig, + delegate, + catalogName, + identityCacheMapping)); + } + else { + return delegate; + } + } +} diff --git a/plugin/trino-adb/src/main/java/io/trino/plugin/adb/connector/connection/AdbDataSource.java b/plugin/trino-adb/src/main/java/io/trino/plugin/adb/connector/connection/AdbDataSource.java new file mode 100644 index 000000000000..3bf44c465b8f --- /dev/null +++ b/plugin/trino-adb/src/main/java/io/trino/plugin/adb/connector/connection/AdbDataSource.java @@ -0,0 +1,101 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.trino.plugin.adb.connector.connection; + +import io.trino.plugin.jdbc.ConnectionFactory; +import io.trino.spi.connector.ConnectorSession; + +import javax.sql.DataSource; + +import java.io.PrintWriter; +import java.sql.Connection; +import java.sql.SQLException; +import java.sql.SQLFeatureNotSupportedException; +import java.util.logging.Logger; + +public class AdbDataSource + implements DataSource +{ + private final ConnectionFactory delegate; + private final ConnectorSession session; + + public AdbDataSource(ConnectionFactory delegate, ConnectorSession session) + { + this.delegate = delegate; + this.session = session; + } + + @Override + public Connection getConnection() + throws SQLException + { + return delegate.openConnection(session); + } + + @Override + public Connection getConnection(String username, String password) + throws SQLException + { + return this.getConnection(); + } + + @Override + public PrintWriter getLogWriter() + throws SQLException + { + throw new SQLFeatureNotSupportedException(); + } + + @Override + public void setLogWriter(PrintWriter out) + throws SQLException + { + throw new SQLFeatureNotSupportedException(); + } + + @Override + public void setLoginTimeout(int seconds) + throws SQLException + { + throw new SQLFeatureNotSupportedException(); + } + + @Override + public int getLoginTimeout() + throws SQLException + { + throw new SQLFeatureNotSupportedException(); + } + + @Override + public Logger getParentLogger() + throws SQLFeatureNotSupportedException + { + throw new SQLFeatureNotSupportedException(); + } + + @Override + public T unwrap(Class iface) + throws SQLException + { + throw new SQLFeatureNotSupportedException(); + } + + @Override + public boolean isWrapperFor(Class iface) + throws SQLException + { + return false; + } +} diff --git a/plugin/trino-adb/src/main/java/io/trino/plugin/adb/connector/connection/pool/AdbCachedDataSourceEntry.java b/plugin/trino-adb/src/main/java/io/trino/plugin/adb/connector/connection/pool/AdbCachedDataSourceEntry.java new file mode 100644 index 000000000000..a8c00088c99a --- /dev/null +++ b/plugin/trino-adb/src/main/java/io/trino/plugin/adb/connector/connection/pool/AdbCachedDataSourceEntry.java @@ -0,0 +1,68 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.trino.plugin.adb.connector.connection.pool; + +import com.zaxxer.hikari.HikariDataSource; + +import java.sql.Connection; +import java.sql.SQLException; +import java.util.Optional; +import java.util.concurrent.locks.Lock; +import java.util.concurrent.locks.ReentrantLock; + +public class AdbCachedDataSourceEntry + implements CachedDataSourceEntry +{ + private final Lock lock = new ReentrantLock(); + private final HikariDataSource dataSource; + + public AdbCachedDataSourceEntry(HikariDataSource dataSource) + { + this.dataSource = dataSource; + } + + @Override + public Optional tryOpenConnection() + throws SQLException + { + lock.lock(); + try { + if (!dataSource.isClosed()) { + return Optional.of(dataSource.getConnection()); + } + return Optional.empty(); + } + finally { + this.lock.unlock(); + } + } + + @Override + public boolean tryCloseConnection() + { + lock.lock(); + try { + if (this.dataSource.getHikariPoolMXBean().getActiveConnections() != 0) { + return false; + } + else { + this.dataSource.close(); + return true; + } + } + finally { + lock.unlock(); + } + } +} diff --git a/plugin/trino-adb/src/main/java/io/trino/plugin/adb/connector/connection/pool/AdbJdbcConnectionPoolConfig.java b/plugin/trino-adb/src/main/java/io/trino/plugin/adb/connector/connection/pool/AdbJdbcConnectionPoolConfig.java new file mode 100644 index 000000000000..d6d2d6362080 --- /dev/null +++ b/plugin/trino-adb/src/main/java/io/trino/plugin/adb/connector/connection/pool/AdbJdbcConnectionPoolConfig.java @@ -0,0 +1,86 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.trino.plugin.adb.connector.connection.pool; + +import io.airlift.configuration.Config; +import io.airlift.configuration.ConfigDescription; +import io.airlift.units.Duration; +import io.airlift.units.MinDuration; +import jakarta.validation.constraints.Min; + +import java.util.concurrent.TimeUnit; + +public class AdbJdbcConnectionPoolConfig +{ + private int maxUsers = 100; + private int maxUserConnections = Runtime.getRuntime().availableProcessors(); + private Duration userTtl = new Duration(5.0, TimeUnit.MINUTES); + private Duration connectionTtl = new Duration(5.0, TimeUnit.MINUTES); + + @Min(1L) + public int getMaxUsers() + { + return this.maxUsers; + } + + @Config("adb.jdbc.connection-pool.max-users") + @ConfigDescription("The maximum amount of users for which to cache connections. Default is 100") + public AdbJdbcConnectionPoolConfig setMaxUsers(int maxUsers) + { + this.maxUsers = maxUsers; + return this; + } + + @Min(1L) + public int getMaxUserConnections() + { + return this.maxUserConnections; + } + + @Config("adb.jdbc.connection-pool.max-user-connections") + @ConfigDescription("Maximum connections per user. Default is amount of available processors") + public AdbJdbcConnectionPoolConfig setMaxUserConnections(int maxUserConnections) + { + this.maxUserConnections = maxUserConnections; + return this; + } + + @MinDuration("1s") + public Duration getUserTtl() + { + return this.userTtl; + } + + @Config("adb.jdbc.connection-pool.user-ttl") + @ConfigDescription("For how long to keep a connection pool for a user. Default is 5 minutes") + public AdbJdbcConnectionPoolConfig setUserTtl(Duration userTtl) + { + this.userTtl = userTtl; + return this; + } + + @MinDuration("1s") + public Duration getConnectionTtl() + { + return this.connectionTtl; + } + + @Config("adb.jdbc.connection-pool.connection-ttl") + @ConfigDescription("Maximum connection TTL. Default is 5 minutes") + public AdbJdbcConnectionPoolConfig setConnectionTtl(Duration connectionTtl) + { + this.connectionTtl = connectionTtl; + return this; + } +} diff --git a/plugin/trino-adb/src/main/java/io/trino/plugin/adb/connector/connection/pool/AdbJdbcDataSourcePool.java b/plugin/trino-adb/src/main/java/io/trino/plugin/adb/connector/connection/pool/AdbJdbcDataSourcePool.java new file mode 100644 index 000000000000..efbc89062f05 --- /dev/null +++ b/plugin/trino-adb/src/main/java/io/trino/plugin/adb/connector/connection/pool/AdbJdbcDataSourcePool.java @@ -0,0 +1,103 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.trino.plugin.adb.connector.connection.pool; + +import com.google.common.cache.CacheBuilder; +import com.zaxxer.hikari.HikariConfig; +import com.zaxxer.hikari.HikariDataSource; +import io.airlift.concurrent.Threads; +import io.trino.cache.NonKeyEvictableCache; +import io.trino.cache.SafeCaches; +import io.trino.plugin.adb.connector.connection.AdbDataSource; +import io.trino.plugin.jdbc.ConnectionFactory; +import io.trino.plugin.jdbc.IdentityCacheMapping; +import io.trino.spi.StandardErrorCode; +import io.trino.spi.TrinoException; +import io.trino.spi.catalog.CatalogName; +import io.trino.spi.connector.ConnectorSession; + +import java.util.Queue; +import java.util.concurrent.ConcurrentLinkedQueue; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; + +public class AdbJdbcDataSourcePool + implements JdbcDataSourcePool +{ + private static final long CLEANUP_INITIAL_DELAY_SECONDS = 10L; + private static final long CLEANUP_PERIOD_SECONDS = 10L; + private final Queue removedConnections = new ConcurrentLinkedQueue<>(); + private final AdbJdbcConnectionPoolConfig poolConfig; + private final ConnectionFactory delegate; + private final CatalogName catalogName; + private final IdentityCacheMapping identityCacheMapping; + private final NonKeyEvictableCache cache; + private final ScheduledExecutorService cleanupExecutor; + + public AdbJdbcDataSourcePool(AdbJdbcConnectionPoolConfig config, + ConnectionFactory delegate, + CatalogName catalogName, + IdentityCacheMapping identityCacheMapping) + { + this.delegate = delegate; + this.poolConfig = config; + this.catalogName = catalogName; + this.identityCacheMapping = identityCacheMapping; + cache = SafeCaches.buildNonEvictableCacheWithWeakInvalidateAll( + CacheBuilder.newBuilder() + .maximumSize(poolConfig.getMaxUsers()) + .expireAfterAccess(poolConfig.getUserTtl().toMillis(), TimeUnit.MILLISECONDS) + .removalListener(entry -> removedConnections.add(entry.getValue()))); + cleanupExecutor = Executors.newSingleThreadScheduledExecutor( + Threads.daemonThreadsNamed(String.format("adb-jdbc-connection-pool-cleanup-%s", catalogName))); + cleanupExecutor.scheduleAtFixedRate(() -> { + cache.cleanUp(); + removedConnections.removeIf(CachedDataSourceEntry::tryCloseConnection); + }, CLEANUP_INITIAL_DELAY_SECONDS, CLEANUP_PERIOD_SECONDS, TimeUnit.SECONDS); + } + + @Override + public CachedDataSourceEntry get(ConnectorSession session) + { + IdentityCacheMapping.IdentityCacheKey key = identityCacheMapping.getRemoteUserCacheKey(session); + try { + return cache.get(key, () -> { + HikariConfig config = new HikariConfig(); + config.setDataSource(new AdbDataSource(delegate, session)); + config.setRegisterMbeans(true); + config.setPoolName(catalogName.toString()); + config.setMaximumPoolSize(poolConfig.getMaxUserConnections()); + config.setMaxLifetime(poolConfig.getConnectionTtl().toMillis()); + return new AdbCachedDataSourceEntry(new HikariDataSource(config)); + }); + } + catch (Throwable e) { + throw new TrinoException(StandardErrorCode.GENERIC_INTERNAL_ERROR, e.getMessage(), e); + } + } + + @Override + public void close() + throws Exception + { + try { + this.cache.invalidateAll(); + this.cleanupExecutor.shutdownNow(); + } + catch (Exception e) { + throw new RuntimeException("Failed to close adb jdbc connection pool: " + e.getMessage(), e); + } + } +} diff --git a/plugin/trino-adb/src/main/java/io/trino/plugin/adb/connector/connection/pool/CachedDataSourceEntry.java b/plugin/trino-adb/src/main/java/io/trino/plugin/adb/connector/connection/pool/CachedDataSourceEntry.java new file mode 100644 index 000000000000..5f973b5a19ca --- /dev/null +++ b/plugin/trino-adb/src/main/java/io/trino/plugin/adb/connector/connection/pool/CachedDataSourceEntry.java @@ -0,0 +1,26 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.trino.plugin.adb.connector.connection.pool; + +import java.sql.Connection; +import java.sql.SQLException; +import java.util.Optional; + +public interface CachedDataSourceEntry +{ + Optional tryOpenConnection() + throws SQLException; + + boolean tryCloseConnection(); +} diff --git a/plugin/trino-adb/src/main/java/io/trino/plugin/adb/connector/connection/pool/JdbcDataSourcePool.java b/plugin/trino-adb/src/main/java/io/trino/plugin/adb/connector/connection/pool/JdbcDataSourcePool.java new file mode 100644 index 000000000000..4fe1ee5e5bc7 --- /dev/null +++ b/plugin/trino-adb/src/main/java/io/trino/plugin/adb/connector/connection/pool/JdbcDataSourcePool.java @@ -0,0 +1,22 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.trino.plugin.adb.connector.connection.pool; + +import io.trino.spi.connector.ConnectorSession; + +public interface JdbcDataSourcePool + extends AutoCloseable +{ + CachedDataSourceEntry get(ConnectorSession session); +} diff --git a/plugin/trino-adb/src/main/java/io/trino/plugin/adb/connector/datatype/mapper/DataTypeMapperImpl.java b/plugin/trino-adb/src/main/java/io/trino/plugin/adb/connector/datatype/mapper/DataTypeMapperImpl.java index 39591226765c..43e592c6db1b 100644 --- a/plugin/trino-adb/src/main/java/io/trino/plugin/adb/connector/datatype/mapper/DataTypeMapperImpl.java +++ b/plugin/trino-adb/src/main/java/io/trino/plugin/adb/connector/datatype/mapper/DataTypeMapperImpl.java @@ -106,12 +106,12 @@ import static io.trino.plugin.adb.AdbPluginConfig.ArrayMapping.AS_ARRAY; import static io.trino.plugin.adb.AdbPluginConfig.ArrayMapping.AS_JSON; import static io.trino.plugin.adb.AdbPluginConfig.ArrayMapping.DISABLED; -import static io.trino.plugin.adb.TypeUtil.TIME_TYPE_FORMATTER; -import static io.trino.plugin.adb.TypeUtil.arrayDepth; -import static io.trino.plugin.adb.TypeUtil.getArrayElementPgTypeName; -import static io.trino.plugin.adb.TypeUtil.getJdbcObjectArray; -import static io.trino.plugin.adb.TypeUtil.toPgTimestamp; import static io.trino.plugin.adb.connector.AdbSessionProperties.getArrayMapping; +import static io.trino.plugin.adb.connector.TypeUtil.TIME_TYPE_FORMATTER; +import static io.trino.plugin.adb.connector.TypeUtil.arrayDepth; +import static io.trino.plugin.adb.connector.TypeUtil.getArrayElementPgTypeName; +import static io.trino.plugin.adb.connector.TypeUtil.getJdbcObjectArray; +import static io.trino.plugin.adb.connector.TypeUtil.toPgTimestamp; import static io.trino.plugin.base.util.JsonTypeUtil.jsonParse; import static io.trino.plugin.base.util.JsonTypeUtil.toJsonValue; import static io.trino.plugin.jdbc.DecimalConfig.DecimalMapping.ALLOW_OVERFLOW; diff --git a/plugin/trino-adb/src/main/java/io/trino/plugin/adb/connector/decode/csv/CsvRowDecoder.java b/plugin/trino-adb/src/main/java/io/trino/plugin/adb/connector/decode/csv/CsvRowDecoder.java index c52bf913b6c9..d243ea253d93 100644 --- a/plugin/trino-adb/src/main/java/io/trino/plugin/adb/connector/decode/csv/CsvRowDecoder.java +++ b/plugin/trino-adb/src/main/java/io/trino/plugin/adb/connector/decode/csv/CsvRowDecoder.java @@ -62,8 +62,8 @@ import static com.google.common.base.Preconditions.checkArgument; import static com.google.common.io.BaseEncoding.base16; import static io.airlift.slice.SliceUtf8.countCodePoints; -import static io.trino.plugin.adb.TypeUtil.DATE_TYPE_FORMATTER; -import static io.trino.plugin.adb.TypeUtil.TIMESTAMP_TYPE_FORMATTER; +import static io.trino.plugin.adb.connector.TypeUtil.DATE_TYPE_FORMATTER; +import static io.trino.plugin.adb.connector.TypeUtil.TIMESTAMP_TYPE_FORMATTER; import static java.lang.String.format; public class CsvRowDecoder diff --git a/plugin/trino-adb/src/main/java/io/trino/plugin/adb/connector/encode/csv/CsvRowEncoder.java b/plugin/trino-adb/src/main/java/io/trino/plugin/adb/connector/encode/csv/CsvRowEncoder.java index de110c9771cf..7eae3604b251 100644 --- a/plugin/trino-adb/src/main/java/io/trino/plugin/adb/connector/encode/csv/CsvRowEncoder.java +++ b/plugin/trino-adb/src/main/java/io/trino/plugin/adb/connector/encode/csv/CsvRowEncoder.java @@ -35,9 +35,9 @@ import java.util.List; import static com.google.common.base.Preconditions.checkArgument; -import static io.trino.plugin.adb.TypeUtil.DATE_TYPE_FORMATTER; -import static io.trino.plugin.adb.TypeUtil.TIMESTAMP_TYPE_FORMATTER; -import static io.trino.plugin.adb.TypeUtil.TIME_TYPE_FORMATTER; +import static io.trino.plugin.adb.connector.TypeUtil.DATE_TYPE_FORMATTER; +import static io.trino.plugin.adb.connector.TypeUtil.TIMESTAMP_TYPE_FORMATTER; +import static io.trino.plugin.adb.connector.TypeUtil.TIME_TYPE_FORMATTER; import static java.lang.String.format; public class CsvRowEncoder diff --git a/plugin/trino-adb/src/main/java/io/trino/plugin/adb/connector/protocol/gpfdist/GpfdistModule.java b/plugin/trino-adb/src/main/java/io/trino/plugin/adb/connector/protocol/gpfdist/GpfdistModule.java index e23b3b71d197..d7eb50c2649e 100644 --- a/plugin/trino-adb/src/main/java/io/trino/plugin/adb/connector/protocol/gpfdist/GpfdistModule.java +++ b/plugin/trino-adb/src/main/java/io/trino/plugin/adb/connector/protocol/gpfdist/GpfdistModule.java @@ -28,7 +28,6 @@ import io.airlift.http.server.RequestStats; import io.airlift.node.NodeConfig; import io.airlift.node.NodeInfo; -import io.opentelemetry.api.OpenTelemetry; import io.trino.plugin.adb.connector.protocol.gpfdist.load.context.WriteContext; import io.trino.plugin.adb.connector.protocol.gpfdist.load.context.WriteContextManager; import io.trino.plugin.adb.connector.protocol.gpfdist.load.process.GpfdistPageSinkProvider; @@ -52,19 +51,11 @@ import io.trino.plugin.adb.connector.protocol.gpfdist.unload.process.GpfdistRecordSetProvider; import io.trino.plugin.adb.connector.protocol.gpfdist.unload.query.CreateWritableExternalTableQueryFactory; import io.trino.plugin.adb.connector.protocol.gpfdist.unload.query.InsertDataToExternalTableQueryFactory; -import io.trino.plugin.jdbc.BaseJdbcConfig; -import io.trino.plugin.jdbc.ConnectionFactory; -import io.trino.plugin.jdbc.DriverConnectionFactory; -import io.trino.plugin.jdbc.ForBaseJdbc; -import io.trino.plugin.jdbc.credential.CredentialProvider; import io.trino.spi.NodeManager; import io.trino.spi.connector.ConnectorPageSinkProvider; import io.trino.spi.connector.ConnectorRecordSetProvider; -import org.postgresql.Driver; import org.weakref.jmx.guice.ExportBinder; -import java.util.Properties; - import static com.google.inject.multibindings.MapBinder.newMapBinder; public class GpfdistModule @@ -114,21 +105,6 @@ public static MapBinder exte return newMapBinder(binder, ExternalTableType.class, CreateExternalTableQueryFactory.class); } - @Provides - @Singleton - @ForBaseJdbc - public static ConnectionFactory getConnectionFactory(BaseJdbcConfig config, - OpenTelemetry openTelemetry, - CredentialProvider credentialProvider) - { - //todo implement cached connection pool with using hikari data source - Properties connectionProperties = new Properties(); - return DriverConnectionFactory.builder(new Driver(), config.getConnectionUrl(), credentialProvider) - .setConnectionProperties(connectionProperties) - .setOpenTelemetry(openTelemetry) - .build(); - } - @Provides @Singleton public static NodeConfig getNodeConfig(GpfdistServerConfig config, NodeManager nodeManager)