Skip to content

Commit

Permalink
review fixes
Browse files Browse the repository at this point in the history
  • Loading branch information
IlyaFaer committed Mar 27, 2024
1 parent 1a5e4d0 commit c239109
Show file tree
Hide file tree
Showing 4 changed files with 44 additions and 44 deletions.
1 change: 0 additions & 1 deletion dlt/common/schema/typing.py
Original file line number Diff line number Diff line change
Expand Up @@ -171,7 +171,6 @@ class TTableSchema(TypedDict, total=False):
columns: TTableSchemaColumns
resource: Optional[str]
table_format: Optional[TTableFormat]
insert_api: Optional[Literal["streaming", "default"]]


class TPartialTableSchema(TTableSchema):
Expand Down
81 changes: 41 additions & 40 deletions dlt/destinations/impl/bigquery/bigquery.py
Original file line number Diff line number Diff line change
Expand Up @@ -223,53 +223,54 @@ def restore_file_load(self, file_path: str) -> LoadJob:
return job

def start_file_load(self, table: TTableSchema, file_path: str, load_id: str) -> LoadJob:
insert_api = table.get("insert_api", self.config.loading_api)
if insert_api == "streaming":
if file_path.endswith(".jsonl"):
job_cls = DestinationJsonlLoadJob
elif file_path.endswith(".parquet"):
job_cls = DestinationParquetLoadJob # type: ignore
else:
raise ValueError(
f"Unsupported file type for BigQuery streaming inserts: {file_path}"
)
job = super().start_file_load(table, file_path, load_id) # type: ignore

job = job_cls(
table,
file_path,
self.config, # type: ignore
self.schema,
destination_state(),
functools.partial(_streaming_load, self.sql_client),
[],
)
else:
job = super().start_file_load(table, file_path, load_id) # type: ignore

if not job:
try:
if not job:
insert_api = table.get("x-insert-api", "default")
try:
if insert_api == "streaming":
if file_path.endswith(".jsonl"):
job_cls = DestinationJsonlLoadJob
elif file_path.endswith(".parquet"):
job_cls = DestinationParquetLoadJob # type: ignore
else:
raise ValueError(
f"Unsupported file type for BigQuery streaming inserts: {file_path}"
)

job = job_cls(
table,
file_path,
self.config, # type: ignore
self.schema,
destination_state(),
functools.partial(_streaming_load, self.sql_client),
[],
)
else:
job = BigQueryLoadJob( # type: ignore
FileStorage.get_file_name_from_file_path(file_path),
self._create_load_job(table, file_path),
self.config.http_timeout,
self.config.retry_deadline,
)
except api_core_exceptions.GoogleAPICallError as gace:
reason = BigQuerySqlClient._get_reason_from_errors(gace)
if reason == "notFound":
# google.api_core.exceptions.NotFound: 404 – table not found
raise UnknownTableException(table["name"]) from gace
elif (
reason == "duplicate"
): # google.api_core.exceptions.Conflict: 409 PUT – already exists
return self.restore_file_load(file_path)
elif reason in BQ_TERMINAL_REASONS:
# google.api_core.exceptions.BadRequest - will not be processed ie bad job name
raise LoadJobTerminalException(
file_path, f"The server reason was: {reason}"
) from gace
else:
raise DestinationTransientException(gace) from gace
except api_core_exceptions.GoogleAPICallError as gace:
reason = BigQuerySqlClient._get_reason_from_errors(gace)
if reason == "notFound":
# google.api_core.exceptions.NotFound: 404 – table not found
raise UnknownTableException(table["name"]) from gace
elif (
reason == "duplicate"
): # google.api_core.exceptions.Conflict: 409 PUT – already exists
return self.restore_file_load(file_path)
elif reason in BQ_TERMINAL_REASONS:
# google.api_core.exceptions.BadRequest - will not be processed ie bad job name
raise LoadJobTerminalException(
file_path, f"The server reason was: {reason}"
) from gace
else:
raise DestinationTransientException(gace) from gace

return job

def _get_table_update_sql(
Expand Down
5 changes: 3 additions & 2 deletions dlt/destinations/impl/bigquery/bigquery_adapter.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ def bigquery_adapter(
round_half_even: TColumnNames = None,
table_description: Optional[str] = None,
table_expiration_datetime: Optional[str] = None,
insert_api: Optional[Literal["streaming", "default"]] = "default",
insert_api: Optional[Literal["streaming", "default"]] = None,
) -> DltResource:
"""
Prepares data for loading into BigQuery.
Expand Down Expand Up @@ -145,7 +145,8 @@ def bigquery_adapter(
except ValueError as e:
raise ValueError(f"{table_expiration_datetime} could not be parsed!") from e

additional_table_hints |= {"insert_api": insert_api} # type: ignore[operator]
if insert_api is not None:
additional_table_hints |= {"x-insert-api": insert_api} # type: ignore[operator]

if column_hints or additional_table_hints:
resource.apply_hints(columns=column_hints, additional_table_hints=additional_table_hints)
Expand Down
1 change: 0 additions & 1 deletion dlt/destinations/impl/bigquery/configuration.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@ class BigQueryClientConfiguration(DestinationClientDwhWithStagingConfiguration):
retry_deadline: float = (
60.0 # how long to retry the operation in case of error, the backoff 60 s.
)
loading_api: str = "default"
batch_size: int = 500

__config_gen_annotations__: ClassVar[List[str]] = ["location"]
Expand Down

0 comments on commit c239109

Please sign in to comment.