From f8dc4e060bebb950d01073dc1727670a2efd4eb2 Mon Sep 17 00:00:00 2001 From: Zaeem Athar Date: Thu, 2 Nov 2023 15:30:41 +0100 Subject: [PATCH] Updating the blog. --- docs/website/blog/2023-11-01-dlt-dagster.md | 147 +++++++++++++------- 1 file changed, 99 insertions(+), 48 deletions(-) diff --git a/docs/website/blog/2023-11-01-dlt-dagster.md b/docs/website/blog/2023-11-01-dlt-dagster.md index a8520661fc..1cd28a1b4e 100644 --- a/docs/website/blog/2023-11-01-dlt-dagster.md +++ b/docs/website/blog/2023-11-01-dlt-dagster.md @@ -141,45 +141,47 @@ This will generate the default files for dagster that we will use as a starting ### Step 4: Add configurable resources and define the asset -- Define a **`DltPipeline`** class in **`resources/__init__.py`** as a Dagster configurable resource. This class allows you to reuse pipeline code inside an asset. +- Define a **`DltResource`** class in **`resources/__init__.py`** as a Dagster configurable resource. This class allows you to reuse pipeline code inside an asset. ```python from dagster import ConfigurableResource import dlt -class DltPipeline(ConfigurableResource): - def create_pipeline(self, pipeline_name ,dataset_name, destination, dlt_resource, table_name): +class DltResource(ConfigurableResource): + pipeline_name: str + dataset_name: str + destination: str + table_name: str + + def create_pipeline(self, dlt_resource): # configure the pipeline with your destination details pipeline = dlt.pipeline( - pipeline_name=pipeline_name, destination=destination, dataset_name=dataset_name + pipeline_name=self.pipeline_name, destination=self.destination, dataset_name=self.dataset_name ) + # run the pipeline with your parameters - load_info = pipeline.run(dlt_resource, table_name=table_name) + load_info = pipeline.run(dlt_resource, table_name=self.table_name) return load_info ``` -- Define the asset, **`issues_pipeline`**, in **`assets/__init__.py`**. This asset uses the configurable resource to create a Dlt pipeline and ingests data into BigQuery. +- Define the asset, **`issues_pipeline`**, in **`assets/__init__.py`**. This asset uses the configurable resource to create a dlt pipeline and ingests data into BigQuery. ```python from dagster import asset, get_dagster_logger -from ..resources import DltPipeline +from ..resources import DltResource from ..dlt import github_issues_resource @asset -def issues_pipeline(pipeline: DltPipeline): - pipeline_name= 'github_issues' - dataset= 'dagster_github_issues' - destination= 'bigquery' - table= 'github_issues' +def issues_pipeline(pipeline: DltResource): logger = get_dagster_logger() - results = pipeline.create_pipeline(pipeline_name, dataset, destination, github_issues_resource, table) + results = pipeline.create_pipeline(github_issues_resource) logger.info(results) ``` -The defined asset (**issues_pipeline**) takes as input the configurable resource (**DltPipeline**). In the asset, we use the configurable resource to create a dlt pipeline by using an instance of the configurable resource (**DltPipeline**) to call the `create_pipeline` function. The `dlt.resource` (**github_issues_resource**) is passed to the `create_pipeline` function. The `create_pipeline` function normalizes the data and ingests it into BigQuery. +The defined asset (**issues_pipeline**) takes as input the configurable resource (**DltResource**). In the asset, we use the configurable resource to create a dlt pipeline by using an instance of the configurable resource (**DltResource**) to call the `create_pipeline` function. The `dlt.resource` (**github_issues_resource**) is passed to the `create_pipeline` function. The `create_pipeline` function normalizes the data and ingests it into BigQuery. ### Step 5: Handle Schema Evolution @@ -190,7 +192,7 @@ The defined asset (**issues_pipeline**) takes as input the configurable resource ```python from dagster import AssetExecutionContext @asset -def issues_pipeline(context: AssetExecutionContext, pipeline: DltPipeline): +def issues_pipeline(context: AssetExecutionContext, pipeline: DltResource): ... md_content="" for package in result.load_packages: @@ -214,7 +216,12 @@ defs = Definitions( assets=all_assets, jobs=[simple_pipeline], resources={ - "pipeline": DltPipeline(), + "pipeline": DltResource( + pipeline_name = "github_issues", + dataset_name = "dagster_github_issues", + destination = "bigquery", + table_name= "github_issues" + ), } ) ``` @@ -253,7 +260,7 @@ One of the main strengths of `dlt` lies in its ability to extract, normalize, an dagster project scaffold --name mongodb-dlt ``` -- Follow the steps mentioned earlier and create an **`assets`** directory under **`mongodb-dlt/mongodb_dlt`**. +- Follow the steps mentioned earlier and create an **`assets`**, and **`resources`** directory under **`mongodb-dlt/mongodb_dlt`**. - Initialize a **`dlt`** MongoDB pipeline in the same directory: ```python @@ -275,13 +282,15 @@ After running the command your directory structure should be as follows: │ │ ├── __init__.py │ │ └── helpers.py │ ├── mongodb_pipeline.py -│ └── requirements.txt +│ ├── requirements.txt +│ └── resources +│ ├── __init__.py ├── mongodb_dlt_tests │ ├── __init__.py │ └── test_assets.py ├── pyproject.toml ├── setup.cfg -├── setup.py +└── setup.py ``` ### Step 2: Configuring MongoDB Atlas and Credentials @@ -290,7 +299,34 @@ For this example, we are using MongoDB Atlas. Set up the account for MongoDB Atl Next, create a `.env` file and add the BigQuery and MongoDB credentials to the file. The `.env` file should reside in the root directory. -### Step 3: Defining an Asset Factory + +### Step 3: Adding the DltResource + + Create a **`DltResouce`** under the **resources** directory. Add the following code to the **`__init__.py` :** + +```python +from dagster import ConfigurableResource + +import dlt + +class DltResource(ConfigurableResource): + pipeline_name: str + dataset_name: str + destination: str + + def load_collection(self, resource_data, database): + + # configure the pipeline with your destination details + pipeline = dlt.pipeline( + pipeline_name=f"{database}_{self.pipeline_name}", destination=self.destination, dataset_name=f"{self.dataset_name}_{database}" + ) + + load_info = pipeline.run(resource_data, write_disposition="replace") + + return load_info +``` + +### Step 4: Defining an Asset Factory The structure of data in MongoDB is such that under each database you will find multiple collections. When writing a data pipeline it is important to separate the data loading for each collection. @@ -301,58 +337,73 @@ In the **`mongodb_pipeline.py`** file, locate the `**load_select_collection_hint In the **`__init__.py`** file under the **assets** directory, define the **`dlt_asset_factory`**: ```python -url = os.getenv('SOURCES__MONGODB__CONNECTION__URL') +from ..mongodb import mongodb +from ..resources import DltResource + +import dlt +import os + +URL = os.getenv('SOURCES__MONGODB__CONNECTION__URL') -database_collection ={ - "sample_mflix":[ +DATABASE_COLLECTIONS = { + "sample_mflix": [ "comments", - "embedded_movies" + "embedded_movies", ], } def dlt_asset_factory(collection_list): multi_assets = [] - jobs = [] - for db, collection_name in database_collection.items(): + for db, collection_name in collection_list.items(): @multi_asset( name=db, group_name=db, outs={ - stream: AssetOut(key_prefix=[f'raw_{db}']) - for stream in collection_name} + stream: AssetOut(key_prefix=[f'raw_{db}']) + for stream in collection_name} ) - def load_select_collection(context: OpExecutionContext): - """Use the dlt mongodb source to reflect an entire database schema and load select tables from it. - - This example sources data from a sample mongo database data from [mongodb-sample-dataset](https://github.com/neelabalan/mongodb-sample-dataset). - """ + def collections_asset(context: OpExecutionContext, pipeline: DltResource): - pipeline = dlt.pipeline( - pipeline_name="local_mongo", - destination='bigquery', - dataset_name="mongo_select_hint", - ) + # Getting Data From MongoDB + data = mongodb(URL, db).with_resources(*collection_name) logger = get_dagster_logger() - data = mongodb(url, db).with_resources(*collection_name) + results = pipeline.load_collection(data, db) + logger.info(results) - load_info = pipeline.run(data, write_disposition="replace") - return tuple([None for _ in context.selected_output_names]) + return tuple([None for _ in context.selected_output_names]) - multi_assets.append(load_select_collection) + multi_assets.append(collections_asset) - asset_job = define_asset_job(f"{db}_assets", AssetSelection.groups(db)) + return multi_assets - jobs.append(asset_job) - return multi_assets, jobs - -dlt_assets, dlt_jobs = dlt_asset_factory(database_collection) +dlt_assets = dlt_asset_factory(DATABASE_COLLECTIONS) ``` -### Step 4: Running the Dagster Web Server +### Step 5: Definitions and ****Run the Web Server**** + +Add the definitions in the `__init__.py` in the root directory: + +```python +from dagster import Definitions, EnvVar + +from .assets import dlt_assets +from .resources import DltResource + +defs = Definitions( + assets=dlt_assets, + resources={ + "pipeline": DltResource( + pipeline_name = "mongo", + dataset_name = "dagster_mongo", + destination = "bigquery" + ), + } +) +``` We can run the `dagster dev` command to start the web server. We can see that each collection is converted into a separate asset by Dagster. We can materialize our assets to ingest the data into BigQuery.