From ba01251049b0d6f85b4f2a8a1fb4e76085ebdd87 Mon Sep 17 00:00:00 2001 From: Dave Date: Tue, 2 Apr 2024 20:29:50 +0200 Subject: [PATCH] fix default order by --- dlt/destinations/impl/dremio/dremio.py | 6 ++++++ dlt/destinations/sql_jobs.py | 8 ++++++-- tests/load/pipeline/test_pipelines.py | 5 +---- tests/load/pipeline/test_write_disposition_changes.py | 11 ++--------- 4 files changed, 15 insertions(+), 15 deletions(-) diff --git a/dlt/destinations/impl/dremio/dremio.py b/dlt/destinations/impl/dremio/dremio.py index 5eaacaeb35..23bca0ad74 100644 --- a/dlt/destinations/impl/dremio/dremio.py +++ b/dlt/destinations/impl/dremio/dremio.py @@ -69,6 +69,7 @@ def from_db_type( return dict(data_type="decimal", precision=precision, scale=scale) return super().from_db_type(db_type, precision, scale) + class DremioMergeJob(SqlMergeJob): @classmethod def _new_temp_table_name(cls, name_prefix: str, sql_client: SqlClientBase[Any]) -> str: @@ -78,6 +79,11 @@ def _new_temp_table_name(cls, name_prefix: str, sql_client: SqlClientBase[Any]) def _to_temp_table(cls, select_sql: str, temp_table_name: str) -> str: return f"CREATE TABLE {temp_table_name} AS {select_sql};" + @classmethod + def default_order_by(cls) -> str: + return "NULL" + + class DremioLoadJob(LoadJob, FollowupJob): def __init__( self, diff --git a/dlt/destinations/sql_jobs.py b/dlt/destinations/sql_jobs.py index a584ec80b3..0770bf58d3 100644 --- a/dlt/destinations/sql_jobs.py +++ b/dlt/destinations/sql_jobs.py @@ -194,7 +194,7 @@ def gen_key_table_clauses( @classmethod def gen_delete_temp_table_sql( - cls, unique_column: str, key_table_clauses: Sequence[str], sql_client: SqlClientBase[Any] + cls, unique_column: str, key_table_clauses: Sequence[str], sql_client: SqlClientBase[Any] ) -> Tuple[List[str], str]: """Generate sql that creates delete temp table and inserts `unique_column` from root table for all records to delete. May return several statements. @@ -254,7 +254,7 @@ def gen_select_from_dedup_sql( 1) To select the values for an INSERT INTO statement. 2) To select the values for a temporary table used for inserts. """ - order_by = "NULL" + order_by = cls.default_order_by() if dedup_sort is not None: order_by = f"{dedup_sort[0]} {dedup_sort[1].upper()}" if condition is None: @@ -271,6 +271,10 @@ def gen_select_from_dedup_sql( ) AS _dlt_dedup_numbered WHERE _dlt_dedup_rn = 1 AND ({condition}) """ + @classmethod + def default_order_by(cls) -> str: + return "(SELECT NULL)" + @classmethod def gen_insert_temp_table_sql( cls, diff --git a/tests/load/pipeline/test_pipelines.py b/tests/load/pipeline/test_pipelines.py index 4264cebc7c..166ab9feb2 100644 --- a/tests/load/pipeline/test_pipelines.py +++ b/tests/load/pipeline/test_pipelines.py @@ -58,10 +58,7 @@ def test_default_pipeline_names( p.config.use_single_dataset = use_single_dataset # this is a name of executing test harness or blank pipeline on windows possible_names = ["dlt_pytest", "dlt_pipeline"] - possible_dataset_names = [ - "dlt_pytest_dataset", - "dlt_pipeline_dataset" - ] + possible_dataset_names = ["dlt_pytest_dataset", "dlt_pipeline_dataset"] assert p.pipeline_name in possible_names assert p.pipelines_dir == os.path.abspath(os.path.join(TEST_STORAGE_ROOT, ".dlt", "pipelines")) assert p.dataset_name in possible_dataset_names diff --git a/tests/load/pipeline/test_write_disposition_changes.py b/tests/load/pipeline/test_write_disposition_changes.py index 24ff806922..50986727ed 100644 --- a/tests/load/pipeline/test_write_disposition_changes.py +++ b/tests/load/pipeline/test_write_disposition_changes.py @@ -133,10 +133,7 @@ def source(): return # without a root key this will fail, it is expected - if ( - not with_root_key - and destination_config.supports_merge - ): + if not with_root_key and destination_config.supports_merge: with pytest.raises(PipelineStepFailed): pipeline.run( s, @@ -157,11 +154,7 @@ def source(): pipeline, { "items": 100 if destination_config.supports_merge else 200, - "items__sub_items": ( - 100 - if destination_config.supports_merge - else 200 - ), + "items__sub_items": 100 if destination_config.supports_merge else 200, }, ) assert pipeline.default_schema._normalizers_config["json"]["config"]["propagation"]["tables"][