diff --git a/docs/website/docs/examples/chess_production/code/chess-snippets.py b/docs/website/docs/examples/chess_production/code/chess-snippets.py index eefb59352e..215477ec0a 100644 --- a/docs/website/docs/examples/chess_production/code/chess-snippets.py +++ b/docs/website/docs/examples/chess_production/code/chess-snippets.py @@ -69,8 +69,7 @@ def players_games(username: Any) -> Iterator[TDataItems]: retry=retry_if_exception(retry_load(("extract", "load"))), reraise=True, ) - def load_data_with_retry(): - data = chess(chess_url="https://api.chess.com/pub/", max_players=max_players) + def load_data_with_retry(data): return pipeline.run(data) # @@@DLT_SNIPPET_END markdown_retry @@ -83,11 +82,12 @@ def load_data_with_retry(): pipeline_name="chess_pipeline", destination="duckdb", dataset_name="chess_data", + full_refresh=True, ) max_players = 5 - load_info = pipeline.run( - chess(chess_url="https://api.chess.com/pub/", max_players=max_players) - ) + # get data for a few famous players + data = chess(chess_url="https://api.chess.com/pub/", max_players=max_players) + load_info = pipeline.run(data) print(load_info) # @@@DLT_SNIPPET_END markdown_pipeline @@ -143,8 +143,6 @@ def load_data_with_retry(): from dlt.common.runtime.slack import send_slack_message from dlt.pipeline.helpers import retry_load - # get data for a few famous players - data = chess(chess_url="https://api.chess.com/pub/", max_players=max_players) try: for attempt in Retrying( stop=stop_after_attempt(5), @@ -160,17 +158,17 @@ def load_data_with_retry(): # @@@DLT_SNIPPET_END markdown_retry_cm # @@@DLT_SNIPPET_START markdown_retry_run - load_info_retry = load_data_with_retry() + load_info_retry = load_data_with_retry(data) # @@@DLT_SNIPPET_END markdown_retry_run # @@@DLT_SNIPPET_START markdown_sql_client with pipeline.sql_client() as client: with client.execute_query("SELECT COUNT(*) FROM players") as cursor: - count = cursor.fetchone()[0] - if count == 0: + count_client = cursor.fetchone()[0] + if count_client == 0: print("Warning: No data in players table") else: - print(f"Players table contains {count} rows") + print(f"Players table contains {count_client} rows") # @@@DLT_SNIPPET_END markdown_sql_client # @@@DLT_SNIPPET_START markdown_norm_info @@ -182,7 +180,6 @@ def load_data_with_retry(): print(f"Players table contains {count} rows") # @@@DLT_SNIPPET_END markdown_norm_info - # check that stuff was loaded row_counts = pipeline.last_trace.last_normalize_info.row_counts assert row_counts["players"] == max_players @@ -192,3 +189,11 @@ def load_data_with_retry(): jobs_num = len(load_info.load_packages[0].jobs["completed_jobs"]) assert jobs_num == 4 + + packages_num = len(load_info_retry.load_packages) + assert packages_num == 1 + + jobs_num = len(load_info_retry.load_packages[0].jobs["completed_jobs"]) + assert jobs_num == 3 + + assert count_client == max_players diff --git a/docs/website/docs/examples/chess_production/index.md b/docs/website/docs/examples/chess_production/index.md index 4f25e91e5a..6dc308f0e8 100644 --- a/docs/website/docs/examples/chess_production/index.md +++ b/docs/website/docs/examples/chess_production/index.md @@ -7,7 +7,7 @@ keywords: [incremental loading, example] import Header from '../_examples-header.md';
@@ -17,13 +17,12 @@ In this example, you'll find a Python script that interacts with the Chess API t We'll learn how to: -- Examining packages after they have been loaded. -- Reloading load information, schema updates, and traces. +- Inspecting packages after they have been loaded. +- Loading back load information, schema updates, and traces. - Triggering notifications in case of schema evolution. - Using context managers to independently retry pipeline stages. - Run basic tests utilizing sql_client and normalize_info. - ## Init pipeline @@ -90,11 +89,12 @@ if __name__ == "__main__": pipeline_name="chess_pipeline", destination="duckdb", dataset_name="chess_data", + full_refresh=True, ) max_players = 5 - load_info = pipeline.run( - chess(chess_url="https://api.chess.com/pub/", max_players=max_players) - ) + # get data for a few famous players + data = chess(chess_url="https://api.chess.com/pub/", max_players=max_players) + load_info = pipeline.run(data) print(load_info) ``` @@ -158,6 +158,20 @@ if schema_updates: ``` +You can configure Slack incoming hook via +`secrets.toml` or environment variables. + +In `secrets.toml`: +```toml +[runtime] +slack_incoming_hook="https://hooks.slack.com/services/T04DHMAF13Q/B04E7B1MQ1H/TDHEI123WUEE" +``` +ENV: + +```python +RUNTIME__SLACK_INCOMING_HOOK="https://hooks.slack.com/services/T04DHMAF13Q/B04E7B1MQ1H/TDHEI123WUEE" +``` + ## Using context managers to retry pipeline stages separately To use context managers to retry pipeline stages separately for the pipeline: @@ -174,8 +188,6 @@ from tenacity import ( from dlt.common.runtime.slack import send_slack_message from dlt.pipeline.helpers import retry_load -# get data for a few famous players -data = chess(chess_url="https://api.chess.com/pub/", max_players=max_players) try: for attempt in Retrying( stop=stop_after_attempt(5), @@ -205,15 +217,14 @@ from dlt.pipeline.helpers import retry_load retry=retry_if_exception(retry_load(("extract", "load"))), reraise=True, ) -def load_data_with_retry(): - data = chess(chess_url="https://api.chess.com/pub/", max_players=max_players) +def load_data_with_retry(data): return pipeline.run(data) ``` ```py -load_info = load_data_with_retry() +load_info_retry = load_data_with_retry(data) ``` @@ -225,11 +236,11 @@ warning if there is no data, you can use the `execute_query` method: ```py with pipeline.sql_client() as client: with client.execute_query("SELECT COUNT(*) FROM players") as cursor: - count = cursor.fetchone()[0] - if count == 0: + count_client = cursor.fetchone()[0] + if count_client == 0: print("Warning: No data in players table") else: - print(f"Players table contains {count} rows") + print(f"Players table contains {count_client} rows") ```