Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

docs: add typechecking to embedded snippets #1130

Merged
merged 7 commits into from
Mar 26, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion docs/tools/check_embedded_snippets.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
LINT_TEMPLATE = "./lint_setup/template.py"
LINT_FILE = "./lint_setup/lint_me.py"

ENABLE_MYPY = False
ENABLE_MYPY = True


@dataclass
Expand Down Expand Up @@ -225,6 +225,7 @@ def typecheck_snippets(snippets: List[Snippet], verbose: bool) -> None:
failed_count += 1
fmt.warning(f"Failed to type check {str(snippet)}")
fmt.echo(result.stdout.strip())
fmt.echo(result.stderr.strip())

if failed_count:
fmt.error(f"Failed to type check {failed_count} snippets")
Expand Down
4 changes: 3 additions & 1 deletion docs/tools/lint_setup/template.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,8 @@
import os

import pendulum
from pendulum import DateTime
from datetime import datetime # noqa: I251
from pendulum import DateTime

import dlt
from dlt.common import json
Expand All @@ -26,10 +26,12 @@
BaseConfiguration,
)
from dlt.common.storages.configuration import FileSystemCredentials
from dlt.pipeline.exceptions import PipelineStepFailed

# some universal variables
pipeline: dlt.Pipeline = None # type: ignore[assignment]
p: dlt.Pipeline = None # type: ignore[assignment]
ex: Exception = None # type: ignore[assignment]
load_info: LoadInfo = None # type: ignore[assignment]
url: str = None # type: ignore[assignment]
my_resource: DltResource = None # type: ignore[assignment]
25 changes: 11 additions & 14 deletions docs/tools/utils.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
from typing import List
import os
import glob

import dlt.cli.echo as fmt

Expand All @@ -15,24 +16,20 @@ def collect_markdown_files(verbose: bool) -> List[str]:

# collect docs pages
markdown_files: List[str] = []
for path, _, files in os.walk(DOCS_DIR):
if "api_reference" in path:
for filepath in glob.glob(f"{DOCS_DIR}/**/*.md", recursive=True):
if "api_reference" in filepath:
continue
if "jaffle_shop" in path:
if "jaffle_shop" in filepath:
continue
for file in files:
if file.endswith(".md"):
markdown_files.append(os.path.join(path, file))
if verbose:
fmt.echo(f"Discovered {os.path.join(path, file)}")
markdown_files.append(filepath)
if verbose:
fmt.echo(f"Discovered {filepath}")

# collect blog pages
for path, _, files in os.walk(BLOG_DIR):
for file in files:
if file.endswith(".md"):
markdown_files.append(os.path.join(path, file))
if verbose:
fmt.echo(f"Discovered {os.path.join(path, file)}")
for filepath in glob.glob(f"{BLOG_DIR}/**/*.md", recursive=True):
markdown_files.append(filepath)
if verbose:
fmt.echo(f"Discovered {filepath}")

if len(markdown_files) < 50: # sanity check
fmt.error("Found too few files. Something went wrong.")
Expand Down
6 changes: 3 additions & 3 deletions docs/website/blog/2023-08-24-dlt-etlt.md
Original file line number Diff line number Diff line change
Expand Up @@ -150,16 +150,16 @@ def pseudonymize_name(doc):
# 1. Create an instance of the source so you can edit it.
data_source = dummy_source()
# 2. Modify this source instance's resource
data_source = data_source.dummy_data().add_map(pseudonymize_name)
data_resource = data_source.dummy_data().add_map(pseudonymize_name)
# 3. Inspect your result
for row in data_source:
for row in data_resource:
print(row)
#{'id': 0, 'name': '96259edb2b28b48bebce8278c550e99fbdc4a3fac8189e6b90f183ecff01c442'}
#{'id': 1, 'name': '92d3972b625cbd21f28782fb5c89552ce1aa09281892a2ab32aee8feeb3544a1'}
#{'id': 2, 'name': '443679926a7cff506a3b5d5d094dc7734861352b9e0791af5d39db5a7356d11a'}

pipeline = dlt.pipeline(pipeline_name='example', destination='bigquery', dataset_name='normalized_data')
load_info = pipeline.run(data_source)
load_info = pipeline.run(data_resource)

```

Expand Down
6 changes: 2 additions & 4 deletions docs/website/blog/2023-10-26-dlt-prefect.md
Original file line number Diff line number Diff line change
Expand Up @@ -132,7 +132,6 @@ from typing import List

import dlt
import pendulum
from pendulum import datetime

from slack import slack_source

Expand All @@ -144,7 +143,7 @@ def get_resources() -> List[str]:
"""Fetch a list of available dlt resources so we can fetch them one at a time"""
# ...

def load_channel_history(channel: str, start_date: datetime) -> None:
def load_channel_history(channel: str, start_date: Date) -> None:
"""Execute a pipeline that will load the given Slack channel incrementally beginning at the given start date."""
# ...

Expand Down Expand Up @@ -201,7 +200,6 @@ from typing import List

import dlt
import pendulum
from pendulum import datetime
from prefect import flow, task
from slack import slack_source

Expand All @@ -214,7 +212,7 @@ def get_resources() -> List[str]:
...

@task
def load_channel_history(channel: str, start_date: datetime) -> None:
def load_channel_history(channel: str, start_date: pendulum.Date) -> None:
...

@task
Expand Down
26 changes: 13 additions & 13 deletions docs/website/blog/2023-11-01-dlt-dagster.md
Original file line number Diff line number Diff line change
Expand Up @@ -141,13 +141,13 @@ This will generate the default files for Dagster that we will use as a starting

### Step 4: Add configurable resources and define the asset

- Define a `DltResource` class in `resources/__init__.py` as a Dagster configurable resource. This class allows you to reuse pipeline code inside an asset.
- Define a `DDltResource` class in `resources/__init__.py` as a Dagster configurable resource. This class allows you to reuse pipeline code inside an asset.

```py
from dagster import ConfigurableResource
import dlt

class DltResource(ConfigurableResource):
class DDltResource(ConfigurableResource):
pipeline_name: str
dataset_name: str
destination: str
Expand All @@ -169,18 +169,18 @@ class DltResource(ConfigurableResource):

```py
from dagster import asset, get_dagster_logger
from ..resources import DltResource
from ..resources import DDltResource
from ..dlt import github_issues_resource

@asset
def issues_pipeline(pipeline: DltResource):
def issues_pipeline(pipeline: DDltResource):

logger = get_dagster_logger()
results = pipeline.create_pipeline(github_issues_resource, table_name='github_issues')
logger.info(results)
```

The defined asset (**issues_pipeline**) takes as input the configurable resource (**DltResource**). In the asset, we use the configurable resource to create a dlt pipeline by using an instance of the configurable resource (**DltResource**) to call the `create_pipeline` function. The `dlt.resource` (**github_issues_resource**) is passed to the `create_pipeline` function. The `create_pipeline` function normalizes the data and ingests it into BigQuery.
The defined asset (**issues_pipeline**) takes as input the configurable resource (**DDltResource**). In the asset, we use the configurable resource to create a dlt pipeline by using an instance of the configurable resource (**DDltResource**) to call the `create_pipeline` function. The `dlt.resource` (**github_issues_resource**) is passed to the `create_pipeline` function. The `create_pipeline` function normalizes the data and ingests it into BigQuery.

### Step 5: Handle Schema Evolution

Expand All @@ -191,7 +191,7 @@ The defined asset (**issues_pipeline**) takes as input the configurable resource
```py
from dagster import AssetExecutionContext
@asset
def issues_pipeline(context: AssetExecutionContext, pipeline: DltResource):
def issues_pipeline(context: AssetExecutionContext, pipeline: DDltResource):
...
md_content=""
for package in result.load_packages:
Expand All @@ -215,7 +215,7 @@ defs = Definitions(
assets=all_assets,
jobs=[simple_pipeline],
resources={
"pipeline": DltResource(
"pipeline": DDltResource(
pipeline_name = "github_issues",
dataset_name = "dagster_github_issues",
destination = "bigquery",
Expand Down Expand Up @@ -299,7 +299,7 @@ For this example, we are using MongoDB Atlas. Set up the account for MongoDB Atl
Next, create a `.env` file and add the BigQuery and MongoDB credentials to the file. The `.env` file should reside in the root directory.


### Step 3: Adding the DltResource
### Step 3: Adding the DDltResource

Create a `DltResouce` under the **resources** directory. Add the following code to the `__init__.py`:

Expand All @@ -308,7 +308,7 @@ from dagster import ConfigurableResource

import dlt

class DltResource(ConfigurableResource):
class DDltResource(ConfigurableResource):
pipeline_name: str
dataset_name: str
destination: str
Expand Down Expand Up @@ -337,7 +337,7 @@ In the `mongodb_pipeline.py` file, locate the `load_select_collection_hint_db` f

```py
from ..mongodb import mongodb
from ..resources import DltResource
from ..resources import DDltResource

import dlt
import os
Expand All @@ -363,7 +363,7 @@ def dlt_asset_factory(collection_list):
for stream in collection_name}

)
def collections_asset(context: OpExecutionContext, pipeline: DltResource):
def collections_asset(context: OpExecutionContext, pipeline: DDltResource):

# Getting Data From MongoDB
data = mongodb(URL, db).with_resources(*collection_name)
Expand All @@ -390,12 +390,12 @@ Add the definitions in the `__init__.py` in the root directory:
from dagster import Definitions

from .assets import dlt_assets
from .resources import DltResource
from .resources import DDltResource

defs = Definitions(
assets=dlt_assets,
resources={
"pipeline": DltResource(
"pipeline": DDltResource(
pipeline_name = "mongo",
dataset_name = "dagster_mongo",
destination = "bigquery"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -81,9 +81,9 @@ in-depth guide, please refer to the detailed documentation.
1. Set the environment to Python 3.10 and prepare to insert code into main.py:
```py
import dlt
import json
import time
from google.cloud import bigquery
from dlt.common import json

def github_webhook(request):
# Extract relevant data from the request payload
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -194,9 +194,9 @@ To integrate dlt and dbt in cloud functions, use the dlt-dbt runner; here’s ho
```py
import dlt
import logging
import json
from flask import jsonify
from dlt.common.runtime.slack import send_slack_message
from dlt.common import json

def run_pipeline(request):
"""
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ The full signature of the destination decorator plus its function is the followi
loader_file_format="jsonl",
name="my_custom_destination",
naming_convention="direct",
max_nesting_level=0,
max_table_nesting=0,
skip_dlt_columns_and_tables=True
)
def my_destination(items: TDataItems, table: TTableSchema) -> None:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -215,7 +215,7 @@ verified source.
base_id = base_id,
table_names = table_names
)
load_info = pipeline.run(airtables, write_deposition = "replace")
load_info = pipeline.run(airtables, write_disposition = "replace")
```

> You have the option to use table names or table IDs in the code above, in place of "Table1" and
Expand Down
8 changes: 5 additions & 3 deletions docs/website/docs/dlt-ecosystem/verified-sources/chess.md
Original file line number Diff line number Diff line change
Expand Up @@ -127,7 +127,9 @@ def players_profiles(players: List[str]) -> Iterator[TDataItem]:
@dlt.defer
def _get_profile(username: str) -> TDataItem:
return get_path_with_retry(f"player/{username}")
...

for username in players:
yield _get_profile(username)
```

`players`: Is a list of player usernames for which you want to fetch profile data.
Expand Down Expand Up @@ -158,10 +160,10 @@ specified otherwise.
@dlt.resource(write_disposition="append")
def players_games(
players: List[str], start_month: str = None, end_month: str = None
) -> Iterator[Callable[[], List[TDataItem]]]:
) -> Iterator[TDataItems]:
# gets a list of already checked(loaded) archives.
checked_archives = dlt.current.resource_state().setdefault("archives", [])
...
yield {} # return your retrieved data here
```

`players`: Is a list of player usernames for which you want to fetch games.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -453,7 +453,8 @@ verified source.
)
# pretty print the information on data that was loaded
print(load_info)
print(listing)(pipeline.last_trace.last_normalize_info)
print(listing)
print(pipeline.last_trace.last_normalize_info)
```

1. Cleanup after loading:
Expand Down
3 changes: 2 additions & 1 deletion docs/website/docs/dlt-ecosystem/verified-sources/jira.md
Original file line number Diff line number Diff line change
Expand Up @@ -173,7 +173,8 @@ The resource function searches issues using JQL queries and then loads them to t
```py
@dlt.resource(write_disposition="replace")
def issues(jql_queries: List[str]) -> Iterable[TDataItem]:
api_path = "rest/api/3/search"
api_path = "rest/api/3/search"
return {} # return the retrieved values here
```

`jql_queries`: Accepts a list of JQL queries.
Expand Down
7 changes: 4 additions & 3 deletions docs/website/docs/dlt-ecosystem/verified-sources/pipedrive.md
Original file line number Diff line number Diff line change
Expand Up @@ -213,10 +213,11 @@ create and store a mapping of custom fields for different entities in the source
```py
@dlt.resource(selected=False)
def create_state(pipedrive_api_key: str) -> Iterator[Dict[str, Any]]:
def _get_pages_for_rename(
entity: str, fields_entity: str, pipedrive_api_key: str
) -> Dict[str, Any]:
def _get_pages_for_rename(
entity: str, fields_entity: str, pipedrive_api_key: str
) -> Dict[str, Any]:
...
yield _get_pages_for_rename("", "", "")
```

It processes each entity in ENTITY_MAPPINGS, updating the custom fields mapping if a related fields
Expand Down
6 changes: 3 additions & 3 deletions docs/website/docs/general-usage/credentials/config_specs.md
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,7 @@ credentials = ConnectionStringCredentials()
credentials.drivername = "postgresql"
credentials.database = "my_database"
credentials.username = "my_user"
credentials.password = "my_password"
credentials.password = "my_password" # type: ignore
credentials.host = "localhost"
credentials.port = 5432

Expand All @@ -120,8 +120,8 @@ Usage:
```py
credentials = OAuth2Credentials(
client_id="CLIENT_ID",
client_secret="CLIENT_SECRET",
refresh_token="REFRESH_TOKEN",
client_secret="CLIENT_SECRET", # type: ignore
refresh_token="REFRESH_TOKEN", # type: ignore
scopes=["scope1", "scope2"]
)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,11 +51,11 @@ for row in dummy_source().dummy_data.add_map(pseudonymize_name):
# 1. Create an instance of the source so you can edit it.
data_source = dummy_source()
# 2. Modify this source instance's resource
data_source = data_source.dummy_data.add_map(pseudonymize_name)
data_resource = data_source.dummy_data.add_map(pseudonymize_name)
# 3. Inspect your result
for row in data_source:
for row in data_resource:
print(row)

pipeline = dlt.pipeline(pipeline_name='example', destination='bigquery', dataset_name='normalized_data')
load_info = pipeline.run(data_source)
load_info = pipeline.run(data_resource)
```
Original file line number Diff line number Diff line change
Expand Up @@ -44,10 +44,10 @@ def replace_umlauts_in_dict_keys(d):
data_source = dummy_source()

# 2. Modify this source instance's resource
data_source = data_source.dummy_data().add_map(replace_umlauts_in_dict_keys)
data_resource = data_source.dummy_data().add_map(replace_umlauts_in_dict_keys)

# 3. Inspect your result
for row in data_source:
for row in data_resource:
print(row)

# {'Objekt_0': {'Groesse': 0, 'Aequivalenzpruefung': True}}
Expand Down
Loading
Loading