Skip to content

Commit

Permalink
Move refresh tests to load, add filesystem truncate test
Browse files Browse the repository at this point in the history
  • Loading branch information
steinitzu committed May 22, 2024
1 parent 61cbaf9 commit 9bb46a5
Showing 1 changed file with 87 additions and 49 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@

from tests.utils import clean_test_storage, preserve_environ
from tests.pipeline.utils import assert_load_info
from tests.load.utils import destinations_configs, DestinationTestConfiguration


def assert_source_state_is_wiped(state: DictStrAny) -> None:
Expand Down Expand Up @@ -94,16 +95,15 @@ def some_data_4():
yield some_data_4


def test_refresh_drop_sources():

# First run pipeline with load to destination so tables are created
pipeline = dlt.pipeline(
"refresh_full_test",
destination="duckdb",
refresh="drop_sources",
dataset_name="refresh_full_test",
)
@pytest.mark.parametrize(
"destination_config",
destinations_configs(default_sql_configs=True, subset=["duckdb"]),
ids=lambda x: x.name,
)
def test_refresh_drop_sources(destination_config: DestinationTestConfiguration):
pipeline = destination_config.setup_pipeline("refresh_full_test", refresh="drop_sources")

# First run pipeline so destination so tables are created
info = pipeline.run(refresh_source(first_run=True, drop_sources=True))
assert_load_info(info)

Expand Down Expand Up @@ -143,16 +143,16 @@ def test_refresh_drop_sources():
assert_source_state_is_wiped(destination_state["sources"]["refresh_source"])


def test_existing_schema_hash():
@pytest.mark.parametrize(
"destination_config",
destinations_configs(default_sql_configs=True, subset=["duckdb"]),
ids=lambda x: x.name,
)
def test_existing_schema_hash(destination_config: DestinationTestConfiguration):
"""Test when new schema is identical to a previously stored schema after dropping and re-creating tables.
The change should be detected regardless and tables are created again in destination db
"""
pipeline = dlt.pipeline(
"refresh_full_test",
destination="duckdb",
refresh="drop_sources",
dataset_name="refresh_full_test",
)
pipeline = destination_config.setup_pipeline("refresh_full_test", refresh="drop_sources")

info = pipeline.run(refresh_source(first_run=True, drop_sources=True))
assert_load_info(info)
Expand Down Expand Up @@ -184,14 +184,14 @@ def test_existing_schema_hash():
assert new_schema_hash == first_schema_hash


def test_refresh_drop_tables():
@pytest.mark.parametrize(
"destination_config",
destinations_configs(default_sql_configs=True, subset=["duckdb"]),
ids=lambda x: x.name,
)
def test_refresh_drop_tables(destination_config: DestinationTestConfiguration):
# First run pipeline with load to destination so tables are created
pipeline = dlt.pipeline(
"refresh_full_test",
destination="duckdb",
refresh="drop_resources",
dataset_name="refresh_full_test",
)
pipeline = destination_config.setup_pipeline("refresh_full_test", refresh="drop_tables")

info = pipeline.run(refresh_source(first_run=True))
assert_load_info(info)
Expand Down Expand Up @@ -235,15 +235,15 @@ def test_refresh_drop_tables():
assert not source_state["resources"]["some_data_1"]


def test_refresh_drop_data_only():
@pytest.mark.parametrize(
"destination_config",
destinations_configs(default_sql_configs=True, subset=["duckdb"]),
ids=lambda x: x.name,
)
def test_refresh_drop_data_only(destination_config: DestinationTestConfiguration):
"""Refresh drop_data should truncate all selected tables before load"""
# First run pipeline with load to destination so tables are created
pipeline = dlt.pipeline(
"refresh_full_test",
destination="duckdb",
refresh="drop_data",
dataset_name="refresh_full_test",
)
pipeline = destination_config.setup_pipeline("refresh_full_test", refresh="drop_data")

info = pipeline.run(refresh_source(first_run=True), write_disposition="append")
assert_load_info(info)
Expand Down Expand Up @@ -300,7 +300,12 @@ def test_refresh_drop_data_only():
assert source_state["resources"]["some_data_3"] == {"run1_1": "value1_1"}


def test_refresh_drop_sources_multiple_sources():
@pytest.mark.parametrize(
"destination_config",
destinations_configs(default_sql_configs=True, subset=["duckdb"]),
ids=lambda x: x.name,
)
def test_refresh_drop_sources_multiple_sources(destination_config: DestinationTestConfiguration):
"""
Ensure only state and tables for currently selected source is dropped
"""
Expand Down Expand Up @@ -344,12 +349,7 @@ def source_2_data_2():
yield source_2_data_1
yield source_2_data_2

pipeline = dlt.pipeline(
"refresh_full_test_2",
destination="duckdb",
refresh="drop_sources",
dataset_name="refresh_full_test",
)
pipeline = destination_config.setup_pipeline("refresh_full_test", refresh="drop_sources")

# Run both sources
info = pipeline.run(
Expand Down Expand Up @@ -390,12 +390,13 @@ def source_2_data_2():
result = client.execute_sql("SELECT * FROM source_2_data_2")


def test_refresh_argument_to_run():
pipeline = dlt.pipeline(
"refresh_full_test",
destination="duckdb",
dataset_name="refresh_full_test",
)
@pytest.mark.parametrize(
"destination_config",
destinations_configs(default_sql_configs=True, subset=["duckdb"]),
ids=lambda x: x.name,
)
def test_refresh_argument_to_run(destination_config: DestinationTestConfiguration):
pipeline = destination_config.setup_pipeline("refresh_full_test")

info = pipeline.run(refresh_source(first_run=True))
assert_load_info(info)
Expand All @@ -419,12 +420,13 @@ def test_refresh_argument_to_run():
assert tables == {"some_data_2", "some_data_3"}


def test_refresh_argument_to_extract():
pipeline = dlt.pipeline(
"refresh_full_test",
destination="duckdb",
dataset_name="refresh_full_test",
)
@pytest.mark.parametrize(
"destination_config",
destinations_configs(default_sql_configs=True, subset=["duckdb"]),
ids=lambda x: x.name,
)
def test_refresh_argument_to_extract(destination_config: DestinationTestConfiguration):
pipeline = destination_config.setup_pipeline("refresh_full_test")

info = pipeline.run(refresh_source(first_run=True))
assert_load_info(info)
Expand All @@ -443,3 +445,39 @@ def test_refresh_argument_to_extract():

tables = set(t["name"] for t in pipeline.default_schema.data_tables(include_incomplete=True))
assert tables == {"some_data_2", "some_data_3", "some_data_4"}


@pytest.mark.parametrize(
"destination_config", destinations_configs(local_filesystem_configs=True), ids=lambda x: x.name
)
def test_refresh_drop_sources_local_filesystem(destination_config: DestinationTestConfiguration):
pipeline = destination_config.setup_pipeline("refresh_full_test", refresh="drop_data")

info = pipeline.run(refresh_source(first_run=True, drop_sources=False))
assert_load_info(info)
load_1_id = info.loads_ids[0]

info = pipeline.run(
refresh_source(first_run=False, drop_sources=False).with_resources(
"some_data_1", "some_data_2"
)
)
assert_load_info(info)
load_2_id = info.loads_ids[0]

client = pipeline._fs_client()

# Only contains files from load 2
file_names = client.list_table_files("some_data_1")
assert len(file_names) == 1
assert load_2_id in file_names[0]

# Only contains files from load 2
file_names = client.list_table_files("some_data_2")
assert len(file_names) == 1
assert load_2_id in file_names[0]

# Nothing dropped, only file from load 1
file_names = client.list_table_files("some_data_3")
assert len(file_names) == 1
assert load_1_id in file_names[0]

0 comments on commit 9bb46a5

Please sign in to comment.