From eada0ddee6bb6b0ee617bcb4f3a5a6088fb87776 Mon Sep 17 00:00:00 2001 From: Marcin Rudolf Date: Wed, 15 May 2024 20:26:20 +0200 Subject: [PATCH] uses with_staging_dataset correctly --- dlt/load/load.py | 6 ++---- .../helpers/airflow_tests/test_airflow_wrapper.py | 6 +++++- tests/load/pipeline/test_pipelines.py | 14 +------------- 3 files changed, 8 insertions(+), 18 deletions(-) diff --git a/dlt/load/load.py b/dlt/load/load.py index 21a3520173..9d898bc54d 100644 --- a/dlt/load/load.py +++ b/dlt/load/load.py @@ -519,10 +519,8 @@ def _maybe_trancate_staging_dataset(self, schema: Schema, job_client: JobClientB except Exception as exc: logger.warn( - ( - f"Staging dataset truncate failed due to the following error: {exc}" - " However, it didn't affect the data integrity." - ) + f"Staging dataset truncate failed due to the following error: {exc}" + " However, it didn't affect the data integrity." ) def get_step_info( diff --git a/tests/helpers/airflow_tests/test_airflow_wrapper.py b/tests/helpers/airflow_tests/test_airflow_wrapper.py index 5ddde69645..533d16c998 100644 --- a/tests/helpers/airflow_tests/test_airflow_wrapper.py +++ b/tests/helpers/airflow_tests/test_airflow_wrapper.py @@ -387,7 +387,11 @@ def dag_parallel(): warn_mock.assert_has_calls( [ mock.call( - "The resource resource2 in task mock_data_incremental_source_resource1-resource2 is using incremental loading and may modify the state. Resources that modify the state should not run in parallel within the single pipeline as the state will not be correctly merged. Please use 'serialize' or 'parallel-isolated' modes instead." + "The resource resource2 in task" + " mock_data_incremental_source_resource1-resource2 is using incremental loading" + " and may modify the state. Resources that modify the state should not run in" + " parallel within the single pipeline as the state will not be correctly" + " merged. Please use 'serialize' or 'parallel-isolated' modes instead." ) ] ) diff --git a/tests/load/pipeline/test_pipelines.py b/tests/load/pipeline/test_pipelines.py index a6c117f425..d98f335d16 100644 --- a/tests/load/pipeline/test_pipelines.py +++ b/tests/load/pipeline/test_pipelines.py @@ -1013,20 +1013,8 @@ def table_3(make_data=False): if job_client.should_load_data_to_staging_dataset( job_client.schema.tables[table_name] ): - with client.with_staging_dataset(): + with client.with_staging_dataset(staging=True): tab_name = client.make_qualified_table_name(table_name) - if destination_config.destination == "clickhouse": - ind = tab_name.rfind("___") - else: - ind = tab_name.rfind(".") - 1 - - if destination_config.destination == "snowflake": - suffix = "_STAGING" - else: - suffix = "_staging" - - tab_name = tab_name[:ind] + suffix + tab_name[ind:] - with client.execute_query(f"SELECT * FROM {tab_name}") as cur: assert len(cur.fetchall()) == 0