Skip to content

Commit

Permalink
docs: add typechecking to embedded snippets (#1130)
Browse files Browse the repository at this point in the history
* fixes a couple of typechecking errors in the docs

* fix more snippets and enable mypy typechecking on embedded snippets

* switch to glob for file discovery

* additional typing checks for merged in blog posts

* fix snippets after devel merge
  • Loading branch information
sh-rp authored Mar 26, 2024
1 parent cc7533a commit 9566199
Show file tree
Hide file tree
Showing 24 changed files with 76 additions and 70 deletions.
3 changes: 2 additions & 1 deletion docs/tools/check_embedded_snippets.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
LINT_TEMPLATE = "./lint_setup/template.py"
LINT_FILE = "./lint_setup/lint_me.py"

ENABLE_MYPY = False
ENABLE_MYPY = True


@dataclass
Expand Down Expand Up @@ -225,6 +225,7 @@ def typecheck_snippets(snippets: List[Snippet], verbose: bool) -> None:
failed_count += 1
fmt.warning(f"Failed to type check {str(snippet)}")
fmt.echo(result.stdout.strip())
fmt.echo(result.stderr.strip())

if failed_count:
fmt.error(f"Failed to type check {failed_count} snippets")
Expand Down
4 changes: 3 additions & 1 deletion docs/tools/lint_setup/template.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,8 @@
import os

import pendulum
from pendulum import DateTime
from datetime import datetime # noqa: I251
from pendulum import DateTime

import dlt
from dlt.common import json
Expand All @@ -26,10 +26,12 @@
BaseConfiguration,
)
from dlt.common.storages.configuration import FileSystemCredentials
from dlt.pipeline.exceptions import PipelineStepFailed

# some universal variables
pipeline: dlt.Pipeline = None # type: ignore[assignment]
p: dlt.Pipeline = None # type: ignore[assignment]
ex: Exception = None # type: ignore[assignment]
load_info: LoadInfo = None # type: ignore[assignment]
url: str = None # type: ignore[assignment]
my_resource: DltResource = None # type: ignore[assignment]
25 changes: 11 additions & 14 deletions docs/tools/utils.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
from typing import List
import os
import glob

import dlt.cli.echo as fmt

Expand All @@ -15,24 +16,20 @@ def collect_markdown_files(verbose: bool) -> List[str]:

# collect docs pages
markdown_files: List[str] = []
for path, _, files in os.walk(DOCS_DIR):
if "api_reference" in path:
for filepath in glob.glob(f"{DOCS_DIR}/**/*.md", recursive=True):
if "api_reference" in filepath:
continue
if "jaffle_shop" in path:
if "jaffle_shop" in filepath:
continue
for file in files:
if file.endswith(".md"):
markdown_files.append(os.path.join(path, file))
if verbose:
fmt.echo(f"Discovered {os.path.join(path, file)}")
markdown_files.append(filepath)
if verbose:
fmt.echo(f"Discovered {filepath}")

# collect blog pages
for path, _, files in os.walk(BLOG_DIR):
for file in files:
if file.endswith(".md"):
markdown_files.append(os.path.join(path, file))
if verbose:
fmt.echo(f"Discovered {os.path.join(path, file)}")
for filepath in glob.glob(f"{BLOG_DIR}/**/*.md", recursive=True):
markdown_files.append(filepath)
if verbose:
fmt.echo(f"Discovered {filepath}")

if len(markdown_files) < 50: # sanity check
fmt.error("Found too few files. Something went wrong.")
Expand Down
6 changes: 3 additions & 3 deletions docs/website/blog/2023-08-24-dlt-etlt.md
Original file line number Diff line number Diff line change
Expand Up @@ -150,16 +150,16 @@ def pseudonymize_name(doc):
# 1. Create an instance of the source so you can edit it.
data_source = dummy_source()
# 2. Modify this source instance's resource
data_source = data_source.dummy_data().add_map(pseudonymize_name)
data_resource = data_source.dummy_data().add_map(pseudonymize_name)
# 3. Inspect your result
for row in data_source:
for row in data_resource:
print(row)
#{'id': 0, 'name': '96259edb2b28b48bebce8278c550e99fbdc4a3fac8189e6b90f183ecff01c442'}
#{'id': 1, 'name': '92d3972b625cbd21f28782fb5c89552ce1aa09281892a2ab32aee8feeb3544a1'}
#{'id': 2, 'name': '443679926a7cff506a3b5d5d094dc7734861352b9e0791af5d39db5a7356d11a'}

pipeline = dlt.pipeline(pipeline_name='example', destination='bigquery', dataset_name='normalized_data')
load_info = pipeline.run(data_source)
load_info = pipeline.run(data_resource)

```

Expand Down
6 changes: 2 additions & 4 deletions docs/website/blog/2023-10-26-dlt-prefect.md
Original file line number Diff line number Diff line change
Expand Up @@ -132,7 +132,6 @@ from typing import List
import dlt
import pendulum
from pendulum import datetime
from slack import slack_source
Expand All @@ -144,7 +143,7 @@ def get_resources() -> List[str]:
"""Fetch a list of available dlt resources so we can fetch them one at a time"""
# ...
def load_channel_history(channel: str, start_date: datetime) -> None:
def load_channel_history(channel: str, start_date: Date) -> None:
"""Execute a pipeline that will load the given Slack channel incrementally beginning at the given start date."""
# ...
Expand Down Expand Up @@ -201,7 +200,6 @@ from typing import List
import dlt
import pendulum
from pendulum import datetime
from prefect import flow, task
from slack import slack_source
Expand All @@ -214,7 +212,7 @@ def get_resources() -> List[str]:
...
@task
def load_channel_history(channel: str, start_date: datetime) -> None:
def load_channel_history(channel: str, start_date: pendulum.Date) -> None:
...
@task
Expand Down
26 changes: 13 additions & 13 deletions docs/website/blog/2023-11-01-dlt-dagster.md
Original file line number Diff line number Diff line change
Expand Up @@ -141,13 +141,13 @@ This will generate the default files for Dagster that we will use as a starting

### Step 4: Add configurable resources and define the asset

- Define a `DltResource` class in `resources/__init__.py` as a Dagster configurable resource. This class allows you to reuse pipeline code inside an asset.
- Define a `DDltResource` class in `resources/__init__.py` as a Dagster configurable resource. This class allows you to reuse pipeline code inside an asset.

```py
from dagster import ConfigurableResource
import dlt

class DltResource(ConfigurableResource):
class DDltResource(ConfigurableResource):
pipeline_name: str
dataset_name: str
destination: str
Expand All @@ -169,18 +169,18 @@ class DltResource(ConfigurableResource):

```py
from dagster import asset, get_dagster_logger
from ..resources import DltResource
from ..resources import DDltResource
from ..dlt import github_issues_resource

@asset
def issues_pipeline(pipeline: DltResource):
def issues_pipeline(pipeline: DDltResource):

logger = get_dagster_logger()
results = pipeline.create_pipeline(github_issues_resource, table_name='github_issues')
logger.info(results)
```

The defined asset (**issues_pipeline**) takes as input the configurable resource (**DltResource**). In the asset, we use the configurable resource to create a dlt pipeline by using an instance of the configurable resource (**DltResource**) to call the `create_pipeline` function. The `dlt.resource` (**github_issues_resource**) is passed to the `create_pipeline` function. The `create_pipeline` function normalizes the data and ingests it into BigQuery.
The defined asset (**issues_pipeline**) takes as input the configurable resource (**DDltResource**). In the asset, we use the configurable resource to create a dlt pipeline by using an instance of the configurable resource (**DDltResource**) to call the `create_pipeline` function. The `dlt.resource` (**github_issues_resource**) is passed to the `create_pipeline` function. The `create_pipeline` function normalizes the data and ingests it into BigQuery.

### Step 5: Handle Schema Evolution

Expand All @@ -191,7 +191,7 @@ The defined asset (**issues_pipeline**) takes as input the configurable resource
```py
from dagster import AssetExecutionContext
@asset
def issues_pipeline(context: AssetExecutionContext, pipeline: DltResource):
def issues_pipeline(context: AssetExecutionContext, pipeline: DDltResource):
...
md_content=""
for package in result.load_packages:
Expand All @@ -215,7 +215,7 @@ defs = Definitions(
assets=all_assets,
jobs=[simple_pipeline],
resources={
"pipeline": DltResource(
"pipeline": DDltResource(
pipeline_name = "github_issues",
dataset_name = "dagster_github_issues",
destination = "bigquery",
Expand Down Expand Up @@ -299,7 +299,7 @@ For this example, we are using MongoDB Atlas. Set up the account for MongoDB Atl
Next, create a `.env` file and add the BigQuery and MongoDB credentials to the file. The `.env` file should reside in the root directory.


### Step 3: Adding the DltResource
### Step 3: Adding the DDltResource

Create a `DltResouce` under the **resources** directory. Add the following code to the `__init__.py`:

Expand All @@ -308,7 +308,7 @@ from dagster import ConfigurableResource

import dlt

class DltResource(ConfigurableResource):
class DDltResource(ConfigurableResource):
pipeline_name: str
dataset_name: str
destination: str
Expand Down Expand Up @@ -337,7 +337,7 @@ In the `mongodb_pipeline.py` file, locate the `load_select_collection_hint_db` f

```py
from ..mongodb import mongodb
from ..resources import DltResource
from ..resources import DDltResource

import dlt
import os
Expand All @@ -363,7 +363,7 @@ def dlt_asset_factory(collection_list):
for stream in collection_name}

)
def collections_asset(context: OpExecutionContext, pipeline: DltResource):
def collections_asset(context: OpExecutionContext, pipeline: DDltResource):

# Getting Data From MongoDB
data = mongodb(URL, db).with_resources(*collection_name)
Expand All @@ -390,12 +390,12 @@ Add the definitions in the `__init__.py` in the root directory:
from dagster import Definitions

from .assets import dlt_assets
from .resources import DltResource
from .resources import DDltResource

defs = Definitions(
assets=dlt_assets,
resources={
"pipeline": DltResource(
"pipeline": DDltResource(
pipeline_name = "mongo",
dataset_name = "dagster_mongo",
destination = "bigquery"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -81,9 +81,9 @@ in-depth guide, please refer to the detailed documentation.
1. Set the environment to Python 3.10 and prepare to insert code into main.py:
```py
import dlt
import json
import time
from google.cloud import bigquery
from dlt.common import json

def github_webhook(request):
# Extract relevant data from the request payload
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -194,9 +194,9 @@ To integrate dlt and dbt in cloud functions, use the dlt-dbt runner; here’s ho
```py
import dlt
import logging
import json
from flask import jsonify
from dlt.common.runtime.slack import send_slack_message
from dlt.common import json
def run_pipeline(request):
"""
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ The full signature of the destination decorator plus its function is the followi
loader_file_format="jsonl",
name="my_custom_destination",
naming_convention="direct",
max_nesting_level=0,
max_table_nesting=0,
skip_dlt_columns_and_tables=True
)
def my_destination(items: TDataItems, table: TTableSchema) -> None:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -212,7 +212,7 @@ verified source.
base_id = base_id,
table_names = table_names
)
load_info = pipeline.run(airtables, write_deposition = "replace")
load_info = pipeline.run(airtables, write_disposition = "replace")
```

> You have the option to use table names or table IDs in the code above, in place of "Table1" and
Expand Down
8 changes: 5 additions & 3 deletions docs/website/docs/dlt-ecosystem/verified-sources/chess.md
Original file line number Diff line number Diff line change
Expand Up @@ -124,7 +124,9 @@ def players_profiles(players: List[str]) -> Iterator[TDataItem]:
@dlt.defer
def _get_profile(username: str) -> TDataItem:
return get_path_with_retry(f"player/{username}")
...

for username in players:
yield _get_profile(username)
```

`players`: Is a list of player usernames for which you want to fetch profile data.
Expand Down Expand Up @@ -155,10 +157,10 @@ specified otherwise.
@dlt.resource(write_disposition="append")
def players_games(
players: List[str], start_month: str = None, end_month: str = None
) -> Iterator[Callable[[], List[TDataItem]]]:
) -> Iterator[TDataItems]:
# gets a list of already checked(loaded) archives.
checked_archives = dlt.current.resource_state().setdefault("archives", [])
...
yield {} # return your retrieved data here
```

`players`: Is a list of player usernames for which you want to fetch games.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -451,7 +451,8 @@ verified source.
)
# pretty print the information on data that was loaded
print(load_info)
print(listing)(pipeline.last_trace.last_normalize_info)
print(listing)
print(pipeline.last_trace.last_normalize_info)
```

1. Cleanup after loading:
Expand Down
3 changes: 2 additions & 1 deletion docs/website/docs/dlt-ecosystem/verified-sources/jira.md
Original file line number Diff line number Diff line change
Expand Up @@ -176,7 +176,8 @@ The resource function searches issues using JQL queries and then loads them to t
```py
@dlt.resource(write_disposition="replace")
def issues(jql_queries: List[str]) -> Iterable[TDataItem]:
api_path = "rest/api/3/search"
api_path = "rest/api/3/search"
return {} # return the retrieved values here
```

`jql_queries`: Accepts a list of JQL queries.
Expand Down
7 changes: 4 additions & 3 deletions docs/website/docs/dlt-ecosystem/verified-sources/pipedrive.md
Original file line number Diff line number Diff line change
Expand Up @@ -210,10 +210,11 @@ create and store a mapping of custom fields for different entities in the source
```py
@dlt.resource(selected=False)
def create_state(pipedrive_api_key: str) -> Iterator[Dict[str, Any]]:
def _get_pages_for_rename(
entity: str, fields_entity: str, pipedrive_api_key: str
) -> Dict[str, Any]:
def _get_pages_for_rename(
entity: str, fields_entity: str, pipedrive_api_key: str
) -> Dict[str, Any]:
...
yield _get_pages_for_rename("", "", "")
```

It processes each entity in ENTITY_MAPPINGS, updating the custom fields mapping if a related fields
Expand Down
6 changes: 3 additions & 3 deletions docs/website/docs/general-usage/credentials/config_specs.md
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,7 @@ credentials = ConnectionStringCredentials()
credentials.drivername = "postgresql"
credentials.database = "my_database"
credentials.username = "my_user"
credentials.password = "my_password"
credentials.password = "my_password" # type: ignore
credentials.host = "localhost"
credentials.port = 5432

Expand All @@ -120,8 +120,8 @@ Usage:
```py
credentials = OAuth2Credentials(
client_id="CLIENT_ID",
client_secret="CLIENT_SECRET",
refresh_token="REFRESH_TOKEN",
client_secret="CLIENT_SECRET", # type: ignore
refresh_token="REFRESH_TOKEN", # type: ignore
scopes=["scope1", "scope2"]
)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,11 +51,11 @@ for row in dummy_source().dummy_data.add_map(pseudonymize_name):
# 1. Create an instance of the source so you can edit it.
data_source = dummy_source()
# 2. Modify this source instance's resource
data_source = data_source.dummy_data.add_map(pseudonymize_name)
data_resource = data_source.dummy_data.add_map(pseudonymize_name)
# 3. Inspect your result
for row in data_source:
for row in data_resource:
print(row)

pipeline = dlt.pipeline(pipeline_name='example', destination='bigquery', dataset_name='normalized_data')
load_info = pipeline.run(data_source)
load_info = pipeline.run(data_resource)
```
Original file line number Diff line number Diff line change
Expand Up @@ -44,10 +44,10 @@ def replace_umlauts_in_dict_keys(d):
data_source = dummy_source()

# 2. Modify this source instance's resource
data_source = data_source.dummy_data().add_map(replace_umlauts_in_dict_keys)
data_resource = data_source.dummy_data().add_map(replace_umlauts_in_dict_keys)

# 3. Inspect your result
for row in data_source:
for row in data_resource:
print(row)

# {'Objekt_0': {'Groesse': 0, 'Aequivalenzpruefung': True}}
Expand Down
Loading

0 comments on commit 9566199

Please sign in to comment.