Skip to content

Commit

Permalink
Decide BigQuery API in BigQueryMetadata.getTableHandle
Browse files Browse the repository at this point in the history
  • Loading branch information
ebyhr committed Nov 1, 2024
1 parent 32b07c3 commit 15a74e1
Show file tree
Hide file tree
Showing 5 changed files with 42 additions and 17 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,9 @@
import java.util.stream.Stream;

import static com.google.cloud.bigquery.StandardSQLTypeName.INT64;
import static com.google.cloud.bigquery.TableDefinition.Type.EXTERNAL;
import static com.google.cloud.bigquery.TableDefinition.Type.MATERIALIZED_VIEW;
import static com.google.cloud.bigquery.TableDefinition.Type.VIEW;
import static com.google.cloud.bigquery.storage.v1.WriteStream.Type.COMMITTED;
import static com.google.common.base.Preconditions.checkArgument;
import static com.google.common.base.Preconditions.checkState;
Expand All @@ -131,6 +134,7 @@
import static io.trino.plugin.bigquery.BigQueryPseudoColumn.PARTITION_DATE;
import static io.trino.plugin.bigquery.BigQueryPseudoColumn.PARTITION_TIME;
import static io.trino.plugin.bigquery.BigQuerySessionProperties.isProjectionPushdownEnabled;
import static io.trino.plugin.bigquery.BigQuerySessionProperties.isSkipViewMaterialization;
import static io.trino.plugin.bigquery.BigQueryTableHandle.BigQueryPartitionType.INGESTION;
import static io.trino.plugin.bigquery.BigQueryTableHandle.getPartitionType;
import static io.trino.plugin.bigquery.BigQueryUtil.isWildcardTable;
Expand Down Expand Up @@ -348,12 +352,29 @@ public ConnectorTableHandle getTableHandle(ConnectorSession session, SchemaTable
new RemoteTableName(tableInfo.get().getTableId()),
tableInfo.get().getDefinition().getType().toString(),
partitionType,
Optional.ofNullable(tableInfo.get().getDescription())),
Optional.ofNullable(tableInfo.get().getDescription()),
useStorageApi(session, schemaTableName.getTableName(), tableInfo.get().getDefinition().getType())),
TupleDomain.all(),
Optional.empty())
.withProjectedColumns(columns.build());
}

private static boolean useStorageApi(ConnectorSession session, String tableName, TableDefinition.Type type)
{
if (isWildcardTable(type, tableName)) {
// Storage API doesn't support reading wildcard tables
return false;
}
if (type == EXTERNAL) {
// Storage API doesn't support reading external tables
return false;
}
if ((type == VIEW || type == MATERIALIZED_VIEW) && isSkipViewMaterialization(session)) {
return false;
}
return true;
}

private Optional<TableInfo> getTableInfoIgnoringConflicts(ConnectorSession session, SchemaTableName schemaTableName)
{
BigQueryClient client = bigQueryClientFactory.create(session);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,20 +31,23 @@ public class BigQueryNamedRelationHandle
private final String type;
private final Optional<BigQueryPartitionType> partitionType;
private final Optional<String> comment;
private final boolean useStorageApi;

@JsonCreator
public BigQueryNamedRelationHandle(
@JsonProperty("schemaTableName") SchemaTableName schemaTableName,
@JsonProperty("remoteTableName") RemoteTableName remoteTableName,
@JsonProperty("type") String type,
@JsonProperty("partitionType") Optional<BigQueryPartitionType> partitionType,
@JsonProperty("comment") Optional<String> comment)
@JsonProperty("comment") Optional<String> comment,
@JsonProperty("useStorageApi") boolean useStorageApi)
{
this.schemaTableName = requireNonNull(schemaTableName, "schemaTableName is null");
this.remoteTableName = requireNonNull(remoteTableName, "remoteTableName is null");
this.type = requireNonNull(type, "type is null");
this.partitionType = requireNonNull(partitionType, "partitionType is null");
this.comment = requireNonNull(comment, "comment is null");
this.useStorageApi = useStorageApi;
}

@JsonProperty
Expand Down Expand Up @@ -77,6 +80,13 @@ public Optional<String> getComment()
return comment;
}

@JsonProperty
@Override
public boolean isUseStorageApi()
{
return useStorageApi;
}

@Override
public boolean equals(Object o)
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ public RemoteTableName getDestinationTableName()
}

@JsonProperty
@Override
public boolean isUseStorageApi()
{
return useStorageApi;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@
})
public abstract class BigQueryRelationHandle
{
public abstract boolean isUseStorageApi();

@Override
public abstract String toString();
}
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,6 @@
import java.util.OptionalInt;
import java.util.concurrent.CompletableFuture;

import static com.google.cloud.bigquery.TableDefinition.Type.EXTERNAL;
import static com.google.cloud.bigquery.TableDefinition.Type.MATERIALIZED_VIEW;
import static com.google.cloud.bigquery.TableDefinition.Type.VIEW;
import static com.google.common.base.Preconditions.checkArgument;
Expand All @@ -51,8 +50,6 @@
import static io.trino.plugin.bigquery.BigQueryClient.TABLE_TYPES;
import static io.trino.plugin.bigquery.BigQueryClient.selectSql;
import static io.trino.plugin.bigquery.BigQueryErrorCode.BIGQUERY_FAILED_TO_EXECUTE_QUERY;
import static io.trino.plugin.bigquery.BigQuerySessionProperties.isSkipViewMaterialization;
import static io.trino.plugin.bigquery.BigQueryUtil.isWildcardTable;
import static io.trino.spi.StandardErrorCode.NOT_SUPPORTED;
import static java.util.Objects.requireNonNull;
import static java.util.concurrent.CompletableFuture.completedFuture;
Expand Down Expand Up @@ -170,11 +167,12 @@ private List<BigQuerySplit> getSplits(
.collect(toImmutableList());
}

TableId remoteTableId = bigQueryTableHandle.asPlainTable().getRemoteTableName().toTableId();
BigQueryNamedRelationHandle namedRelation = bigQueryTableHandle.getRequiredNamedRelation();
TableId remoteTableId = namedRelation.getRemoteTableName().toTableId();
TableDefinition.Type tableType = TableDefinition.Type.valueOf(bigQueryTableHandle.asPlainTable().getType());
return emptyProjectionIsRequired(bigQueryTableHandle.projectedColumns())
? createEmptyProjection(session, tableType, remoteTableId, filter)
: readFromBigQuery(session, tableType, remoteTableId, bigQueryTableHandle.projectedColumns(), tableConstraint);
: readFromBigQuery(session, tableType, remoteTableId, bigQueryTableHandle.projectedColumns(), tableConstraint, namedRelation.isUseStorageApi());
}

private static boolean emptyProjectionIsRequired(Optional<List<BigQueryColumnHandle>> projectedColumns)
Expand All @@ -187,7 +185,8 @@ private List<BigQuerySplit> readFromBigQuery(
TableDefinition.Type type,
TableId remoteTableId,
Optional<List<BigQueryColumnHandle>> projectedColumns,
TupleDomain<ColumnHandle> tableConstraint)
TupleDomain<ColumnHandle> tableConstraint,
boolean useStorageApi)
{
checkArgument(projectedColumns.isPresent() && projectedColumns.get().size() > 0, "Projected column is empty");
Optional<String> filter = BigQueryFilterQueryBuilder.buildFilter(tableConstraint);
Expand All @@ -198,18 +197,10 @@ private List<BigQuerySplit> readFromBigQuery(
ImmutableList.Builder<BigQueryColumnHandle> projectedColumnHandles = ImmutableList.builder();
projectedColumnHandles.addAll(columns);

if (isWildcardTable(type, remoteTableId.getTable())) {
// Storage API doesn't support reading wildcard tables
return ImmutableList.of(BigQuerySplit.forViewStream(columns, filter));
}
if (type == EXTERNAL) {
// Storage API doesn't support reading external tables
if (!useStorageApi) {
return ImmutableList.of(BigQuerySplit.forViewStream(columns, filter));
}
if (type == VIEW || type == MATERIALIZED_VIEW) {
if (isSkipViewMaterialization(session)) {
return ImmutableList.of(BigQuerySplit.forViewStream(columns, filter));
}
tableConstraint.getDomains().ifPresent(domains -> domains.keySet().stream()
.map(BigQueryColumnHandle.class::cast)
.filter(column -> !projectedColumnsNames.contains(column.name()))
Expand Down

0 comments on commit 15a74e1

Please sign in to comment.