Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Skip unsupported timestamp type when reading views in BigQuery #24004

Merged
merged 4 commits into from
Nov 7, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -565,10 +565,10 @@ public List<BigQueryColumnHandle> getColumns(BigQueryTableHandle tableHandle)

TableInfo tableInfo = getTable(tableHandle.asPlainTable().getRemoteTableName().toTableId())
.orElseThrow(() -> new TableNotFoundException(tableHandle.asPlainTable().getSchemaTableName()));
return buildColumnHandles(tableInfo);
return buildColumnHandles(tableInfo, tableHandle.relationHandle().isUseStorageApi());
}

public List<BigQueryColumnHandle> buildColumnHandles(TableInfo tableInfo)
public List<BigQueryColumnHandle> buildColumnHandles(TableInfo tableInfo, boolean useStorageApi)
{
Schema schema = tableInfo.getDefinition().getSchema();
if (schema == null) {
Expand All @@ -577,8 +577,8 @@ public List<BigQueryColumnHandle> buildColumnHandles(TableInfo tableInfo)
}
return schema.getFields()
.stream()
.filter(typeManager::isSupportedType)
.map(typeManager::toColumnHandle)
.filter(field -> typeManager.isSupportedType(field, useStorageApi))
.map(field -> typeManager.toColumnHandle(field, useStorageApi))
.collect(toImmutableList());
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,9 @@
import java.util.stream.Stream;

import static com.google.cloud.bigquery.StandardSQLTypeName.INT64;
import static com.google.cloud.bigquery.TableDefinition.Type.EXTERNAL;
import static com.google.cloud.bigquery.TableDefinition.Type.MATERIALIZED_VIEW;
import static com.google.cloud.bigquery.TableDefinition.Type.VIEW;
import static com.google.cloud.bigquery.storage.v1.WriteStream.Type.COMMITTED;
import static com.google.common.base.Preconditions.checkArgument;
import static com.google.common.base.Preconditions.checkState;
Expand All @@ -131,6 +134,7 @@
import static io.trino.plugin.bigquery.BigQueryPseudoColumn.PARTITION_DATE;
import static io.trino.plugin.bigquery.BigQueryPseudoColumn.PARTITION_TIME;
import static io.trino.plugin.bigquery.BigQuerySessionProperties.isProjectionPushdownEnabled;
import static io.trino.plugin.bigquery.BigQuerySessionProperties.isSkipViewMaterialization;
import static io.trino.plugin.bigquery.BigQueryTableHandle.BigQueryPartitionType.INGESTION;
import static io.trino.plugin.bigquery.BigQueryTableHandle.getPartitionType;
import static io.trino.plugin.bigquery.BigQueryUtil.isWildcardTable;
Expand Down Expand Up @@ -182,7 +186,6 @@ public BigQueryMetadata(
@Override
public List<String> listSchemaNames(ConnectorSession session)
{
log.debug("listSchemaNames(session=%s)", session);
return listRemoteSchemaNames(session).stream()
.map(schema -> schema.toLowerCase(ENGLISH))
.collect(toImmutableList());
Expand Down Expand Up @@ -215,7 +218,6 @@ public boolean schemaExists(ConnectorSession session, String schemaName)
DatasetId localDatasetId = client.toDatasetId(schemaName);

// Overridden to make sure an error message is returned in case of an ambiguous schema name
log.debug("schemaExists(session=%s)", session);
return client.toRemoteDataset(localDatasetId)
.map(RemoteDatabaseObject::getOnlyRemoteName)
.filter(remoteSchema -> client.getDataset(DatasetId.of(localDatasetId.getProject(), remoteSchema)) != null)
Expand All @@ -227,7 +229,6 @@ public List<SchemaTableName> listTables(ConnectorSession session, Optional<Strin
{
BigQueryClient client = bigQueryClientFactory.create(session);

log.debug("listTables(session=%s, schemaName=%s)", session, schemaName);
String projectId;

Set<String> remoteSchemaNames;
Expand Down Expand Up @@ -326,7 +327,6 @@ public ConnectorTableHandle getTableHandle(ConnectorSession session, SchemaTable
}

BigQueryClient client = bigQueryClientFactory.create(session);
log.debug("getTableHandle(session=%s, schemaTableName=%s)", session, schemaTableName);
DatasetId localDatasetId = client.toDatasetId(schemaTableName.getSchemaName());
String remoteSchemaName = client.toRemoteDataset(localDatasetId)
.map(RemoteDatabaseObject::getOnlyRemoteName)
Expand All @@ -340,8 +340,9 @@ public ConnectorTableHandle getTableHandle(ConnectorSession session, SchemaTable
return null;
}

boolean useStorageApi = useStorageApi(session, schemaTableName.getTableName(), tableInfo.get().getDefinition().getType());
ImmutableList.Builder<BigQueryColumnHandle> columns = ImmutableList.builder();
columns.addAll(client.buildColumnHandles(tableInfo.get()));
columns.addAll(client.buildColumnHandles(tableInfo.get(), useStorageApi));
Optional<BigQueryPartitionType> partitionType = getPartitionType(tableInfo.get().getDefinition());
if (partitionType.isPresent() && partitionType.get() == INGESTION) {
columns.add(PARTITION_DATE.getColumnHandle());
Expand All @@ -352,12 +353,29 @@ public ConnectorTableHandle getTableHandle(ConnectorSession session, SchemaTable
new RemoteTableName(tableInfo.get().getTableId()),
tableInfo.get().getDefinition().getType().toString(),
partitionType,
Optional.ofNullable(tableInfo.get().getDescription())),
Optional.ofNullable(tableInfo.get().getDescription()),
useStorageApi),
TupleDomain.all(),
Optional.empty())
.withProjectedColumns(columns.build());
}

private static boolean useStorageApi(ConnectorSession session, String tableName, TableDefinition.Type type)
{
if (isWildcardTable(type, tableName)) {
// Storage API doesn't support reading wildcard tables
return false;
}
if (type == EXTERNAL) {
// Storage API doesn't support reading external tables
return false;
}
if ((type == VIEW || type == MATERIALIZED_VIEW) && isSkipViewMaterialization(session)) {
return false;
}
return true;
}

private Optional<TableInfo> getTableInfoIgnoringConflicts(ConnectorSession session, SchemaTableName schemaTableName)
{
BigQueryClient client = bigQueryClientFactory.create(session);
Expand All @@ -375,7 +393,6 @@ private Optional<TableInfo> getTableInfoIgnoringConflicts(ConnectorSession sessi
public ConnectorTableMetadata getTableMetadata(ConnectorSession session, ConnectorTableHandle tableHandle)
{
BigQueryClient client = bigQueryClientFactory.create(session);
log.debug("getTableMetadata(session=%s, tableHandle=%s)", session, tableHandle);
BigQueryTableHandle handle = (BigQueryTableHandle) tableHandle;

List<ColumnMetadata> columns = client.getColumns(handle).stream()
Expand Down Expand Up @@ -422,24 +439,9 @@ private Optional<SystemTable> getViewDefinitionSystemTable(ConnectorSession sess
@Override
public Map<String, ColumnHandle> getColumnHandles(ConnectorSession session, ConnectorTableHandle tableHandle)
{
BigQueryClient client = bigQueryClientFactory.create(session);
log.debug("getColumnHandles(session=%s, tableHandle=%s)", session, tableHandle);

BigQueryTableHandle table = (BigQueryTableHandle) tableHandle;
if (table.projectedColumns().isPresent()) {
return table.projectedColumns().get().stream()
.collect(toImmutableMap(columnHandle -> columnHandle.getColumnMetadata().getName(), identity()));
}

checkArgument(table.isNamedRelation(), "Cannot get columns for %s", tableHandle);

ImmutableList.Builder<BigQueryColumnHandle> columns = ImmutableList.builder();
columns.addAll(client.getColumns(table));
if (table.asPlainTable().getPartitionType().isPresent() && table.asPlainTable().getPartitionType().get() == INGESTION) {
columns.add(PARTITION_DATE.getColumnHandle());
columns.add(PARTITION_TIME.getColumnHandle());
}
return columns.build().stream()
checkArgument(table.projectedColumns().isPresent(), "Project columns must be present: %s", table);
ebyhr marked this conversation as resolved.
Show resolved Hide resolved
return table.projectedColumns().get().stream()
.collect(toImmutableMap(columnHandle -> columnHandle.getColumnMetadata().getName(), identity()));
}

Expand All @@ -449,7 +451,6 @@ public ColumnMetadata getColumnMetadata(
ConnectorTableHandle tableHandle,
ColumnHandle columnHandle)
{
log.debug("getColumnMetadata(session=%s, tableHandle=%s, columnHandle=%s)", session, columnHandle, columnHandle);
return ((BigQueryColumnHandle) columnHandle).getColumnMetadata();
}

Expand All @@ -458,7 +459,6 @@ public ColumnMetadata getColumnMetadata(
public Map<SchemaTableName, List<ColumnMetadata>> listTableColumns(ConnectorSession session, SchemaTablePrefix prefix)
{
BigQueryClient client = bigQueryClientFactory.create(session);
log.debug("listTableColumns(session=%s, prefix=%s)", session, prefix);
List<SchemaTableName> tables = prefix.toOptionalSchemaTableName()
.<List<SchemaTableName>>map(ImmutableList::of)
.orElseGet(() -> listTables(session, prefix.getSchema()));
Expand All @@ -470,7 +470,7 @@ public Map<SchemaTableName, List<ColumnMetadata>> listTableColumns(ConnectorSess
return tableInfos.stream()
.collect(toImmutableMap(
table -> new SchemaTableName(table.getTableId().getDataset(), table.getTableId().getTable()),
table -> client.buildColumnHandles(table).stream()
table -> client.buildColumnHandles(table, useStorageApi(session, table.getTableId().getTable(), table.getDefinition().getType())).stream()
.map(BigQueryColumnHandle::getColumnMetadata)
.collect(toImmutableList())));
}
Expand Down Expand Up @@ -894,8 +894,6 @@ public Optional<ProjectionApplicationResult<ConnectorTableHandle>> applyProjecti
List<ConnectorExpression> projections,
Map<String, ColumnHandle> assignments)
{
log.debug("applyProjection(session=%s, handle=%s, projections=%s, assignments=%s)",
session, handle, projections, assignments);
BigQueryTableHandle bigQueryTableHandle = (BigQueryTableHandle) handle;
if (!isProjectionPushdownEnabled(session)) {
List<ColumnHandle> newColumns = ImmutableList.copyOf(assignments.values());
Expand Down Expand Up @@ -1049,8 +1047,6 @@ public Optional<ConstraintApplicationResult<ConnectorTableHandle>> applyFilter(
ConnectorTableHandle handle,
Constraint constraint)
{
log.debug("applyFilter(session=%s, handle=%s, summary=%s, predicate=%s, columns=%s)",
session, handle, constraint.getSummary(), constraint.predicate(), constraint.getPredicateColumns());
BigQueryTableHandle bigQueryTableHandle = (BigQueryTableHandle) handle;

TupleDomain<ColumnHandle> oldDomain = bigQueryTableHandle.constraint();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,20 +31,23 @@ public class BigQueryNamedRelationHandle
private final String type;
private final Optional<BigQueryPartitionType> partitionType;
private final Optional<String> comment;
private final boolean useStorageApi;

@JsonCreator
public BigQueryNamedRelationHandle(
@JsonProperty("schemaTableName") SchemaTableName schemaTableName,
@JsonProperty("remoteTableName") RemoteTableName remoteTableName,
@JsonProperty("type") String type,
@JsonProperty("partitionType") Optional<BigQueryPartitionType> partitionType,
@JsonProperty("comment") Optional<String> comment)
@JsonProperty("comment") Optional<String> comment,
@JsonProperty("useStorageApi") boolean useStorageApi)
{
this.schemaTableName = requireNonNull(schemaTableName, "schemaTableName is null");
this.remoteTableName = requireNonNull(remoteTableName, "remoteTableName is null");
this.type = requireNonNull(type, "type is null");
this.partitionType = requireNonNull(partitionType, "partitionType is null");
this.comment = requireNonNull(comment, "comment is null");
this.useStorageApi = useStorageApi;
}

@JsonProperty
Expand Down Expand Up @@ -77,6 +80,13 @@ public Optional<String> getComment()
return comment;
}

@JsonProperty
@Override
public boolean isUseStorageApi()
{
return useStorageApi;
}

@Override
public boolean equals(Object o)
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -197,7 +197,7 @@ else if (tableResult.hasNextPage()) {

private void appendTo(Type type, FieldValue value, BlockBuilder output)
{
// TODO (https://github.com/trinodb/trino/issues/12346) Add support for bignumeric and timestamp with time zone types
// TODO (https://github.com/trinodb/trino/issues/12346) Add support for timestamp with time zone type
if (value == null || value.isNull()) {
output.appendNull();
return;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ public RemoteTableName getDestinationTableName()
}

@JsonProperty
@Override
public boolean isUseStorageApi()
{
return useStorageApi;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@
})
public abstract class BigQueryRelationHandle
{
public abstract boolean isUseStorageApi();

@Override
public abstract String toString();
}
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,6 @@
import java.util.OptionalInt;
import java.util.concurrent.CompletableFuture;

import static com.google.cloud.bigquery.TableDefinition.Type.EXTERNAL;
import static com.google.cloud.bigquery.TableDefinition.Type.MATERIALIZED_VIEW;
import static com.google.cloud.bigquery.TableDefinition.Type.VIEW;
import static com.google.common.base.Preconditions.checkArgument;
Expand All @@ -51,8 +50,6 @@
import static io.trino.plugin.bigquery.BigQueryClient.TABLE_TYPES;
import static io.trino.plugin.bigquery.BigQueryClient.selectSql;
import static io.trino.plugin.bigquery.BigQueryErrorCode.BIGQUERY_FAILED_TO_EXECUTE_QUERY;
import static io.trino.plugin.bigquery.BigQuerySessionProperties.isSkipViewMaterialization;
import static io.trino.plugin.bigquery.BigQueryUtil.isWildcardTable;
import static io.trino.spi.StandardErrorCode.NOT_SUPPORTED;
import static java.util.Objects.requireNonNull;
import static java.util.concurrent.CompletableFuture.completedFuture;
Expand Down Expand Up @@ -170,11 +167,12 @@ private List<BigQuerySplit> getSplits(
.collect(toImmutableList());
}

TableId remoteTableId = bigQueryTableHandle.asPlainTable().getRemoteTableName().toTableId();
BigQueryNamedRelationHandle namedRelation = bigQueryTableHandle.getRequiredNamedRelation();
TableId remoteTableId = namedRelation.getRemoteTableName().toTableId();
TableDefinition.Type tableType = TableDefinition.Type.valueOf(bigQueryTableHandle.asPlainTable().getType());
return emptyProjectionIsRequired(bigQueryTableHandle.projectedColumns())
? createEmptyProjection(session, tableType, remoteTableId, filter)
: readFromBigQuery(session, tableType, remoteTableId, bigQueryTableHandle.projectedColumns(), tableConstraint);
: readFromBigQuery(session, tableType, remoteTableId, bigQueryTableHandle.projectedColumns(), tableConstraint, namedRelation.isUseStorageApi());
}

private static boolean emptyProjectionIsRequired(Optional<List<BigQueryColumnHandle>> projectedColumns)
Expand All @@ -187,7 +185,8 @@ private List<BigQuerySplit> readFromBigQuery(
TableDefinition.Type type,
TableId remoteTableId,
Optional<List<BigQueryColumnHandle>> projectedColumns,
TupleDomain<ColumnHandle> tableConstraint)
TupleDomain<ColumnHandle> tableConstraint,
boolean useStorageApi)
{
checkArgument(projectedColumns.isPresent() && projectedColumns.get().size() > 0, "Projected column is empty");
Optional<String> filter = BigQueryFilterQueryBuilder.buildFilter(tableConstraint);
Expand All @@ -198,18 +197,10 @@ private List<BigQuerySplit> readFromBigQuery(
ImmutableList.Builder<BigQueryColumnHandle> projectedColumnHandles = ImmutableList.builder();
projectedColumnHandles.addAll(columns);

if (isWildcardTable(type, remoteTableId.getTable())) {
// Storage API doesn't support reading wildcard tables
return ImmutableList.of(BigQuerySplit.forViewStream(columns, filter));
}
if (type == EXTERNAL) {
// Storage API doesn't support reading external tables
if (!useStorageApi) {
return ImmutableList.of(BigQuerySplit.forViewStream(columns, filter));
}
if (type == VIEW || type == MATERIALIZED_VIEW) {
if (isSkipViewMaterialization(session)) {
return ImmutableList.of(BigQuerySplit.forViewStream(columns, filter));
}
tableConstraint.getDomains().ifPresent(domains -> domains.keySet().stream()
.map(BigQueryColumnHandle.class::cast)
.filter(column -> !projectedColumnsNames.contains(column.name()))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -380,14 +380,14 @@ private Optional<ColumnMapping> convertToTrinoType(Field field)
}
}

public BigQueryColumnHandle toColumnHandle(Field field)
public BigQueryColumnHandle toColumnHandle(Field field, boolean useStorageApi)
{
FieldList subFields = field.getSubFields();
List<BigQueryColumnHandle> subColumns = subFields == null ?
Collections.emptyList() :
subFields.stream()
.filter(this::isSupportedType)
.map(this::toColumnHandle)
.filter(column -> isSupportedType(column, useStorageApi))
.map(column -> toColumnHandle(column, useStorageApi))
.collect(Collectors.toList());
ColumnMapping columnMapping = toTrinoType(field).orElseThrow(() -> new IllegalArgumentException("Unsupported type: " + field));
return new BigQueryColumnHandle(
Expand All @@ -402,7 +402,7 @@ public BigQueryColumnHandle toColumnHandle(Field field)
false);
}

public boolean isSupportedType(Field field)
public boolean isSupportedType(Field field, boolean useStorageApi)
{
LegacySQLTypeName type = field.getType();
if (type == LegacySQLTypeName.BIGNUMERIC) {
Expand All @@ -414,6 +414,10 @@ public boolean isSupportedType(Field field)
return false;
}
}
if (!useStorageApi && type == LegacySQLTypeName.TIMESTAMP) {
// TODO https://github.com/trinodb/trino/issues/12346 BigQueryQueryPageSource does not support TIMESTAMP type
return false;
}

return toTrinoType(field).isPresent();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -122,11 +122,11 @@ public TableFunctionAnalysis analyze(

ImmutableList.Builder<BigQueryColumnHandle> columnsBuilder = ImmutableList.builderWithExpectedSize(schema.getFields().size());
for (com.google.cloud.bigquery.Field field : schema.getFields()) {
if (!typeManager.isSupportedType(field)) {
if (!typeManager.isSupportedType(field, useStorageApi)) {
// TODO: Skip unsupported type instead of throwing an exception
throw new TrinoException(NOT_SUPPORTED, "Unsupported type: " + field.getType());
}
columnsBuilder.add(typeManager.toColumnHandle(field));
columnsBuilder.add(typeManager.toColumnHandle(field, useStorageApi));
}

Descriptor returnedType = new Descriptor(columnsBuilder.build().stream()
Expand Down
Loading