Skip to content

Commit

Permalink
adds scd2 tests
Browse files Browse the repository at this point in the history
  • Loading branch information
rudolfix committed Sep 16, 2024
1 parent 7a7b2d2 commit 5045631
Show file tree
Hide file tree
Showing 3 changed files with 101 additions and 12 deletions.
19 changes: 9 additions & 10 deletions docs/website/docs/general-usage/destination-tables.md
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ connecting to the database directly.

:::

## Nested and root tables
## Nested tables

Now let's look at a more complex example:

Expand Down Expand Up @@ -117,7 +117,7 @@ pipeline = dlt.pipeline(
load_info = pipeline.run(data, table_name="users")
```

Running this pipeline will create two tables in the destination, `users` (**root table**) and `users__pets` (**nested table**). The `users` table will contain the top-level data, and the `users__pets` table will contain the data nested in the Python list. Here is what the tables may look like:
Running this pipeline will create two tables in the destination, `users` (**root table**) and `users__pets` (**nested table**). The `users` table will contain the top-level data, and the `users__pets` table will contain the data nested in the Python lists. Here is what the tables may look like:

**mydata.users**

Expand All @@ -134,18 +134,17 @@ Running this pipeline will create two tables in the destination, `users` (**root
| 2 | Spot | dog | 9uxh36VU9lqKpw | wX3f5vn801W16A | 1 |
| 3 | Fido | dog | pe3FVtCWz8VuNA | rX8ybgTeEmAmmA | 0 |

When creating a database schema, `dlt` maps the structure of Python objects (often representing JSON files) into relational tables,
creating and linking nested tables.
When inferring a database schema, `dlt` maps the structure of Python objects (ie. from parsed JSON files) into nested tables and creates
references between them.

This is how it works:

1. Each row in all (root and nested) data tables created by `dlt` contains a unique column named `_dlt_id`.
1. Each nested table contains column named `_dlt_parent_id` linking to a particular row (`_dlt_id`) of a parent table.
1. Rows in child tables come from the lists: `dlt` stores the position of each item in the list in `_dlt_list_idx`.
1. For tables that are loaded with the `merge` write disposition, we add a root key column `_dlt_root_id`, which links the child table to a row in the root table.


1. Each row in all (root and nested) data tables created by `dlt` contains a unique column named `_dlt_id` (**row key**).
1. Each nested table contains column named `_dlt_parent_id` referencing to a particular row (`_dlt_id`) of a parent table (**parent key**).
1. Rows in nested tables come from the Python lists: `dlt` stores the position of each item in the list in `_dlt_list_idx`.
1. For nested tables that are loaded with the `merge` write disposition, we add a **root key** column `_dlt_root_id`, which references the child table to a row in the root table.

[Learn more on nested references, row keys and parent keys](schema.md#nested-references-root-and-nested-tables)

## Naming convention: tables and columns

Expand Down
7 changes: 5 additions & 2 deletions tests/pipeline/cases/github_pipeline/github_scd2.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,10 @@ def load_issues():
if len(sys.argv) == 2:
delete_issues = [int(p) for p in sys.argv[1].split(",")]

p = dlt.pipeline("dlt_github_scd2", destination="duckdb", dataset_name="github_scd2")
def filter_issues(issue):
return issue["number"] not in delete_issues

p = dlt.pipeline("dlt_github_pipeline", destination="duckdb", dataset_name="github_3")
github_source = github()
info = p.run(github_source)
info = p.run(github_source.load_issues.add_filter(filter_issues))
print(info)
87 changes: 87 additions & 0 deletions tests/pipeline/test_dlt_versions.py
Original file line number Diff line number Diff line change
Expand Up @@ -396,3 +396,90 @@ def test_normalize_package_with_dlt_update(test_storage: FileStorage) -> None:
# now we can migrate the storage
pipeline.normalize()
assert pipeline._get_normalize_storage().version == "1.0.1"


def test_scd2_pipeline_update(test_storage: FileStorage) -> None:
shutil.copytree("tests/pipeline/cases/github_pipeline", TEST_STORAGE_ROOT, dirs_exist_ok=True)

# execute in test storage
with set_working_dir(TEST_STORAGE_ROOT):
# store dlt data in test storage (like patch_home_dir)
with custom_environ({DLT_DATA_DIR: get_dlt_data_dir()}):
# save database outside of pipeline dir
with custom_environ(
{"DESTINATION__DUCKDB__CREDENTIALS": "duckdb:///test_github_3.duckdb"}
):
# run scd2 pipeline on 0.4.10
venv_dir = tempfile.mkdtemp()
# venv_dir == "tmp/dlt0410"
with Venv.create(venv_dir, ["dlt[duckdb]==0.4.10"]) as venv:
venv._install_deps(venv.context, ["duckdb" + "==" + pkg_version("duckdb")])

print(venv.run_script("../tests/pipeline/cases/github_pipeline/github_scd2.py"))
# get data from original db
duckdb_cfg = resolve_configuration(
DuckDbClientConfiguration()._bind_dataset_name(dataset_name=GITHUB_DATASET),
sections=("destination", "duckdb"),
)
with DuckDbSqlClient(
GITHUB_DATASET,
"%s_staging",
duckdb_cfg.credentials,
duckdb().capabilities(),
) as client:
issues = client.execute_sql("SELECT * FROM issues ORDER BY id")
issues__assignees = client.execute_sql(
"SELECT * FROM issues__assignees ORDER BY id"
)
issues__labels = client.execute_sql(
"SELECT * FROM issues__labels ORDER BY id"
)

assert len(issues) == 100

venv = Venv.restore_current()
# load same data again
print(venv.run_script("../tests/pipeline/cases/github_pipeline/github_scd2.py"))
pipeline = dlt.attach(GITHUB_PIPELINE_NAME)
# unique on row_key got swapped from True to False
assert (
pipeline.default_schema.tables["issues"]["columns"]["_dlt_id"]["unique"]
is False
)
# datasets must be the same
with DuckDbSqlClient(
GITHUB_DATASET,
"%s_staging",
duckdb_cfg.credentials,
duckdb().capabilities(),
) as client:
issues_n = client.execute_sql("SELECT * FROM issues ORDER BY id")
issues__assignees_n = client.execute_sql(
"SELECT * FROM issues__assignees ORDER BY id"
)
issues__labels_n = client.execute_sql(
"SELECT * FROM issues__labels ORDER BY id"
)
assert issues == issues_n
assert issues__assignees == issues__assignees_n
assert issues__labels == issues__labels_n

# retire some ids
print(
venv.run_script(
"../tests/pipeline/cases/github_pipeline/github_scd2.py", "6272"
)
)
with DuckDbSqlClient(
GITHUB_DATASET,
"%s_staging",
duckdb_cfg.credentials,
duckdb().capabilities(),
) as client:
issues_retired = client.execute_sql(
"SELECT number FROM issues WHERE _dlt_valid_to IS NOT NULL"
)

assert len(issues_retired) == 1
assert issues_retired[0][0] == 6272
# print(pipeline.default_schema.to_pretty_yaml())

0 comments on commit 5045631

Please sign in to comment.