Skip to content

Commit

Permalink
lint fix
Browse files Browse the repository at this point in the history
  • Loading branch information
IlyaFaer committed Mar 25, 2024
1 parent c692113 commit 81ac233
Showing 1 changed file with 9 additions and 6 deletions.
15 changes: 9 additions & 6 deletions dlt/destinations/impl/bigquery/bigquery.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@

from dlt.common import json, logger
from dlt.common.destination import DestinationCapabilitiesContext
from dlt.destinations.job_impl import DestinationJsonlLoadJob, DestinationParquetLoadJob
from dlt.common.destination.reference import (
FollowupJob,
NewLoadJob,
Expand All @@ -26,6 +25,8 @@
from dlt.common.schema.utils import table_schema_has_type
from dlt.common.storages.file_storage import FileStorage
from dlt.common.typing import DictStrAny
from dlt.destinations.job_impl import DestinationJsonlLoadJob, DestinationParquetLoadJob
from dlt.destinations.sql_client import SqlClientBase
from dlt.destinations.exceptions import (
DestinationSchemaWillNotUpdate,
DestinationTransientException,
Expand Down Expand Up @@ -225,9 +226,9 @@ def start_file_load(self, table: TTableSchema, file_path: str, load_id: str) ->
insert_api = table.get("insert_api", self.config.loading_api)
if insert_api == "streaming":
if file_path.endswith(".jsonl"):
job_cls: Type[DestinationJsonlLoadJob] = DestinationJsonlLoadJob
job_cls = DestinationJsonlLoadJob # type: ignore
elif file_path.endswith(".parquet"):
job_cls: Type[DestinationParquetLoadJob] = DestinationParquetLoadJob
job_cls = DestinationParquetLoadJob # type: ignore
else:
raise ValueError(
f"Unsupported file type for BigQuery streaming inserts: {file_path}"
Expand Down Expand Up @@ -451,7 +452,9 @@ def _from_db_type(
return self.type_mapper.from_db_type(bq_t, precision, scale)


def _streaming_load(sql_client: BigQueryClient, items: List[Dict[Any, Any]], table: Dict[Any, Any]):
def _streaming_load(
sql_client: SqlClientBase, items: List[Dict[Any, Any]], table: Dict[str, Any]
) -> None:
"""
Upload the given items into BigQuery table, using streaming API.
Streaming API is used for small amounts of data, with optimal
Expand All @@ -464,7 +467,7 @@ def _streaming_load(sql_client: BigQueryClient, items: List[Dict[Any, Any]], tab
table (Dict[Any, Any]): Table schema.
"""

def _should_retry(exc: Exception) -> bool:
def _should_retry(exc: api_core_exceptions.GoogleAPIError) -> bool:
"""Predicate to decide if we need to retry the exception.
Args:
Expand All @@ -479,7 +482,7 @@ def _should_retry(exc: Exception) -> bool:

full_name = sql_client.make_qualified_table_name(table["name"], escape=False)

bq_client = sql_client._client
bq_client = sql_client._client # type: ignore
bq_client.insert_rows_json(
full_name,
items,
Expand Down

0 comments on commit 81ac233

Please sign in to comment.