Skip to content

Commit

Permalink
add tests for pipeline methods
Browse files Browse the repository at this point in the history
  • Loading branch information
jorritsandbrink committed Aug 5, 2024
1 parent def563c commit 085818f
Showing 1 changed file with 50 additions and 0 deletions.
50 changes: 50 additions & 0 deletions tests/pipeline/test_pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
from dlt.common.typing import DictStrAny
from dlt.common.utils import uniq_id
from dlt.common.schema import Schema
from dlt.common.storages.exceptions import LoadPackageNotFound

from dlt.destinations import filesystem, redshift, dummy
from dlt.destinations.impl.filesystem.filesystem import INIT_FILE_NAME
Expand Down Expand Up @@ -2637,6 +2638,55 @@ def comments(user_id: str):
assert pipeline.last_trace.last_normalize_info.row_counts["user_comments"] == 3


def test_get_last_completed_load_id() -> None:
os.environ["COMPLETED_PROB"] = "1.0"
pipeline = dlt.pipeline(destination="dummy")

pipeline.extract([{"foo": "bar"}], table_name="foo")
assert pipeline.get_last_completed_load_id() is None
pipeline.normalize()
assert pipeline.get_last_completed_load_id() is None
pipeline.load()
first_load_id = pipeline.get_last_completed_load_id()
assert first_load_id is not None

pipeline.run([{"foo": "bar"}], table_name="foo")
second_load_id = pipeline.get_last_completed_load_id()
assert second_load_id is not None
assert second_load_id != first_load_id


def test_list_loaded_tables_names() -> None:
os.environ["COMPLETED_PROB"] = "1.0"
os.environ["RESTORE_FROM_DESTINATION"] = "false" # don't load _dlt_pipeline_state table
pipeline = dlt.pipeline(destination="dummy")

pipeline.run([{"foo": "bar"}], table_name="foo")
load_id = pipeline.get_last_completed_load_id()
assert pipeline.list_loaded_tables_names(load_id) == ["foo"]

pipeline.run([{"foo": "bar"}], table_name="bar")
load_id = pipeline.get_last_completed_load_id()
assert pipeline.list_loaded_tables_names(load_id) == ["bar"]

@dlt.resource(file_format="jsonl")
def foo():
yield {"foo": "bar"}

@dlt.resource(file_format="jsonl")
def bar():
yield {"foo": "bar"}

pipeline.run([foo(), bar()])
load_id = pipeline.get_last_completed_load_id()
assert set(pipeline.list_loaded_tables_names(load_id)) == {"foo", "bar"}
assert set(pipeline.list_loaded_tables_names(load_id, file_format="jsonl")) == {"foo", "bar"}
assert pipeline.list_loaded_tables_names(load_id, file_format="parquet") == []

with pytest.raises(LoadPackageNotFound):
pipeline.list_loaded_tables_names("not_a_load_id")


def assert_imported_file(
pipeline: Pipeline,
table_name: str,
Expand Down

0 comments on commit 085818f

Please sign in to comment.