Skip to content

Commit

Permalink
Updating the blog.
Browse files Browse the repository at this point in the history
  • Loading branch information
zem360 committed Nov 2, 2023
1 parent 2f5b3ad commit f8dc4e0
Showing 1 changed file with 99 additions and 48 deletions.
147 changes: 99 additions & 48 deletions docs/website/blog/2023-11-01-dlt-dagster.md
Original file line number Diff line number Diff line change
Expand Up @@ -141,45 +141,47 @@ This will generate the default files for dagster that we will use as a starting

### Step 4: Add configurable resources and define the asset

- Define a **`DltPipeline`** class in **`resources/__init__.py`** as a Dagster configurable resource. This class allows you to reuse pipeline code inside an asset.
- Define a **`DltResource`** class in **`resources/__init__.py`** as a Dagster configurable resource. This class allows you to reuse pipeline code inside an asset.

```python
from dagster import ConfigurableResource
import dlt

class DltPipeline(ConfigurableResource):
def create_pipeline(self, pipeline_name ,dataset_name, destination, dlt_resource, table_name):
class DltResource(ConfigurableResource):
pipeline_name: str
dataset_name: str
destination: str
table_name: str

def create_pipeline(self, dlt_resource):

# configure the pipeline with your destination details
pipeline = dlt.pipeline(
pipeline_name=pipeline_name, destination=destination, dataset_name=dataset_name
pipeline_name=self.pipeline_name, destination=self.destination, dataset_name=self.dataset_name
)

# run the pipeline with your parameters
load_info = pipeline.run(dlt_resource, table_name=table_name)
load_info = pipeline.run(dlt_resource, table_name=self.table_name)

return load_info
```

- Define the asset, **`issues_pipeline`**, in **`assets/__init__.py`**. This asset uses the configurable resource to create a Dlt pipeline and ingests data into BigQuery.
- Define the asset, **`issues_pipeline`**, in **`assets/__init__.py`**. This asset uses the configurable resource to create a dlt pipeline and ingests data into BigQuery.

```python
from dagster import asset, get_dagster_logger
from ..resources import DltPipeline
from ..resources import DltResource
from ..dlt import github_issues_resource

@asset
def issues_pipeline(pipeline: DltPipeline):
pipeline_name= 'github_issues'
dataset= 'dagster_github_issues'
destination= 'bigquery'
table= 'github_issues'
def issues_pipeline(pipeline: DltResource):

logger = get_dagster_logger()
results = pipeline.create_pipeline(pipeline_name, dataset, destination, github_issues_resource, table)
results = pipeline.create_pipeline(github_issues_resource)
logger.info(results)
```

The defined asset (**issues_pipeline**) takes as input the configurable resource (**DltPipeline**). In the asset, we use the configurable resource to create a dlt pipeline by using an instance of the configurable resource (**DltPipeline**) to call the `create_pipeline` function. The `dlt.resource` (**github_issues_resource**) is passed to the `create_pipeline` function. The `create_pipeline` function normalizes the data and ingests it into BigQuery.
The defined asset (**issues_pipeline**) takes as input the configurable resource (**DltResource**). In the asset, we use the configurable resource to create a dlt pipeline by using an instance of the configurable resource (**DltResource**) to call the `create_pipeline` function. The `dlt.resource` (**github_issues_resource**) is passed to the `create_pipeline` function. The `create_pipeline` function normalizes the data and ingests it into BigQuery.

### Step 5: Handle Schema Evolution

Expand All @@ -190,7 +192,7 @@ The defined asset (**issues_pipeline**) takes as input the configurable resource
```python
from dagster import AssetExecutionContext
@asset
def issues_pipeline(context: AssetExecutionContext, pipeline: DltPipeline):
def issues_pipeline(context: AssetExecutionContext, pipeline: DltResource):
...
md_content=""
for package in result.load_packages:
Expand All @@ -214,7 +216,12 @@ defs = Definitions(
assets=all_assets,
jobs=[simple_pipeline],
resources={
"pipeline": DltPipeline(),
"pipeline": DltResource(
pipeline_name = "github_issues",
dataset_name = "dagster_github_issues",
destination = "bigquery",
table_name= "github_issues"
),
}
)
```
Expand Down Expand Up @@ -253,7 +260,7 @@ One of the main strengths of `dlt` lies in its ability to extract, normalize, an
dagster project scaffold --name mongodb-dlt
```

- Follow the steps mentioned earlier and create an **`assets`** directory under **`mongodb-dlt/mongodb_dlt`**.
- Follow the steps mentioned earlier and create an **`assets`**, and **`resources`** directory under **`mongodb-dlt/mongodb_dlt`**.
- Initialize a **`dlt`** MongoDB pipeline in the same directory:

```python
Expand All @@ -275,13 +282,15 @@ After running the command your directory structure should be as follows:
│ │ ├── __init__.py
│ │ └── helpers.py
│ ├── mongodb_pipeline.py
│ └── requirements.txt
│ ├── requirements.txt
│ └── resources
│ ├── __init__.py
├── mongodb_dlt_tests
│ ├── __init__.py
│ └── test_assets.py
├── pyproject.toml
├── setup.cfg
── setup.py
── setup.py
```

### Step 2: Configuring MongoDB Atlas and Credentials
Expand All @@ -290,7 +299,34 @@ For this example, we are using MongoDB Atlas. Set up the account for MongoDB Atl

Next, create a `.env` file and add the BigQuery and MongoDB credentials to the file. The `.env` file should reside in the root directory.

### Step 3: Defining an Asset Factory

### Step 3: Adding the DltResource

Create a **`DltResouce`** under the **resources** directory. Add the following code to the **`__init__.py` :**

```python
from dagster import ConfigurableResource

import dlt

class DltResource(ConfigurableResource):
pipeline_name: str
dataset_name: str
destination: str

def load_collection(self, resource_data, database):

# configure the pipeline with your destination details
pipeline = dlt.pipeline(
pipeline_name=f"{database}_{self.pipeline_name}", destination=self.destination, dataset_name=f"{self.dataset_name}_{database}"
)

load_info = pipeline.run(resource_data, write_disposition="replace")

return load_info
```

### Step 4: Defining an Asset Factory

The structure of data in MongoDB is such that under each database you will find multiple collections. When writing a data pipeline it is important to separate the data loading for each collection.

Expand All @@ -301,58 +337,73 @@ In the **`mongodb_pipeline.py`** file, locate the `**load_select_collection_hint
In the **`__init__.py`** file under the **assets** directory, define the **`dlt_asset_factory`**:

```python
url = os.getenv('SOURCES__MONGODB__CONNECTION__URL')
from ..mongodb import mongodb
from ..resources import DltResource

import dlt
import os

URL = os.getenv('SOURCES__MONGODB__CONNECTION__URL')

database_collection ={
"sample_mflix":[
DATABASE_COLLECTIONS = {
"sample_mflix": [
"comments",
"embedded_movies"
"embedded_movies",
],
}

def dlt_asset_factory(collection_list):
multi_assets = []
jobs = []

for db, collection_name in database_collection.items():
for db, collection_name in collection_list.items():
@multi_asset(
name=db,
group_name=db,
outs={
stream: AssetOut(key_prefix=[f'raw_{db}'])
for stream in collection_name}
stream: AssetOut(key_prefix=[f'raw_{db}'])
for stream in collection_name}

)
def load_select_collection(context: OpExecutionContext):
"""Use the dlt mongodb source to reflect an entire database schema and load select tables from it.
This example sources data from a sample mongo database data from [mongodb-sample-dataset](https://github.com/neelabalan/mongodb-sample-dataset).
"""
def collections_asset(context: OpExecutionContext, pipeline: DltResource):

pipeline = dlt.pipeline(
pipeline_name="local_mongo",
destination='bigquery',
dataset_name="mongo_select_hint",
)
# Getting Data From MongoDB
data = mongodb(URL, db).with_resources(*collection_name)

logger = get_dagster_logger()
data = mongodb(url, db).with_resources(*collection_name)
results = pipeline.load_collection(data, db)
logger.info(results)

load_info = pipeline.run(data, write_disposition="replace")
return tuple([None for _ in context.selected_output_names])
return tuple([None for _ in context.selected_output_names])

multi_assets.append(load_select_collection)
multi_assets.append(collections_asset)

asset_job = define_asset_job(f"{db}_assets", AssetSelection.groups(db))
return multi_assets

jobs.append(asset_job)

return multi_assets, jobs

dlt_assets, dlt_jobs = dlt_asset_factory(database_collection)
dlt_assets = dlt_asset_factory(DATABASE_COLLECTIONS)
```

### Step 4: Running the Dagster Web Server
### Step 5: Definitions and ****Run the Web Server****

Add the definitions in the `__init__.py` in the root directory:

```python
from dagster import Definitions, EnvVar

from .assets import dlt_assets
from .resources import DltResource

defs = Definitions(
assets=dlt_assets,
resources={
"pipeline": DltResource(
pipeline_name = "mongo",
dataset_name = "dagster_mongo",
destination = "bigquery"
),
}
)
```

We can run the `dagster dev` command to start the web server. We can see that each collection is converted into a separate asset by Dagster. We can materialize our assets to ingest the data into BigQuery.

Expand Down

0 comments on commit f8dc4e0

Please sign in to comment.