From a01c28661b10bf65fa3bd1bd20fdb10437233647 Mon Sep 17 00:00:00 2001 From: Marcin Rudolf Date: Sat, 14 Dec 2024 17:16:38 +0100 Subject: [PATCH] fixes tests --- dlt/common/normalizers/json/relational.py | 29 ++++++++++++++----- .../airflow_tests/test_airflow_provider.py | 2 +- .../airflow_tests/test_airflow_wrapper.py | 2 +- .../test_join_airflow_scheduler.py | 2 +- tests/load/pipeline/test_refresh_modes.py | 13 ++++----- 5 files changed, 31 insertions(+), 17 deletions(-) diff --git a/dlt/common/normalizers/json/relational.py b/dlt/common/normalizers/json/relational.py index a1f5142c2f..36845b2e14 100644 --- a/dlt/common/normalizers/json/relational.py +++ b/dlt/common/normalizers/json/relational.py @@ -1,4 +1,16 @@ -from typing import Dict, List, Mapping, Optional, Sequence, Tuple, cast, TypedDict, Any +from typing import ( + ClassVar, + Dict, + List, + Mapping, + Optional, + Sequence, + Tuple, + Type, + cast, + TypedDict, + Any, +) from dlt.common.normalizers.exceptions import InvalidJsonNormalizer from dlt.common.normalizers.typing import TJSONNormalizer @@ -51,6 +63,7 @@ class DataItemNormalizer(DataItemNormalizerBase[RelationalNormalizerConfig]): # other constants EMPTY_KEY_IDENTIFIER = "_empty" # replace empty keys with this + RELATIONAL_CONFIG_TYPE: ClassVar[Type[RelationalNormalizerConfig]] = RelationalNormalizerConfig normalizer_config: RelationalNormalizerConfig propagation_config: RelationalNormalizerConfigPropagation @@ -323,7 +336,7 @@ def extend_table(self, table_name: str) -> None: ): # get row id column from table, assume that we propagate it into c_dlt_root_id always c_dlt_id = get_first_column_name_with_prop(table, "row_key", include_incomplete=True) - DataItemNormalizer.update_normalizer_config( + self.update_normalizer_config( self.schema, { "propagation": { @@ -373,8 +386,8 @@ def normalize_data_item( def ensure_this_normalizer(cls, norm_config: TJSONNormalizer) -> None: # make sure schema has right normalizer present_normalizer = norm_config["module"] - if present_normalizer != __name__: - raise InvalidJsonNormalizer(__name__, present_normalizer) + if present_normalizer != cls.__module__: + raise InvalidJsonNormalizer(cls.__module__, present_normalizer) @classmethod def update_normalizer_config(cls, schema: Schema, config: RelationalNormalizerConfig) -> None: @@ -392,8 +405,10 @@ def get_normalizer_config(cls, schema: Schema) -> RelationalNormalizerConfig: cls.ensure_this_normalizer(norm_config) return cast(RelationalNormalizerConfig, norm_config.get("config", {})) - @staticmethod - def _validate_normalizer_config(schema: Schema, config: RelationalNormalizerConfig) -> None: + @classmethod + def _validate_normalizer_config( + cls, schema: Schema, config: RelationalNormalizerConfig + ) -> None: """Normalizes all known column identifiers according to the schema and then validates the configuration""" def _normalize_prop( @@ -418,7 +433,7 @@ def _normalize_prop( ) validate_dict( - RelationalNormalizerConfig, + cls.RELATIONAL_CONFIG_TYPE, config, "./normalizers/json/config", validator_f=column_name_validator(schema.naming), diff --git a/tests/helpers/airflow_tests/test_airflow_provider.py b/tests/helpers/airflow_tests/test_airflow_provider.py index cc601a43ca..2a8e46e2c8 100644 --- a/tests/helpers/airflow_tests/test_airflow_provider.py +++ b/tests/helpers/airflow_tests/test_airflow_provider.py @@ -1,6 +1,6 @@ import pytest -pytest.importorskip("apache-airflow") +pytest.importorskip("airflow") from airflow import DAG from airflow.decorators import task, dag diff --git a/tests/helpers/airflow_tests/test_airflow_wrapper.py b/tests/helpers/airflow_tests/test_airflow_wrapper.py index 01ecc70326..06603ffcec 100644 --- a/tests/helpers/airflow_tests/test_airflow_wrapper.py +++ b/tests/helpers/airflow_tests/test_airflow_wrapper.py @@ -3,7 +3,7 @@ from unittest import mock from typing import Iterator, List -pytest.importorskip("apache-airflow") +pytest.importorskip("airflow") from airflow import DAG from airflow.decorators import dag from airflow.operators.python import PythonOperator, get_current_context diff --git a/tests/helpers/airflow_tests/test_join_airflow_scheduler.py b/tests/helpers/airflow_tests/test_join_airflow_scheduler.py index e727f61a4d..503aa62359 100644 --- a/tests/helpers/airflow_tests/test_join_airflow_scheduler.py +++ b/tests/helpers/airflow_tests/test_join_airflow_scheduler.py @@ -2,7 +2,7 @@ import datetime from pendulum.tz import UTC -pytest.importorskip("apache-airflow") +pytest.importorskip("airflow") from airflow import DAG from airflow.decorators import dag, task from airflow.models import DagRun diff --git a/tests/load/pipeline/test_refresh_modes.py b/tests/load/pipeline/test_refresh_modes.py index 49b5ca5073..fb88ba915c 100644 --- a/tests/load/pipeline/test_refresh_modes.py +++ b/tests/load/pipeline/test_refresh_modes.py @@ -118,9 +118,7 @@ def test_refresh_drop_sources( pipeline = destination_config.setup_pipeline("refresh_source") - data: Any = refresh_source(first_run=True, drop_sources=True).with_resources( - "some_data_1", "some_data_2" - ) + data: Any = refresh_source(first_run=True, drop_sources=True) if not in_source: data = list(data.selected_resources.values()) @@ -353,7 +351,9 @@ def test_refresh_drop_data_only(destination_config: DestinationTestConfiguration @pytest.mark.parametrize( "destination_config", - destinations_configs(default_sql_configs=True, subset=["duckdb"]), + destinations_configs( + default_sql_configs=True, local_filesystem_configs=True, subset=["duckdb", "filesystem"] + ), ids=lambda x: x.name, ) def test_refresh_drop_sources_multiple_sources(destination_config: DestinationTestConfiguration): @@ -408,7 +408,6 @@ def source_2_data_2(): **destination_config.run_kwargs, ) assert_load_info(info, 2) - # breakpoint() info = pipeline.run( refresh_source_2(first_run=False).with_resources("source_2_data_1"), **destination_config.run_kwargs, @@ -432,7 +431,7 @@ def source_2_data_2(): result = sorted([(row["id"], row["name"]) for row in data["some_data_1"]]) assert result == [(1, "John"), (2, "Jane")] - # # First table from source2 exists, with only first column + # First table from source2 exists, with only first column data = load_tables_to_dicts(pipeline, "source_2_data_1", schema_name="refresh_source_2") assert_only_table_columns( pipeline, "source_2_data_1", ["product"], schema_name="refresh_source_2" @@ -440,7 +439,7 @@ def source_2_data_2(): result = sorted([row["product"] for row in data["source_2_data_1"]]) assert result == ["orange", "pear"] - # # Second table from source 2 is gone + # Second table from source 2 is gone assert not table_exists(pipeline, "source_2_data_2", schema_name="refresh_source_2")