Skip to content

Commit

Permalink
Merge branch 'master' into devel
Browse files Browse the repository at this point in the history
  • Loading branch information
rudolfix authored Feb 26, 2024
2 parents 1dc8bdd + 2704ed2 commit 5a46b09
Show file tree
Hide file tree
Showing 7 changed files with 270 additions and 52 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/get_docs_changes.yml
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ on:
value: ${{ jobs.get_docs_changes.outputs.changes_outside_docs }}

env:
EXCLUDED_FILE_PATTERNS: '^docs/|^README.md|^LICENSE\.txt|\.editorconfig|\.gitignore'
EXCLUDED_FILE_PATTERNS: '^docs/|^README.md|^CONTRIBUTING.md|^LICENSE\.txt|\.editorconfig|\.gitignore|get_docs_changes.yml'


jobs:
Expand Down
12 changes: 6 additions & 6 deletions CONTRIBUTING.md
Original file line number Diff line number Diff line change
Expand Up @@ -38,12 +38,12 @@ When you're ready to contribute, follow these steps:

We use **devel** (which is our default Github branch) to prepare a next release of `dlt`. We accept all regular contributions there (including most of the bugfixes).

We use **master** branch for hot fixes (including documentation) that needs to be released out of normal schedule.
We use **master** branch for hot fixes (including documentation) that needs to be released out of the normal schedule.

On the release day, **devel** branch is merged into **master**. All releases of `dlt` happen only from the **master**.

### Submitting a hotfix
We'll fix critical bugs and release `dlt` our of the schedule. Follow the regular procedure, but make your PR against **master** branch. Please ping us on Slack if you do it.
We'll fix critical bugs and release `dlt` out of the schedule. Follow the regular procedure, but make your PR against **master** branch. Please ping us on Slack if you do it.

### Testing with Github Actions
We enable our CI to run tests for contributions from forks. All the tests are run, but not all destinations are available due to credentials. Currently
Expand Down Expand Up @@ -71,7 +71,7 @@ To test local destinations (`duckdb` and `postgres`), run `make test-load-local`

### External Destinations

To test external destinations use `make test`. You will need following external resources
To test external destinations use `make test`. You will need the following external resources

1. `BigQuery` project
2. `Redshift` cluster
Expand All @@ -95,7 +95,7 @@ This section is intended for project maintainers who have the necessary permissi

Please read how we [version the library](README.md#adding-as-dependency) first.

The source of truth of the current version is is `pyproject.toml`, and we use `poetry` to manage it.
The source of truth for the current version is `pyproject.toml`, and we use `poetry` to manage it.

### Regular release

Expand All @@ -104,14 +104,14 @@ Before publishing a new release, make sure to bump the project's version accordi
1. Check out the **devel** branch.
2. Use `poetry version patch` to increase the **patch** version
3. Run `make build-library` to apply the changes to the project.
4. Create a new branch, and submit the PR to **devel**. Go through standard process to merge it.
4. Create a new branch, and submit the PR to **devel**. Go through the standard process to merge it.
5. Create a merge PR from `devel` to `master` and merge it with a merge commit.

### Hotfix release
1. Check out the **master** branch
2. Use `poetry version patch` to increase the **patch** version
3. Run `make build-library` to apply the changes to the project.
4. Create a new branch, and submit the PR to **master** and merge it.
4. Create a new branch, submit the PR to **master** and merge it.

### Pre-release
Occasionally we may release an alpha version directly from the **branch**.
Expand Down
84 changes: 84 additions & 0 deletions docs/website/blog/2024-02-21-pipelines-single-pane-of-glass.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,84 @@
---
slug: single-pane-glass
title: "Single pane of glass for pipelines running on various orchestrators"
image: https://storage.googleapis.com/dlt-blog-images/single-pane-glass.png
authors:
name: Adrian Brudaru
title: Open source Data Engineer
url: https://github.com/adrianbr
image_url: https://avatars.githubusercontent.com/u/5762770?v=4
tags: [data observability, data pipeline observability]
---

# The challenge in discussion

In large organisations, there are often many data teams that serve different departments. These data teams usually cannot agree where to run their infrastructure, and everyone ends up doing something else. For example:

- 40 generated GCP projects with various services used on each
- Native AWS services under no particular orchestrator
- That on-prem machine that’s the only gateway to some strange corporate data
- and of course that SaaS orchestrator from the marketing team
- together with the event tracking lambdas from product
- don’t forget the notebooks someone scheduled

So, what’s going on? Where is the data flowing? what data is it?

# The case at hand

At dltHub, we are data people, and use data in our daily work.

One of our sources is our community slack, which we use in 2 ways:

1. We are on free tier Slack, where messages expire quickly. We refer to them in our github issues and plan to use the technical answers for training our GPT helper. For these purposes, we archive the conversations daily. We run this pipeline on github actions ([docs](https://dlthub.com/docs/walkthroughs/deploy-a-pipeline/deploy-with-github-actions)) which is a serverless runner that does not have a short time limit like cloud functions.
2. We measure the growth rate of the dlt community - for this, it helps to understand when people join Slack. Because we are on free tier, we cannot request this information from the API, but can capture the event via a webhook. This runs serverless on cloud functions, set up as in this [documentation](https://dlthub.com/docs/walkthroughs/deploy-a-pipeline/deploy-gcp-cloud-function-as-webhook).

So already we have 2 different serverless run environments, each with their own “run reporting”.

Not fun to manage. So how do we achieve a single pane of glass?

### Alerts are better than monitoring

Since “checking” things can be tedious, we rather forget about it and be notified. For this, we can use slack to send messages. Docs [here](https://dlthub.com/docs/running-in-production/running#using-slack-to-send-messages).

Here’s a gist of how to use it

```python
from dlt.common.runtime.slack import send_slack_message

def run_pipeline_and_notify(pipeline, data):
try:
load_info = pipeline.run(data)
except Exception as e:
send_slack_message(
pipeline.runtime_config.slack_incoming_hook,
f"Pipeline {pipeline.pipeline_name} failed! \n Error: {str(e)}")
raise
```

### Monitoring load metrics is cheaper than scanning entire data sets

As for monitoring, we could always run some queries to count the amount of loaded rows ad hoc - but this would scan a lot of data and cost significantly on larger volumes.

A better way would be to leverage runtime metrics collected by the pipeline such as row counts. You can find docs on how to do that [here](https://dlthub.com/docs/running-in-production/monitoring#data-monitoring).

### If we care, governance is doable too

Now, not everything needs to be governed. But for the slack pipelines we want to tag which columns have personally identifiable information, so we can delete that information and stay compliant.

One simple way to stay compliant is to annotate your raw data schema and use views for the transformed data, so if you delete the data at source, it’s gone everywhere.

If you are materialising your transformed tables, you would need to have column level lineage in the transform layer to facilitate the documentation and deletion of the data. [Here’s](https://dlthub.com/docs/blog/dlt-lineage-support) a write up of how to capture that info. There are also other ways to grab a schema and annotate it, read more [here](https://dlthub.com/docs/general-usage/schema).

# In conclusion

There are many reasons why you’d end up running pipelines in different places, from organisational disagreements, to skillset differences, or simply technical restrictions.

Having a single pane of glass is not just beneficial but essential for operational coherence.

While solutions exist for different parts of this problem, the data collection still needs to be standardised and supported across different locations.

By using a tool like dlt, standardisation is introduced with ingestion, enabling cross-orchestrator observability and monitoring.

### Want to discuss?

[Join our slack community](https://dlthub.com/community) to take part in the conversation.
45 changes: 41 additions & 4 deletions docs/website/docs/dlt-ecosystem/verified-sources/google_sheets.md
Original file line number Diff line number Diff line change
Expand Up @@ -68,8 +68,6 @@ You need to create a GCP service account to get API credentials if you don't hav
You need to create a GCP account to get OAuth credentials if you don't have one. To create one,
follow these steps:

1. Ensure your email used for the GCP account has access to the GA4 property.

1. Open a GCP project in your GCP account.

1. Enable the Sheets API in the project.
Expand Down Expand Up @@ -305,7 +303,7 @@ For more information, read the [General Usage: Credentials.](../../general-usage
1. You're now ready to run the pipeline! To get started, run the following command:

```bash
python3 google_sheets_pipeline.py
python google_sheets_pipeline.py
```

1. Once the pipeline has finished running, you can verify that everything loaded correctly by using
Expand All @@ -322,7 +320,46 @@ For more information, read the guide on [how to run a pipeline](../../walkthroug

## Data types

The `dlt` normalizer uses the first row of data to infer types and attempts to coerce subsequent rows, creating variant columns if unsuccessful. This is standard behavior. It also recognizes date and time types using additional metadata from the first row.
The `dlt` normalizer uses the first row of data to infer types and attempts to coerce subsequent rows, creating variant columns if unsuccessful. This is standard behavior.
If `dlt` did not correctly determine the data type in the column, or you want to change the data type for other reasons,
then you can provide a type hint for the affected column in the resource.
Also, since recently `dlt`'s no longer recognizing date and time types, so you have to designate it yourself as `timestamp`.

Use the `apply_hints` method on the resource to achieve this.
Here's how you can do it:

```python
for resource in resources:
resource.apply_hints(columns={
"total_amount": {"data_type": "double"},
"date": {"data_type": "timestamp"},
})
```
In this example, the `total_amount` column is enforced to be of type double and `date` is enforced to be of type timestamp.
This will ensure that all values in the `total_amount` column are treated as `double`, regardless of whether they are integers or decimals in the original Google Sheets data.
And `date` column will be represented as dates, not integers.

For a single resource (e.g. `Sheet1`), you can simply use:
```python
source.Sheet1.apply_hints(columns={
"total_amount": {"data_type": "double"},
"date": {"data_type": "timestamp"},
})
```

To get the name of resources, you can use:
```python
print(source.resources.keys())
```

To read more about tables, columns, and datatypes, please refer to [our documentation here.](../../general-usage/schema#tables-and-columns)

:::caution
`dlt` will **not modify** tables after they are created.
So if you changed data types with hints,
then you need to **delete the dataset**
or set `full_refresh=True`.
:::

## Sources and resources

Expand Down
106 changes: 73 additions & 33 deletions docs/website/docs/dlt-ecosystem/verified-sources/sql_database.md
Original file line number Diff line number Diff line change
Expand Up @@ -255,6 +255,79 @@ def sql_table(

`write_disposition`: Can be "merge", "replace", or "append".

## Incremental Loading
Efficient data management often requires loading only new or updated data from your SQL databases, rather than reprocessing the entire dataset. This is where incremental loading comes into play.

Incremental loading uses a cursor column (e.g., timestamp or auto-incrementing ID) to load only data newer than a specified initial value, enhancing efficiency by reducing processing time and resource use.


### Configuring Incremental Loading
1. **Choose a Cursor Column**: Identify a column in your SQL table that can serve as a reliable indicator of new or updated rows. Common choices include timestamp columns or auto-incrementing IDs.
1. **Set an Initial Value**: Choose a starting value for the cursor to begin loading data. This could be a specific timestamp or ID from which you wish to start loading data.
1. **Apply Incremental Configuration**: Enable incremental loading with your configuration's `incremental` argument.
1. **Deduplication**: When using incremental loading, the system automatically handles the deduplication of rows based on the primary key (if available) or row hash for tables without a primary key.

:::note
Incorporating incremental loading into your SQL data pipelines can significantly enhance performance by minimizing unnecessary data processing and transfer.
:::

#### Incremental Loading Example
1. Consider a table with a `last_modified` timestamp column. By setting this column as your cursor and specifying an
initial value, the loader generates a SQL query filtering rows with `last_modified` values greater than the specified initial value.

```python
from sql_database import sql_table
from datetime import datetime

# Example: Incrementally loading a table based on a timestamp column
table = sql_table(
table='your_table_name',
incremental=dlt.sources.incremental(
'last_modified', # Cursor column name
initial_value=datetime(2024, 1, 1) # Initial cursor value
)
)

info = pipeline.extract(table, write_disposition="merge")
print(info)
```

1. To incrementally load the "family" table using the sql_database source method:

```python
source = sql_database().with_resources("family")
#using the "updated" field as an incremental field using initial value of January 1, 2022, at midnight
source.family.apply_hints(incremental=dlt.sources.incremental("updated"),initial_value=pendulum.DateTime(2022, 1, 1, 0, 0, 0))
#running the pipeline
info = pipeline.run(source, write_disposition="merge")
print(info)
```
In this example, we load data from the `family` table, using the `updated` column for incremental loading. In the first run, the process loads all data starting from midnight (00:00:00) on January 1, 2022. Subsequent runs perform incremental loading, guided by the values in the `updated` field.

1. To incrementally load the "family" table using the 'sql_table' resource.

```python
family = sql_table(
table="family",
incremental=dlt.sources.incremental(
"updated", initial_value=pendulum.datetime(2022, 1, 1, 0, 0, 0)
),
)
# Running the pipeline
info = pipeline.extract(family, write_disposition="merge")
print(info)
```

This process initially loads all data from the `family` table starting at midnight on January 1, 2022. For later runs, it uses the `updated` field for incremental loading as well.

:::info
* For merge write disposition, the source table needs a primary key, which `dlt` automatically sets up.
* `apply_hints` is a powerful method that enables schema modifications after resource creation, like adjusting write disposition and primary keys. You can choose from various tables and use `apply_hints` multiple times to create pipelines with merged, appendend, or replaced resources.
:::

### Troubleshooting
If you encounter issues where the expected WHERE clause for incremental loading is not generated, ensure your configuration aligns with the `sql_table` resource rather than applying hints post-resource creation. This ensures the loader generates the correct query for incremental loading.

## Customization
### Create your own pipeline

Expand Down Expand Up @@ -342,39 +415,6 @@ To create your own pipeline, use source and resource methods from this verified
print(info)
```

1. To incrementally load the "family" table using the sql_database source method:

```python
source = sql_database().with_resources("family")
#using the "updated" field as an incremental field using initial value of January 1, 2022, at midnight
source.family.apply_hints(incremental=dlt.sources.incremental("updated"),initial_value=pendulum.DateTime(2022, 1, 1, 0, 0, 0))
#running the pipeline
info = pipeline.run(source, write_disposition="merge")
print(info)
```
In this example, we load data from the `family` table, using the `updated` column for incremental loading. In the first run, the process loads all data starting from midnight (00:00:00) on January 1, 2022. Subsequent runs perform incremental loading, guided by the values in the `updated` field.

1. To incrementally load the "family" table using the 'sql_table' resource.

```python
family = sql_table(
table="family",
incremental=dlt.sources.incremental(
"updated", initial_value=pendulum.datetime(2022, 1, 1, 0, 0, 0)
),
)
# Running the pipeline
info = pipeline.extract(family, write_disposition="merge")
print(info)
```

This process initially loads all data from the `family` table starting at midnight on January 1, 2022. For later runs, it uses the `updated` field for incremental loading as well.

:::info
* For merge write disposition, the source table needs a primary key, which `dlt` automatically sets up.
* `apply_hints` is a powerful method that enables schema modifications after resource creation, like adjusting write disposition and primary keys. You can choose from various tables and use `apply_hints` multiple times to create pipelines with merged, appendend, or replaced resources.
:::

1. Remember to keep the pipeline name and destination dataset name consistent. The pipeline name is crucial for retrieving the [state](https://dlthub.com/docs/general-usage/state) from the last run, which is essential for incremental loading. Altering these names could initiate a "[full_refresh](https://dlthub.com/docs/general-usage/pipeline#do-experiments-with-full-refresh)", interfering with the metadata tracking necessary for [incremental loads](https://dlthub.com/docs/general-usage/incremental-loading).

<!--@@@DLT_SNIPPET_START tuba::sql_database-->
Expand Down
57 changes: 57 additions & 0 deletions docs/website/docs/running-in-production/monitoring.md
Original file line number Diff line number Diff line change
Expand Up @@ -60,3 +60,60 @@ charts and time-series charts that provide a baseline or a pattern that a person

For example, to monitor data loading, consider plotting "count of records by `loaded_at` date/hour",
"created at", "modified at", or other recency markers.

### Rows count
To find the number of rows loaded per table, use the following command:

```shell
dlt pipeline <pipeline_name> trace
```

This command will display the names of the tables that were loaded and the number of rows in each table.
The above command provides the row count for the Chess source. As shown below:

```shell
Step normalize COMPLETED in 2.37 seconds.
Normalized data for the following tables:
- _dlt_pipeline_state: 1 row(s)
- payments: 1329 row(s)
- tickets: 1492 row(s)
- orders: 2940 row(s)
- shipment: 2382 row(s)
- retailers: 1342 row(s)
```

To load these info back to the destination you can use the following:
```python
# Create a pipeline with the specified name, destination, and dataset
# Run the pipeline

# Get the trace of the last run of the pipeline
# The trace contains timing information on extract, normalize, and load steps
trace = pipeline.last_trace

# Load the trace information into a table named "_trace" in the destination
pipeline.run([trace], table_name="_trace")
```
This process loads several additional tables to the destination, which provide insights into
the extract, normalize, and load steps. Information on the number of rows loaded for each table,
along with the `load_id`, can be found in the `_trace__steps__extract_info__table_metrics` table.
The `load_id` is an epoch timestamp that indicates when the loading was completed. Here's graphical
representation of the rows loaded with `load_id` for different tables:

![image](https://storage.googleapis.com/dlt-blog-images/docs_monitoring_count_of_rows_vs_load_id.jpg)

### Data load time
Data loading time for each table can be obtained by using the following command:

```shell
dlt pipeline <pipeline_name> load-package
```

The above information can also be obtained from the script as follows:

```python
info = pipeline.run(source, table_name="table_name", write_disposition='append')

print(info.load_packages[0])
```
> `load_packages[0]` will print the information of the first load package in the list of load packages.
Loading

0 comments on commit 5a46b09

Please sign in to comment.