diff --git a/.github/workflows/get_docs_changes.yml b/.github/workflows/get_docs_changes.yml index 3c68b83420..d0fd936e00 100644 --- a/.github/workflows/get_docs_changes.yml +++ b/.github/workflows/get_docs_changes.yml @@ -8,7 +8,7 @@ on: value: ${{ jobs.get_docs_changes.outputs.changes_outside_docs }} env: - EXCLUDED_FILE_PATTERNS: '^docs/|^README.md|^LICENSE\.txt|\.editorconfig|\.gitignore' + EXCLUDED_FILE_PATTERNS: '^docs/|^README.md|^CONTRIBUTING.md|^LICENSE\.txt|\.editorconfig|\.gitignore|get_docs_changes.yml' jobs: diff --git a/CONTRIBUTING.md b/CONTRIBUTING.md index 27156461de..afd0a00d4a 100644 --- a/CONTRIBUTING.md +++ b/CONTRIBUTING.md @@ -38,12 +38,12 @@ When you're ready to contribute, follow these steps: We use **devel** (which is our default Github branch) to prepare a next release of `dlt`. We accept all regular contributions there (including most of the bugfixes). -We use **master** branch for hot fixes (including documentation) that needs to be released out of normal schedule. +We use **master** branch for hot fixes (including documentation) that needs to be released out of the normal schedule. On the release day, **devel** branch is merged into **master**. All releases of `dlt` happen only from the **master**. ### Submitting a hotfix -We'll fix critical bugs and release `dlt` our of the schedule. Follow the regular procedure, but make your PR against **master** branch. Please ping us on Slack if you do it. +We'll fix critical bugs and release `dlt` out of the schedule. Follow the regular procedure, but make your PR against **master** branch. Please ping us on Slack if you do it. ### Testing with Github Actions We enable our CI to run tests for contributions from forks. All the tests are run, but not all destinations are available due to credentials. Currently @@ -71,7 +71,7 @@ To test local destinations (`duckdb` and `postgres`), run `make test-load-local` ### External Destinations -To test external destinations use `make test`. You will need following external resources +To test external destinations use `make test`. You will need the following external resources 1. `BigQuery` project 2. `Redshift` cluster @@ -95,7 +95,7 @@ This section is intended for project maintainers who have the necessary permissi Please read how we [version the library](README.md#adding-as-dependency) first. -The source of truth of the current version is is `pyproject.toml`, and we use `poetry` to manage it. +The source of truth for the current version is `pyproject.toml`, and we use `poetry` to manage it. ### Regular release @@ -104,14 +104,14 @@ Before publishing a new release, make sure to bump the project's version accordi 1. Check out the **devel** branch. 2. Use `poetry version patch` to increase the **patch** version 3. Run `make build-library` to apply the changes to the project. -4. Create a new branch, and submit the PR to **devel**. Go through standard process to merge it. +4. Create a new branch, and submit the PR to **devel**. Go through the standard process to merge it. 5. Create a merge PR from `devel` to `master` and merge it with a merge commit. ### Hotfix release 1. Check out the **master** branch 2. Use `poetry version patch` to increase the **patch** version 3. Run `make build-library` to apply the changes to the project. -4. Create a new branch, and submit the PR to **master** and merge it. +4. Create a new branch, submit the PR to **master** and merge it. ### Pre-release Occasionally we may release an alpha version directly from the **branch**. diff --git a/docs/website/blog/2024-02-21-pipelines-single-pane-of-glass.md b/docs/website/blog/2024-02-21-pipelines-single-pane-of-glass.md new file mode 100644 index 0000000000..553284bc6f --- /dev/null +++ b/docs/website/blog/2024-02-21-pipelines-single-pane-of-glass.md @@ -0,0 +1,84 @@ +--- +slug: single-pane-glass +title: "Single pane of glass for pipelines running on various orchestrators" +image: https://storage.googleapis.com/dlt-blog-images/single-pane-glass.png +authors: + name: Adrian Brudaru + title: Open source Data Engineer + url: https://github.com/adrianbr + image_url: https://avatars.githubusercontent.com/u/5762770?v=4 +tags: [data observability, data pipeline observability] +--- + +# The challenge in discussion + +In large organisations, there are often many data teams that serve different departments. These data teams usually cannot agree where to run their infrastructure, and everyone ends up doing something else. For example: + +- 40 generated GCP projects with various services used on each +- Native AWS services under no particular orchestrator +- That on-prem machine that’s the only gateway to some strange corporate data +- and of course that SaaS orchestrator from the marketing team +- together with the event tracking lambdas from product +- don’t forget the notebooks someone scheduled + +So, what’s going on? Where is the data flowing? what data is it? + +# The case at hand + +At dltHub, we are data people, and use data in our daily work. + +One of our sources is our community slack, which we use in 2 ways: + +1. We are on free tier Slack, where messages expire quickly. We refer to them in our github issues and plan to use the technical answers for training our GPT helper. For these purposes, we archive the conversations daily. We run this pipeline on github actions ([docs](https://dlthub.com/docs/walkthroughs/deploy-a-pipeline/deploy-with-github-actions)) which is a serverless runner that does not have a short time limit like cloud functions. +2. We measure the growth rate of the dlt community - for this, it helps to understand when people join Slack. Because we are on free tier, we cannot request this information from the API, but can capture the event via a webhook. This runs serverless on cloud functions, set up as in this [documentation](https://dlthub.com/docs/walkthroughs/deploy-a-pipeline/deploy-gcp-cloud-function-as-webhook). + +So already we have 2 different serverless run environments, each with their own “run reporting”. + +Not fun to manage. So how do we achieve a single pane of glass? + +### Alerts are better than monitoring + +Since “checking” things can be tedious, we rather forget about it and be notified. For this, we can use slack to send messages. Docs [here](https://dlthub.com/docs/running-in-production/running#using-slack-to-send-messages). + +Here’s a gist of how to use it + +```python +from dlt.common.runtime.slack import send_slack_message + +def run_pipeline_and_notify(pipeline, data): + try: + load_info = pipeline.run(data) + except Exception as e: + send_slack_message( + pipeline.runtime_config.slack_incoming_hook, + f"Pipeline {pipeline.pipeline_name} failed! \n Error: {str(e)}") + raise +``` + +### Monitoring load metrics is cheaper than scanning entire data sets + +As for monitoring, we could always run some queries to count the amount of loaded rows ad hoc - but this would scan a lot of data and cost significantly on larger volumes. + +A better way would be to leverage runtime metrics collected by the pipeline such as row counts. You can find docs on how to do that [here](https://dlthub.com/docs/running-in-production/monitoring#data-monitoring). + +### If we care, governance is doable too + +Now, not everything needs to be governed. But for the slack pipelines we want to tag which columns have personally identifiable information, so we can delete that information and stay compliant. + +One simple way to stay compliant is to annotate your raw data schema and use views for the transformed data, so if you delete the data at source, it’s gone everywhere. + +If you are materialising your transformed tables, you would need to have column level lineage in the transform layer to facilitate the documentation and deletion of the data. [Here’s](https://dlthub.com/docs/blog/dlt-lineage-support) a write up of how to capture that info. There are also other ways to grab a schema and annotate it, read more [here](https://dlthub.com/docs/general-usage/schema). + +# In conclusion + +There are many reasons why you’d end up running pipelines in different places, from organisational disagreements, to skillset differences, or simply technical restrictions. + +Having a single pane of glass is not just beneficial but essential for operational coherence. + +While solutions exist for different parts of this problem, the data collection still needs to be standardised and supported across different locations. + +By using a tool like dlt, standardisation is introduced with ingestion, enabling cross-orchestrator observability and monitoring. + +### Want to discuss? + +[Join our slack community](https://dlthub.com/community) to take part in the conversation. \ No newline at end of file diff --git a/docs/website/docs/dlt-ecosystem/verified-sources/google_sheets.md b/docs/website/docs/dlt-ecosystem/verified-sources/google_sheets.md index 830ba909ee..2a5d4b03ab 100644 --- a/docs/website/docs/dlt-ecosystem/verified-sources/google_sheets.md +++ b/docs/website/docs/dlt-ecosystem/verified-sources/google_sheets.md @@ -68,8 +68,6 @@ You need to create a GCP service account to get API credentials if you don't hav You need to create a GCP account to get OAuth credentials if you don't have one. To create one, follow these steps: -1. Ensure your email used for the GCP account has access to the GA4 property. - 1. Open a GCP project in your GCP account. 1. Enable the Sheets API in the project. @@ -305,7 +303,7 @@ For more information, read the [General Usage: Credentials.](../../general-usage 1. You're now ready to run the pipeline! To get started, run the following command: ```bash - python3 google_sheets_pipeline.py + python google_sheets_pipeline.py ``` 1. Once the pipeline has finished running, you can verify that everything loaded correctly by using @@ -322,7 +320,46 @@ For more information, read the guide on [how to run a pipeline](../../walkthroug ## Data types -The `dlt` normalizer uses the first row of data to infer types and attempts to coerce subsequent rows, creating variant columns if unsuccessful. This is standard behavior. It also recognizes date and time types using additional metadata from the first row. +The `dlt` normalizer uses the first row of data to infer types and attempts to coerce subsequent rows, creating variant columns if unsuccessful. This is standard behavior. +If `dlt` did not correctly determine the data type in the column, or you want to change the data type for other reasons, +then you can provide a type hint for the affected column in the resource. +Also, since recently `dlt`'s no longer recognizing date and time types, so you have to designate it yourself as `timestamp`. + +Use the `apply_hints` method on the resource to achieve this. +Here's how you can do it: + +```python +for resource in resources: + resource.apply_hints(columns={ + "total_amount": {"data_type": "double"}, + "date": {"data_type": "timestamp"}, + }) +``` +In this example, the `total_amount` column is enforced to be of type double and `date` is enforced to be of type timestamp. +This will ensure that all values in the `total_amount` column are treated as `double`, regardless of whether they are integers or decimals in the original Google Sheets data. +And `date` column will be represented as dates, not integers. + +For a single resource (e.g. `Sheet1`), you can simply use: +```python +source.Sheet1.apply_hints(columns={ + "total_amount": {"data_type": "double"}, + "date": {"data_type": "timestamp"}, +}) +``` + +To get the name of resources, you can use: +```python +print(source.resources.keys()) +``` + +To read more about tables, columns, and datatypes, please refer to [our documentation here.](../../general-usage/schema#tables-and-columns) + +:::caution +`dlt` will **not modify** tables after they are created. +So if you changed data types with hints, +then you need to **delete the dataset** +or set `full_refresh=True`. +::: ## Sources and resources diff --git a/docs/website/docs/dlt-ecosystem/verified-sources/sql_database.md b/docs/website/docs/dlt-ecosystem/verified-sources/sql_database.md index dcd816f3a0..3f0532e9d2 100644 --- a/docs/website/docs/dlt-ecosystem/verified-sources/sql_database.md +++ b/docs/website/docs/dlt-ecosystem/verified-sources/sql_database.md @@ -255,6 +255,79 @@ def sql_table( `write_disposition`: Can be "merge", "replace", or "append". +## Incremental Loading +Efficient data management often requires loading only new or updated data from your SQL databases, rather than reprocessing the entire dataset. This is where incremental loading comes into play. + +Incremental loading uses a cursor column (e.g., timestamp or auto-incrementing ID) to load only data newer than a specified initial value, enhancing efficiency by reducing processing time and resource use. + + +### Configuring Incremental Loading +1. **Choose a Cursor Column**: Identify a column in your SQL table that can serve as a reliable indicator of new or updated rows. Common choices include timestamp columns or auto-incrementing IDs. +1. **Set an Initial Value**: Choose a starting value for the cursor to begin loading data. This could be a specific timestamp or ID from which you wish to start loading data. +1. **Apply Incremental Configuration**: Enable incremental loading with your configuration's `incremental` argument. +1. **Deduplication**: When using incremental loading, the system automatically handles the deduplication of rows based on the primary key (if available) or row hash for tables without a primary key. + +:::note +Incorporating incremental loading into your SQL data pipelines can significantly enhance performance by minimizing unnecessary data processing and transfer. +::: + +#### Incremental Loading Example +1. Consider a table with a `last_modified` timestamp column. By setting this column as your cursor and specifying an + initial value, the loader generates a SQL query filtering rows with `last_modified` values greater than the specified initial value. + + ```python + from sql_database import sql_table + from datetime import datetime + + # Example: Incrementally loading a table based on a timestamp column + table = sql_table( + table='your_table_name', + incremental=dlt.sources.incremental( + 'last_modified', # Cursor column name + initial_value=datetime(2024, 1, 1) # Initial cursor value + ) + ) + + info = pipeline.extract(table, write_disposition="merge") + print(info) + ``` + +1. To incrementally load the "family" table using the sql_database source method: + + ```python + source = sql_database().with_resources("family") + #using the "updated" field as an incremental field using initial value of January 1, 2022, at midnight + source.family.apply_hints(incremental=dlt.sources.incremental("updated"),initial_value=pendulum.DateTime(2022, 1, 1, 0, 0, 0)) + #running the pipeline + info = pipeline.run(source, write_disposition="merge") + print(info) + ``` + In this example, we load data from the `family` table, using the `updated` column for incremental loading. In the first run, the process loads all data starting from midnight (00:00:00) on January 1, 2022. Subsequent runs perform incremental loading, guided by the values in the `updated` field. + +1. To incrementally load the "family" table using the 'sql_table' resource. + + ```python + family = sql_table( + table="family", + incremental=dlt.sources.incremental( + "updated", initial_value=pendulum.datetime(2022, 1, 1, 0, 0, 0) + ), + ) + # Running the pipeline + info = pipeline.extract(family, write_disposition="merge") + print(info) + ``` + + This process initially loads all data from the `family` table starting at midnight on January 1, 2022. For later runs, it uses the `updated` field for incremental loading as well. + + :::info + * For merge write disposition, the source table needs a primary key, which `dlt` automatically sets up. + * `apply_hints` is a powerful method that enables schema modifications after resource creation, like adjusting write disposition and primary keys. You can choose from various tables and use `apply_hints` multiple times to create pipelines with merged, appendend, or replaced resources. + ::: + +### Troubleshooting +If you encounter issues where the expected WHERE clause for incremental loading is not generated, ensure your configuration aligns with the `sql_table` resource rather than applying hints post-resource creation. This ensures the loader generates the correct query for incremental loading. + ## Customization ### Create your own pipeline @@ -342,39 +415,6 @@ To create your own pipeline, use source and resource methods from this verified print(info) ``` -1. To incrementally load the "family" table using the sql_database source method: - - ```python - source = sql_database().with_resources("family") - #using the "updated" field as an incremental field using initial value of January 1, 2022, at midnight - source.family.apply_hints(incremental=dlt.sources.incremental("updated"),initial_value=pendulum.DateTime(2022, 1, 1, 0, 0, 0)) - #running the pipeline - info = pipeline.run(source, write_disposition="merge") - print(info) - ``` - In this example, we load data from the `family` table, using the `updated` column for incremental loading. In the first run, the process loads all data starting from midnight (00:00:00) on January 1, 2022. Subsequent runs perform incremental loading, guided by the values in the `updated` field. - -1. To incrementally load the "family" table using the 'sql_table' resource. - - ```python - family = sql_table( - table="family", - incremental=dlt.sources.incremental( - "updated", initial_value=pendulum.datetime(2022, 1, 1, 0, 0, 0) - ), - ) - # Running the pipeline - info = pipeline.extract(family, write_disposition="merge") - print(info) - ``` - - This process initially loads all data from the `family` table starting at midnight on January 1, 2022. For later runs, it uses the `updated` field for incremental loading as well. - - :::info - * For merge write disposition, the source table needs a primary key, which `dlt` automatically sets up. - * `apply_hints` is a powerful method that enables schema modifications after resource creation, like adjusting write disposition and primary keys. You can choose from various tables and use `apply_hints` multiple times to create pipelines with merged, appendend, or replaced resources. - ::: - 1. Remember to keep the pipeline name and destination dataset name consistent. The pipeline name is crucial for retrieving the [state](https://dlthub.com/docs/general-usage/state) from the last run, which is essential for incremental loading. Altering these names could initiate a "[full_refresh](https://dlthub.com/docs/general-usage/pipeline#do-experiments-with-full-refresh)", interfering with the metadata tracking necessary for [incremental loads](https://dlthub.com/docs/general-usage/incremental-loading). diff --git a/docs/website/docs/running-in-production/monitoring.md b/docs/website/docs/running-in-production/monitoring.md index c9b427fd4e..8532bac36b 100644 --- a/docs/website/docs/running-in-production/monitoring.md +++ b/docs/website/docs/running-in-production/monitoring.md @@ -60,3 +60,60 @@ charts and time-series charts that provide a baseline or a pattern that a person For example, to monitor data loading, consider plotting "count of records by `loaded_at` date/hour", "created at", "modified at", or other recency markers. + +### Rows count +To find the number of rows loaded per table, use the following command: + +```shell +dlt pipeline trace +``` + +This command will display the names of the tables that were loaded and the number of rows in each table. +The above command provides the row count for the Chess source. As shown below: + +```shell +Step normalize COMPLETED in 2.37 seconds. +Normalized data for the following tables: +- _dlt_pipeline_state: 1 row(s) +- payments: 1329 row(s) +- tickets: 1492 row(s) +- orders: 2940 row(s) +- shipment: 2382 row(s) +- retailers: 1342 row(s) +``` + +To load these info back to the destination you can use the following: +```python +# Create a pipeline with the specified name, destination, and dataset +# Run the pipeline + +# Get the trace of the last run of the pipeline +# The trace contains timing information on extract, normalize, and load steps +trace = pipeline.last_trace + +# Load the trace information into a table named "_trace" in the destination +pipeline.run([trace], table_name="_trace") +``` +This process loads several additional tables to the destination, which provide insights into +the extract, normalize, and load steps. Information on the number of rows loaded for each table, +along with the `load_id`, can be found in the `_trace__steps__extract_info__table_metrics` table. +The `load_id` is an epoch timestamp that indicates when the loading was completed. Here's graphical +representation of the rows loaded with `load_id` for different tables: + +![image](https://storage.googleapis.com/dlt-blog-images/docs_monitoring_count_of_rows_vs_load_id.jpg) + +### Data load time +Data loading time for each table can be obtained by using the following command: + +```shell +dlt pipeline load-package +``` + +The above information can also be obtained from the script as follows: + +```python +info = pipeline.run(source, table_name="table_name", write_disposition='append') + +print(info.load_packages[0]) +``` +> `load_packages[0]` will print the information of the first load package in the list of load packages. \ No newline at end of file diff --git a/docs/website/docs/tutorial/grouping-resources.md b/docs/website/docs/tutorial/grouping-resources.md index 164063b20e..a54ba97fe3 100644 --- a/docs/website/docs/tutorial/grouping-resources.md +++ b/docs/website/docs/tutorial/grouping-resources.md @@ -43,7 +43,7 @@ def github_source(): return [get_issues, get_comments] ``` -`github_source()` groups resources into a [source](../general-usage/source). A dlt source is a logical gropping of resources. You use it to group resources that belong together, for example, to load data from the same API. Loading data from a source can be run in a single pipeline. Here's how our updated script looks like: +`github_source()` groups resources into a [source](../general-usage/source). A dlt source is a logical grouping of resources. You use it to group resources that belong together, for example, to load data from the same API. Loading data from a source can be run in a single pipeline. Here's what our updated script looks like: ```py import dlt @@ -115,7 +115,7 @@ print(load_info) ### Dynamic resources -You've noticed that there's a lot of code duplication in the `get_issues` and `get_comments` functions. We can reduce that by extracting the common fetching code into a separate function and use it in both resources. Even better, we can use `dlt.resource` as a function and pass it the `fetch_github_data()` generator function directly. Here's refactored code: +You've noticed that there's a lot of code duplication in the `get_issues` and `get_comments` functions. We can reduce that by extracting the common fetching code into a separate function and use it in both resources. Even better, we can use `dlt.resource` as a function and pass it the `fetch_github_data()` generator function directly. Here's the refactored code: ```python import dlt @@ -194,7 +194,7 @@ def github_source(access_token): ... ``` -Here, we added `access_token` parameter and now we can used it to pass the access token token to the request: +Here, we added `access_token` parameter and now we can use it to pass the access token to the request: ```python load_info = pipeline.run(github_source(access_token="ghp_XXXXX")) @@ -212,7 +212,7 @@ def github_source( ... ``` -When you add `dlt.secrets.value` as a default value for an argument, `dlt` will try to load and inject this value from a different configuration sources in the following order: +When you add `dlt.secrets.value` as a default value for an argument, `dlt` will try to load and inject this value from different configuration sources in the following order: 1. Special environment variables. 2. `secrets.toml` file. @@ -276,7 +276,7 @@ load_info = pipeline.run(github_source()) ## Configurable sources -The next step is to make our dlt GitHub source reusable so it is able to load data from any GitHub repo. We'll do that by changing both `github_source()` and `fetch_github_data()` functions to accept the repo name as a parameter: +The next step is to make our dlt GitHub source reusable so it can load data from any GitHub repo. We'll do that by changing both `github_source()` and `fetch_github_data()` functions to accept the repo name as a parameter: ```python import dlt @@ -336,11 +336,11 @@ That's it! Now you have a reusable source that can load data from any GitHub rep ## What’s next -Congratulations on completing the tutorial! You've come a long way since the [getting started](../getting-started) guide. By now, you've mastered loading data from various GitHub API endpoints, organizing resources into sources, managing secrets securely, and creatig reusable sources. You can use these skills to build your own pipelines and load data from any source. +Congratulations on completing the tutorial! You've come a long way since the [getting started](../getting-started) guide. By now, you've mastered loading data from various GitHub API endpoints, organizing resources into sources, managing secrets securely, and creating reusable sources. You can use these skills to build your own pipelines and load data from any source. Interested in learning more? Here are some suggestions: 1. You've been running your pipelines locally. Learn how to [deploy and run them in the cloud](../walkthroughs/deploy-a-pipeline/). -2. Dive deeper into how dlts works by reading the [Using dlt](../general-usage) section. Some highlights: +2. Dive deeper into how dlt works by reading the [Using dlt](../general-usage) section. Some highlights: - [Connect the transformers to the resources](../general-usage/resource#feeding-data-from-one-resource-into-another) to load additional data or enrich it. - [Create your resources dynamically from data](../general-usage/source#create-resources-dynamically). - [Transform your data before loading](../general-usage/resource#customize-resources) and see some [examples of customizations like column renames and anonymization](../general-usage/customising-pipelines/renaming_columns). @@ -348,4 +348,4 @@ Interested in learning more? Here are some suggestions: - [Run in production: inspecting, tracing, retry policies and cleaning up](../running-in-production/running). - [Run resources in parallel, optimize buffers and local storage](../reference/performance.md) 3. Check out our [how-to guides](../walkthroughs) to get answers to some common questions. -4. Explore [Examples](../examples) section to see how dlt can be used in real-world scenarios \ No newline at end of file +4. Explore the [Examples](../examples) section to see how dlt can be used in real-world scenarios