From 32b07c386eae496516d55e87fa84bde5395472cd Mon Sep 17 00:00:00 2001
From: Yuya Ebihara <ebyhry@gmail.com>
Date: Fri, 1 Nov 2024 13:33:13 +0900
Subject: [PATCH 1/4] Remove redundant code from BigQueryMetadata

---
 .../plugin/bigquery/BigQueryMetadata.java     | 30 ++-----------------
 1 file changed, 2 insertions(+), 28 deletions(-)

diff --git a/plugin/trino-bigquery/src/main/java/io/trino/plugin/bigquery/BigQueryMetadata.java b/plugin/trino-bigquery/src/main/java/io/trino/plugin/bigquery/BigQueryMetadata.java
index bb264e69a528..bc13c29b4335 100644
--- a/plugin/trino-bigquery/src/main/java/io/trino/plugin/bigquery/BigQueryMetadata.java
+++ b/plugin/trino-bigquery/src/main/java/io/trino/plugin/bigquery/BigQueryMetadata.java
@@ -182,7 +182,6 @@ public BigQueryMetadata(
     @Override
     public List<String> listSchemaNames(ConnectorSession session)
     {
-        log.debug("listSchemaNames(session=%s)", session);
         return listRemoteSchemaNames(session).stream()
                 .map(schema -> schema.toLowerCase(ENGLISH))
                 .collect(toImmutableList());
@@ -215,7 +214,6 @@ public boolean schemaExists(ConnectorSession session, String schemaName)
         DatasetId localDatasetId = client.toDatasetId(schemaName);
 
         // Overridden to make sure an error message is returned in case of an ambiguous schema name
-        log.debug("schemaExists(session=%s)", session);
         return client.toRemoteDataset(localDatasetId)
                 .map(RemoteDatabaseObject::getOnlyRemoteName)
                 .filter(remoteSchema -> client.getDataset(DatasetId.of(localDatasetId.getProject(), remoteSchema)) != null)
@@ -227,7 +225,6 @@ public List<SchemaTableName> listTables(ConnectorSession session, Optional<Strin
     {
         BigQueryClient client = bigQueryClientFactory.create(session);
 
-        log.debug("listTables(session=%s, schemaName=%s)", session, schemaName);
         String projectId;
 
         Set<String> remoteSchemaNames;
@@ -326,7 +323,6 @@ public ConnectorTableHandle getTableHandle(ConnectorSession session, SchemaTable
         }
 
         BigQueryClient client = bigQueryClientFactory.create(session);
-        log.debug("getTableHandle(session=%s, schemaTableName=%s)", session, schemaTableName);
         DatasetId localDatasetId = client.toDatasetId(schemaTableName.getSchemaName());
         String remoteSchemaName = client.toRemoteDataset(localDatasetId)
                 .map(RemoteDatabaseObject::getOnlyRemoteName)
@@ -375,7 +371,6 @@ private Optional<TableInfo> getTableInfoIgnoringConflicts(ConnectorSession sessi
     public ConnectorTableMetadata getTableMetadata(ConnectorSession session, ConnectorTableHandle tableHandle)
     {
         BigQueryClient client = bigQueryClientFactory.create(session);
-        log.debug("getTableMetadata(session=%s, tableHandle=%s)", session, tableHandle);
         BigQueryTableHandle handle = (BigQueryTableHandle) tableHandle;
 
         List<ColumnMetadata> columns = client.getColumns(handle).stream()
@@ -422,24 +417,9 @@ private Optional<SystemTable> getViewDefinitionSystemTable(ConnectorSession sess
     @Override
     public Map<String, ColumnHandle> getColumnHandles(ConnectorSession session, ConnectorTableHandle tableHandle)
     {
-        BigQueryClient client = bigQueryClientFactory.create(session);
-        log.debug("getColumnHandles(session=%s, tableHandle=%s)", session, tableHandle);
-
         BigQueryTableHandle table = (BigQueryTableHandle) tableHandle;
-        if (table.projectedColumns().isPresent()) {
-            return table.projectedColumns().get().stream()
-                    .collect(toImmutableMap(columnHandle -> columnHandle.getColumnMetadata().getName(), identity()));
-        }
-
-        checkArgument(table.isNamedRelation(), "Cannot get columns for %s", tableHandle);
-
-        ImmutableList.Builder<BigQueryColumnHandle> columns = ImmutableList.builder();
-        columns.addAll(client.getColumns(table));
-        if (table.asPlainTable().getPartitionType().isPresent() && table.asPlainTable().getPartitionType().get() == INGESTION) {
-            columns.add(PARTITION_DATE.getColumnHandle());
-            columns.add(PARTITION_TIME.getColumnHandle());
-        }
-        return columns.build().stream()
+        checkArgument(table.projectedColumns().isPresent(), "Project columns must be present: %s", table);
+        return table.projectedColumns().get().stream()
                 .collect(toImmutableMap(columnHandle -> columnHandle.getColumnMetadata().getName(), identity()));
     }
 
@@ -449,7 +429,6 @@ public ColumnMetadata getColumnMetadata(
             ConnectorTableHandle tableHandle,
             ColumnHandle columnHandle)
     {
-        log.debug("getColumnMetadata(session=%s, tableHandle=%s, columnHandle=%s)", session, columnHandle, columnHandle);
         return ((BigQueryColumnHandle) columnHandle).getColumnMetadata();
     }
 
@@ -458,7 +437,6 @@ public ColumnMetadata getColumnMetadata(
     public Map<SchemaTableName, List<ColumnMetadata>> listTableColumns(ConnectorSession session, SchemaTablePrefix prefix)
     {
         BigQueryClient client = bigQueryClientFactory.create(session);
-        log.debug("listTableColumns(session=%s, prefix=%s)", session, prefix);
         List<SchemaTableName> tables = prefix.toOptionalSchemaTableName()
                 .<List<SchemaTableName>>map(ImmutableList::of)
                 .orElseGet(() -> listTables(session, prefix.getSchema()));
@@ -894,8 +872,6 @@ public Optional<ProjectionApplicationResult<ConnectorTableHandle>> applyProjecti
             List<ConnectorExpression> projections,
             Map<String, ColumnHandle> assignments)
     {
-        log.debug("applyProjection(session=%s, handle=%s, projections=%s, assignments=%s)",
-                session, handle, projections, assignments);
         BigQueryTableHandle bigQueryTableHandle = (BigQueryTableHandle) handle;
         if (!isProjectionPushdownEnabled(session)) {
             List<ColumnHandle> newColumns = ImmutableList.copyOf(assignments.values());
@@ -1049,8 +1025,6 @@ public Optional<ConstraintApplicationResult<ConnectorTableHandle>> applyFilter(
             ConnectorTableHandle handle,
             Constraint constraint)
     {
-        log.debug("applyFilter(session=%s, handle=%s, summary=%s, predicate=%s, columns=%s)",
-                session, handle, constraint.getSummary(), constraint.predicate(), constraint.getPredicateColumns());
         BigQueryTableHandle bigQueryTableHandle = (BigQueryTableHandle) handle;
 
         TupleDomain<ColumnHandle> oldDomain = bigQueryTableHandle.constraint();

From 15a74e19b62af202dedf81ddc1ede357f00681f7 Mon Sep 17 00:00:00 2001
From: Yuya Ebihara <ebyhry@gmail.com>
Date: Fri, 1 Nov 2024 14:06:14 +0900
Subject: [PATCH 2/4] Decide BigQuery API in BigQueryMetadata.getTableHandle

---
 .../plugin/bigquery/BigQueryMetadata.java     | 23 ++++++++++++++++++-
 .../bigquery/BigQueryNamedRelationHandle.java | 12 +++++++++-
 .../bigquery/BigQueryQueryRelationHandle.java |  1 +
 .../bigquery/BigQueryRelationHandle.java      |  2 ++
 .../plugin/bigquery/BigQuerySplitSource.java  | 21 +++++------------
 5 files changed, 42 insertions(+), 17 deletions(-)

diff --git a/plugin/trino-bigquery/src/main/java/io/trino/plugin/bigquery/BigQueryMetadata.java b/plugin/trino-bigquery/src/main/java/io/trino/plugin/bigquery/BigQueryMetadata.java
index bc13c29b4335..cd465005e1f4 100644
--- a/plugin/trino-bigquery/src/main/java/io/trino/plugin/bigquery/BigQueryMetadata.java
+++ b/plugin/trino-bigquery/src/main/java/io/trino/plugin/bigquery/BigQueryMetadata.java
@@ -112,6 +112,9 @@
 import java.util.stream.Stream;
 
 import static com.google.cloud.bigquery.StandardSQLTypeName.INT64;
+import static com.google.cloud.bigquery.TableDefinition.Type.EXTERNAL;
+import static com.google.cloud.bigquery.TableDefinition.Type.MATERIALIZED_VIEW;
+import static com.google.cloud.bigquery.TableDefinition.Type.VIEW;
 import static com.google.cloud.bigquery.storage.v1.WriteStream.Type.COMMITTED;
 import static com.google.common.base.Preconditions.checkArgument;
 import static com.google.common.base.Preconditions.checkState;
@@ -131,6 +134,7 @@
 import static io.trino.plugin.bigquery.BigQueryPseudoColumn.PARTITION_DATE;
 import static io.trino.plugin.bigquery.BigQueryPseudoColumn.PARTITION_TIME;
 import static io.trino.plugin.bigquery.BigQuerySessionProperties.isProjectionPushdownEnabled;
+import static io.trino.plugin.bigquery.BigQuerySessionProperties.isSkipViewMaterialization;
 import static io.trino.plugin.bigquery.BigQueryTableHandle.BigQueryPartitionType.INGESTION;
 import static io.trino.plugin.bigquery.BigQueryTableHandle.getPartitionType;
 import static io.trino.plugin.bigquery.BigQueryUtil.isWildcardTable;
@@ -348,12 +352,29 @@ public ConnectorTableHandle getTableHandle(ConnectorSession session, SchemaTable
                 new RemoteTableName(tableInfo.get().getTableId()),
                 tableInfo.get().getDefinition().getType().toString(),
                 partitionType,
-                Optional.ofNullable(tableInfo.get().getDescription())),
+                Optional.ofNullable(tableInfo.get().getDescription()),
+                useStorageApi(session, schemaTableName.getTableName(), tableInfo.get().getDefinition().getType())),
                 TupleDomain.all(),
                 Optional.empty())
                 .withProjectedColumns(columns.build());
     }
 
+    private static boolean useStorageApi(ConnectorSession session, String tableName, TableDefinition.Type type)
+    {
+        if (isWildcardTable(type, tableName)) {
+            // Storage API doesn't support reading wildcard tables
+            return false;
+        }
+        if (type == EXTERNAL) {
+            // Storage API doesn't support reading external tables
+            return false;
+        }
+        if ((type == VIEW || type == MATERIALIZED_VIEW) && isSkipViewMaterialization(session)) {
+            return false;
+        }
+        return true;
+    }
+
     private Optional<TableInfo> getTableInfoIgnoringConflicts(ConnectorSession session, SchemaTableName schemaTableName)
     {
         BigQueryClient client = bigQueryClientFactory.create(session);
diff --git a/plugin/trino-bigquery/src/main/java/io/trino/plugin/bigquery/BigQueryNamedRelationHandle.java b/plugin/trino-bigquery/src/main/java/io/trino/plugin/bigquery/BigQueryNamedRelationHandle.java
index ed368e6697f2..f72d473489a4 100644
--- a/plugin/trino-bigquery/src/main/java/io/trino/plugin/bigquery/BigQueryNamedRelationHandle.java
+++ b/plugin/trino-bigquery/src/main/java/io/trino/plugin/bigquery/BigQueryNamedRelationHandle.java
@@ -31,6 +31,7 @@ public class BigQueryNamedRelationHandle
     private final String type;
     private final Optional<BigQueryPartitionType> partitionType;
     private final Optional<String> comment;
+    private final boolean useStorageApi;
 
     @JsonCreator
     public BigQueryNamedRelationHandle(
@@ -38,13 +39,15 @@ public BigQueryNamedRelationHandle(
             @JsonProperty("remoteTableName") RemoteTableName remoteTableName,
             @JsonProperty("type") String type,
             @JsonProperty("partitionType") Optional<BigQueryPartitionType> partitionType,
-            @JsonProperty("comment") Optional<String> comment)
+            @JsonProperty("comment") Optional<String> comment,
+            @JsonProperty("useStorageApi") boolean useStorageApi)
     {
         this.schemaTableName = requireNonNull(schemaTableName, "schemaTableName is null");
         this.remoteTableName = requireNonNull(remoteTableName, "remoteTableName is null");
         this.type = requireNonNull(type, "type is null");
         this.partitionType = requireNonNull(partitionType, "partitionType is null");
         this.comment = requireNonNull(comment, "comment is null");
+        this.useStorageApi = useStorageApi;
     }
 
     @JsonProperty
@@ -77,6 +80,13 @@ public Optional<String> getComment()
         return comment;
     }
 
+    @JsonProperty
+    @Override
+    public boolean isUseStorageApi()
+    {
+        return useStorageApi;
+    }
+
     @Override
     public boolean equals(Object o)
     {
diff --git a/plugin/trino-bigquery/src/main/java/io/trino/plugin/bigquery/BigQueryQueryRelationHandle.java b/plugin/trino-bigquery/src/main/java/io/trino/plugin/bigquery/BigQueryQueryRelationHandle.java
index e03e02c0b7b0..73b70e93de28 100644
--- a/plugin/trino-bigquery/src/main/java/io/trino/plugin/bigquery/BigQueryQueryRelationHandle.java
+++ b/plugin/trino-bigquery/src/main/java/io/trino/plugin/bigquery/BigQueryQueryRelationHandle.java
@@ -49,6 +49,7 @@ public RemoteTableName getDestinationTableName()
     }
 
     @JsonProperty
+    @Override
     public boolean isUseStorageApi()
     {
         return useStorageApi;
diff --git a/plugin/trino-bigquery/src/main/java/io/trino/plugin/bigquery/BigQueryRelationHandle.java b/plugin/trino-bigquery/src/main/java/io/trino/plugin/bigquery/BigQueryRelationHandle.java
index e3976d6cb71c..879840a3fdfb 100644
--- a/plugin/trino-bigquery/src/main/java/io/trino/plugin/bigquery/BigQueryRelationHandle.java
+++ b/plugin/trino-bigquery/src/main/java/io/trino/plugin/bigquery/BigQueryRelationHandle.java
@@ -25,6 +25,8 @@
 })
 public abstract class BigQueryRelationHandle
 {
+    public abstract boolean isUseStorageApi();
+
     @Override
     public abstract String toString();
 }
diff --git a/plugin/trino-bigquery/src/main/java/io/trino/plugin/bigquery/BigQuerySplitSource.java b/plugin/trino-bigquery/src/main/java/io/trino/plugin/bigquery/BigQuerySplitSource.java
index 0aa69c320302..b6b97cd97ec3 100644
--- a/plugin/trino-bigquery/src/main/java/io/trino/plugin/bigquery/BigQuerySplitSource.java
+++ b/plugin/trino-bigquery/src/main/java/io/trino/plugin/bigquery/BigQuerySplitSource.java
@@ -42,7 +42,6 @@
 import java.util.OptionalInt;
 import java.util.concurrent.CompletableFuture;
 
-import static com.google.cloud.bigquery.TableDefinition.Type.EXTERNAL;
 import static com.google.cloud.bigquery.TableDefinition.Type.MATERIALIZED_VIEW;
 import static com.google.cloud.bigquery.TableDefinition.Type.VIEW;
 import static com.google.common.base.Preconditions.checkArgument;
@@ -51,8 +50,6 @@
 import static io.trino.plugin.bigquery.BigQueryClient.TABLE_TYPES;
 import static io.trino.plugin.bigquery.BigQueryClient.selectSql;
 import static io.trino.plugin.bigquery.BigQueryErrorCode.BIGQUERY_FAILED_TO_EXECUTE_QUERY;
-import static io.trino.plugin.bigquery.BigQuerySessionProperties.isSkipViewMaterialization;
-import static io.trino.plugin.bigquery.BigQueryUtil.isWildcardTable;
 import static io.trino.spi.StandardErrorCode.NOT_SUPPORTED;
 import static java.util.Objects.requireNonNull;
 import static java.util.concurrent.CompletableFuture.completedFuture;
@@ -170,11 +167,12 @@ private List<BigQuerySplit> getSplits(
                     .collect(toImmutableList());
         }
 
-        TableId remoteTableId = bigQueryTableHandle.asPlainTable().getRemoteTableName().toTableId();
+        BigQueryNamedRelationHandle namedRelation = bigQueryTableHandle.getRequiredNamedRelation();
+        TableId remoteTableId = namedRelation.getRemoteTableName().toTableId();
         TableDefinition.Type tableType = TableDefinition.Type.valueOf(bigQueryTableHandle.asPlainTable().getType());
         return emptyProjectionIsRequired(bigQueryTableHandle.projectedColumns())
                 ? createEmptyProjection(session, tableType, remoteTableId, filter)
-                : readFromBigQuery(session, tableType, remoteTableId, bigQueryTableHandle.projectedColumns(), tableConstraint);
+                : readFromBigQuery(session, tableType, remoteTableId, bigQueryTableHandle.projectedColumns(), tableConstraint, namedRelation.isUseStorageApi());
     }
 
     private static boolean emptyProjectionIsRequired(Optional<List<BigQueryColumnHandle>> projectedColumns)
@@ -187,7 +185,8 @@ private List<BigQuerySplit> readFromBigQuery(
             TableDefinition.Type type,
             TableId remoteTableId,
             Optional<List<BigQueryColumnHandle>> projectedColumns,
-            TupleDomain<ColumnHandle> tableConstraint)
+            TupleDomain<ColumnHandle> tableConstraint,
+            boolean useStorageApi)
     {
         checkArgument(projectedColumns.isPresent() && projectedColumns.get().size() > 0, "Projected column is empty");
         Optional<String> filter = BigQueryFilterQueryBuilder.buildFilter(tableConstraint);
@@ -198,18 +197,10 @@ private List<BigQuerySplit> readFromBigQuery(
         ImmutableList.Builder<BigQueryColumnHandle> projectedColumnHandles = ImmutableList.builder();
         projectedColumnHandles.addAll(columns);
 
-        if (isWildcardTable(type, remoteTableId.getTable())) {
-            // Storage API doesn't support reading wildcard tables
-            return ImmutableList.of(BigQuerySplit.forViewStream(columns, filter));
-        }
-        if (type == EXTERNAL) {
-            // Storage API doesn't support reading external tables
+        if (!useStorageApi) {
             return ImmutableList.of(BigQuerySplit.forViewStream(columns, filter));
         }
         if (type == VIEW || type == MATERIALIZED_VIEW) {
-            if (isSkipViewMaterialization(session)) {
-                return ImmutableList.of(BigQuerySplit.forViewStream(columns, filter));
-            }
             tableConstraint.getDomains().ifPresent(domains -> domains.keySet().stream()
                     .map(BigQueryColumnHandle.class::cast)
                     .filter(column -> !projectedColumnsNames.contains(column.name()))

From 380a23e8fdbc6b6c999d37cf526fd05c603b3861 Mon Sep 17 00:00:00 2001
From: Yuya Ebihara <ebyhry@gmail.com>
Date: Fri, 1 Nov 2024 20:21:09 +0900
Subject: [PATCH 3/4] Update TODO comment in BigQueryQueryPageSource

---
 .../java/io/trino/plugin/bigquery/BigQueryQueryPageSource.java  | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)

diff --git a/plugin/trino-bigquery/src/main/java/io/trino/plugin/bigquery/BigQueryQueryPageSource.java b/plugin/trino-bigquery/src/main/java/io/trino/plugin/bigquery/BigQueryQueryPageSource.java
index ce89b253a17d..5e63bd68b6e1 100644
--- a/plugin/trino-bigquery/src/main/java/io/trino/plugin/bigquery/BigQueryQueryPageSource.java
+++ b/plugin/trino-bigquery/src/main/java/io/trino/plugin/bigquery/BigQueryQueryPageSource.java
@@ -197,7 +197,7 @@ else if (tableResult.hasNextPage()) {
 
     private void appendTo(Type type, FieldValue value, BlockBuilder output)
     {
-        // TODO (https://github.com/trinodb/trino/issues/12346) Add support for bignumeric and timestamp with time zone types
+        // TODO (https://github.com/trinodb/trino/issues/12346) Add support for timestamp with time zone type
         if (value == null || value.isNull()) {
             output.appendNull();
             return;

From 79a8a35eb9aeae2bfa3157f1133acf4160459f99 Mon Sep 17 00:00:00 2001
From: Yuya Ebihara <ebyhry@gmail.com>
Date: Fri, 1 Nov 2024 20:20:43 +0900
Subject: [PATCH 4/4] Skip unsupported timestamp type in BigQuery

---
 .../trino/plugin/bigquery/BigQueryClient.java |  8 +++----
 .../plugin/bigquery/BigQueryMetadata.java     |  7 ++++---
 .../plugin/bigquery/BigQueryTypeManager.java  | 12 +++++++----
 .../io/trino/plugin/bigquery/ptf/Query.java   |  4 ++--
 .../bigquery/BaseBigQueryConnectorTest.java   | 21 +++++++++++++++++++
 5 files changed, 39 insertions(+), 13 deletions(-)

diff --git a/plugin/trino-bigquery/src/main/java/io/trino/plugin/bigquery/BigQueryClient.java b/plugin/trino-bigquery/src/main/java/io/trino/plugin/bigquery/BigQueryClient.java
index c94f9c1e858b..ffbae1b898a6 100644
--- a/plugin/trino-bigquery/src/main/java/io/trino/plugin/bigquery/BigQueryClient.java
+++ b/plugin/trino-bigquery/src/main/java/io/trino/plugin/bigquery/BigQueryClient.java
@@ -565,10 +565,10 @@ public List<BigQueryColumnHandle> getColumns(BigQueryTableHandle tableHandle)
 
         TableInfo tableInfo = getTable(tableHandle.asPlainTable().getRemoteTableName().toTableId())
                 .orElseThrow(() -> new TableNotFoundException(tableHandle.asPlainTable().getSchemaTableName()));
-        return buildColumnHandles(tableInfo);
+        return buildColumnHandles(tableInfo, tableHandle.relationHandle().isUseStorageApi());
     }
 
-    public List<BigQueryColumnHandle> buildColumnHandles(TableInfo tableInfo)
+    public List<BigQueryColumnHandle> buildColumnHandles(TableInfo tableInfo, boolean useStorageApi)
     {
         Schema schema = tableInfo.getDefinition().getSchema();
         if (schema == null) {
@@ -577,8 +577,8 @@ public List<BigQueryColumnHandle> buildColumnHandles(TableInfo tableInfo)
         }
         return schema.getFields()
                 .stream()
-                .filter(typeManager::isSupportedType)
-                .map(typeManager::toColumnHandle)
+                .filter(field -> typeManager.isSupportedType(field, useStorageApi))
+                .map(field -> typeManager.toColumnHandle(field, useStorageApi))
                 .collect(toImmutableList());
     }
 
diff --git a/plugin/trino-bigquery/src/main/java/io/trino/plugin/bigquery/BigQueryMetadata.java b/plugin/trino-bigquery/src/main/java/io/trino/plugin/bigquery/BigQueryMetadata.java
index cd465005e1f4..ec6db3f137ed 100644
--- a/plugin/trino-bigquery/src/main/java/io/trino/plugin/bigquery/BigQueryMetadata.java
+++ b/plugin/trino-bigquery/src/main/java/io/trino/plugin/bigquery/BigQueryMetadata.java
@@ -340,8 +340,9 @@ public ConnectorTableHandle getTableHandle(ConnectorSession session, SchemaTable
             return null;
         }
 
+        boolean useStorageApi = useStorageApi(session, schemaTableName.getTableName(), tableInfo.get().getDefinition().getType());
         ImmutableList.Builder<BigQueryColumnHandle> columns = ImmutableList.builder();
-        columns.addAll(client.buildColumnHandles(tableInfo.get()));
+        columns.addAll(client.buildColumnHandles(tableInfo.get(), useStorageApi));
         Optional<BigQueryPartitionType> partitionType = getPartitionType(tableInfo.get().getDefinition());
         if (partitionType.isPresent() && partitionType.get() == INGESTION) {
             columns.add(PARTITION_DATE.getColumnHandle());
@@ -353,7 +354,7 @@ public ConnectorTableHandle getTableHandle(ConnectorSession session, SchemaTable
                 tableInfo.get().getDefinition().getType().toString(),
                 partitionType,
                 Optional.ofNullable(tableInfo.get().getDescription()),
-                useStorageApi(session, schemaTableName.getTableName(), tableInfo.get().getDefinition().getType())),
+                useStorageApi),
                 TupleDomain.all(),
                 Optional.empty())
                 .withProjectedColumns(columns.build());
@@ -469,7 +470,7 @@ public Map<SchemaTableName, List<ColumnMetadata>> listTableColumns(ConnectorSess
         return tableInfos.stream()
                 .collect(toImmutableMap(
                         table -> new SchemaTableName(table.getTableId().getDataset(), table.getTableId().getTable()),
-                        table -> client.buildColumnHandles(table).stream()
+                        table -> client.buildColumnHandles(table, useStorageApi(session, table.getTableId().getTable(), table.getDefinition().getType())).stream()
                                 .map(BigQueryColumnHandle::getColumnMetadata)
                                 .collect(toImmutableList())));
     }
diff --git a/plugin/trino-bigquery/src/main/java/io/trino/plugin/bigquery/BigQueryTypeManager.java b/plugin/trino-bigquery/src/main/java/io/trino/plugin/bigquery/BigQueryTypeManager.java
index 5785e0ec7b20..2c90fc5edd78 100644
--- a/plugin/trino-bigquery/src/main/java/io/trino/plugin/bigquery/BigQueryTypeManager.java
+++ b/plugin/trino-bigquery/src/main/java/io/trino/plugin/bigquery/BigQueryTypeManager.java
@@ -380,14 +380,14 @@ private Optional<ColumnMapping> convertToTrinoType(Field field)
         }
     }
 
-    public BigQueryColumnHandle toColumnHandle(Field field)
+    public BigQueryColumnHandle toColumnHandle(Field field, boolean useStorageApi)
     {
         FieldList subFields = field.getSubFields();
         List<BigQueryColumnHandle> subColumns = subFields == null ?
                 Collections.emptyList() :
                 subFields.stream()
-                        .filter(this::isSupportedType)
-                        .map(this::toColumnHandle)
+                        .filter(column -> isSupportedType(column, useStorageApi))
+                        .map(column -> toColumnHandle(column, useStorageApi))
                         .collect(Collectors.toList());
         ColumnMapping columnMapping = toTrinoType(field).orElseThrow(() -> new IllegalArgumentException("Unsupported type: " + field));
         return new BigQueryColumnHandle(
@@ -402,7 +402,7 @@ public BigQueryColumnHandle toColumnHandle(Field field)
                 false);
     }
 
-    public boolean isSupportedType(Field field)
+    public boolean isSupportedType(Field field, boolean useStorageApi)
     {
         LegacySQLTypeName type = field.getType();
         if (type == LegacySQLTypeName.BIGNUMERIC) {
@@ -414,6 +414,10 @@ public boolean isSupportedType(Field field)
                 return false;
             }
         }
+        if (!useStorageApi && type == LegacySQLTypeName.TIMESTAMP) {
+            // TODO https://github.com/trinodb/trino/issues/12346 BigQueryQueryPageSource does not support TIMESTAMP type
+            return false;
+        }
 
         return toTrinoType(field).isPresent();
     }
diff --git a/plugin/trino-bigquery/src/main/java/io/trino/plugin/bigquery/ptf/Query.java b/plugin/trino-bigquery/src/main/java/io/trino/plugin/bigquery/ptf/Query.java
index d576a34be63f..a441f11da970 100644
--- a/plugin/trino-bigquery/src/main/java/io/trino/plugin/bigquery/ptf/Query.java
+++ b/plugin/trino-bigquery/src/main/java/io/trino/plugin/bigquery/ptf/Query.java
@@ -122,11 +122,11 @@ public TableFunctionAnalysis analyze(
 
             ImmutableList.Builder<BigQueryColumnHandle> columnsBuilder = ImmutableList.builderWithExpectedSize(schema.getFields().size());
             for (com.google.cloud.bigquery.Field field : schema.getFields()) {
-                if (!typeManager.isSupportedType(field)) {
+                if (!typeManager.isSupportedType(field, useStorageApi)) {
                     // TODO: Skip unsupported type instead of throwing an exception
                     throw new TrinoException(NOT_SUPPORTED, "Unsupported type: " + field.getType());
                 }
-                columnsBuilder.add(typeManager.toColumnHandle(field));
+                columnsBuilder.add(typeManager.toColumnHandle(field, useStorageApi));
             }
 
             Descriptor returnedType = new Descriptor(columnsBuilder.build().stream()
diff --git a/plugin/trino-bigquery/src/test/java/io/trino/plugin/bigquery/BaseBigQueryConnectorTest.java b/plugin/trino-bigquery/src/test/java/io/trino/plugin/bigquery/BaseBigQueryConnectorTest.java
index e24dc811a243..21b3aecf65ce 100644
--- a/plugin/trino-bigquery/src/test/java/io/trino/plugin/bigquery/BaseBigQueryConnectorTest.java
+++ b/plugin/trino-bigquery/src/test/java/io/trino/plugin/bigquery/BaseBigQueryConnectorTest.java
@@ -26,6 +26,7 @@
 import io.trino.testing.QueryRunner.MaterializedResultWithPlan;
 import io.trino.testing.TestingConnectorBehavior;
 import io.trino.testing.sql.TestTable;
+import io.trino.testing.sql.TestView;
 import org.intellij.lang.annotations.Language;
 import org.junit.jupiter.api.BeforeAll;
 import org.junit.jupiter.api.Disabled;
@@ -644,6 +645,26 @@ public void testSkipUnsupportedType()
         }
     }
 
+    @Test
+    public void testSkipUnsupportedTimestampType()
+    {
+        Session skipViewMaterialization = Session.builder(getSession())
+                .setCatalogSessionProperty("bigquery", "skip_view_materialization", "true")
+                .build();
+
+        try (TestView view = new TestView(
+                bigQuerySqlExecutor,
+                "test.test_skip_unsupported_type",
+                "SELECT 1 a, TIMESTAMP '1970-01-01 00:00:00 UTC' unsupported, 2 b")) {
+            assertQuery(skipViewMaterialization, "SELECT * FROM " + view.getName(), "VALUES (1, 2)");
+            assertThat((String) computeActual(skipViewMaterialization, "SHOW CREATE TABLE " + view.getName()).getOnlyValue())
+                    .isEqualTo("CREATE TABLE bigquery." + view.getName() + " (\n" +
+                            "   a bigint,\n" +
+                            "   b bigint\n" +
+                            ")");
+        }
+    }
+
     @Test
     @Override
     public void testDateYearOfEraPredicate()