From 02f927453f7ae2bcf37131c4f52ac4b956b06a5b Mon Sep 17 00:00:00 2001 From: Anton Burnashev Date: Wed, 13 Sep 2023 21:51:14 +0300 Subject: [PATCH] Add staging and versioned datasets --- .../understanding-the-tables.md | 150 ++++++++++++++++-- 1 file changed, 135 insertions(+), 15 deletions(-) diff --git a/docs/website/docs/dlt-ecosystem/visualizations/understanding-the-tables.md b/docs/website/docs/dlt-ecosystem/visualizations/understanding-the-tables.md index 5bbd721162..b56688a755 100644 --- a/docs/website/docs/dlt-ecosystem/visualizations/understanding-the-tables.md +++ b/docs/website/docs/dlt-ecosystem/visualizations/understanding-the-tables.md @@ -37,16 +37,21 @@ will behave similarly and have similar concepts. ## Schema When you run the pipeline, dlt creates a schema in the destination database. The schema is a -collection of tables that represent the data you loaded. The schema name is the same as the +collection of tables that represent the data you loaded into the database. The schema name is the same as the `dataset_name` you provided in the pipeline definition. In the example above, we explicitly set the `dataset_name` to `mydata`, if you don't set it, it will be set to the pipeline name with a suffix `_dataset`. +Be aware that the schema referred to in this section is distinct from the [dlt Schema](../../general-usage/schema.md). +The database schema pertains to the structure and organization of data within the database, including table +definitions and relationships. On the other hand, the "dlt Schema" specifically refers to the format +and structure of normalized data within the dlt pipeline. + ## Tables Each [resource](../../general-usage/resource.md) in your pipeline definition will be represented by a table in -the destination. In the example above, we have one resource, `users`, so we will have one table, `users`, -in the destination. Here also, we explicitly set the `table_name` to `users`, if you don't set it, it will be -set to the resource name. +the destination. In the example above, we have one resource, `users`, so we will have one table, `mydata.users`, +in the destination. `mydata` is the schema name, and `users` is the table name. Here also, we explicitly set +the `table_name` to `users`, if you don't set it, it will be set to the resource name. For example, we can rewrite the pipeline above as: @@ -113,14 +118,14 @@ Running this pipeline will create two tables in the destination, `users` and `us `users` table will contain the top level data, and the `users__pets` table will contain the child data. Here is what the tables may look like: -**users** +**mydata.users** | id | name | _dlt_id | _dlt_load_id | | --- | --- | --- | --- | | 1 | Alice | wX3f5vn801W16A | 1234562350.98417 | | 2 | Bob | rX8ybgTeEmAmmA | 1234562350.98417 | -**users__pets** +**mydata.users__pets** | id | name | type | _dlt_id | _dlt_parent_id | _dlt_list_idx | | --- | --- | --- | --- | --- | --- | @@ -155,23 +160,55 @@ case the primary key or other unique columns are defined. During a pipeline run, dlt [normalizes both table and column names](../../general-usage/schema.md#naming-convention) to ensure compatibility with the destination database's accepted format. All names from your source data will be transformed into snake_case and will only include alphanumeric characters. Please be aware that the names in the destination database may differ somewhat from those in your original input. -## Load IDs +## Load Packages and Load IDs + +Each execution of the pipeline generates one or more load packages. A load package typically contains data retrieved from +all the [resources](../../general-usage/glossary.md#resource) of a particular [source](../../general-usage/glossary.md#source). +These packages are uniquely identified by a `load_id`. The `load_id` of a particular package is added to the top data tables +(referenced as `_dlt_load_id` column in the example above) and to the special `_dlt_loads` table with a status 0 +(when the load process is fully completed). + +To illustrate this, let's load more data into the same destination: + +```py +data = [ + { + 'id': 3, + 'name': 'Charlie', + 'pets': [] + }, +] +``` + +The rest of the pipeline definition remains the same. Running this pipeline will create a new load +package with a new `load_id` and add the data to the existing tables. The `users` table will now +look like this: + +**mydata.users** +| id | name | _dlt_id | _dlt_load_id | +| --- | --- | --- | --- | +| 1 | Alice | wX3f5vn801W16A | 1234562350.98417 | +| 2 | Bob | rX8ybgTeEmAmmA | 1234562350.98417 | +| 3 | Charlie | h8lehZEvT3fASQ | **1234563456.12345** | + +The `_dlt_loads` table will look like this: -Each pipeline run creates one or more load packages, which can be identified by their `load_id`. A load -package typically contains data from all [resources](../../general-usage/glossary.md#resource) of a -particular [source](../../general-usage/glossary.md#source). The `load_id` of a particular package -is added to the top data tables (`_dlt_load_id` column) and to the `_dlt_loads` table with a status 0 (when the load process -is fully completed). +**mydata._dlt_loads** + +| load_id | schema_name | status | inserted_at | schema_version_hash | +| --- | --- | --- | --- | --- | +| 1234562350.98417 | quick_start | 0 | 2023-09-12 16:45:51.17865+00 | aOEbMXCa6yHWbBM56qhLlx209rHoe35X1ZbnQekd/58= | +| **1234563456.12345** | quick_start | 0 | 2023-09-12 16:46:03.10662+00 | aOEbMXCa6yHWbBM56qhLlx209rHoe35X1ZbnQekd/58= | The `_dlt_loads` table tracks complete loads and allows chaining transformations on top of them. Many destinations do not support distributed and long-running transactions (e.g. Amazon Redshift). -In that case, the user may see the partially loaded data. It is possible to filter such data out—any -row with a `load_id` that does not exist in `_dlt_loads` is not yet completed. The same procedure may be used to delete and identify +In that case, the user may see the partially loaded data. It is possible to filter such data out: any +row with a `load_id` that does not exist in `_dlt_loads` is not yet completed. The same procedure may be used to identify and delete data for packages that never got completed. For each load, you can test and [alert](../../running-in-production/alerting.md) on anomalies (e.g. no data, too much loaded to a table). There are also some useful load stats in the `Load info` tab -of the [Streamlit app](understanding-the-tables.md#show-tables-and-data-in-the-destination) +of the [Streamlit app](exploring-the-data.md#exploring-the-data) mentioned above. You can add [transformations](../transformations) and chain them together @@ -192,3 +229,86 @@ You can [save](../../running-in-production/running.md#inspect-and-save-the-load- complete lineage info for a particular `load_id` including a list of loaded files, error messages (if any), elapsed times, schema changes. This can be helpful, for example, when troubleshooting problems. + +## Staging dataset + +So far we've been using the `append` write disposition in our example pipeline. This means that +each time we run the pipeline, the data is appended to the existing tables. When you use [the +merge write disposition](../../general-usage/incremental-loading.md), `dlt` creates a staging database schema for +staging data. This schema is named `_staging` and contains the same tables as the +destination schema. When you run the pipeline, the data from the staging tables is loaded into the +destination tables in a single atomic transaction. + +Let's illustrate this with an example. We change our pipeline to use the `merge` write disposition: + +```py +import dlt + +@dlt.resource(primary_key="id", write_disposition="merge") +def users(): + yield [ + {'id': 1, 'name': 'Alice 2'}, + {'id': 2, 'name': 'Bob 2'} + ] + +pipeline = dlt.pipeline( + pipeline_name='quick_start', + destination='duckdb', + dataset_name='mydata' +) + +load_info = pipeline.run(users) +``` + +Running this pipeline will create a schema in the destination database with the name `mydata_staging`. +If you inspect the tables in this schema, you will find `mydata_staging.users` table identical to the +`mydata.users` table in the previous example. + +Here is what the tables may look like after running the pipeline: + +**mydata_staging.users** +| id | name | _dlt_id | _dlt_load_id | +| --- | --- | --- | --- | +| 1 | Alice 2 | wX3f5vn801W16A | 2345672350.98417 | +| 2 | Bob 2 | rX8ybgTeEmAmmA | 2345672350.98417 | + +**mydata.users** +| id | name | _dlt_id | _dlt_load_id | +| --- | --- | --- | --- | +| 1 | Alice 2 | wX3f5vn801W16A | 2345672350.98417 | +| 2 | Bob 2 | rX8ybgTeEmAmmA | 2345672350.98417 | +| 3 | Charlie | h8lehZEvT3fASQ | 1234563456.12345 | + +Notice that the `mydata.users` table now contains the data from both the previous pipeline run and +the current one. + +## Versioned datasets + +When you use the `full_refresh` option, `dlt` creates a versioned dataset. This means that each +time you run the pipeline, the data is loaded into a new dataset (a new database schema). +The dataset name is the same as the `dataset_name` you provided in the pipeline definition with a +datetime-based suffix. + +We modify our pipeline to use the `full_refresh` option to see how this works: + +```py +import dlt + +data = [ + {'id': 1, 'name': 'Alice'}, + {'id': 2, 'name': 'Bob'} +] + +pipeline = dlt.pipeline( + pipeline_name='quick_start', + destination='duckdb', + dataset_name='mydata', + full_refresh=True # <-- add this line +) +load_info = pipeline.run(data, table_name="users") +``` + +Every time you run this pipeline, a new schema will be created in the destination database with a +datetime-based suffix. The data will be loaded into tables in this schema. +For example, the first time you run the pipeline, the schema will be named +`mydata_20230912064403`, the second time it will be named `mydata_20230912064407`, and so on.