Skip to content

Commit

Permalink
lint fix
Browse files Browse the repository at this point in the history
  • Loading branch information
IlyaFaer committed Mar 25, 2024
1 parent 10e57f6 commit c692113
Show file tree
Hide file tree
Showing 3 changed files with 10 additions and 10 deletions.
16 changes: 8 additions & 8 deletions dlt/destinations/impl/bigquery/bigquery.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import functools
import os
from pathlib import Path
from typing import ClassVar, Optional, Sequence, Tuple, List, cast, Dict
from typing import Any, ClassVar, Dict, List, Optional, Sequence, Tuple, Type, cast

import google.cloud.bigquery as bigquery # noqa: I250
from google.api_core import exceptions as api_core_exceptions
Expand Down Expand Up @@ -225,9 +225,9 @@ def start_file_load(self, table: TTableSchema, file_path: str, load_id: str) ->
insert_api = table.get("insert_api", self.config.loading_api)
if insert_api == "streaming":
if file_path.endswith(".jsonl"):
job_cls = DestinationJsonlLoadJob
job_cls: Type[DestinationJsonlLoadJob] = DestinationJsonlLoadJob
elif file_path.endswith(".parquet"):
job_cls = DestinationParquetLoadJob
job_cls: Type[DestinationParquetLoadJob] = DestinationParquetLoadJob
else:
raise ValueError(
f"Unsupported file type for BigQuery streaming inserts: {file_path}"
Expand All @@ -236,18 +236,18 @@ def start_file_load(self, table: TTableSchema, file_path: str, load_id: str) ->
job = job_cls(
table,
file_path,
self.config,
self.config, # type: ignore
self.schema,
destination_state(),
functools.partial(_streaming_load, self.sql_client),
[],
)
else:
job = super().start_file_load(table, file_path, load_id)
job = super().start_file_load(table, file_path, load_id) # type: ignore

if not job:
try:
job = BigQueryLoadJob(
job = BigQueryLoadJob( # type: ignore
FileStorage.get_file_name_from_file_path(file_path),
self._create_load_job(table, file_path),
self.config.http_timeout,
Expand Down Expand Up @@ -451,7 +451,7 @@ def _from_db_type(
return self.type_mapper.from_db_type(bq_t, precision, scale)


def _streaming_load(sql_client, items, table):
def _streaming_load(sql_client: BigQueryClient, items: List[Dict[Any, Any]], table: Dict[Any, Any]):
"""
Upload the given items into BigQuery table, using streaming API.
Streaming API is used for small amounts of data, with optimal
Expand All @@ -464,7 +464,7 @@ def _streaming_load(sql_client, items, table):
table (Dict[Any, Any]): Table schema.
"""

def _should_retry(exc):
def _should_retry(exc: Exception) -> bool:
"""Predicate to decide if we need to retry the exception.
Args:
Expand Down
2 changes: 1 addition & 1 deletion dlt/destinations/impl/bigquery/bigquery_adapter.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ def bigquery_adapter(
round_half_even: TColumnNames = None,
table_description: Optional[str] = None,
table_expiration_datetime: Optional[str] = None,
insert_api: Optional[Literal["streaming_insert", "default"]] = "default",
insert_api: Optional[Literal["streaming", "default"]] = "default",
) -> DltResource:
"""
Prepares data for loading into BigQuery.
Expand Down
2 changes: 1 addition & 1 deletion dlt/destinations/impl/bigquery/sql_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ class BigQueryDBApiCursorImpl(DBApiCursorImpl):
def df(self, chunk_size: int = None, **kwargs: Any) -> DataFrame:
if chunk_size is not None:
return super().df(chunk_size=chunk_size)
query_job: bigquery.QueryJob = self.native_cursor._query_job
query_job: bigquery.QueryJob = self.native_cursor._query_job # type: ignore

try:
return query_job.to_dataframe(**kwargs)
Expand Down

0 comments on commit c692113

Please sign in to comment.