From 8e665ac4b0c4542dd106225477c0f98bbf438873 Mon Sep 17 00:00:00 2001 From: Dave Date: Fri, 22 Mar 2024 10:57:21 +0100 Subject: [PATCH 1/5] fixes a couple of typechecking errors in the docs --- docs/tools/lint_setup/template.py | 4 +++- .../docs/dlt-ecosystem/destinations/destination.md | 2 +- .../dlt-ecosystem/verified-sources/airtable.md | 2 +- .../docs/dlt-ecosystem/verified-sources/chess.md | 6 ++++-- .../dlt-ecosystem/verified-sources/filesystem.md | 3 ++- .../docs/dlt-ecosystem/verified-sources/jira.md | 3 ++- .../dlt-ecosystem/verified-sources/pipedrive.md | 7 ++++--- .../docs/general-usage/credentials/config_specs.md | 14 +++++++------- .../pseudonymizing_columns.md | 6 +++--- .../customising-pipelines/renaming_columns.md | 4 ++-- .../user_agent_device_data_enrichment.md | 8 ++++---- docs/website/docs/general-usage/resource.md | 4 +++- .../website/docs/general-usage/schema-contracts.md | 2 +- .../website/docs/general-usage/schema-evolution.md | 2 +- docs/website/docs/general-usage/schema.md | 4 ++-- .../deploy-gcp-cloud-function-as-webhook.md | 6 +++--- 16 files changed, 43 insertions(+), 34 deletions(-) diff --git a/docs/tools/lint_setup/template.py b/docs/tools/lint_setup/template.py index dcfada63f6..6b207ceb0b 100644 --- a/docs/tools/lint_setup/template.py +++ b/docs/tools/lint_setup/template.py @@ -8,8 +8,8 @@ import os import pendulum -from pendulum import DateTime from datetime import datetime # noqa: I251 +from pendulum import DateTime import dlt from dlt.common import json @@ -26,6 +26,7 @@ BaseConfiguration, ) from dlt.common.storages.configuration import FileSystemCredentials +from dlt.pipeline.exceptions import PipelineStepFailed # some universal variables pipeline: dlt.Pipeline = None # type: ignore[assignment] @@ -33,3 +34,4 @@ ex: Exception = None # type: ignore[assignment] load_info: LoadInfo = None # type: ignore[assignment] url: str = None # type: ignore[assignment] +my_resource: DltResource = None # type: ignore[assignment] \ No newline at end of file diff --git a/docs/website/docs/dlt-ecosystem/destinations/destination.md b/docs/website/docs/dlt-ecosystem/destinations/destination.md index 60753d90b5..c9a0bff022 100644 --- a/docs/website/docs/dlt-ecosystem/destinations/destination.md +++ b/docs/website/docs/dlt-ecosystem/destinations/destination.md @@ -54,7 +54,7 @@ The full signature of the destination decorator plus its function is the followi loader_file_format="jsonl", name="my_custom_destination", naming_convention="direct", - max_nesting_level=0, + max_table_nesting=0, skip_dlt_columns_and_tables=True ) def my_destination(items: TDataItems, table: TTableSchema) -> None: diff --git a/docs/website/docs/dlt-ecosystem/verified-sources/airtable.md b/docs/website/docs/dlt-ecosystem/verified-sources/airtable.md index bd04dbfcf3..f6b16ef944 100644 --- a/docs/website/docs/dlt-ecosystem/verified-sources/airtable.md +++ b/docs/website/docs/dlt-ecosystem/verified-sources/airtable.md @@ -215,7 +215,7 @@ verified source. base_id = base_id, table_names = table_names ) - load_info = pipeline.run(airtables, write_deposition = "replace") + load_info = pipeline.run(airtables, write_disposition = "replace") ``` > You have the option to use table names or table IDs in the code above, in place of "Table1" and diff --git a/docs/website/docs/dlt-ecosystem/verified-sources/chess.md b/docs/website/docs/dlt-ecosystem/verified-sources/chess.md index 6ae457d1e6..62776b5c53 100644 --- a/docs/website/docs/dlt-ecosystem/verified-sources/chess.md +++ b/docs/website/docs/dlt-ecosystem/verified-sources/chess.md @@ -127,7 +127,9 @@ def players_profiles(players: List[str]) -> Iterator[TDataItem]: @dlt.defer def _get_profile(username: str) -> TDataItem: return get_path_with_retry(f"player/{username}") - ... + + for username in players: + yield _get_profile(username) ``` `players`: Is a list of player usernames for which you want to fetch profile data. @@ -161,7 +163,7 @@ def players_games( ) -> Iterator[Callable[[], List[TDataItem]]]: # gets a list of already checked(loaded) archives. checked_archives = dlt.current.resource_state().setdefault("archives", []) - ... + yield {} # return your retrieved data here ``` `players`: Is a list of player usernames for which you want to fetch games. diff --git a/docs/website/docs/dlt-ecosystem/verified-sources/filesystem.md b/docs/website/docs/dlt-ecosystem/verified-sources/filesystem.md index bf30da8882..9e0d46c563 100644 --- a/docs/website/docs/dlt-ecosystem/verified-sources/filesystem.md +++ b/docs/website/docs/dlt-ecosystem/verified-sources/filesystem.md @@ -453,7 +453,8 @@ verified source. ) # pretty print the information on data that was loaded print(load_info) - print(listing)(pipeline.last_trace.last_normalize_info) + print(listing) + print(pipeline.last_trace.last_normalize_info) ``` 1. Cleanup after loading: diff --git a/docs/website/docs/dlt-ecosystem/verified-sources/jira.md b/docs/website/docs/dlt-ecosystem/verified-sources/jira.md index 38dacb0541..26c4462c34 100644 --- a/docs/website/docs/dlt-ecosystem/verified-sources/jira.md +++ b/docs/website/docs/dlt-ecosystem/verified-sources/jira.md @@ -173,7 +173,8 @@ The resource function searches issues using JQL queries and then loads them to t ```py @dlt.resource(write_disposition="replace") def issues(jql_queries: List[str]) -> Iterable[TDataItem]: - api_path = "rest/api/3/search" + api_path = "rest/api/3/search" + return {} # return the retrieved values here ``` `jql_queries`: Accepts a list of JQL queries. diff --git a/docs/website/docs/dlt-ecosystem/verified-sources/pipedrive.md b/docs/website/docs/dlt-ecosystem/verified-sources/pipedrive.md index 3dc815d53b..1e570bfe7a 100644 --- a/docs/website/docs/dlt-ecosystem/verified-sources/pipedrive.md +++ b/docs/website/docs/dlt-ecosystem/verified-sources/pipedrive.md @@ -213,10 +213,11 @@ create and store a mapping of custom fields for different entities in the source ```py @dlt.resource(selected=False) def create_state(pipedrive_api_key: str) -> Iterator[Dict[str, Any]]: - def _get_pages_for_rename( - entity: str, fields_entity: str, pipedrive_api_key: str - ) -> Dict[str, Any]: + def _get_pages_for_rename( + entity: str, fields_entity: str, pipedrive_api_key: str + ) -> Dict[str, Any]: ... + yield _get_pages_for_rename("", "", "") ``` It processes each entity in ENTITY_MAPPINGS, updating the custom fields mapping if a related fields diff --git a/docs/website/docs/general-usage/credentials/config_specs.md b/docs/website/docs/general-usage/credentials/config_specs.md index e93e1c466a..e66939fc39 100644 --- a/docs/website/docs/general-usage/credentials/config_specs.md +++ b/docs/website/docs/general-usage/credentials/config_specs.md @@ -94,7 +94,7 @@ credentials = ConnectionStringCredentials() credentials.drivername = "postgresql" credentials.database = "my_database" credentials.username = "my_user" -credentials.password = "my_password" +credentials.password = "my_password" # type: ignore credentials.host = "localhost" credentials.port = 5432 @@ -118,12 +118,12 @@ It also allows for the addition of scopes and provides methods for client authen Usage: ```py -credentials = OAuth2Credentials( - client_id="CLIENT_ID", - client_secret="CLIENT_SECRET", - refresh_token="REFRESH_TOKEN", - scopes=["scope1", "scope2"] -) +credentials = OAuth2Credentials({ + "client_id": "CLIENT_ID", + "client_secret": "CLIENT_SECRET", + "refresh_token": "REFRESH_TOKEN", + "scopes": ["scope1", "scope2"] +}) # Authorize the client credentials.auth() diff --git a/docs/website/docs/general-usage/customising-pipelines/pseudonymizing_columns.md b/docs/website/docs/general-usage/customising-pipelines/pseudonymizing_columns.md index ba0b13636b..eff6f795ac 100644 --- a/docs/website/docs/general-usage/customising-pipelines/pseudonymizing_columns.md +++ b/docs/website/docs/general-usage/customising-pipelines/pseudonymizing_columns.md @@ -51,11 +51,11 @@ for row in dummy_source().dummy_data.add_map(pseudonymize_name): # 1. Create an instance of the source so you can edit it. data_source = dummy_source() # 2. Modify this source instance's resource -data_source = data_source.dummy_data.add_map(pseudonymize_name) +data_resource = data_source.dummy_data.add_map(pseudonymize_name) # 3. Inspect your result -for row in data_source: +for row in data_resource: print(row) pipeline = dlt.pipeline(pipeline_name='example', destination='bigquery', dataset_name='normalized_data') -load_info = pipeline.run(data_source) +load_info = pipeline.run(data_resource) ``` diff --git a/docs/website/docs/general-usage/customising-pipelines/renaming_columns.md b/docs/website/docs/general-usage/customising-pipelines/renaming_columns.md index 04e4d33b13..4cbb4d7b32 100644 --- a/docs/website/docs/general-usage/customising-pipelines/renaming_columns.md +++ b/docs/website/docs/general-usage/customising-pipelines/renaming_columns.md @@ -44,10 +44,10 @@ def replace_umlauts_in_dict_keys(d): data_source = dummy_source() # 2. Modify this source instance's resource -data_source = data_source.dummy_data().add_map(replace_umlauts_in_dict_keys) +data_resource = data_source.dummy_data().add_map(replace_umlauts_in_dict_keys) # 3. Inspect your result -for row in data_source: +for row in data_resource: print(row) # {'Objekt_0': {'Groesse': 0, 'Aequivalenzpruefung': True}} diff --git a/docs/website/docs/general-usage/data-enrichments/user_agent_device_data_enrichment.md b/docs/website/docs/general-usage/data-enrichments/user_agent_device_data_enrichment.md index 6b07845689..3aadb2f982 100644 --- a/docs/website/docs/general-usage/data-enrichments/user_agent_device_data_enrichment.md +++ b/docs/website/docs/general-usage/data-enrichments/user_agent_device_data_enrichment.md @@ -127,7 +127,7 @@ The first step is to register on [SerpAPI](https://serpapi.com/) and obtain the 1. Create `fetch_average_price()` function as follows: ```py - import datetime + from datetime import datetime, timedelta import requests # Uncomment transformer function if it is to be used as a transformer, @@ -160,7 +160,7 @@ The first step is to register on [SerpAPI](https://serpapi.com/) and obtain the device_info = dlt.current.resource_state().setdefault("devices", {}) # Current timestamp for checking the last update - current_timestamp = datetime.datetime.now() + current_timestamp = datetime.now() # Print the current device information # print(device_info) # if you need to check state @@ -172,10 +172,10 @@ The first step is to register on [SerpAPI](https://serpapi.com/) and obtain the # Calculate the time since the last update last_updated = ( current_timestamp - - device_data.get('timestamp', datetime.datetime.min) + device_data.get('timestamp', datetime.min) ) # Check if the device is not in state or data is older than 180 days - if device not in device_info or last_updated > datetime.timedelta(days=180): + if device not in device_info or last_updated > timedelta(days=180): try: # Make an API request to fetch device prices response = requests.get("https://serpapi.com/search", params={ diff --git a/docs/website/docs/general-usage/resource.md b/docs/website/docs/general-usage/resource.md index e2e95d937f..67609b8989 100644 --- a/docs/website/docs/general-usage/resource.md +++ b/docs/website/docs/general-usage/resource.md @@ -63,6 +63,7 @@ accepts following arguments: ... # the `table_schema` method gets table schema generated by a resource + # TODO: needs fixing print(get_users().table_schema()) ``` @@ -154,6 +155,7 @@ def repo_events() -> Iterator[TDataItems]: # the `table_schema` method gets table schema generated by a resource and takes optional # data item to evaluate dynamic hints +# TODO: needs fixing print(repo_events().table_schema({"type": "WatchEvent", id:...})) ``` @@ -283,7 +285,7 @@ def get_orders(): yield o # users and orders will be iterated in parallel in two separate threads -pipeline.run(get_users(), get_orders()) +pipeline.run[(get_users(), get_orders()]) ``` Async generators are automatically extracted concurrently with other resources: diff --git a/docs/website/docs/general-usage/schema-contracts.md b/docs/website/docs/general-usage/schema-contracts.md index 1b5e67357a..c79d240520 100644 --- a/docs/website/docs/general-usage/schema-contracts.md +++ b/docs/website/docs/general-usage/schema-contracts.md @@ -124,7 +124,7 @@ As with any other exception coming from pipeline run, it will be re-raised via ` ```py try: pipeline.run() -except Exception as pip_ex: +except PipelineStepFailed as pip_ex: if pip_ex.step == "normalize": if isinstance(pip_ex.__context__.__context__, DataValidationError): ... diff --git a/docs/website/docs/general-usage/schema-evolution.md b/docs/website/docs/general-usage/schema-evolution.md index 377df0e47f..dd3aa0bf8a 100644 --- a/docs/website/docs/general-usage/schema-evolution.md +++ b/docs/website/docs/general-usage/schema-evolution.md @@ -163,7 +163,7 @@ data = [{ pipeline = dlt.pipeline("organizations_pipeline", destination="duckdb") # Adding not null constraint -pipeline.run(data, table_name="org", columns={"room": {"data_type": "integer", "nullable": False}}) +pipeline.run(data, table_name="org", columns={"room": {"data_type": "bigint", "nullable": False}}) ``` During pipeline execution a data validation error indicates that a removed column is being passed as null. diff --git a/docs/website/docs/general-usage/schema.md b/docs/website/docs/general-usage/schema.md index 164814010d..9b0d8ec622 100644 --- a/docs/website/docs/general-usage/schema.md +++ b/docs/website/docs/general-usage/schema.md @@ -317,7 +317,7 @@ def textual(nesting_level: int): schema.remove_type_detection("iso_timestamp") # convert UNIX timestamp (float, withing a year from NOW) into timestamp schema.add_type_detection("timestamp") - schema.compile_settings() + schema._compile_settings() - return dlt.resource(...) + return dlt.resource([]) ``` diff --git a/docs/website/docs/walkthroughs/deploy-a-pipeline/deploy-gcp-cloud-function-as-webhook.md b/docs/website/docs/walkthroughs/deploy-a-pipeline/deploy-gcp-cloud-function-as-webhook.md index fc32aa2c30..29a0ae86f8 100644 --- a/docs/website/docs/walkthroughs/deploy-a-pipeline/deploy-gcp-cloud-function-as-webhook.md +++ b/docs/website/docs/walkthroughs/deploy-a-pipeline/deploy-gcp-cloud-function-as-webhook.md @@ -17,10 +17,10 @@ You can setup GCP cloud function webhook using `dlt` as follows: ```py import dlt - import json import time from google.cloud import bigquery - + from dlt.common import json + def your_webhook(request): # Extract relevant data from the request payload data = request.get_json() @@ -40,7 +40,7 @@ You can setup GCP cloud function webhook using `dlt` as follows: 7. Set the function name as "your_webhook" in the Entry point field. 8. In the requirements.txt file, specify the necessary packages: - ```py + ```text # Function dependencies, for example: # package>=version dlt From a8ebc3b64511317011c301b1c8b500d4da559490 Mon Sep 17 00:00:00 2001 From: Dave Date: Fri, 22 Mar 2024 11:28:24 +0100 Subject: [PATCH 2/5] fix more snippets and enable mypy typechecking on embedded snippets --- docs/tools/check_embedded_snippets.py | 3 +-- docs/website/docs/dlt-ecosystem/verified-sources/chess.md | 2 +- docs/website/docs/general-usage/resource.md | 8 +++----- 3 files changed, 5 insertions(+), 8 deletions(-) diff --git a/docs/tools/check_embedded_snippets.py b/docs/tools/check_embedded_snippets.py index da27c1aa19..9c74025cab 100644 --- a/docs/tools/check_embedded_snippets.py +++ b/docs/tools/check_embedded_snippets.py @@ -18,8 +18,7 @@ LINT_TEMPLATE = "./lint_setup/template.py" LINT_FILE = "./lint_setup/lint_me.py" -ENABLE_MYPY = False - +ENABLE_MYPY = True @dataclass class Snippet: diff --git a/docs/website/docs/dlt-ecosystem/verified-sources/chess.md b/docs/website/docs/dlt-ecosystem/verified-sources/chess.md index 62776b5c53..03699bc9ce 100644 --- a/docs/website/docs/dlt-ecosystem/verified-sources/chess.md +++ b/docs/website/docs/dlt-ecosystem/verified-sources/chess.md @@ -160,7 +160,7 @@ specified otherwise. @dlt.resource(write_disposition="append") def players_games( players: List[str], start_month: str = None, end_month: str = None -) -> Iterator[Callable[[], List[TDataItem]]]: +) -> Iterator[TDataItems]: # gets a list of already checked(loaded) archives. checked_archives = dlt.current.resource_state().setdefault("archives", []) yield {} # return your retrieved data here diff --git a/docs/website/docs/general-usage/resource.md b/docs/website/docs/general-usage/resource.md index 67609b8989..66c4281d8d 100644 --- a/docs/website/docs/general-usage/resource.md +++ b/docs/website/docs/general-usage/resource.md @@ -63,8 +63,7 @@ accepts following arguments: ... # the `table_schema` method gets table schema generated by a resource - # TODO: needs fixing - print(get_users().table_schema()) + print(get_users().compute_table_schema()) ``` > 💡 You can pass dynamic hints which are functions that take the data item as input and return a @@ -155,8 +154,7 @@ def repo_events() -> Iterator[TDataItems]: # the `table_schema` method gets table schema generated by a resource and takes optional # data item to evaluate dynamic hints -# TODO: needs fixing -print(repo_events().table_schema({"type": "WatchEvent", id:...})) +print(repo_events().compute_table_schema({"type": "WatchEvent", id:...})) ``` In more advanced cases, you can dispatch data to different tables directly in the code of the @@ -285,7 +283,7 @@ def get_orders(): yield o # users and orders will be iterated in parallel in two separate threads -pipeline.run[(get_users(), get_orders()]) +pipeline.run([get_users(), get_orders()]) ``` Async generators are automatically extracted concurrently with other resources: From c6896c1f56332019a58eeaf984e51bc4f467f545 Mon Sep 17 00:00:00 2001 From: Dave Date: Mon, 25 Mar 2024 13:00:52 +0100 Subject: [PATCH 3/5] switch to glob for file discovery --- docs/tools/utils.py | 25 +++++++++++-------------- 1 file changed, 11 insertions(+), 14 deletions(-) diff --git a/docs/tools/utils.py b/docs/tools/utils.py index b7d401b893..7cbfe8fea3 100644 --- a/docs/tools/utils.py +++ b/docs/tools/utils.py @@ -1,5 +1,6 @@ from typing import List import os +import glob import dlt.cli.echo as fmt @@ -15,24 +16,20 @@ def collect_markdown_files(verbose: bool) -> List[str]: # collect docs pages markdown_files: List[str] = [] - for path, _, files in os.walk(DOCS_DIR): - if "api_reference" in path: + for filepath in glob.glob(f'{DOCS_DIR}/**/*.md', recursive=True): + if "api_reference" in filepath: continue - if "jaffle_shop" in path: + if "jaffle_shop" in filepath: continue - for file in files: - if file.endswith(".md"): - markdown_files.append(os.path.join(path, file)) - if verbose: - fmt.echo(f"Discovered {os.path.join(path, file)}") + markdown_files.append(filepath) + if verbose: + fmt.echo(f"Discovered {filepath}") # collect blog pages - for path, _, files in os.walk(BLOG_DIR): - for file in files: - if file.endswith(".md"): - markdown_files.append(os.path.join(path, file)) - if verbose: - fmt.echo(f"Discovered {os.path.join(path, file)}") + for filepath in glob.glob(f'{BLOG_DIR}/**/*.md', recursive=True): + markdown_files.append(filepath) + if verbose: + fmt.echo(f"Discovered {filepath}") if len(markdown_files) < 50: # sanity check fmt.error("Found too few files. Something went wrong.") From ad30f862a1f2f7234b739ecffa2ca33cb0f0421d Mon Sep 17 00:00:00 2001 From: Dave Date: Mon, 25 Mar 2024 13:14:34 +0100 Subject: [PATCH 4/5] additional typing checks for merged in blog posts --- docs/website/blog/2023-08-24-dlt-etlt.md | 6 ++--- docs/website/blog/2023-10-26-dlt-prefect.md | 6 ++--- docs/website/blog/2023-11-01-dlt-dagster.md | 26 +++++++++---------- ...1-22-dlt-webhooks-event-based-ingestion.md | 2 +- ...01-15-dlt-dbt-runner-on-cloud-functions.md | 2 +- 5 files changed, 20 insertions(+), 22 deletions(-) diff --git a/docs/website/blog/2023-08-24-dlt-etlt.md b/docs/website/blog/2023-08-24-dlt-etlt.md index fb8215c9a0..a36b169a99 100644 --- a/docs/website/blog/2023-08-24-dlt-etlt.md +++ b/docs/website/blog/2023-08-24-dlt-etlt.md @@ -150,16 +150,16 @@ def pseudonymize_name(doc): # 1. Create an instance of the source so you can edit it. data_source = dummy_source() # 2. Modify this source instance's resource -data_source = data_source.dummy_data().add_map(pseudonymize_name) +data_resource = data_source.dummy_data().add_map(pseudonymize_name) # 3. Inspect your result -for row in data_source: +for row in data_resource: print(row) #{'id': 0, 'name': '96259edb2b28b48bebce8278c550e99fbdc4a3fac8189e6b90f183ecff01c442'} #{'id': 1, 'name': '92d3972b625cbd21f28782fb5c89552ce1aa09281892a2ab32aee8feeb3544a1'} #{'id': 2, 'name': '443679926a7cff506a3b5d5d094dc7734861352b9e0791af5d39db5a7356d11a'} pipeline = dlt.pipeline(pipeline_name='example', destination='bigquery', dataset_name='normalized_data') -load_info = pipeline.run(data_source) +load_info = pipeline.run(data_resource) ``` diff --git a/docs/website/blog/2023-10-26-dlt-prefect.md b/docs/website/blog/2023-10-26-dlt-prefect.md index 6e9caa3fea..85fa47a5c8 100644 --- a/docs/website/blog/2023-10-26-dlt-prefect.md +++ b/docs/website/blog/2023-10-26-dlt-prefect.md @@ -132,7 +132,6 @@ from typing import List import dlt import pendulum -from pendulum import datetime from slack import slack_source @@ -144,7 +143,7 @@ def get_resources() -> List[str]: """Fetch a list of available dlt resources so we can fetch them one at a time""" # ... -def load_channel_history(channel: str, start_date: datetime) -> None: +def load_channel_history(channel: str, start_date: Date) -> None: """Execute a pipeline that will load the given Slack channel incrementally beginning at the given start date.""" # ... @@ -201,7 +200,6 @@ from typing import List import dlt import pendulum -from pendulum import datetime from prefect import flow, task from slack import slack_source @@ -214,7 +212,7 @@ def get_resources() -> List[str]: ... @task -def load_channel_history(channel: str, start_date: datetime) -> None: +def load_channel_history(channel: str, start_date: pendulum.Date) -> None: ... @task diff --git a/docs/website/blog/2023-11-01-dlt-dagster.md b/docs/website/blog/2023-11-01-dlt-dagster.md index dc05a35bff..687e8444c4 100644 --- a/docs/website/blog/2023-11-01-dlt-dagster.md +++ b/docs/website/blog/2023-11-01-dlt-dagster.md @@ -141,13 +141,13 @@ This will generate the default files for Dagster that we will use as a starting ### Step 4: Add configurable resources and define the asset -- Define a `DltResource` class in `resources/__init__.py` as a Dagster configurable resource. This class allows you to reuse pipeline code inside an asset. +- Define a `DDltResource` class in `resources/__init__.py` as a Dagster configurable resource. This class allows you to reuse pipeline code inside an asset. ```py from dagster import ConfigurableResource import dlt -class DltResource(ConfigurableResource): +class DDltResource(ConfigurableResource): pipeline_name: str dataset_name: str destination: str @@ -169,18 +169,18 @@ class DltResource(ConfigurableResource): ```py from dagster import asset, get_dagster_logger -from ..resources import DltResource +from ..resources import DDltResource from ..dlt import github_issues_resource @asset -def issues_pipeline(pipeline: DltResource): +def issues_pipeline(pipeline: DDltResource): logger = get_dagster_logger() results = pipeline.create_pipeline(github_issues_resource, table_name='github_issues') logger.info(results) ``` -The defined asset (**issues_pipeline**) takes as input the configurable resource (**DltResource**). In the asset, we use the configurable resource to create a dlt pipeline by using an instance of the configurable resource (**DltResource**) to call the `create_pipeline` function. The `dlt.resource` (**github_issues_resource**) is passed to the `create_pipeline` function. The `create_pipeline` function normalizes the data and ingests it into BigQuery. +The defined asset (**issues_pipeline**) takes as input the configurable resource (**DDltResource**). In the asset, we use the configurable resource to create a dlt pipeline by using an instance of the configurable resource (**DDltResource**) to call the `create_pipeline` function. The `dlt.resource` (**github_issues_resource**) is passed to the `create_pipeline` function. The `create_pipeline` function normalizes the data and ingests it into BigQuery. ### Step 5: Handle Schema Evolution @@ -191,7 +191,7 @@ The defined asset (**issues_pipeline**) takes as input the configurable resource ```py from dagster import AssetExecutionContext @asset -def issues_pipeline(context: AssetExecutionContext, pipeline: DltResource): +def issues_pipeline(context: AssetExecutionContext, pipeline: DDltResource): ... md_content="" for package in result.load_packages: @@ -215,7 +215,7 @@ defs = Definitions( assets=all_assets, jobs=[simple_pipeline], resources={ - "pipeline": DltResource( + "pipeline": DDltResource( pipeline_name = "github_issues", dataset_name = "dagster_github_issues", destination = "bigquery", @@ -299,7 +299,7 @@ For this example, we are using MongoDB Atlas. Set up the account for MongoDB Atl Next, create a `.env` file and add the BigQuery and MongoDB credentials to the file. The `.env` file should reside in the root directory. -### Step 3: Adding the DltResource +### Step 3: Adding the DDltResource Create a `DltResouce` under the **resources** directory. Add the following code to the `__init__.py`: @@ -308,7 +308,7 @@ from dagster import ConfigurableResource import dlt -class DltResource(ConfigurableResource): +class DDltResource(ConfigurableResource): pipeline_name: str dataset_name: str destination: str @@ -337,7 +337,7 @@ In the `mongodb_pipeline.py` file, locate the `load_select_collection_hint_db` f ```py from ..mongodb import mongodb -from ..resources import DltResource +from ..resources import DDltResource import dlt import os @@ -363,7 +363,7 @@ def dlt_asset_factory(collection_list): for stream in collection_name} ) - def collections_asset(context: OpExecutionContext, pipeline: DltResource): + def collections_asset(context: OpExecutionContext, pipeline: DDltResource): # Getting Data From MongoDB data = mongodb(URL, db).with_resources(*collection_name) @@ -390,12 +390,12 @@ Add the definitions in the `__init__.py` in the root directory: from dagster import Definitions from .assets import dlt_assets -from .resources import DltResource +from .resources import DDltResource defs = Definitions( assets=dlt_assets, resources={ - "pipeline": DltResource( + "pipeline": DDltResource( pipeline_name = "mongo", dataset_name = "dagster_mongo", destination = "bigquery" diff --git a/docs/website/blog/2023-11-22-dlt-webhooks-event-based-ingestion.md b/docs/website/blog/2023-11-22-dlt-webhooks-event-based-ingestion.md index aa433dc883..94fb89790e 100644 --- a/docs/website/blog/2023-11-22-dlt-webhooks-event-based-ingestion.md +++ b/docs/website/blog/2023-11-22-dlt-webhooks-event-based-ingestion.md @@ -81,9 +81,9 @@ in-depth guide, please refer to the detailed documentation. 1. Set the environment to Python 3.10 and prepare to insert code into main.py: ```py import dlt - import json import time from google.cloud import bigquery + from dlt.common import json def github_webhook(request): # Extract relevant data from the request payload diff --git a/docs/website/blog/2024-01-15-dlt-dbt-runner-on-cloud-functions.md b/docs/website/blog/2024-01-15-dlt-dbt-runner-on-cloud-functions.md index e21154d98e..059dd97a06 100644 --- a/docs/website/blog/2024-01-15-dlt-dbt-runner-on-cloud-functions.md +++ b/docs/website/blog/2024-01-15-dlt-dbt-runner-on-cloud-functions.md @@ -194,9 +194,9 @@ To integrate dlt and dbt in cloud functions, use the dlt-dbt runner; here’s ho ```py import dlt import logging - import json from flask import jsonify from dlt.common.runtime.slack import send_slack_message + from dlt.common import json def run_pipeline(request): """ From 51ea6f10133c4cb7ce0bb9244ba8b962f3d8b07a Mon Sep 17 00:00:00 2001 From: Dave Date: Tue, 26 Mar 2024 11:47:09 +0100 Subject: [PATCH 5/5] fix snippets after devel merge --- docs/tools/check_embedded_snippets.py | 2 ++ docs/tools/lint_setup/template.py | 2 +- docs/tools/utils.py | 4 ++-- .../docs/general-usage/credentials/config_specs.md | 12 ++++++------ mypy.ini | 3 +++ 5 files changed, 14 insertions(+), 9 deletions(-) diff --git a/docs/tools/check_embedded_snippets.py b/docs/tools/check_embedded_snippets.py index 9c74025cab..96e1227745 100644 --- a/docs/tools/check_embedded_snippets.py +++ b/docs/tools/check_embedded_snippets.py @@ -20,6 +20,7 @@ ENABLE_MYPY = True + @dataclass class Snippet: index: int @@ -224,6 +225,7 @@ def typecheck_snippets(snippets: List[Snippet], verbose: bool) -> None: failed_count += 1 fmt.warning(f"Failed to type check {str(snippet)}") fmt.echo(result.stdout.strip()) + fmt.echo(result.stderr.strip()) if failed_count: fmt.error(f"Failed to type check {failed_count} snippets") diff --git a/docs/tools/lint_setup/template.py b/docs/tools/lint_setup/template.py index 6b207ceb0b..c72c4dba62 100644 --- a/docs/tools/lint_setup/template.py +++ b/docs/tools/lint_setup/template.py @@ -34,4 +34,4 @@ ex: Exception = None # type: ignore[assignment] load_info: LoadInfo = None # type: ignore[assignment] url: str = None # type: ignore[assignment] -my_resource: DltResource = None # type: ignore[assignment] \ No newline at end of file +my_resource: DltResource = None # type: ignore[assignment] diff --git a/docs/tools/utils.py b/docs/tools/utils.py index 7cbfe8fea3..f71d68bd86 100644 --- a/docs/tools/utils.py +++ b/docs/tools/utils.py @@ -16,7 +16,7 @@ def collect_markdown_files(verbose: bool) -> List[str]: # collect docs pages markdown_files: List[str] = [] - for filepath in glob.glob(f'{DOCS_DIR}/**/*.md', recursive=True): + for filepath in glob.glob(f"{DOCS_DIR}/**/*.md", recursive=True): if "api_reference" in filepath: continue if "jaffle_shop" in filepath: @@ -26,7 +26,7 @@ def collect_markdown_files(verbose: bool) -> List[str]: fmt.echo(f"Discovered {filepath}") # collect blog pages - for filepath in glob.glob(f'{BLOG_DIR}/**/*.md', recursive=True): + for filepath in glob.glob(f"{BLOG_DIR}/**/*.md", recursive=True): markdown_files.append(filepath) if verbose: fmt.echo(f"Discovered {filepath}") diff --git a/docs/website/docs/general-usage/credentials/config_specs.md b/docs/website/docs/general-usage/credentials/config_specs.md index e66939fc39..30b401727d 100644 --- a/docs/website/docs/general-usage/credentials/config_specs.md +++ b/docs/website/docs/general-usage/credentials/config_specs.md @@ -118,12 +118,12 @@ It also allows for the addition of scopes and provides methods for client authen Usage: ```py -credentials = OAuth2Credentials({ - "client_id": "CLIENT_ID", - "client_secret": "CLIENT_SECRET", - "refresh_token": "REFRESH_TOKEN", - "scopes": ["scope1", "scope2"] -}) +credentials = OAuth2Credentials( + client_id="CLIENT_ID", + client_secret="CLIENT_SECRET", # type: ignore + refresh_token="REFRESH_TOKEN", # type: ignore + scopes=["scope1", "scope2"] +) # Authorize the client credentials.auth() diff --git a/mypy.ini b/mypy.ini index 829da1c6ce..089fde35aa 100644 --- a/mypy.ini +++ b/mypy.ini @@ -116,4 +116,7 @@ ignore_missing_imports = True ignore_missing_imports = True [mypy-dotenv.*] +ignore_missing_imports = True + +[mypy-pytz.*] ignore_missing_imports = True \ No newline at end of file