From 81ac233372d9bc0a84d29194680e0a169def7ac9 Mon Sep 17 00:00:00 2001 From: IlyaFaer Date: Mon, 25 Mar 2024 15:43:28 +0400 Subject: [PATCH] lint fix --- dlt/destinations/impl/bigquery/bigquery.py | 15 +++++++++------ 1 file changed, 9 insertions(+), 6 deletions(-) diff --git a/dlt/destinations/impl/bigquery/bigquery.py b/dlt/destinations/impl/bigquery/bigquery.py index ef009f9b17..a0d71d9f7d 100644 --- a/dlt/destinations/impl/bigquery/bigquery.py +++ b/dlt/destinations/impl/bigquery/bigquery.py @@ -11,7 +11,6 @@ from dlt.common import json, logger from dlt.common.destination import DestinationCapabilitiesContext -from dlt.destinations.job_impl import DestinationJsonlLoadJob, DestinationParquetLoadJob from dlt.common.destination.reference import ( FollowupJob, NewLoadJob, @@ -26,6 +25,8 @@ from dlt.common.schema.utils import table_schema_has_type from dlt.common.storages.file_storage import FileStorage from dlt.common.typing import DictStrAny +from dlt.destinations.job_impl import DestinationJsonlLoadJob, DestinationParquetLoadJob +from dlt.destinations.sql_client import SqlClientBase from dlt.destinations.exceptions import ( DestinationSchemaWillNotUpdate, DestinationTransientException, @@ -225,9 +226,9 @@ def start_file_load(self, table: TTableSchema, file_path: str, load_id: str) -> insert_api = table.get("insert_api", self.config.loading_api) if insert_api == "streaming": if file_path.endswith(".jsonl"): - job_cls: Type[DestinationJsonlLoadJob] = DestinationJsonlLoadJob + job_cls = DestinationJsonlLoadJob # type: ignore elif file_path.endswith(".parquet"): - job_cls: Type[DestinationParquetLoadJob] = DestinationParquetLoadJob + job_cls = DestinationParquetLoadJob # type: ignore else: raise ValueError( f"Unsupported file type for BigQuery streaming inserts: {file_path}" @@ -451,7 +452,9 @@ def _from_db_type( return self.type_mapper.from_db_type(bq_t, precision, scale) -def _streaming_load(sql_client: BigQueryClient, items: List[Dict[Any, Any]], table: Dict[Any, Any]): +def _streaming_load( + sql_client: SqlClientBase, items: List[Dict[Any, Any]], table: Dict[str, Any] +) -> None: """ Upload the given items into BigQuery table, using streaming API. Streaming API is used for small amounts of data, with optimal @@ -464,7 +467,7 @@ def _streaming_load(sql_client: BigQueryClient, items: List[Dict[Any, Any]], tab table (Dict[Any, Any]): Table schema. """ - def _should_retry(exc: Exception) -> bool: + def _should_retry(exc: api_core_exceptions.GoogleAPIError) -> bool: """Predicate to decide if we need to retry the exception. Args: @@ -479,7 +482,7 @@ def _should_retry(exc: Exception) -> bool: full_name = sql_client.make_qualified_table_name(table["name"], escape=False) - bq_client = sql_client._client + bq_client = sql_client._client # type: ignore bq_client.insert_rows_json( full_name, items,