Skip to content

Commit

Permalink
fixes tests
Browse files Browse the repository at this point in the history
  • Loading branch information
rudolfix committed Dec 14, 2024
1 parent c55d846 commit a01c286
Show file tree
Hide file tree
Showing 5 changed files with 31 additions and 17 deletions.
29 changes: 22 additions & 7 deletions dlt/common/normalizers/json/relational.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,16 @@
from typing import Dict, List, Mapping, Optional, Sequence, Tuple, cast, TypedDict, Any
from typing import (
ClassVar,
Dict,
List,
Mapping,
Optional,
Sequence,
Tuple,
Type,
cast,
TypedDict,
Any,
)

from dlt.common.normalizers.exceptions import InvalidJsonNormalizer
from dlt.common.normalizers.typing import TJSONNormalizer
Expand Down Expand Up @@ -51,6 +63,7 @@ class DataItemNormalizer(DataItemNormalizerBase[RelationalNormalizerConfig]):

# other constants
EMPTY_KEY_IDENTIFIER = "_empty" # replace empty keys with this
RELATIONAL_CONFIG_TYPE: ClassVar[Type[RelationalNormalizerConfig]] = RelationalNormalizerConfig

normalizer_config: RelationalNormalizerConfig
propagation_config: RelationalNormalizerConfigPropagation
Expand Down Expand Up @@ -323,7 +336,7 @@ def extend_table(self, table_name: str) -> None:
):
# get row id column from table, assume that we propagate it into c_dlt_root_id always
c_dlt_id = get_first_column_name_with_prop(table, "row_key", include_incomplete=True)
DataItemNormalizer.update_normalizer_config(
self.update_normalizer_config(
self.schema,
{
"propagation": {
Expand Down Expand Up @@ -373,8 +386,8 @@ def normalize_data_item(
def ensure_this_normalizer(cls, norm_config: TJSONNormalizer) -> None:
# make sure schema has right normalizer
present_normalizer = norm_config["module"]
if present_normalizer != __name__:
raise InvalidJsonNormalizer(__name__, present_normalizer)
if present_normalizer != cls.__module__:
raise InvalidJsonNormalizer(cls.__module__, present_normalizer)

@classmethod
def update_normalizer_config(cls, schema: Schema, config: RelationalNormalizerConfig) -> None:
Expand All @@ -392,8 +405,10 @@ def get_normalizer_config(cls, schema: Schema) -> RelationalNormalizerConfig:
cls.ensure_this_normalizer(norm_config)
return cast(RelationalNormalizerConfig, norm_config.get("config", {}))

@staticmethod
def _validate_normalizer_config(schema: Schema, config: RelationalNormalizerConfig) -> None:
@classmethod
def _validate_normalizer_config(
cls, schema: Schema, config: RelationalNormalizerConfig
) -> None:
"""Normalizes all known column identifiers according to the schema and then validates the configuration"""

def _normalize_prop(
Expand All @@ -418,7 +433,7 @@ def _normalize_prop(
)

validate_dict(
RelationalNormalizerConfig,
cls.RELATIONAL_CONFIG_TYPE,
config,
"./normalizers/json/config",
validator_f=column_name_validator(schema.naming),
Expand Down
2 changes: 1 addition & 1 deletion tests/helpers/airflow_tests/test_airflow_provider.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import pytest

pytest.importorskip("apache-airflow")
pytest.importorskip("airflow")

from airflow import DAG
from airflow.decorators import task, dag
Expand Down
2 changes: 1 addition & 1 deletion tests/helpers/airflow_tests/test_airflow_wrapper.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
from unittest import mock
from typing import Iterator, List

pytest.importorskip("apache-airflow")
pytest.importorskip("airflow")
from airflow import DAG
from airflow.decorators import dag
from airflow.operators.python import PythonOperator, get_current_context
Expand Down
2 changes: 1 addition & 1 deletion tests/helpers/airflow_tests/test_join_airflow_scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
import datetime
from pendulum.tz import UTC

pytest.importorskip("apache-airflow")
pytest.importorskip("airflow")
from airflow import DAG
from airflow.decorators import dag, task
from airflow.models import DagRun
Expand Down
13 changes: 6 additions & 7 deletions tests/load/pipeline/test_refresh_modes.py
Original file line number Diff line number Diff line change
Expand Up @@ -118,9 +118,7 @@ def test_refresh_drop_sources(

pipeline = destination_config.setup_pipeline("refresh_source")

data: Any = refresh_source(first_run=True, drop_sources=True).with_resources(
"some_data_1", "some_data_2"
)
data: Any = refresh_source(first_run=True, drop_sources=True)
if not in_source:
data = list(data.selected_resources.values())

Expand Down Expand Up @@ -353,7 +351,9 @@ def test_refresh_drop_data_only(destination_config: DestinationTestConfiguration

@pytest.mark.parametrize(
"destination_config",
destinations_configs(default_sql_configs=True, subset=["duckdb"]),
destinations_configs(
default_sql_configs=True, local_filesystem_configs=True, subset=["duckdb", "filesystem"]
),
ids=lambda x: x.name,
)
def test_refresh_drop_sources_multiple_sources(destination_config: DestinationTestConfiguration):
Expand Down Expand Up @@ -408,7 +408,6 @@ def source_2_data_2():
**destination_config.run_kwargs,
)
assert_load_info(info, 2)
# breakpoint()
info = pipeline.run(
refresh_source_2(first_run=False).with_resources("source_2_data_1"),
**destination_config.run_kwargs,
Expand All @@ -432,15 +431,15 @@ def source_2_data_2():
result = sorted([(row["id"], row["name"]) for row in data["some_data_1"]])
assert result == [(1, "John"), (2, "Jane")]

# # First table from source2 exists, with only first column
# First table from source2 exists, with only first column
data = load_tables_to_dicts(pipeline, "source_2_data_1", schema_name="refresh_source_2")
assert_only_table_columns(
pipeline, "source_2_data_1", ["product"], schema_name="refresh_source_2"
)
result = sorted([row["product"] for row in data["source_2_data_1"]])
assert result == ["orange", "pear"]

# # Second table from source 2 is gone
# Second table from source 2 is gone
assert not table_exists(pipeline, "source_2_data_2", schema_name="refresh_source_2")


Expand Down

0 comments on commit a01c286

Please sign in to comment.