Skip to content

Commit

Permalink
Fixing Formatiing.
Browse files Browse the repository at this point in the history
  • Loading branch information
zem360 committed Nov 8, 2023
1 parent 6de5bbb commit 10ea7d8
Showing 1 changed file with 31 additions and 31 deletions.
62 changes: 31 additions & 31 deletions docs/website/blog/2023-11-01-dlt-dagster.md
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
---
slug: dlt-dagster
title: "Orchestrating unstructured data pipeline with dagster and dlt."
title: "Orchestrating unstructured data pipeline with Dagster and dlt."
image: https://d1ice69yfovmhk.cloudfront.net/images/dlt-dagster_overview.jpg
authors:
name: Zaeem Athar
Expand All @@ -10,7 +10,7 @@ authors:
tags: [Dagster, dlt, Asset Factory, Unstructured Data]
---
:::info
TL;DR: In this blog post, we'll build data piplines using dlt and orchestrate them using Dagster.
TL;DR: In this blog post, we'll build data piplines using [dlt](https://dlthub.com/) and orchestrate them using [Dagster](https://dagster.io/).
:::

`dlt` is an open-source Python library that allows you to declaratively load messy data sources into well-structured tables or datasets, through automatic schema inference and evolution. It simplifies building data pipelines by providing functionality to support the entire extract and load process.
Expand All @@ -19,28 +19,28 @@ It does so in a scalable way, enabling you to run it on both micro workers or in

To start with `dlt`, you can install it using pip: `pip install dlt`. Afterward, import `dlt` in your Python script and start building your data pipeline. There's no need to start any backends or containers.

## **Project Overview:**
## Project Overview:

In this example, we will ingest GitHub issue data from a repository and store the data in BigQuery. We will use `dlt` to create a data pipeline and orchestrate it using dagster.
In this example, we will ingest GitHub issue data from a repository and store the data in BigQuery. We will use `dlt` to create a data pipeline and orchestrate it using Dagster.

Initially, we will start by creating a simple data pipeline using `dlt`. We will then orchestrate the pipeline using dagster. Finally, we will add more features to this pipeline by using the dlt schema evolution and dagster asset metadata to educate the users about their data pipeline.
Initially, we will start by creating a simple data pipeline using `dlt`. We will then orchestrate the pipeline using Dagster. Finally, we will add more features to this pipeline by using the dlt schema evolution and dagster asset metadata to educate the users about their data pipeline.

The project code is available on [GitHub](https://github.com/dlt-hub/dlt-dagster-demo/tree/main).

![Project Overview](https://d1ice69yfovmhk.cloudfront.net/images/dlt-dagster_overview.jpg)

As we will be ingesting data into BigQuery we first need to create service account credentials for BigQuery. You can find more info on setting up a service account in the `dlt` [docs](https://dlthub.com/docs/dlt-ecosystem/destinations/bigquery).

Once we have the credentials we are ready to begin. Let’s first install dagster and `dlt`. The below commands should install both.
Once we have the credentials we are ready to begin. Let’s first install Dagster and `dlt`. The below commands should install both.

```python
pip install dlt
pip install dagster dagster-webserver
```

## **Simple `dlt` Pipeline:**
## Simple dlt Pipeline:

As a first step, we will create the GitHub issues pipeline using dlt.
As a first step, we will create the GitHub issues pipeline using `dlt`.

```bash
dlt init github_issues bigquery
Expand Down Expand Up @@ -84,7 +84,7 @@ if __name__ == "__main__":
print(load_info)
```

The above code creates a simple github_issues pipeline that gets the issues data from the defined repository and loads it into BigQuery. The `dlt.resources` yields the data while the `dlt.pipeline` normalizes the nested data and loads it into the defined destination. To read more about the technical details refer to the `dlt` [docs](https://dlthub.com/docs/intro).
The above code creates a simple **github_issues** pipeline that gets the issues data from the defined repository and loads it into BigQuery. The `dlt.resources` yields the data while the `dlt.pipeline` normalizes the nested data and loads it into the defined destination. To read more about the technical details refer to the `dlt` [docs](https://dlthub.com/docs/intro).

To run the pipeline execute the below commands:

Expand All @@ -93,11 +93,11 @@ pip install -r requirements.txt
python github_issues.py
```

We now have a running pipeline and are ready to orchestrate it using dagster.
We now have a running pipeline and are ready to orchestrate it using Dagster.

## **Orchestrating using dagster:**
## Orchestrating using dagster:

We will need to adjust our pipeline a bit to orchestrate it using dagster.
We will need to adjust our pipeline a bit to orchestrate it using Dagster.

### Step 1: Create a dagster project

Expand All @@ -109,9 +109,9 @@ cd dagster_github_issues
dagster project scaffold --name github-issues
```

This will generate the default files for dagster that we will use as a starting point for our data pipeline.
This will generate the default files for Dagster that we will use as a starting point for our data pipeline.

### Step 2: Set up the directory Structure
### Step 2: Set up the directory structure

- Inside the `github-issues/github_issues` directory create the following folders: `assets`, `resources`, and `dlt`.

Expand All @@ -136,12 +136,12 @@ This will generate the default files for dagster that we will use as a starting

### Step 3: Add dlt Resources and environment variables

- Copy the previously created **`github_issues_resource`** code into **`dlt/__init__.py`** under the **`dlt`** folder. Remove the **`dlt.secrets.value`** parameter, as we'll pass the credentials through a **`.env`** file.
- Create a `.env` file in the root directory. This is the directory where the `pyproject.toml` file exits. Copy the credentials into the `.env` and follow the correct naming convention. For more info on setting up the `.env` file have a look at the [docs](https://dlthub.com/docs/general-usage/credentials#reading-credentials-from-environment-variables).
- Copy the previously created **github_issues_resource** code into `dlt/__init__.py` under the `dlt` folder. Remove the `dlt.secrets.value` parameter, as we'll pass the credentials through a `.env` file.
- Create a `.env` file in the root directory. This is the directory where the `pyproject.toml` file exits. Copy the credentials into the `.env` and follow the correct naming convention. For more info on setting up the `.env` file have a look at the [docs](https://dlthub.com/docs/walkthroughs/add_credentials#reading-credentials-from-environment-variables).

### Step 4: Add configurable resources and define the asset

- Define a **`DltResource`** class in **`resources/__init__.py`** as a Dagster configurable resource. This class allows you to reuse pipeline code inside an asset.
- Define a `DltResource` class in `resources/__init__.py` as a Dagster configurable resource. This class allows you to reuse pipeline code inside an asset.

```python
from dagster import ConfigurableResource
Expand All @@ -165,7 +165,7 @@ class DltResource(ConfigurableResource):
return load_info
```

- Define the asset, **`issues_pipeline`**, in **`assets/__init__.py`**. This asset uses the configurable resource to create a dlt pipeline and ingests data into BigQuery.
- Define the asset, `issues_pipeline`, in `assets/__init__.py`. This asset uses the configurable resource to create a dlt pipeline and ingests data into BigQuery.

```python
from dagster import asset, get_dagster_logger
Expand All @@ -184,7 +184,7 @@ The defined asset (**issues_pipeline**) takes as input the configurable resource

### Step 5: Handle Schema Evolution

`dlt` provides the feature of schema evolution that monitors changes in the defined table schema. Suppose GitHub adds a new column or changes a datatype of a column this small change can break pipelines and transformations. The schema evolution feature works amazingly well with dagster.
`dlt` provides the feature of schema evolution that monitors changes in the defined table schema. Suppose GitHub adds a new column or changes a datatype of a column this small change can break pipelines and transformations. The schema evolution feature works amazingly well with Dagster.

- Add the schema evolution code to the asset to make our pipelines more resilient to changes.

Expand Down Expand Up @@ -225,13 +225,13 @@ defs = Definitions(
)
```

### Step 7: Run the Web Server and Materialize the asset
### Step 7: Run the Web Server and materialize the asset

- In the root directory (**github-issues)** run the `dagster dev` command to run the web server and materialize the asset.
- In the root directory (**github-issues**) run the `dagster dev` command to run the web server and materialize the asset.

![GitHub Asset](https://d1ice69yfovmhk.cloudfront.net/images/dlt-dagster_asset.png)

### Step 8: View the populated Metadata and data in BigQuery
### Step 8: View the populated Metadata and ingested data in BigQuery

Once the asset has been successfully materialized go to the Assets tab from the top and select the **Issues_pipeline**. In the Metadata you can see the Tables, Columns, and Data Types that have been updated. In this case, the changes are related to internal `dlt` tables.

Expand All @@ -243,15 +243,15 @@ Let's finally have a look in BigQuery to view the ingested data.

![Data Loaded in Bigquery](https://d1ice69yfovmhk.cloudfront.net/images/dlt-dagster_bigquery_data.png)

The **github_issues** is the parent table that contains the data from the root level of the JSON returned by the GitHub API. The subsequent table **github_issues_assignees** is a child table that was nested in the original JSON. `dlt` normalizes nested data by populating them in separate tables and creates relationships between the tables. To learn more about how dlt created these relationships refer to the [docs](https://dlthub.com/docs/general-usage/destination-tables#child-and-parent-tables).
The **github_issues** is the parent table that contains the data from the root level of the JSON returned by the GitHub API. The subsequent table **github_issues_assignees** is a child table that was nested in the original JSON. `dlt` normalizes nested data by populating them in separate tables and creates relationships between the tables. To learn more about how `dlt` created these relationships refer to the [docs](https://dlthub.com/docs/general-usage/destination-tables#child-and-parent-tables).

## **Orchestrating verified `dlt` source using dagster:**
## Orchestrating verified dlt source using Dagster:

`dlt` provides a list of verified sources that can be initialized to fast-track the pipeline-building process. You can find a list of sources provided in the `dlt` [docs](https://dlthub.com/docs/dlt-ecosystem/verified-sources/).

One of the main strengths of `dlt` lies in its ability to extract, normalize, and ingest unstructured and semi-structured data from various sources. One of the most commonly used verified source is MongoDB. Let’s quickly look at how we can orchestrate MongoDB source using dagster.

### Step 1: Setting up a dagster project:
### Step 1: Setting up a Dagster project:

- Start by creating a new Dagster project scaffold:

Expand Down Expand Up @@ -301,7 +301,7 @@ Next, create a `.env` file and add the BigQuery and MongoDB credentials to the f

### Step 3: Adding the DltResource

Create a **`DltResouce`** under the **resources** directory. Add the following code to the **`__init__.py` :**
Create a `DltResouce` under the **resources** directory. Add the following code to the `__init__.py`:

```python
from dagster import ConfigurableResource
Expand Down Expand Up @@ -329,11 +329,11 @@ class DltResource(ConfigurableResource):

The structure of data in MongoDB is such that under each database you will find multiple collections. When writing a data pipeline it is important to separate the data loading for each collection.

Dagster provides the feature of `**@multi_asset`** declaration that will allow us to convert each collection under a database into a separate asset. This will make our pipeline easy to debug in case of failure and the collections independent of each other.
Dagster provides the feature of `@multi_asset` declaration that will allow us to convert each collection under a database into a separate asset. This will make our pipeline easy to debug in case of failure and the collections independent of each other.

In the **`mongodb_pipeline.py`** file, locate the `**load_select_collection_hint_db**` function. We will use this function to create the asset factory.
In the `mongodb_pipeline.py` file, locate the `load_select_collection_hint_db` function. We will use this function to create the asset factory.

In the **`__init__.py`** file under the **assets** directory, define the **`dlt_asset_factory`**:
In the `__init__.py` file under the **assets** directory, define the `dlt_asset_factory`:

```python
from ..mongodb import mongodb
Expand Down Expand Up @@ -412,10 +412,10 @@ The resulting data in BigQuery:

![Data Ingestion in BigQuery from MongoDB](https://d1ice69yfovmhk.cloudfront.net/images/dlt-dagster_mongo_bigquery.png)

## **Conclusion:**
## Conclusion:

In this demo, we looked at how to orchestrate dlt pipelines using dagster. We started off by creating a simple dlt pipeline and then converted the pipeline into an asset and resource before orchestrating.

We also looked at how we can orchestrate dlt MongoDB verified sources using Dagster. We utilized the Dagster `**@multi_asset`** feature to create a **`dlt_asset_factory` which** converts each collection under a database to a separate asset allowing us to create more robust data pipelines.
We also looked at how we can orchestrate dlt MongoDB verified sources using Dagster. We utilized the Dagster `@multi_asset` feature to create a `dlt_asset_factory` which converts each collection under a database to a separate asset allowing us to create more robust data pipelines.

Both `dlt` and dagster can be easily run on local machines. By combining the two we can build data pipelines at great speed and rigorously test them before shipping to production.

0 comments on commit 10ea7d8

Please sign in to comment.