Skip to content

Commit

Permalink
update
Browse files Browse the repository at this point in the history
  • Loading branch information
AstrakhantsevaAA committed Oct 26, 2023
1 parent fa3f781 commit 3b11cc6
Show file tree
Hide file tree
Showing 2 changed files with 43 additions and 27 deletions.
29 changes: 17 additions & 12 deletions docs/website/docs/examples/chess_production/code/chess-snippets.py
Original file line number Diff line number Diff line change
Expand Up @@ -69,8 +69,7 @@ def players_games(username: Any) -> Iterator[TDataItems]:
retry=retry_if_exception(retry_load(("extract", "load"))),
reraise=True,
)
def load_data_with_retry():
data = chess(chess_url="https://api.chess.com/pub/", max_players=max_players)
def load_data_with_retry(data):
return pipeline.run(data)

# @@@DLT_SNIPPET_END markdown_retry
Expand All @@ -83,11 +82,12 @@ def load_data_with_retry():
pipeline_name="chess_pipeline",
destination="duckdb",
dataset_name="chess_data",
full_refresh=True,
)
max_players = 5
load_info = pipeline.run(
chess(chess_url="https://api.chess.com/pub/", max_players=max_players)
)
# get data for a few famous players
data = chess(chess_url="https://api.chess.com/pub/", max_players=max_players)
load_info = pipeline.run(data)
print(load_info)
# @@@DLT_SNIPPET_END markdown_pipeline

Expand Down Expand Up @@ -143,8 +143,6 @@ def load_data_with_retry():
from dlt.common.runtime.slack import send_slack_message
from dlt.pipeline.helpers import retry_load

# get data for a few famous players
data = chess(chess_url="https://api.chess.com/pub/", max_players=max_players)
try:
for attempt in Retrying(
stop=stop_after_attempt(5),
Expand All @@ -160,17 +158,17 @@ def load_data_with_retry():
# @@@DLT_SNIPPET_END markdown_retry_cm

# @@@DLT_SNIPPET_START markdown_retry_run
load_info_retry = load_data_with_retry()
load_info_retry = load_data_with_retry(data)
# @@@DLT_SNIPPET_END markdown_retry_run

# @@@DLT_SNIPPET_START markdown_sql_client
with pipeline.sql_client() as client:
with client.execute_query("SELECT COUNT(*) FROM players") as cursor:
count = cursor.fetchone()[0]
if count == 0:
count_client = cursor.fetchone()[0]
if count_client == 0:
print("Warning: No data in players table")
else:
print(f"Players table contains {count} rows")
print(f"Players table contains {count_client} rows")
# @@@DLT_SNIPPET_END markdown_sql_client

# @@@DLT_SNIPPET_START markdown_norm_info
Expand All @@ -182,7 +180,6 @@ def load_data_with_retry():
print(f"Players table contains {count} rows")
# @@@DLT_SNIPPET_END markdown_norm_info


# check that stuff was loaded
row_counts = pipeline.last_trace.last_normalize_info.row_counts
assert row_counts["players"] == max_players
Expand All @@ -192,3 +189,11 @@ def load_data_with_retry():

jobs_num = len(load_info.load_packages[0].jobs["completed_jobs"])
assert jobs_num == 4

packages_num = len(load_info_retry.load_packages)
assert packages_num == 1

jobs_num = len(load_info_retry.load_packages[0].jobs["completed_jobs"])
assert jobs_num == 3

assert count_client == max_players
41 changes: 26 additions & 15 deletions docs/website/docs/examples/chess_production/index.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ keywords: [incremental loading, example]
import Header from '../_examples-header.md';

<Header
intro="In this tutorial, you will learn how to investigate and track your loads."
intro="In this tutorial, you will learn how to investigate, track, retry and test your loads."
slug="chess_production"
run_file="chess" />

Expand All @@ -17,13 +17,12 @@ In this example, you'll find a Python script that interacts with the Chess API t

We'll learn how to:

- Examining packages after they have been loaded.
- Reloading load information, schema updates, and traces.
- Inspecting packages after they have been loaded.
- Loading back load information, schema updates, and traces.
- Triggering notifications in case of schema evolution.
- Using context managers to independently retry pipeline stages.
- Run basic tests utilizing sql_client and normalize_info.


## Init pipeline

<!--@@@DLT_SNIPPET_START ./code/chess-snippets.py::markdown_source-->
Expand Down Expand Up @@ -90,11 +89,12 @@ if __name__ == "__main__":
pipeline_name="chess_pipeline",
destination="duckdb",
dataset_name="chess_data",
full_refresh=True,
)
max_players = 5
load_info = pipeline.run(
chess(chess_url="https://api.chess.com/pub/", max_players=max_players)
)
# get data for a few famous players
data = chess(chess_url="https://api.chess.com/pub/", max_players=max_players)
load_info = pipeline.run(data)
print(load_info)
```
<!--@@@DLT_SNIPPET_END ./code/chess-snippets.py::markdown_pipeline-->
Expand Down Expand Up @@ -158,6 +158,20 @@ if schema_updates:
```
<!--@@@DLT_SNIPPET_END ./code/chess-snippets.py::markdown_notify-->

You can configure Slack incoming hook via
`secrets.toml` or environment variables.

In `secrets.toml`:
```toml
[runtime]
slack_incoming_hook="https://hooks.slack.com/services/T04DHMAF13Q/B04E7B1MQ1H/TDHEI123WUEE"
```
ENV:

```python
RUNTIME__SLACK_INCOMING_HOOK="https://hooks.slack.com/services/T04DHMAF13Q/B04E7B1MQ1H/TDHEI123WUEE"
```

## Using context managers to retry pipeline stages separately

To use context managers to retry pipeline stages separately for the pipeline:
Expand All @@ -174,8 +188,6 @@ from tenacity import (
from dlt.common.runtime.slack import send_slack_message
from dlt.pipeline.helpers import retry_load

# get data for a few famous players
data = chess(chess_url="https://api.chess.com/pub/", max_players=max_players)
try:
for attempt in Retrying(
stop=stop_after_attempt(5),
Expand Down Expand Up @@ -205,15 +217,14 @@ from dlt.pipeline.helpers import retry_load
retry=retry_if_exception(retry_load(("extract", "load"))),
reraise=True,
)
def load_data_with_retry():
data = chess(chess_url="https://api.chess.com/pub/", max_players=max_players)
def load_data_with_retry(data):
return pipeline.run(data)
```
<!--@@@DLT_SNIPPET_END ./code/chess-snippets.py::markdown_retry-->

<!--@@@DLT_SNIPPET_START ./code/chess-snippets.py::markdown_retry_run-->
```py
load_info = load_data_with_retry()
load_info_retry = load_data_with_retry(data)
```
<!--@@@DLT_SNIPPET_END ./code/chess-snippets.py::markdown_retry_run-->

Expand All @@ -225,11 +236,11 @@ warning if there is no data, you can use the `execute_query` method:
```py
with pipeline.sql_client() as client:
with client.execute_query("SELECT COUNT(*) FROM players") as cursor:
count = cursor.fetchone()[0]
if count == 0:
count_client = cursor.fetchone()[0]
if count_client == 0:
print("Warning: No data in players table")
else:
print(f"Players table contains {count} rows")
print(f"Players table contains {count_client} rows")
```
<!--@@@DLT_SNIPPET_END ./code/chess-snippets.py::markdown_sql_client-->

Expand Down

0 comments on commit 3b11cc6

Please sign in to comment.