Skip to content

Commit

Permalink
Create _dlt_* tables even when schema contract is frozen
Browse files Browse the repository at this point in the history
  • Loading branch information
sultaniman committed May 15, 2024
1 parent 53dde6c commit cce66c7
Show file tree
Hide file tree
Showing 2 changed files with 103 additions and 23 deletions.
5 changes: 3 additions & 2 deletions dlt/common/schema/schema.py
Original file line number Diff line number Diff line change
Expand Up @@ -297,10 +297,11 @@ def apply_schema_contract(
existing_table: TTableSchema = self._schema_tables.get(table_name, None)

# table is new when not yet exist or
is_new_table = not existing_table or self.is_new_table(table_name)
is_dlt_table = not table_name.startswith("_dlt")
is_new_table = (not existing_table or self.is_new_table(table_name)) and is_dlt_table
# check case where we have a new table
if is_new_table and schema_contract["tables"] != "evolve":
if raise_on_freeze and schema_contract["tables"] == "freeze":
if (raise_on_freeze and schema_contract["tables"] == "freeze") and not is_dlt_table:
raise DataValidationError(
self.name,
table_name,
Expand Down
121 changes: 100 additions & 21 deletions tests/pipeline/test_pipeline.py
Original file line number Diff line number Diff line change
@@ -1,14 +1,15 @@
import asyncio
import pathlib
from concurrent.futures import ThreadPoolExecutor
import itertools
import logging
import os
import pathlib
import random
import threading

from concurrent.futures import ThreadPoolExecutor
from time import sleep
from typing import Any, Tuple, cast
import threading
from tenacity import retry_if_exception, Retrying, stop_after_attempt


import pytest

Expand Down Expand Up @@ -39,14 +40,24 @@

from dlt.destinations import filesystem, redshift, dummy
from dlt.destinations.impl.filesystem.filesystem import INIT_FILE_NAME
from dlt.extract.exceptions import InvalidResourceDataTypeBasic, PipeGenInvalid, SourceExhausted
from dlt.extract.exceptions import (
InvalidResourceDataTypeBasic,
PipeGenInvalid,
SourceExhausted,
)
from dlt.extract.extract import ExtractStorage
from dlt.extract import DltResource, DltSource
from dlt.extract.extractors import MaterializedEmptyList
from dlt.load.exceptions import LoadClientJobFailed
from dlt.pipeline.exceptions import InvalidPipelineName, PipelineNotActive, PipelineStepFailed
from dlt.pipeline.exceptions import (
InvalidPipelineName,
PipelineNotActive,
PipelineStepFailed,
)
from dlt.pipeline.helpers import retry_load

from tenacity import retry_if_exception, Retrying, stop_after_attempt

from tests.common.utils import TEST_SENTRY_DSN
from tests.common.configuration.utils import environment
from tests.utils import TEST_STORAGE_ROOT, skipifnotwindows
Expand All @@ -56,6 +67,7 @@
assert_load_info,
airtable_emojis,
load_data_table_counts,
load_table_counts,
many_delayed,
)

Expand Down Expand Up @@ -143,7 +155,9 @@ def test_file_format_resolution() -> None:
# raise on destinations that does not support staging
with pytest.raises(DestinationLoadingViaStagingNotSupported):
dlt.pipeline(
pipeline_name="managed_state_pipeline", destination="postgres", staging="filesystem"
pipeline_name="managed_state_pipeline",
destination="postgres",
staging="filesystem",
)

# raise on staging that does not support staging interface
Expand All @@ -159,7 +173,9 @@ def test_file_format_resolution() -> None:
# check invalid input
with pytest.raises(DestinationIncompatibleLoaderFileFormatException):
pipeline = dlt.pipeline(
pipeline_name="managed_state_pipeline", destination="athena", staging="filesystem"
pipeline_name="managed_state_pipeline",
destination="athena",
staging="filesystem",
)
pipeline.config.restore_from_destination = False
pipeline.run([1, 2, 3], table_name="numbers", loader_file_format="insert_values")
Expand Down Expand Up @@ -358,7 +374,9 @@ def test_destination_credentials_in_factory(environment: Any) -> None:
def test_destination_explicit_invalid_credentials_filesystem(environment: Any) -> None:
# if string cannot be parsed
p = dlt.pipeline(
pipeline_name="postgres_pipeline", destination="filesystem", credentials="PR8BLEM"
pipeline_name="postgres_pipeline",
destination="filesystem",
credentials="PR8BLEM",
)
with pytest.raises(NativeValueError):
p._get_destination_client_initial_config(p.destination)
Expand Down Expand Up @@ -403,12 +421,18 @@ def test_extract_multiple_sources() -> None:
s1 = DltSource(
dlt.Schema("default"),
"module",
[dlt.resource([1, 2, 3], name="resource_1"), dlt.resource([3, 4, 5], name="resource_2")],
[
dlt.resource([1, 2, 3], name="resource_1"),
dlt.resource([3, 4, 5], name="resource_2"),
],
)
s2 = DltSource(
dlt.Schema("default_2"),
"module",
[dlt.resource([6, 7, 8], name="resource_3"), dlt.resource([9, 10, 0], name="resource_4")],
[
dlt.resource([6, 7, 8], name="resource_3"),
dlt.resource([9, 10, 0], name="resource_4"),
],
)

p = dlt.pipeline(destination="dummy")
Expand All @@ -431,10 +455,15 @@ def i_fail():
s3 = DltSource(
dlt.Schema("default_3"),
"module",
[dlt.resource([1, 2, 3], name="resource_1"), dlt.resource([3, 4, 5], name="resource_2")],
[
dlt.resource([1, 2, 3], name="resource_1"),
dlt.resource([3, 4, 5], name="resource_2"),
],
)
s4 = DltSource(
dlt.Schema("default_4"), "module", [dlt.resource([6, 7, 8], name="resource_3"), i_fail]
dlt.Schema("default_4"),
"module",
[dlt.resource([6, 7, 8], name="resource_3"), i_fail],
)

with pytest.raises(PipelineStepFailed):
Expand Down Expand Up @@ -724,7 +753,10 @@ def data_schema_3():
p = dlt.pipeline(pipeline_name=pipeline_name, destination="dummy")

with pytest.raises(PipelineStepFailed):
p.run([data_schema_1(), data_schema_2(), data_schema_3()], write_disposition="replace")
p.run(
[data_schema_1(), data_schema_2(), data_schema_3()],
write_disposition="replace",
)

# first run didn't really happen
assert p.first_run is True
Expand Down Expand Up @@ -859,7 +891,9 @@ def fail_extract():
retry_count = 2
with pytest.raises(PipelineStepFailed) as py_ex:
for attempt in Retrying(
stop=stop_after_attempt(3), retry=retry_if_exception(retry_load(())), reraise=True
stop=stop_after_attempt(3),
retry=retry_if_exception(retry_load(())),
reraise=True,
):
with attempt:
p.run(fail_extract())
Expand Down Expand Up @@ -958,7 +992,9 @@ def github_repo_events_table_meta(page):
def _get_shuffled_events(repeat: int = 1):
for _ in range(repeat):
with open(
"tests/normalize/cases/github.events.load_page_1_duck.json", "r", encoding="utf-8"
"tests/normalize/cases/github.events.load_page_1_duck.json",
"r",
encoding="utf-8",
) as f:
issues = json.load(f)
yield issues
Expand Down Expand Up @@ -1135,7 +1171,8 @@ def test_pipeline_log_progress() -> None:

# will attach dlt logger
p = dlt.pipeline(
destination="dummy", progress=dlt.progress.log(0.5, logger=None, log_level=logging.WARNING)
destination="dummy",
progress=dlt.progress.log(0.5, logger=None, log_level=logging.WARNING),
)
# collector was created before pipeline so logger is not attached
assert cast(LogCollector, p.collector).logger is None
Expand Down Expand Up @@ -1217,7 +1254,12 @@ def writes_state():
def test_extract_add_tables() -> None:
# we extract and make sure that tables are added to schema
s = airtable_emojis()
assert list(s.resources.keys()) == ["💰Budget", "📆 Schedule", "🦚Peacock", "🦚WidePeacock"]
assert list(s.resources.keys()) == [
"💰Budget",
"📆 Schedule",
"🦚Peacock",
"🦚WidePeacock",
]
assert s.resources["🦚Peacock"].compute_table_schema()["resource"] == "🦚Peacock"
# only name will be normalized
assert s.resources["🦚Peacock"].compute_table_schema()["name"] == "🦚Peacock"
Expand Down Expand Up @@ -1631,7 +1673,11 @@ def test_pipeline_list_packages() -> None:
assert len(load_ids) == 1
# two new packages: for emojis schema and emojis_2
pipeline.extract(
[airtable_emojis(), airtable_emojis(), airtable_emojis().clone(with_name="emojis_2")]
[
airtable_emojis(),
airtable_emojis(),
airtable_emojis().clone(with_name="emojis_2"),
]
)
load_ids = pipeline.list_extracted_load_packages()
assert len(load_ids) == 3
Expand Down Expand Up @@ -1711,7 +1757,9 @@ def test_parallel_pipelines_threads(workers: int) -> None:
os.environ["PIPELINE_1__EXTRA"] = "CFG_P_1"
os.environ["PIPELINE_2__EXTRA"] = "CFG_P_2"

def _run_pipeline(pipeline_name: str) -> Tuple[LoadInfo, PipelineContext, DictStrAny]:
def _run_pipeline(
pipeline_name: str,
) -> Tuple[LoadInfo, PipelineContext, DictStrAny]:
try:

@dlt.transformer(
Expand Down Expand Up @@ -1999,7 +2047,8 @@ def users_source():
@dlt.source
def taxi_demand_source():
@dlt.resource(
primary_key="city", columns=[{"name": "id", "data_type": "bigint", "precision": 4}]
primary_key="city",
columns=[{"name": "id", "data_type": "bigint", "precision": 4}],
)
def locations(idx=dlt.sources.incremental("id")):
for idx in range(10):
Expand Down Expand Up @@ -2230,3 +2279,33 @@ def stateful_resource():
assert len(fs_client.list_table_files("_dlt_loads")) == 2
assert len(fs_client.list_table_files("_dlt_version")) == 1
assert len(fs_client.list_table_files("_dlt_pipeline_state")) == 1


def test_pipeline_with_frozen_schema_contract() -> None:
"""Pipelines with schema_contract=freeze should create _dlt_* tables"""

pipeline = dlt.pipeline(
pipeline_name="frozen_schema_contract",
destination="duckdb",
)

data = [
{"id": 101, "name": "sub item 101"},
{"id": 101, "name": "sub item 102"},
]

pipeline.run(
data,
table_name="test_items",
schema_contract="freeze",
)

# see if we have athena tables with items
table_counts = load_table_counts(
pipeline, *[t["name"] for t in pipeline.default_schema._schema_tables.values()]
)

assert len(table_counts) == 3
assert table_counts["_dlt_loads"] == 1
assert table_counts["_dlt_version"] == 1
assert table_counts["_dlt_pipeline_state"] == 1

0 comments on commit cce66c7

Please sign in to comment.