Skip to content

Commit

Permalink
ADH-5241
Browse files Browse the repository at this point in the history
- implemented trino-adb plugin (for reading)
- done refactoring
  • Loading branch information
VitekArkhipov committed Nov 26, 2024
1 parent 3959401 commit 41932c6
Show file tree
Hide file tree
Showing 89 changed files with 4,547 additions and 1,683 deletions.
2 changes: 1 addition & 1 deletion core/docker/arenadata/coordinator/etc/log.properties
Original file line number Diff line number Diff line change
@@ -1,2 +1,2 @@
# Enable verbose logging from Trino
#io.trino=DEBUG
io.trino=DEBUG
2 changes: 1 addition & 1 deletion core/docker/arenadata/worker/etc/log.properties
Original file line number Diff line number Diff line change
@@ -1,2 +1,2 @@
# Enable verbose logging from Trino
#io.trino=DEBUG
io.trino=DEBUG
Original file line number Diff line number Diff line change
Expand Up @@ -16,18 +16,22 @@
import io.airlift.configuration.Config;
import io.airlift.configuration.ConfigDescription;
import io.airlift.units.DataSize;
import io.airlift.units.Duration;
import io.airlift.units.MinDataSize;
import io.trino.plugin.adb.connector.protocol.TransferDataProtocol;
import jakarta.validation.constraints.Min;
import jakarta.validation.constraints.NotNull;

public class AdbPluginConfig
{
public static final String IDENTIFIER_QUOTE = "\"";
private AdbPluginConfig.ArrayMapping arrayMapping = AdbPluginConfig.ArrayMapping.DISABLED;
private int maxScanParallelism = 1;
private boolean includeSystemTables;
private DataSize writeBufferSize = DataSize.of(16L, DataSize.Unit.MEGABYTE);
private DataSize writeBufferSize = DataSize.of(64L, DataSize.Unit.MEGABYTE);
private DataSize readBufferSize = DataSize.of(64L, DataSize.Unit.MEGABYTE);
private final TransferDataProtocol dataProtocol = TransferDataProtocol.GPFDIST;
private Duration gpfdistRetryTimeout;

public TransferDataProtocol getDataProtocol()
{
Expand Down Expand Up @@ -81,13 +85,41 @@ public DataSize getWriteBufferSize()
}

@Config("adb.connector.write-buffer-size")
@ConfigDescription("Maximum amount of memory that could be allocated per sink when executing write queries. Defaults to 16MB")
@ConfigDescription("Maximum amount of memory that could be allocated per sink when executing write queries. Defaults to 64MB")
public AdbPluginConfig setWriteBufferSize(DataSize writeBufferSize)
{
this.writeBufferSize = writeBufferSize;
return this;
}

@MinDataSize("1kB")
@NotNull
public DataSize getReadBufferSize()
{
return readBufferSize;
}

@Config("adb.connector.read-buffer-size")
@ConfigDescription("Maximum amount of memory that could be allocated per record cursor when executing read queries. Defaults to 64MB")
public AdbPluginConfig setReadBufferSize(DataSize readBufferSize)
{
this.readBufferSize = readBufferSize;
return this;
}

public Duration getGpfdistRetryTimeout()
{
return this.gpfdistRetryTimeout;
}

@Config("adb.gpfdist.retry-timeout")
@ConfigDescription("Value of adb gpfdist_retry_timeout property. Defaults to null (use adb defaults)")
public AdbPluginConfig setGpfdistRetryTimeout(Duration gpfdistRetryTimeout)
{
this.gpfdistRetryTimeout = gpfdistRetryTimeout;
return this;
}

public static enum ArrayMapping
{
DISABLED,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
import com.google.common.primitives.Shorts;
import com.google.common.primitives.SignedBytes;
import io.airlift.slice.Slice;
import io.trino.plugin.adb.connector.AdbSqlClient;
import io.trino.plugin.adb.connector.datatype.mapper.DataTypeMapper;
import io.trino.spi.TrinoException;
import io.trino.spi.block.Block;
import io.trino.spi.connector.ConnectorSession;
Expand Down Expand Up @@ -131,7 +131,7 @@ public static Object[] getJdbcObjectArray(ConnectorSession session, Type element
return valuesArray;
}

public static String getArrayElementPgTypeName(ConnectorSession session, AdbSqlClient client, Type elementType)
public static String getArrayElementPgTypeName(ConnectorSession session, DataTypeMapper client, Type elementType)
{
if (DOUBLE.equals(elementType)) {
return "float8";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,13 +20,19 @@
import io.airlift.configuration.AbstractConfigurationAwareModule;
import io.airlift.configuration.ConfigBinder;
import io.trino.plugin.adb.AdbPluginConfig;
import io.trino.plugin.adb.connector.encode.EncoderModule;
import io.trino.plugin.adb.connector.datatype.mapper.DataTypeMapper;
import io.trino.plugin.adb.connector.datatype.mapper.DataTypeMapperImpl;
import io.trino.plugin.adb.connector.encode.DataFormatModule;
import io.trino.plugin.adb.connector.metadata.AdbMetadataDao;
import io.trino.plugin.adb.connector.metadata.impl.AdbMetadataDaoImpl;
import io.trino.plugin.adb.connector.protocol.TransferDataProtocol;
import io.trino.plugin.adb.connector.protocol.gpfdist.GpfdistModule;
import io.trino.plugin.adb.connector.table.AdbCreateTableStorageConfig;
import io.trino.plugin.adb.connector.table.AdbTableProperties;
import io.trino.plugin.adb.connector.table.SplitSourceManager;
import io.trino.plugin.adb.connector.table.SplitSourceManagerImpl;
import io.trino.plugin.adb.connector.table.StatisticsManager;
import io.trino.plugin.adb.connector.table.StatisticsManagerImpl;
import io.trino.plugin.jdbc.DecimalModule;
import io.trino.plugin.jdbc.ForBaseJdbc;
import io.trino.plugin.jdbc.JdbcClient;
Expand All @@ -44,8 +50,11 @@ public class AdbClientModule
@Override
protected void setup(Binder binder)
{
install(new EncoderModule());
install(new DataFormatModule());
binder.bind(AdbMetadataDao.class).to(AdbMetadataDaoImpl.class).in(Scopes.SINGLETON);
binder.bind(DataTypeMapper.class).to(DataTypeMapperImpl.class).in(Scopes.SINGLETON);
binder.bind(StatisticsManager.class).to(StatisticsManagerImpl.class).in(Scopes.SINGLETON);
binder.bind(SplitSourceManager.class).to(SplitSourceManagerImpl.class).in(Scopes.SINGLETON);
binder.bind(JdbcClient.class).annotatedWith(ForBaseJdbc.class).to(AdbSqlClient.class).in(Scopes.SINGLETON);
ConfigBinder.configBinder(binder).bindConfig(AdbCreateTableStorageConfig.class);
ConfigBinder.configBinder(binder).bindConfig(JdbcStatisticsConfig.class);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,12 +15,15 @@

import com.google.common.collect.ImmutableList;
import com.google.inject.Inject;
import io.airlift.units.Duration;
import io.trino.plugin.adb.AdbPluginConfig;
import io.trino.plugin.base.session.PropertyMetadataUtil;
import io.trino.plugin.base.session.SessionPropertiesProvider;
import io.trino.spi.connector.ConnectorSession;
import io.trino.spi.session.PropertyMetadata;

import java.util.List;
import java.util.Optional;

public class AdbSessionProperties
implements SessionPropertiesProvider
Expand All @@ -33,7 +36,9 @@ public AdbSessionProperties(AdbPluginConfig config)
this.sessionProperties = ImmutableList.of(
PropertyMetadata.enumProperty("array_mapping", "Handling of PostgreSql arrays", AdbPluginConfig.ArrayMapping.class, config.getArrayMapping(), false),
PropertyMetadata.integerProperty(
"max_scan_parallelism", "Maximum degree of parallelism when scanning tables. Defaults to 1.", config.getMaxScanParallelism(), false));
"max_scan_parallelism", "Maximum degree of parallelism when scanning tables. Defaults to 1.", config.getMaxScanParallelism(), false),
PropertyMetadataUtil.durationProperty(
"gpfdist_retry_timeout", "Value of adb gpfdist_retry_timeout property", config.getGpfdistRetryTimeout(), false));
}

@Override
Expand All @@ -56,4 +61,9 @@ public static int getMaxScanParallelism(ConnectorSession session)
{
return session.getProperty("max_scan_parallelism", Integer.class);
}

public static Optional<Duration> getGpfdistRetryTimeout(ConnectorSession session)
{
return Optional.ofNullable(session.getProperty("gpfdist_retry_timeout", Duration.class));
}
}
Loading

0 comments on commit 41932c6

Please sign in to comment.