Skip to content

Commit

Permalink
additional typing checks for merged in blog posts
Browse files Browse the repository at this point in the history
  • Loading branch information
sh-rp committed Mar 25, 2024
1 parent c6896c1 commit ad30f86
Show file tree
Hide file tree
Showing 5 changed files with 20 additions and 22 deletions.
6 changes: 3 additions & 3 deletions docs/website/blog/2023-08-24-dlt-etlt.md
Original file line number Diff line number Diff line change
Expand Up @@ -150,16 +150,16 @@ def pseudonymize_name(doc):
# 1. Create an instance of the source so you can edit it.
data_source = dummy_source()
# 2. Modify this source instance's resource
data_source = data_source.dummy_data().add_map(pseudonymize_name)
data_resource = data_source.dummy_data().add_map(pseudonymize_name)
# 3. Inspect your result
for row in data_source:
for row in data_resource:
print(row)
#{'id': 0, 'name': '96259edb2b28b48bebce8278c550e99fbdc4a3fac8189e6b90f183ecff01c442'}
#{'id': 1, 'name': '92d3972b625cbd21f28782fb5c89552ce1aa09281892a2ab32aee8feeb3544a1'}
#{'id': 2, 'name': '443679926a7cff506a3b5d5d094dc7734861352b9e0791af5d39db5a7356d11a'}

pipeline = dlt.pipeline(pipeline_name='example', destination='bigquery', dataset_name='normalized_data')
load_info = pipeline.run(data_source)
load_info = pipeline.run(data_resource)

```

Expand Down
6 changes: 2 additions & 4 deletions docs/website/blog/2023-10-26-dlt-prefect.md
Original file line number Diff line number Diff line change
Expand Up @@ -132,7 +132,6 @@ from typing import List
import dlt
import pendulum
from pendulum import datetime
from slack import slack_source
Expand All @@ -144,7 +143,7 @@ def get_resources() -> List[str]:
"""Fetch a list of available dlt resources so we can fetch them one at a time"""
# ...
def load_channel_history(channel: str, start_date: datetime) -> None:
def load_channel_history(channel: str, start_date: Date) -> None:
"""Execute a pipeline that will load the given Slack channel incrementally beginning at the given start date."""
# ...
Expand Down Expand Up @@ -201,7 +200,6 @@ from typing import List
import dlt
import pendulum
from pendulum import datetime
from prefect import flow, task
from slack import slack_source
Expand All @@ -214,7 +212,7 @@ def get_resources() -> List[str]:
...
@task
def load_channel_history(channel: str, start_date: datetime) -> None:
def load_channel_history(channel: str, start_date: pendulum.Date) -> None:
...
@task
Expand Down
26 changes: 13 additions & 13 deletions docs/website/blog/2023-11-01-dlt-dagster.md
Original file line number Diff line number Diff line change
Expand Up @@ -141,13 +141,13 @@ This will generate the default files for Dagster that we will use as a starting

### Step 4: Add configurable resources and define the asset

- Define a `DltResource` class in `resources/__init__.py` as a Dagster configurable resource. This class allows you to reuse pipeline code inside an asset.
- Define a `DDltResource` class in `resources/__init__.py` as a Dagster configurable resource. This class allows you to reuse pipeline code inside an asset.

```py
from dagster import ConfigurableResource
import dlt

class DltResource(ConfigurableResource):
class DDltResource(ConfigurableResource):
pipeline_name: str
dataset_name: str
destination: str
Expand All @@ -169,18 +169,18 @@ class DltResource(ConfigurableResource):

```py
from dagster import asset, get_dagster_logger
from ..resources import DltResource
from ..resources import DDltResource
from ..dlt import github_issues_resource

@asset
def issues_pipeline(pipeline: DltResource):
def issues_pipeline(pipeline: DDltResource):

logger = get_dagster_logger()
results = pipeline.create_pipeline(github_issues_resource, table_name='github_issues')
logger.info(results)
```

The defined asset (**issues_pipeline**) takes as input the configurable resource (**DltResource**). In the asset, we use the configurable resource to create a dlt pipeline by using an instance of the configurable resource (**DltResource**) to call the `create_pipeline` function. The `dlt.resource` (**github_issues_resource**) is passed to the `create_pipeline` function. The `create_pipeline` function normalizes the data and ingests it into BigQuery.
The defined asset (**issues_pipeline**) takes as input the configurable resource (**DDltResource**). In the asset, we use the configurable resource to create a dlt pipeline by using an instance of the configurable resource (**DDltResource**) to call the `create_pipeline` function. The `dlt.resource` (**github_issues_resource**) is passed to the `create_pipeline` function. The `create_pipeline` function normalizes the data and ingests it into BigQuery.

### Step 5: Handle Schema Evolution

Expand All @@ -191,7 +191,7 @@ The defined asset (**issues_pipeline**) takes as input the configurable resource
```py
from dagster import AssetExecutionContext
@asset
def issues_pipeline(context: AssetExecutionContext, pipeline: DltResource):
def issues_pipeline(context: AssetExecutionContext, pipeline: DDltResource):
...
md_content=""
for package in result.load_packages:
Expand All @@ -215,7 +215,7 @@ defs = Definitions(
assets=all_assets,
jobs=[simple_pipeline],
resources={
"pipeline": DltResource(
"pipeline": DDltResource(
pipeline_name = "github_issues",
dataset_name = "dagster_github_issues",
destination = "bigquery",
Expand Down Expand Up @@ -299,7 +299,7 @@ For this example, we are using MongoDB Atlas. Set up the account for MongoDB Atl
Next, create a `.env` file and add the BigQuery and MongoDB credentials to the file. The `.env` file should reside in the root directory.


### Step 3: Adding the DltResource
### Step 3: Adding the DDltResource

Create a `DltResouce` under the **resources** directory. Add the following code to the `__init__.py`:

Expand All @@ -308,7 +308,7 @@ from dagster import ConfigurableResource

import dlt

class DltResource(ConfigurableResource):
class DDltResource(ConfigurableResource):
pipeline_name: str
dataset_name: str
destination: str
Expand Down Expand Up @@ -337,7 +337,7 @@ In the `mongodb_pipeline.py` file, locate the `load_select_collection_hint_db` f

```py
from ..mongodb import mongodb
from ..resources import DltResource
from ..resources import DDltResource

import dlt
import os
Expand All @@ -363,7 +363,7 @@ def dlt_asset_factory(collection_list):
for stream in collection_name}

)
def collections_asset(context: OpExecutionContext, pipeline: DltResource):
def collections_asset(context: OpExecutionContext, pipeline: DDltResource):

# Getting Data From MongoDB
data = mongodb(URL, db).with_resources(*collection_name)
Expand All @@ -390,12 +390,12 @@ Add the definitions in the `__init__.py` in the root directory:
from dagster import Definitions

from .assets import dlt_assets
from .resources import DltResource
from .resources import DDltResource

defs = Definitions(
assets=dlt_assets,
resources={
"pipeline": DltResource(
"pipeline": DDltResource(
pipeline_name = "mongo",
dataset_name = "dagster_mongo",
destination = "bigquery"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -81,9 +81,9 @@ in-depth guide, please refer to the detailed documentation.
1. Set the environment to Python 3.10 and prepare to insert code into main.py:
```py
import dlt
import json
import time
from google.cloud import bigquery
from dlt.common import json

def github_webhook(request):
# Extract relevant data from the request payload
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -194,9 +194,9 @@ To integrate dlt and dbt in cloud functions, use the dlt-dbt runner; here’s ho
```py
import dlt
import logging
import json
from flask import jsonify
from dlt.common.runtime.slack import send_slack_message
from dlt.common import json
def run_pipeline(request):
"""
Expand Down

0 comments on commit ad30f86

Please sign in to comment.