diff --git a/docs/website/blog/2023-08-24-dlt-etlt.md b/docs/website/blog/2023-08-24-dlt-etlt.md index fb8215c9a0..a36b169a99 100644 --- a/docs/website/blog/2023-08-24-dlt-etlt.md +++ b/docs/website/blog/2023-08-24-dlt-etlt.md @@ -150,16 +150,16 @@ def pseudonymize_name(doc): # 1. Create an instance of the source so you can edit it. data_source = dummy_source() # 2. Modify this source instance's resource -data_source = data_source.dummy_data().add_map(pseudonymize_name) +data_resource = data_source.dummy_data().add_map(pseudonymize_name) # 3. Inspect your result -for row in data_source: +for row in data_resource: print(row) #{'id': 0, 'name': '96259edb2b28b48bebce8278c550e99fbdc4a3fac8189e6b90f183ecff01c442'} #{'id': 1, 'name': '92d3972b625cbd21f28782fb5c89552ce1aa09281892a2ab32aee8feeb3544a1'} #{'id': 2, 'name': '443679926a7cff506a3b5d5d094dc7734861352b9e0791af5d39db5a7356d11a'} pipeline = dlt.pipeline(pipeline_name='example', destination='bigquery', dataset_name='normalized_data') -load_info = pipeline.run(data_source) +load_info = pipeline.run(data_resource) ``` diff --git a/docs/website/blog/2023-10-26-dlt-prefect.md b/docs/website/blog/2023-10-26-dlt-prefect.md index 6e9caa3fea..85fa47a5c8 100644 --- a/docs/website/blog/2023-10-26-dlt-prefect.md +++ b/docs/website/blog/2023-10-26-dlt-prefect.md @@ -132,7 +132,6 @@ from typing import List import dlt import pendulum -from pendulum import datetime from slack import slack_source @@ -144,7 +143,7 @@ def get_resources() -> List[str]: """Fetch a list of available dlt resources so we can fetch them one at a time""" # ... -def load_channel_history(channel: str, start_date: datetime) -> None: +def load_channel_history(channel: str, start_date: Date) -> None: """Execute a pipeline that will load the given Slack channel incrementally beginning at the given start date.""" # ... @@ -201,7 +200,6 @@ from typing import List import dlt import pendulum -from pendulum import datetime from prefect import flow, task from slack import slack_source @@ -214,7 +212,7 @@ def get_resources() -> List[str]: ... @task -def load_channel_history(channel: str, start_date: datetime) -> None: +def load_channel_history(channel: str, start_date: pendulum.Date) -> None: ... @task diff --git a/docs/website/blog/2023-11-01-dlt-dagster.md b/docs/website/blog/2023-11-01-dlt-dagster.md index dc05a35bff..687e8444c4 100644 --- a/docs/website/blog/2023-11-01-dlt-dagster.md +++ b/docs/website/blog/2023-11-01-dlt-dagster.md @@ -141,13 +141,13 @@ This will generate the default files for Dagster that we will use as a starting ### Step 4: Add configurable resources and define the asset -- Define a `DltResource` class in `resources/__init__.py` as a Dagster configurable resource. This class allows you to reuse pipeline code inside an asset. +- Define a `DDltResource` class in `resources/__init__.py` as a Dagster configurable resource. This class allows you to reuse pipeline code inside an asset. ```py from dagster import ConfigurableResource import dlt -class DltResource(ConfigurableResource): +class DDltResource(ConfigurableResource): pipeline_name: str dataset_name: str destination: str @@ -169,18 +169,18 @@ class DltResource(ConfigurableResource): ```py from dagster import asset, get_dagster_logger -from ..resources import DltResource +from ..resources import DDltResource from ..dlt import github_issues_resource @asset -def issues_pipeline(pipeline: DltResource): +def issues_pipeline(pipeline: DDltResource): logger = get_dagster_logger() results = pipeline.create_pipeline(github_issues_resource, table_name='github_issues') logger.info(results) ``` -The defined asset (**issues_pipeline**) takes as input the configurable resource (**DltResource**). In the asset, we use the configurable resource to create a dlt pipeline by using an instance of the configurable resource (**DltResource**) to call the `create_pipeline` function. The `dlt.resource` (**github_issues_resource**) is passed to the `create_pipeline` function. The `create_pipeline` function normalizes the data and ingests it into BigQuery. +The defined asset (**issues_pipeline**) takes as input the configurable resource (**DDltResource**). In the asset, we use the configurable resource to create a dlt pipeline by using an instance of the configurable resource (**DDltResource**) to call the `create_pipeline` function. The `dlt.resource` (**github_issues_resource**) is passed to the `create_pipeline` function. The `create_pipeline` function normalizes the data and ingests it into BigQuery. ### Step 5: Handle Schema Evolution @@ -191,7 +191,7 @@ The defined asset (**issues_pipeline**) takes as input the configurable resource ```py from dagster import AssetExecutionContext @asset -def issues_pipeline(context: AssetExecutionContext, pipeline: DltResource): +def issues_pipeline(context: AssetExecutionContext, pipeline: DDltResource): ... md_content="" for package in result.load_packages: @@ -215,7 +215,7 @@ defs = Definitions( assets=all_assets, jobs=[simple_pipeline], resources={ - "pipeline": DltResource( + "pipeline": DDltResource( pipeline_name = "github_issues", dataset_name = "dagster_github_issues", destination = "bigquery", @@ -299,7 +299,7 @@ For this example, we are using MongoDB Atlas. Set up the account for MongoDB Atl Next, create a `.env` file and add the BigQuery and MongoDB credentials to the file. The `.env` file should reside in the root directory. -### Step 3: Adding the DltResource +### Step 3: Adding the DDltResource Create a `DltResouce` under the **resources** directory. Add the following code to the `__init__.py`: @@ -308,7 +308,7 @@ from dagster import ConfigurableResource import dlt -class DltResource(ConfigurableResource): +class DDltResource(ConfigurableResource): pipeline_name: str dataset_name: str destination: str @@ -337,7 +337,7 @@ In the `mongodb_pipeline.py` file, locate the `load_select_collection_hint_db` f ```py from ..mongodb import mongodb -from ..resources import DltResource +from ..resources import DDltResource import dlt import os @@ -363,7 +363,7 @@ def dlt_asset_factory(collection_list): for stream in collection_name} ) - def collections_asset(context: OpExecutionContext, pipeline: DltResource): + def collections_asset(context: OpExecutionContext, pipeline: DDltResource): # Getting Data From MongoDB data = mongodb(URL, db).with_resources(*collection_name) @@ -390,12 +390,12 @@ Add the definitions in the `__init__.py` in the root directory: from dagster import Definitions from .assets import dlt_assets -from .resources import DltResource +from .resources import DDltResource defs = Definitions( assets=dlt_assets, resources={ - "pipeline": DltResource( + "pipeline": DDltResource( pipeline_name = "mongo", dataset_name = "dagster_mongo", destination = "bigquery" diff --git a/docs/website/blog/2023-11-22-dlt-webhooks-event-based-ingestion.md b/docs/website/blog/2023-11-22-dlt-webhooks-event-based-ingestion.md index aa433dc883..94fb89790e 100644 --- a/docs/website/blog/2023-11-22-dlt-webhooks-event-based-ingestion.md +++ b/docs/website/blog/2023-11-22-dlt-webhooks-event-based-ingestion.md @@ -81,9 +81,9 @@ in-depth guide, please refer to the detailed documentation. 1. Set the environment to Python 3.10 and prepare to insert code into main.py: ```py import dlt - import json import time from google.cloud import bigquery + from dlt.common import json def github_webhook(request): # Extract relevant data from the request payload diff --git a/docs/website/blog/2024-01-15-dlt-dbt-runner-on-cloud-functions.md b/docs/website/blog/2024-01-15-dlt-dbt-runner-on-cloud-functions.md index e21154d98e..059dd97a06 100644 --- a/docs/website/blog/2024-01-15-dlt-dbt-runner-on-cloud-functions.md +++ b/docs/website/blog/2024-01-15-dlt-dbt-runner-on-cloud-functions.md @@ -194,9 +194,9 @@ To integrate dlt and dbt in cloud functions, use the dlt-dbt runner; here’s ho ```py import dlt import logging - import json from flask import jsonify from dlt.common.runtime.slack import send_slack_message + from dlt.common import json def run_pipeline(request): """