Skip to content

Commit

Permalink
ADH-5231
Browse files Browse the repository at this point in the history
- implemented cached adb connection factory using hikariCP
  • Loading branch information
VitekArkhipov committed Nov 29, 2024
1 parent e81f023 commit 75f6f18
Show file tree
Hide file tree
Showing 17 changed files with 636 additions and 38 deletions.
19 changes: 18 additions & 1 deletion plugin/trino-adb/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,9 @@
<artifactId>trino-adb</artifactId>
<packaging>trino-plugin</packaging>
<description>Trino - Adb connector</description>
<properties>
<hikaricp.version>6.2.1</hikaricp.version>
</properties>

<dependencies>
<dependency>
Expand All @@ -34,6 +37,12 @@
</exclusions>
</dependency>

<dependency>
<groupId>com.zaxxer</groupId>
<artifactId>HikariCP</artifactId>
<version>${hikaricp.version}</version>
</dependency>

<dependency>
<groupId>io.airlift</groupId>
<artifactId>concurrent</artifactId>
Expand Down Expand Up @@ -135,6 +144,10 @@
<groupId>io.opentelemetry</groupId>
<artifactId>opentelemetry-context</artifactId>
</exclusion>
<exclusion>
<groupId>io.trino</groupId>
<artifactId>trino-cache</artifactId>
</exclusion>
<exclusion>
<groupId>io.trino</groupId>
<artifactId>trino-matching</artifactId>
Expand All @@ -154,10 +167,14 @@
</exclusions>
</dependency>

<dependency>
<groupId>io.trino</groupId>
<artifactId>trino-cache</artifactId>
</dependency>

<dependency>
<groupId>io.trino</groupId>
<artifactId>trino-matching</artifactId>
<scope>compile</scope>
</dependency>

<dependency>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,13 +26,13 @@
public class AdbPluginConfig
{
public static final String IDENTIFIER_QUOTE = "\"";
private final TransferDataProtocol dataProtocol = TransferDataProtocol.GPFDIST;
private AdbPluginConfig.ArrayMapping arrayMapping = AdbPluginConfig.ArrayMapping.DISABLED;
private int maxScanParallelism = 1;
private boolean includeSystemTables;
private Integer fetchSize;
private DataSize writeBufferSize = DataSize.of(64L, DataSize.Unit.MEGABYTE);
private DataSize readBufferSize = DataSize.of(64L, DataSize.Unit.MEGABYTE);
private final TransferDataProtocol dataProtocol = TransferDataProtocol.GPFDIST;
private Duration gpfdistRetryTimeout;
private boolean enableStringPushdownWithCollate;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import io.airlift.configuration.AbstractConfigurationAwareModule;
import io.airlift.configuration.ConfigBinder;
import io.trino.plugin.adb.AdbPluginConfig;
import io.trino.plugin.adb.connector.connection.AdbConnectionFactoryModule;
import io.trino.plugin.adb.connector.datatype.mapper.DataTypeMapper;
import io.trino.plugin.adb.connector.datatype.mapper.DataTypeMapperImpl;
import io.trino.plugin.adb.connector.encode.DataFormatModule;
Expand Down Expand Up @@ -51,13 +52,16 @@ public class AdbClientModule
@Override
protected void setup(Binder binder)
{
install(new AdbConnectionFactoryModule());

binder.bind(AdbMetadataDao.class).to(AdbMetadataDaoImpl.class).in(Scopes.SINGLETON);
binder.bind(DataTypeMapper.class).to(DataTypeMapperImpl.class).in(Scopes.SINGLETON);
binder.bind(StatisticsManager.class).to(StatisticsManagerImpl.class).in(Scopes.SINGLETON);
binder.bind(SplitSourceManager.class).to(SplitSourceManagerImpl.class).in(Scopes.SINGLETON);
binder.bind(JdbcClient.class).annotatedWith(ForBaseJdbc.class).to(AdbSqlClient.class).in(Scopes.SINGLETON);
ConfigBinder.configBinder(binder).bindConfig(AdbCreateTableStorageConfig.class);
ConfigBinder.configBinder(binder).bindConfig(JdbcStatisticsConfig.class);
ConfigBinder.configBinder(binder).bindConfig(AdbPushdownConfig.class);
JdbcModule.bindSessionPropertiesProvider(binder, AdbSessionProperties.class);
JdbcModule.bindSessionPropertiesProvider(binder, AdbPushdownSessionProperties.class);
JdbcModule.bindTablePropertiesProvider(binder, AdbTableProperties.class);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.trino.plugin.adb;
package io.trino.plugin.adb.connector;

import com.google.common.primitives.Shorts;
import com.google.common.primitives.SignedBytes;
Expand Down Expand Up @@ -184,7 +184,8 @@ private static Object trinoNativeToJdbcObject(ConnectorSession session, Type tri
if (trinoType instanceof DecimalType decimalType) {
if (decimalType.isShort()) {
BigInteger unscaledValue = BigInteger.valueOf((long) trinoNative);
return new BigDecimal(unscaledValue, decimalType.getScale(), new MathContext(decimalType.getPrecision()));
return new BigDecimal(unscaledValue, decimalType.getScale(),
new MathContext(decimalType.getPrecision()));
}
BigInteger unscaledValue = ((Int128) trinoNative).toBigInteger();
return new BigDecimal(unscaledValue, decimalType.getScale(), new MathContext(decimalType.getPrecision()));
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.trino.plugin.adb.connector.connection;

import io.airlift.log.Logger;
import io.trino.plugin.adb.connector.connection.pool.CachedDataSourceEntry;
import io.trino.plugin.adb.connector.connection.pool.JdbcDataSourcePool;
import io.trino.plugin.jdbc.ConnectionFactory;
import io.trino.spi.StandardErrorCode;
import io.trino.spi.TrinoException;
import io.trino.spi.connector.ConnectorSession;

import java.sql.Connection;
import java.sql.SQLException;
import java.util.Optional;

import static java.lang.String.format;

public class AdbCachedJdbcConnectionFactory
implements ConnectionFactory
{
private static final Logger log = Logger.get(AdbCachedJdbcConnectionFactory.class);
private static final int RETRY_GETTING_CONNECTION_DELAY_MS = 100;
private final AdbConnectionConfig connectionConfig;
private final JdbcDataSourcePool connectionPool;

public AdbCachedJdbcConnectionFactory(AdbConnectionConfig connectionConfig, JdbcDataSourcePool connectionPool)
{
this.connectionConfig = connectionConfig;
this.connectionPool = connectionPool;
}

@Override
public Connection openConnection(ConnectorSession session)
throws SQLException
{
long startTimeMs = System.currentTimeMillis();
while ((startTimeMs + connectionConfig.getOpenConnectionTimeout().toMillis() > System.currentTimeMillis())) {
try {
CachedDataSourceEntry cachedDataSourceEntry = connectionPool.get(session);
Optional<Connection> connectionOptional = cachedDataSourceEntry.tryOpenConnection();
if (connectionOptional.isPresent()) {
return connectionOptional.get();
}
Thread.sleep(RETRY_GETTING_CONNECTION_DELAY_MS);
}
catch (Exception e) {
String errMsg = "Failed to open connection" + e.getMessage();
log.error(errMsg, e);
throw new TrinoException(StandardErrorCode.GENERIC_INTERNAL_ERROR, errMsg, e);
}
}
throw new SQLException(
format("Failed to open connection. Timeout %s exceeded", connectionConfig.getOpenConnectionTimeout()));
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.trino.plugin.adb.connector.connection;

import io.airlift.configuration.Config;
import io.airlift.configuration.ConfigDescription;
import io.airlift.units.Duration;

import java.util.concurrent.TimeUnit;

public class AdbConnectionConfig
{
private boolean isConnectionPoolEnabled = true;
private Duration openConnectionTimeout = new Duration(30, TimeUnit.SECONDS);

public boolean getConnectionPoolEnabled()
{
return isConnectionPoolEnabled;
}

@Config("adb.jdbc.connection.pool.enabled")
@ConfigDescription("Using connection pool. Default is true")
public AdbConnectionConfig setConnectionPoolEnabled(boolean isConnectionPoolEnabled)
{
this.isConnectionPoolEnabled = isConnectionPoolEnabled;
return this;
}

public Duration getOpenConnectionTimeout()
{
return this.openConnectionTimeout;
}

@Config("adb.jdbc.connection.open-timeout")
@ConfigDescription("Max time of trying to open connection to adb. Default is 30 seconds")
public AdbConnectionConfig setOpenConnectionTimeout(Duration openConnectionTimeout)
{
this.openConnectionTimeout = openConnectionTimeout;
return this;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.trino.plugin.adb.connector.connection;

import com.google.inject.Binder;
import com.google.inject.Provides;
import com.google.inject.Singleton;
import io.airlift.configuration.AbstractConfigurationAwareModule;
import io.airlift.configuration.ConfigBinder;
import io.opentelemetry.api.OpenTelemetry;
import io.trino.plugin.adb.connector.connection.pool.AdbJdbcDataSourcePool;
import io.trino.plugin.adb.connector.connection.pool.AdbJdbcConnectionPoolConfig;
import io.trino.plugin.jdbc.BaseJdbcConfig;
import io.trino.plugin.jdbc.ConnectionFactory;
import io.trino.plugin.jdbc.DriverConnectionFactory;
import io.trino.plugin.jdbc.ForBaseJdbc;
import io.trino.plugin.jdbc.IdentityCacheMapping;
import io.trino.plugin.jdbc.credential.CredentialProvider;
import io.trino.spi.catalog.CatalogName;
import org.postgresql.Driver;
import org.postgresql.PGProperty;

import java.util.Properties;

public class AdbConnectionFactoryModule
extends AbstractConfigurationAwareModule
{
@Override
protected void setup(Binder binder)
{
ConfigBinder.configBinder(binder).bindConfig(AdbConnectionConfig.class);
ConfigBinder.configBinder(binder).bindConfig(AdbJdbcConnectionPoolConfig.class);
}

@Provides
@Singleton
@ForBaseJdbc
public static ConnectionFactory getConnectionFactory(BaseJdbcConfig config,
OpenTelemetry openTelemetry,
CredentialProvider credentialProvider,
CatalogName catalogName,
IdentityCacheMapping identityCacheMapping,
AdbJdbcConnectionPoolConfig connectionPoolConfig,
AdbConnectionConfig connectionConfig)
{
Properties connectionProperties = new Properties();
connectionProperties.put(PGProperty.REWRITE_BATCHED_INSERTS.getName(), "true");
ConnectionFactory delegate =
DriverConnectionFactory.builder(new Driver(), config.getConnectionUrl(), credentialProvider)
.setConnectionProperties(connectionProperties)
.setOpenTelemetry(openTelemetry)
.build();
if (connectionConfig.getConnectionPoolEnabled()) {
return new AdbCachedJdbcConnectionFactory(connectionConfig,
new AdbJdbcDataSourcePool(connectionPoolConfig,
delegate,
catalogName,
identityCacheMapping));
}
else {
return delegate;
}
}
}
Loading

0 comments on commit 75f6f18

Please sign in to comment.