Skip to content

Commit

Permalink
ADH-5240
Browse files Browse the repository at this point in the history
- fixed comments
  • Loading branch information
VitekArkhipov committed Dec 4, 2024
1 parent df75758 commit ce438d4
Show file tree
Hide file tree
Showing 10 changed files with 67 additions and 165 deletions.
10 changes: 10 additions & 0 deletions plugin/trino-adb/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -123,6 +123,10 @@
<groupId>io.airlift</groupId>
<artifactId>concurrent</artifactId>
</exclusion>
<exclusion>
<groupId>io.airlift</groupId>
<artifactId>log-manager</artifactId>
</exclusion>
<exclusion>
<groupId>io.airlift</groupId>
<artifactId>units</artifactId>
Expand Down Expand Up @@ -233,5 +237,11 @@
</exclusion>
</exclusions>
</dependency>

<dependency>
<groupId>io.airlift</groupId>
<artifactId>log-manager</artifactId>
<scope>runtime</scope>
</dependency>
</dependencies>
</project>
Original file line number Diff line number Diff line change
Expand Up @@ -123,10 +123,10 @@ public class AdbSqlClient
extends BaseJdbcClient
{
private static final Logger log = Logger.get(AdbSqlClient.class);
private static final String DUPLICATE_TABLE_SQLSTATE = "42P07";
private final AdbMetadataDao metadata;
private final List<String> tableTypes;
private final Optional<Integer> fetchSize;
private final AdbPluginConfig pluginConfig;
private final Integer fetchSize;
private final ConnectorExpressionRewriter<ParameterizedExpression> connectorExpressionRewriter;
private final AggregateFunctionRewriter<JdbcExpression, ?> aggregateFunctionRewriter;
private final DataTypeMapper dataTypeMapper;
Expand Down Expand Up @@ -162,8 +162,7 @@ public AdbSqlClient(ConnectionFactory connectionFactory,
}
this.tableTypes = tableTypes.build();
this.metadata = metadata;
this.fetchSize = Optional.ofNullable(config.getFetchSize());
this.pluginConfig = config;
this.fetchSize = config.getFetchSize();
connectorExpressionRewriter = JdbcConnectorExpressionRewriterBuilder.newBuilder()
.addStandardRules(this::quoted)
.withTypeClass("integer_type", ImmutableSet.of("tinyint", "smallint", "integer", "bigint"))
Expand Down Expand Up @@ -283,7 +282,7 @@ public void createTable(ConnectorSession session, ConnectorTableMetadata tableMe
createTable(session, tableMetadata, tableMetadata.getTable().getTableName());
}
catch (SQLException e) {
boolean exists = "42P07".equals(e.getSQLState());
boolean exists = DUPLICATE_TABLE_SQLSTATE.equals(e.getSQLState());
throw new TrinoException((exists ? StandardErrorCode.ALREADY_EXISTS : JdbcErrorCode.JDBC_ERROR), e);
}
}
Expand Down Expand Up @@ -354,6 +353,7 @@ protected List<String> createTableSqls(RemoteTableName remoteTableName,
List<String> columns,
ConnectorTableMetadata tableMetadata)
{
//this method id override super method, because method of the super class can throw incorrect exception about nonsupporting creating table with comment
throw new UnsupportedOperationException("Unsupported");
}

Expand Down Expand Up @@ -504,7 +504,7 @@ public PreparedStatement getPreparedStatement(Connection connection, String sql,
PreparedStatement statement = connection.prepareStatement(sql);
// This is a heuristic, not exact science. A better formula can perhaps be found with measurements.
// Column count is not known for non-SELECT queries. Not setting fetch size for these.
Optional<Integer> fetchSize = Optional.ofNullable(this.fetchSize.orElseGet(() ->
Optional<Integer> fetchSize = Optional.ofNullable(Optional.ofNullable(this.fetchSize).orElseGet(() ->
columnCount.map(count -> max(100_000 / count, 1_000))
.orElse(null)));
if (fetchSize.isPresent()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,7 @@ public abstract class AbstractRowEncoder
implements RowEncoder
{
private static final String UNSUPPORTED_TYPE_ERROR_MSG_TEMPLATE = "Unsupported type '%s' for column '%s'";
private static final long PICO_SECONDS_PER_DAY = 86400000000000000L;
protected final ConnectorSession session;
protected final List<ColumnDataType> columnDataTypes;
private final Map<ConnectorDataType, BiConsumer<AbstractRowEncoder, EncoderMetadata>> map;
Expand Down Expand Up @@ -179,7 +180,7 @@ private static void encodeTime(AbstractRowEncoder encoder, EncoderMetadata pageB
int precision = ((TimeDataType) pageBlock.columnDataType()).getPrecision();
long picosOfDay = TimeType.createTimeType(precision).getLong(pageBlock.block(), pageBlock.position());
picosOfDay = Timestamps.round(picosOfDay, 12 - precision);
if (picosOfDay == 86400000000000000L) {
if (picosOfDay == PICO_SECONDS_PER_DAY) {
picosOfDay = 0L;
}
LocalTime localTime = LocalTime.ofNanoOfDay(picosOfDay / 1000L);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,8 @@ public int getSegmentCount(ConnectorSession session)
try {
int segmentCount = 0;
try (Connection connection = this.connectionFactory.openConnection(session);
ResultSet rs = connection.createStatement().executeQuery("SELECT MAX(content) + 1 FROM pg_catalog.gp_segment_configuration")) {
ResultSet rs = connection.createStatement()
.executeQuery("SELECT MAX(content) + 1 FROM pg_catalog.gp_segment_configuration")) {
if (!rs.next()) {
return segmentCount;
}
Expand Down Expand Up @@ -87,27 +88,27 @@ public boolean isSegmentedTable(ConnectorSession session, String objectName)
return false;
}
String distribution = rs.getString(1).trim();
isDistributed = distribution.startsWith("DISTRIBUTED BY") || distribution.equals("DISTRIBUTED RANDOMLY");
isDistributed =
distribution.startsWith("DISTRIBUTED BY") || distribution.equals("DISTRIBUTED RANDOMLY");
}
return isDistributed;
}
catch (SQLException e) {
throw new TrinoException(JdbcErrorCode.JDBC_ERROR, "Failed to determine whether the table contains the segment ID column.", e);
throw new TrinoException(JdbcErrorCode.JDBC_ERROR,
"Failed to determine whether the table contains the segment ID column.", e);
}
}

@Override
public Map<String, Object> getTableProperties(ConnectorSession session, String objectName, IdentifierMapping identifierMapping)
public Map<String, Object> getTableProperties(ConnectorSession session, String objectName,
IdentifierMapping identifierMapping)
{
String sql = String.format(
"WITH oid AS (SELECT '%s'::regclass::oid oid)\n" +
"SELECT pg_catalog.pg_get_table_distributedby(oid) FROM oid\n" +
"UNION ALL\n" +
"SELECT UNNEST(reloptions) FROM pg_catalog.pg_class\n" +
"WHERE oid IN (SELECT oid FROM oid)",
objectName);
String sql = "WITH oid AS (SELECT '" + objectName + "'::regclass::oid oid)\n" +
"SELECT pg_catalog.pg_get_table_distributedby(oid) FROM oid\n" +
"UNION ALL\n" +
"SELECT UNNEST(reloptions) FROM pg_catalog.pg_class\n" +
"WHERE oid IN (SELECT oid FROM oid)";
List<String> rows = new ArrayList<>();

try (Connection connection = this.connectionFactory.openConnection(session);
ResultSet rs = connection.createStatement().executeQuery(sql)) {
while (rs.next()) {
Expand All @@ -124,20 +125,18 @@ public Map<String, Object> getTableProperties(ConnectorSession session, String o
if (rows.isEmpty()) {
return Map.of();
}
else {
Map<String, Object> res = new HashMap<>(rows.size());
Map<String, Object> res = new HashMap<>(rows.size());

for (String property : rows) {
try {
this.parseProperty(property, res, identifierMapping);
}
catch (Exception e) {
log.warn(e, "Failed to parse table property: " + property);
}
for (String property : rows) {
try {
this.parseProperty(property, res, identifierMapping);
}
catch (Exception e) {
log.warn(e, "Failed to parse table property: " + property);
}

return res;
}

return res;
}

private void parseProperty(String property, Map<String, Object> res, IdentifierMapping identifierMapping)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -74,15 +74,20 @@ public class GpfdistModule
public void setup(Binder binder)
{
install(new GpfdistServerModule());
binder.bind(ExternalTableFormatConfigFactory.class).to(ExternalTableFormatConfigFactoryImpl.class).in(Scopes.SINGLETON);

Multibinder<CreateExternalTableQueryFactory> createExtTableQueryFactories = Multibinder.newSetBinder(binder, CreateExternalTableQueryFactory.class);
Multibinder<InsertDataQueryFactory> insertDataQueryFactories = Multibinder.newSetBinder(binder, InsertDataQueryFactory.class);

createExtTableQueryFactories.addBinding().to(CreateReadableExternalTableQueryFactory.class).in(Scopes.SINGLETON);
createExtTableQueryFactories.addBinding().to(CreateWritableExternalTableQueryFactory.class).in(Scopes.SINGLETON);
Multibinder<InsertDataQueryFactory> insertDataQueryFactories = Multibinder.newSetBinder(binder, InsertDataQueryFactory.class);
insertDataQueryFactories.addBinding().to(InsertDataFromExternalTableQueryFactory.class).in(Scopes.SINGLETON);
insertDataQueryFactories.addBinding().to(InsertDataToExternalTableQueryFactory.class).in(Scopes.SINGLETON);

binder.bind(ExternalTableFormatConfigFactory.class).to(ExternalTableFormatConfigFactoryImpl.class).in(Scopes.SINGLETON);

OptionalBinder.newOptionalBinder(binder, ConnectorPageSinkProvider.class).setBinding().to(GpfdistPageSinkProvider.class).in(Scopes.SINGLETON);
OptionalBinder.newOptionalBinder(binder, ConnectorRecordSetProvider.class).setBinding().to(GpfdistRecordSetProvider.class).in(Scopes.SINGLETON);

binder.bind(GpfdistLoadMetadataFactory.class).to(GpfdistLoadMetadataFactoryImpl.class).in(Scopes.SINGLETON);
binder.bind(GpfdistUnloadMetadataFactory.class).to(GpfdistUnloadMetadataFactoryImpl.class).in(Scopes.SINGLETON);
binder.bind(GpfdistLocationFactory.class).to(GpfdistLocationFactoryImpl.class).in(Scopes.SINGLETON);
Expand All @@ -93,6 +98,7 @@ public void setup(Binder binder)
binder.bind(NodeInfo.class).in(Scopes.SINGLETON);
binder.bind(HttpServerInfo.class).in(Scopes.SINGLETON);
binder.bind(RequestStats.class).in(Scopes.SINGLETON);

ExportBinder.newExporter(binder)
.export(RequestStats.class)
.as(generator -> generator.generatedNameOf(RequestStats.class, "adb-gpfdist-server-request-stats"));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
*/
package io.trino.plugin.adb.connector.protocol.gpfdist.load.process;

import com.google.common.annotations.VisibleForTesting;
import io.trino.plugin.adb.connector.protocol.gpfdist.load.PageProcessor;
import io.trino.plugin.adb.connector.protocol.gpfdist.load.PageProcessorProvider;

Expand All @@ -21,6 +22,7 @@
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;
import java.util.function.Supplier;

import static java.lang.String.format;

Expand Down Expand Up @@ -59,7 +61,7 @@ public PageProcessor take()
long startTime = System.currentTimeMillis();
while (pageProcessors.isEmpty()) {
try {
if (System.currentTimeMillis() - startTime > ADB_SEGMENT_WAIT_TIMEOUT) {
if (currentTimeMsProvider().get() - startTime > ADB_SEGMENT_WAIT_TIMEOUT) {
throw new RuntimeException(
format("Timeout :%d ms waiting for segments responses is exceeded",
ADB_SEGMENT_WAIT_TIMEOUT));
Expand All @@ -81,6 +83,11 @@ public PageProcessor take()
}
}

public static Supplier<Long> currentTimeMsProvider()
{
return System::currentTimeMillis;
}

@Override
public Queue<PageProcessor> getAll()
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,8 @@ public GpfdistPageSinkProvider(@ForBaseJdbc JdbcClient client,
AdbPluginConfig pluginConfig,
GpfdistLoadMetadataFactory loadMetadataFactory,
ContextManager<WriteContext> contextManager,
RowEncoderFactory rowEncoderFactory, ExternalTableFormatConfigFactory externalTableFormatConfigFactory,
RowEncoderFactory rowEncoderFactory,
ExternalTableFormatConfigFactory externalTableFormatConfigFactory,
Set<CreateExternalTableQueryFactory> createExternalTableQueryFactories,
Set<InsertDataQueryFactory> insertDataQueryFactories)
{
Expand Down Expand Up @@ -100,7 +101,7 @@ public ConnectorPageSink createPageSink(ConnectorTransactionHandle transactionHa
ConnectorOutputTableHandle outputTableHandle,
ConnectorPageSinkId pageSinkId)
{
return createPageSinkInternal(transactionHandle, session, outputTableHandle, pageSinkId);
return createPageSinkInternal(session, outputTableHandle, pageSinkId);
}

@Override
Expand All @@ -109,12 +110,11 @@ public ConnectorPageSink createPageSink(ConnectorTransactionHandle transactionHa
ConnectorInsertTableHandle insertTableHandle,
ConnectorPageSinkId pageSinkId)
{
return createPageSinkInternal(transactionHandle, session, (ConnectorOutputTableHandle) insertTableHandle,
return createPageSinkInternal(session, (ConnectorOutputTableHandle) insertTableHandle,
pageSinkId);
}

private ConnectorPageSink createPageSinkInternal(ConnectorTransactionHandle transactionHandle,
ConnectorSession session,
private ConnectorPageSink createPageSinkInternal(ConnectorSession session,
ConnectorOutputTableHandle outputTableHandle,
ConnectorPageSinkId pageSinkId)
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -159,13 +159,11 @@ public void post(@PathParam("tableName") String tableName, InputStream data, @Co
if (initialRequest(request)) {
processInitialRequest(asyncResponse, readContext, request);
}
else if (!isLast(request)) {
processDataRequest(data, asyncResponse, readContext, request);
}
else {
if (!isLast(request)) {
processDataRequest(data, asyncResponse, readContext, request);
}
else {
processTearDownRequest(asyncResponse, readContext, request);
}
processTearDownRequest(asyncResponse, readContext, request);
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -126,91 +126,11 @@ private static String createRequestId(String tableName)
return tableName + "_" + UUID.randomUUID();
}

public String getRequestId()
{
return requestId;
}

public String getTransactionId()
{
return transactionId;
}

public String getCommandId()
{
return commandId;
}

public String getScanId()
{
return scanId;
}

public Optional<Integer> getSegmentId()
{
return segmentId;
}

public Optional<Integer> getSegmentsCount()
{
return segmentsCount;
}

public Optional<Integer> getLineDelimiterLength()
{
return lineDelimiterLength;
}

public short getGpProtocol()
{
return gpProtocol;
}

public Optional<String> getGpMasterHost()
{
return gpMasterHost;
}

public Optional<Integer> getGpMasterPort()
{
return gpMasterPort;
}

public Optional<String> getGpcsvFormat()
{
return gpcsvFormat;
}

public Optional<String> getGpSegmentConfigPath()
{
return gpSegmentConfigPath;
}

public Optional<String> getGpSegmentDataDirectory()
{
return gpSegmentDataDirectory;
}

public Optional<String> getGpDatabase()
{
return gpDatabase;
}

public Optional<String> getGpUser()
{
return gpUser;
}

public Optional<Integer> getGpSegmentPort()
{
return gpSegmentPort;
}

public Optional<Integer> getGpSessionId()
{
return gpSessionId;
}

@Override
public String toString()
{
Expand Down
Loading

0 comments on commit ce438d4

Please sign in to comment.