From 0860155cd7508c9bc33eeb5ad5fe8bef18b89aec Mon Sep 17 00:00:00 2001 From: AstrakhantsevaAA Date: Mon, 23 Oct 2023 16:51:36 +0200 Subject: [PATCH 01/15] added chess example --- .../examples/chess_production/__init__.py | 0 .../chess_production/code/.dlt/config.toml | 2 + .../chess_production/code/.dlt/secrets.toml | 6 + .../chess_production/code/__init__.py | 0 .../chess_production/code/chess-snippets.py | 66 ++++++ .../docs/examples/chess_production/index.md | 215 ++++++++++++++++++ docs/website/sidebars.js | 1 + 7 files changed, 290 insertions(+) create mode 100644 docs/website/docs/examples/chess_production/__init__.py create mode 100644 docs/website/docs/examples/chess_production/code/.dlt/config.toml create mode 100644 docs/website/docs/examples/chess_production/code/.dlt/secrets.toml create mode 100644 docs/website/docs/examples/chess_production/code/__init__.py create mode 100644 docs/website/docs/examples/chess_production/code/chess-snippets.py create mode 100644 docs/website/docs/examples/chess_production/index.md diff --git a/docs/website/docs/examples/chess_production/__init__.py b/docs/website/docs/examples/chess_production/__init__.py new file mode 100644 index 0000000000..e69de29bb2 diff --git a/docs/website/docs/examples/chess_production/code/.dlt/config.toml b/docs/website/docs/examples/chess_production/code/.dlt/config.toml new file mode 100644 index 0000000000..be627e6c11 --- /dev/null +++ b/docs/website/docs/examples/chess_production/code/.dlt/config.toml @@ -0,0 +1,2 @@ +# @@@DLT_SNIPPET_START example +# @@@DLT_SNIPPET_END example diff --git a/docs/website/docs/examples/chess_production/code/.dlt/secrets.toml b/docs/website/docs/examples/chess_production/code/.dlt/secrets.toml new file mode 100644 index 0000000000..caf8d523c4 --- /dev/null +++ b/docs/website/docs/examples/chess_production/code/.dlt/secrets.toml @@ -0,0 +1,6 @@ +# @@@DLT_SNIPPET_START example +[sources.zendesk.credentials] +password = "" +subdomain = "" +email = "" +# @@@DLT_SNIPPET_END example diff --git a/docs/website/docs/examples/chess_production/code/__init__.py b/docs/website/docs/examples/chess_production/code/__init__.py new file mode 100644 index 0000000000..e69de29bb2 diff --git a/docs/website/docs/examples/chess_production/code/chess-snippets.py b/docs/website/docs/examples/chess_production/code/chess-snippets.py new file mode 100644 index 0000000000..64d5679167 --- /dev/null +++ b/docs/website/docs/examples/chess_production/code/chess-snippets.py @@ -0,0 +1,66 @@ +from tests.utils import skipifgithubfork + + +@skipifgithubfork +def incremental_snippet() -> None: + + # @@@DLT_SNIPPET_START example + # @@@DLT_SNIPPET_START markdown_source + import threading + from typing import Any, Iterator + + import dlt + + from dlt.common import sleep + from dlt.common.typing import StrAny, TDataItems + from dlt.sources.helpers.requests import client + + @dlt.source + def chess(chess_url: str = dlt.config.value, title: str = "GM", max_players: int = 2, year: int = 2022, month: int = 10) -> Any: + + def _get_data_with_retry(path: str) -> StrAny: + r = client.get(f"{chess_url}{path}") + return r.json() # type: ignore + + @dlt.resource(write_disposition="replace") + def players() -> Iterator[TDataItems]: + # return players one by one, you could also return a list that would be faster but we want to pass players item by item to the transformer + yield from _get_data_with_retry(f"titled/{title}")["players"][:max_players] + + # this resource takes data from players and returns profiles + # it uses `defer` decorator to enable parallel run in thread pool. defer requires return at the end so we convert yield into return (we return one item anyway) + # you can still have yielding transformers, look for the test named `test_evolve_schema` + @dlt.transformer(data_from=players, write_disposition="replace") + @dlt.defer + def players_profiles(username: Any) -> TDataItems: + print(f"getting {username} profile via thread {threading.current_thread().name}") + sleep(1) # add some latency to show parallel runs + return _get_data_with_retry(f"player/{username}") + + # this resource takes data from players and returns games for the last month if not specified otherwise + @dlt.transformer(data_from=players, write_disposition="append") + def players_games(username: Any) -> Iterator[TDataItems]: + # https://api.chess.com/pub/player/{username}/games/{YYYY}/{MM} + path = f"player/{username}/games/{year:04d}/{month:02d}" + yield _get_data_with_retry(path)["games"] + + return players(), players_profiles, players_games + + # @@@DLT_SNIPPET_END markdown_source + # @@@DLT_SNIPPET_START markdown_pipeline + __name__ = "__main__" # @@@DLT_REMOVE + if __name__ == "__main__": + # create dlt pipeline + pipeline = dlt.pipeline( + pipeline_name="chess_pipeline", destination="duckdb", dataset_name="chess_data" + ) + max_players = 5 + load_info = pipeline.run(chess(chess_url="https://api.chess.com/pub/", max_players=max_players)) + print(load_info) + # @@@DLT_SNIPPET_END markdown_pipeline + # @@@DLT_SNIPPET_END example + + # check that stuff was loaded + row_counts = pipeline.last_trace.last_normalize_info.row_counts + assert row_counts["players"] == max_players + diff --git a/docs/website/docs/examples/chess_production/index.md b/docs/website/docs/examples/chess_production/index.md new file mode 100644 index 0000000000..0ddada39f5 --- /dev/null +++ b/docs/website/docs/examples/chess_production/index.md @@ -0,0 +1,215 @@ +--- +title: Run chess pipeline in production +description: Learn how run chess pipeline in production +keywords: [incremental loading, example] +--- + +import Header from '../_examples-header.md'; + +
+ +## Run chess pipeline in production + +In this example, you'll find a Python script that interacts with the Chess API to extract players and game data. + +We'll learn how to: + +- inspect packages post load +- load back load info, schema updates, and traces +- send notifications if schema evolved +- use context managers to retry pipeline stages separately +- run simple tests with sql_client (table counts, warn if no data) +- same as above but with normalize_info + + +## Init pipeline + + +```py +import threading +from typing import Any, Iterator + +import dlt + +from dlt.common import sleep +from dlt.common.typing import StrAny, TDataItems +from dlt.sources.helpers.requests import client + +@dlt.source +def chess(chess_url: str = dlt.config.value, title: str = "GM", max_players: int = 2, year: int = 2022, month: int = 10) -> Any: + + def _get_data_with_retry(path: str) -> StrAny: + r = client.get(f"{chess_url}{path}") + return r.json() # type: ignore + + @dlt.resource(write_disposition="replace") + def players() -> Iterator[TDataItems]: + # return players one by one, you could also return a list that would be faster but we want to pass players item by item to the transformer + yield from _get_data_with_retry(f"titled/{title}")["players"][:max_players] + + # this resource takes data from players and returns profiles + # it uses `defer` decorator to enable parallel run in thread pool. defer requires return at the end so we convert yield into return (we return one item anyway) + # you can still have yielding transformers, look for the test named `test_evolve_schema` + @dlt.transformer(data_from=players, write_disposition="replace") + @dlt.defer + def players_profiles(username: Any) -> TDataItems: + print(f"getting {username} profile via thread {threading.current_thread().name}") + sleep(1) # add some latency to show parallel runs + return _get_data_with_retry(f"player/{username}") + + # this resource takes data from players and returns games for the last month if not specified otherwise + @dlt.transformer(data_from=players, write_disposition="append") + def players_games(username: Any) -> Iterator[TDataItems]: + # https://api.chess.com/pub/player/{username}/games/{YYYY}/{MM} + path = f"player/{username}/games/{year:04d}/{month:02d}" + yield _get_data_with_retry(path)["games"] + + return players(), players_profiles, players_games +``` + + +[Chess: Setup Guide.](https://dlthub.com/docs/dlt-ecosystem/verified-sources/chess) + +## Inspecting packages post load + +To inspect a load process after running a pipeline, you can use the dlt command-line interface. Here are some commands you can use: + +To get information about the pipeline: + +``` +dlt pipeline chess_pipeline info +``` + +To see the most recent load package info: + +``` +dlt pipeline chess_pipeline load-package +``` + +To see package info with a given load id: + +``` +dlt pipeline chess_pipeline load-package +``` + +To see the schema changes introduced in the package: + +``` +dlt pipeline -v chess_pipeline load-package +``` + +To see the trace of the most recent data load: + +``` +dlt pipeline chess_pipeline trace +``` + +To check for failed jobs in a load package: + +``` +dlt pipeline chess_pipeline failed-jobs +``` + +For more details, you can refer to the [documentation.](https://dlthub.com/docs/walkthroughs/run-a-pipeline) + +## Loading back load info, schema updates, and traces + +To load back the `load_info`, schema updates, and traces for the chess_pipeline, you can use the dlt library in your Python script. Here's how you can do it: + +Load `load_info` into the destination: + +```python +# we reuse the pipeline instance below and load to the same dataset as data +pipeline.run([load_info], table_name="_load_info") +``` + +Save the runtime trace to the destination: +```python +# save trace to destination, sensitive data will be removed +pipeline.run([pipeline.last_trace], table_name="_trace") +``` + +Save the new tables and column schemas to the destination: +```python +# save just the new tables +table_updates = [p.asdict()["tables"] for p in load_info.load_packages] +pipeline.run(table_updates, table_name="_new_tables") +``` +For more details, you can refer to the [documentation.](https://dlthub.com/docs/running-in-production/running) + +## Sending notifications if schema evolved + +To send notifications if the schema has evolved for the chess_pipeline, you can use the dlt library in your Python script. Here's how you can do it: + +- Check for schema updates: + ```python + schema_updates = [p.asdict()["schema_update"] for p in load_info.load_packages] + ``` +- Send notifications if there are schema updates: + ```python + if schema_updates: + # send notification + send_notification("Schema has evolved for chess_pipeline") + ``` + +In the above code, send_notification is a placeholder for the function you would use to send notifications. This could be an email, a message to a Slack channel, or any other form of notification. + +Please note that you would need to implement the send_notification function according to your requirements. + +## Using context managers to retry pipeline stages separately + +To use context managers to retry pipeline stages separately for the chess_pipeline, you can use the tenacity library. Here's how you can do it: + +```python +from tenacity import stop_after_attempt, retry_if_exception, Retrying, retry +from dlt.common.runtime.slack import send_slack_message +from dlt.pipeline.helpers import retry_load + +if __name__ == "__main__" : + pipeline = dlt.pipeline(pipeline_name="chess_pipeline", destination='duckdb', dataset_name="games_data") + # get data for a few famous players + data = chess(['magnuscarlsen', 'rpragchess'], start_month="2022/11", end_month="2022/12") + try: + for attempt in Retrying(stop=stop_after_attempt(5), wait=wait_exponential(multiplier=1.5, min=4, max=10), retry=retry_if_exception(retry_load(())), reraise=True): + with attempt: + load_info = pipeline.run(data) + send_slack_message(pipeline.runtime_config.slack_incoming_hook, "HOORAY 😄") + except Exception: + # we get here after all the retries + send_slack_message(pipeline.runtime_config.slack_incoming_hook, "BOOO 🤯") + raise +``` +In the above code, the Retrying context manager from tenacity is used to retry the run method of the pipeline if it raises an exception. The retry_load helper function is used to specify that only the load stage should be retried. If the run method succeeds, a success message is sent to a Slack channel. If all retries fail, an error message is sent to the Slack channel and the exception is re-raised. + +## Running simple tests with sql_client (table counts, warn if no data) +To run simple tests with sql_client, such as checking table counts and warning if there is no data, you can use the sql_client's execute_query method. Here's an example: + +```python +pipeline = dlt.pipeline(destination="duckdb", dataset_name="chess_data") +with pipeline.sql_client() as client: + with client.execute_query("SELECT COUNT(*) FROM player") as cursor: + count = cursor.fetchone()[0] + if count == 0: + print("Warning: No data in player table") + else: + print(f"Player table contains {count} rows") +``` +In the above code, we first create a pipeline instance. +Then, we use the sql_client context manager to execute a SQL query that counts the number of rows in the player table. If the count is zero, a warning is printed. Otherwise, the number of rows is printed. + +## Same as above but with normalize_info +To run simple tests with normalize_info, such as checking table counts and warning if there is no data, you can use the normalize_info's row_counts attribute. Here's an example: +```python +pipeline = dlt.pipeline(destination="duckdb", dataset_name="chess_data") +load_info = pipeline.run(data) +normalize_info = pipeline.last_trace.last_normalize_info + +count = normalize_info.row_counts.get("player", 0) +if count == 0: + print("Warning: No data in player table") +else: + print(f"Player table contains {count} rows") +``` \ No newline at end of file diff --git a/docs/website/sidebars.js b/docs/website/sidebars.js index f88ccbdf1a..f52b64abd0 100644 --- a/docs/website/sidebars.js +++ b/docs/website/sidebars.js @@ -224,6 +224,7 @@ const sidebars = { items: [ 'examples/transformers/index', 'examples/incremental_loading/index', + 'examples/chess_production/index', 'examples/connector_x_arrow/index', ], }, From be16010d466713e0765629ddfcfc112851980657 Mon Sep 17 00:00:00 2001 From: AstrakhantsevaAA Date: Tue, 24 Oct 2023 16:20:59 +0200 Subject: [PATCH 02/15] update chess example --- .../chess_production/code/.dlt/secrets.toml | 6 - .../chess_production/code/chess-snippets.py | 154 +++++++++- .../docs/examples/chess_production/index.md | 269 ++++++++++-------- 3 files changed, 292 insertions(+), 137 deletions(-) delete mode 100644 docs/website/docs/examples/chess_production/code/.dlt/secrets.toml diff --git a/docs/website/docs/examples/chess_production/code/.dlt/secrets.toml b/docs/website/docs/examples/chess_production/code/.dlt/secrets.toml deleted file mode 100644 index caf8d523c4..0000000000 --- a/docs/website/docs/examples/chess_production/code/.dlt/secrets.toml +++ /dev/null @@ -1,6 +0,0 @@ -# @@@DLT_SNIPPET_START example -[sources.zendesk.credentials] -password = "" -subdomain = "" -email = "" -# @@@DLT_SNIPPET_END example diff --git a/docs/website/docs/examples/chess_production/code/chess-snippets.py b/docs/website/docs/examples/chess_production/code/chess-snippets.py index 64d5679167..eefb59352e 100644 --- a/docs/website/docs/examples/chess_production/code/chess-snippets.py +++ b/docs/website/docs/examples/chess_production/code/chess-snippets.py @@ -1,43 +1,53 @@ from tests.utils import skipifgithubfork +__source_name__ = "chess_production" + @skipifgithubfork def incremental_snippet() -> None: - - # @@@DLT_SNIPPET_START example # @@@DLT_SNIPPET_START markdown_source import threading from typing import Any, Iterator import dlt - from dlt.common import sleep + from dlt.common.runtime.slack import send_slack_message from dlt.common.typing import StrAny, TDataItems from dlt.sources.helpers.requests import client @dlt.source - def chess(chess_url: str = dlt.config.value, title: str = "GM", max_players: int = 2, year: int = 2022, month: int = 10) -> Any: - + def chess( + chess_url: str = dlt.config.value, + title: str = "GM", + max_players: int = 2, + year: int = 2022, + month: int = 10, + ) -> Any: def _get_data_with_retry(path: str) -> StrAny: r = client.get(f"{chess_url}{path}") return r.json() # type: ignore @dlt.resource(write_disposition="replace") def players() -> Iterator[TDataItems]: - # return players one by one, you could also return a list that would be faster but we want to pass players item by item to the transformer + # return players one by one, you could also return a list + # that would be faster but we want to pass players item by item to the transformer yield from _get_data_with_retry(f"titled/{title}")["players"][:max_players] # this resource takes data from players and returns profiles - # it uses `defer` decorator to enable parallel run in thread pool. defer requires return at the end so we convert yield into return (we return one item anyway) + # it uses `defer` decorator to enable parallel run in thread pool. + # defer requires return at the end so we convert yield into return (we return one item anyway) # you can still have yielding transformers, look for the test named `test_evolve_schema` @dlt.transformer(data_from=players, write_disposition="replace") @dlt.defer def players_profiles(username: Any) -> TDataItems: - print(f"getting {username} profile via thread {threading.current_thread().name}") + print( + f"getting {username} profile via thread {threading.current_thread().name}" + ) sleep(1) # add some latency to show parallel runs return _get_data_with_retry(f"player/{username}") - # this resource takes data from players and returns games for the last month if not specified otherwise + # this resource takes data from players and returns games for the last month + # if not specified otherwise @dlt.transformer(data_from=players, write_disposition="append") def players_games(username: Any) -> Iterator[TDataItems]: # https://api.chess.com/pub/player/{username}/games/{YYYY}/{MM} @@ -47,20 +57,138 @@ def players_games(username: Any) -> Iterator[TDataItems]: return players(), players_profiles, players_games # @@@DLT_SNIPPET_END markdown_source + + # @@@DLT_SNIPPET_START markdown_retry + from tenacity import retry, retry_if_exception, stop_after_attempt, wait_exponential + + from dlt.pipeline.helpers import retry_load + + @retry( + stop=stop_after_attempt(5), + wait=wait_exponential(multiplier=1.5, min=4, max=10), + retry=retry_if_exception(retry_load(("extract", "load"))), + reraise=True, + ) + def load_data_with_retry(): + data = chess(chess_url="https://api.chess.com/pub/", max_players=max_players) + return pipeline.run(data) + + # @@@DLT_SNIPPET_END markdown_retry + # @@@DLT_SNIPPET_START markdown_pipeline - __name__ = "__main__" # @@@DLT_REMOVE + __name__ = "__main__" # @@@DLT_REMOVE if __name__ == "__main__": # create dlt pipeline pipeline = dlt.pipeline( - pipeline_name="chess_pipeline", destination="duckdb", dataset_name="chess_data" + pipeline_name="chess_pipeline", + destination="duckdb", + dataset_name="chess_data", ) max_players = 5 - load_info = pipeline.run(chess(chess_url="https://api.chess.com/pub/", max_players=max_players)) + load_info = pipeline.run( + chess(chess_url="https://api.chess.com/pub/", max_players=max_players) + ) print(load_info) # @@@DLT_SNIPPET_END markdown_pipeline - # @@@DLT_SNIPPET_END example + + # @@@DLT_SNIPPET_START markdown_inspect + # see when load was started + print(f"Pipeline was started: {load_info.started_at}") + # print the information on the first load package and all jobs inside + print(f"First load package info: {load_info.load_packages[0]}") + # print the information on the first completed job in first load package + print( + f"First completed job info: {load_info.load_packages[0].jobs['completed_jobs'][0]}" + ) + + # @@@DLT_SNIPPET_END markdown_inspect + + # @@@DLT_SNIPPET_START markdown_load_back + # we reuse the pipeline instance below and load to the same dataset as data + pipeline.run([load_info], table_name="_load_info") + # save trace to destination, sensitive data will be removed + pipeline.run([pipeline.last_trace], table_name="_trace") + + # print all the new tables/columns in + for package in load_info.load_packages: + for table_name, table in package.schema_update.items(): + print(f"Table {table_name}: {table.get('description')}") + for column_name, column in table["columns"].items(): + print(f"\tcolumn {column_name}: {column['data_type']}") + + # save the new tables and column schemas to the destination: + table_updates = [p.asdict()["tables"] for p in load_info.load_packages] + pipeline.run(table_updates, table_name="_new_tables") + # @@@DLT_SNIPPET_END markdown_load_back + + # @@@DLT_SNIPPET_START markdown_notify + # check for schema updates: + schema_updates = [p.schema_update for p in load_info.load_packages] + # send notifications if there are schema updates + if schema_updates: + # send notification + send_slack_message( + pipeline.runtime_config.slack_incoming_hook, "Schema was updated!" + ) + # @@@DLT_SNIPPET_END markdown_notify + + # @@@DLT_SNIPPET_START markdown_retry_cm + from tenacity import ( + Retrying, + retry_if_exception, + stop_after_attempt, + wait_exponential, + ) + + from dlt.common.runtime.slack import send_slack_message + from dlt.pipeline.helpers import retry_load + + # get data for a few famous players + data = chess(chess_url="https://api.chess.com/pub/", max_players=max_players) + try: + for attempt in Retrying( + stop=stop_after_attempt(5), + wait=wait_exponential(multiplier=1.5, min=4, max=10), + retry=retry_if_exception(retry_load(())), + reraise=True, + ): + with attempt: + pipeline.run(data) + except Exception: + # we get here after all the retries + raise + # @@@DLT_SNIPPET_END markdown_retry_cm + + # @@@DLT_SNIPPET_START markdown_retry_run + load_info_retry = load_data_with_retry() + # @@@DLT_SNIPPET_END markdown_retry_run + + # @@@DLT_SNIPPET_START markdown_sql_client + with pipeline.sql_client() as client: + with client.execute_query("SELECT COUNT(*) FROM players") as cursor: + count = cursor.fetchone()[0] + if count == 0: + print("Warning: No data in players table") + else: + print(f"Players table contains {count} rows") + # @@@DLT_SNIPPET_END markdown_sql_client + + # @@@DLT_SNIPPET_START markdown_norm_info + normalize_info = pipeline.last_trace.last_normalize_info + count = normalize_info.row_counts.get("players", 0) + if count == 0: + print("Warning: No data in players table") + else: + print(f"Players table contains {count} rows") + # @@@DLT_SNIPPET_END markdown_norm_info + # check that stuff was loaded row_counts = pipeline.last_trace.last_normalize_info.row_counts assert row_counts["players"] == max_players + packages_num = len(load_info.load_packages) + assert packages_num == 1 + + jobs_num = len(load_info.load_packages[0].jobs["completed_jobs"]) + assert jobs_num == 4 diff --git a/docs/website/docs/examples/chess_production/index.md b/docs/website/docs/examples/chess_production/index.md index 0ddada39f5..4f25e91e5a 100644 --- a/docs/website/docs/examples/chess_production/index.md +++ b/docs/website/docs/examples/chess_production/index.md @@ -7,7 +7,7 @@ keywords: [incremental loading, example] import Header from '../_examples-header.md';
@@ -17,12 +17,11 @@ In this example, you'll find a Python script that interacts with the Chess API t We'll learn how to: -- inspect packages post load -- load back load info, schema updates, and traces -- send notifications if schema evolved -- use context managers to retry pipeline stages separately -- run simple tests with sql_client (table counts, warn if no data) -- same as above but with normalize_info +- Examining packages after they have been loaded. +- Reloading load information, schema updates, and traces. +- Triggering notifications in case of schema evolution. +- Using context managers to independently retry pipeline stages. +- Run basic tests utilizing sql_client and normalize_info. ## Init pipeline @@ -33,34 +32,44 @@ import threading from typing import Any, Iterator import dlt - from dlt.common import sleep +from dlt.common.runtime.slack import send_slack_message from dlt.common.typing import StrAny, TDataItems from dlt.sources.helpers.requests import client @dlt.source -def chess(chess_url: str = dlt.config.value, title: str = "GM", max_players: int = 2, year: int = 2022, month: int = 10) -> Any: - +def chess( + chess_url: str = dlt.config.value, + title: str = "GM", + max_players: int = 2, + year: int = 2022, + month: int = 10, +) -> Any: def _get_data_with_retry(path: str) -> StrAny: r = client.get(f"{chess_url}{path}") return r.json() # type: ignore @dlt.resource(write_disposition="replace") def players() -> Iterator[TDataItems]: - # return players one by one, you could also return a list that would be faster but we want to pass players item by item to the transformer + # return players one by one, you could also return a list + # that would be faster but we want to pass players item by item to the transformer yield from _get_data_with_retry(f"titled/{title}")["players"][:max_players] # this resource takes data from players and returns profiles - # it uses `defer` decorator to enable parallel run in thread pool. defer requires return at the end so we convert yield into return (we return one item anyway) + # it uses `defer` decorator to enable parallel run in thread pool. + # defer requires return at the end so we convert yield into return (we return one item anyway) # you can still have yielding transformers, look for the test named `test_evolve_schema` @dlt.transformer(data_from=players, write_disposition="replace") @dlt.defer def players_profiles(username: Any) -> TDataItems: - print(f"getting {username} profile via thread {threading.current_thread().name}") + print( + f"getting {username} profile via thread {threading.current_thread().name}" + ) sleep(1) # add some latency to show parallel runs return _get_data_with_retry(f"player/{username}") - # this resource takes data from players and returns games for the last month if not specified otherwise + # this resource takes data from players and returns games for the last month + # if not specified otherwise @dlt.transformer(data_from=players, write_disposition="append") def players_games(username: Any) -> Iterator[TDataItems]: # https://api.chess.com/pub/player/{username}/games/{YYYY}/{MM} @@ -71,145 +80,169 @@ def chess(chess_url: str = dlt.config.value, title: str = "GM", max_players: int ``` -[Chess: Setup Guide.](https://dlthub.com/docs/dlt-ecosystem/verified-sources/chess) - -## Inspecting packages post load - -To inspect a load process after running a pipeline, you can use the dlt command-line interface. Here are some commands you can use: - -To get information about the pipeline: - -``` -dlt pipeline chess_pipeline info -``` - -To see the most recent load package info: - -``` -dlt pipeline chess_pipeline load-package -``` - -To see package info with a given load id: +Run the pipeline: + +```py +if __name__ == "__main__": + # create dlt pipeline + pipeline = dlt.pipeline( + pipeline_name="chess_pipeline", + destination="duckdb", + dataset_name="chess_data", + ) + max_players = 5 + load_info = pipeline.run( + chess(chess_url="https://api.chess.com/pub/", max_players=max_players) + ) + print(load_info) +``` + + +## Inspecting packages + +To inspect a load process after running a pipeline: + + +```py +# see when load was started +print(f"Pipeline was started: {load_info.started_at}") +# print the information on the first load package and all jobs inside +print(f"First load package info: {load_info.load_packages[0]}") +# print the information on the first completed job in first load package +print( + f"First completed job info: {load_info.load_packages[0].jobs['completed_jobs'][0]}" +) ``` -dlt pipeline chess_pipeline load-package -``` - -To see the schema changes introduced in the package: - -``` -dlt pipeline -v chess_pipeline load-package -``` - -To see the trace of the most recent data load: - -``` -dlt pipeline chess_pipeline trace -``` - -To check for failed jobs in a load package: - -``` -dlt pipeline chess_pipeline failed-jobs -``` + -For more details, you can refer to the [documentation.](https://dlthub.com/docs/walkthroughs/run-a-pipeline) ## Loading back load info, schema updates, and traces -To load back the `load_info`, schema updates, and traces for the chess_pipeline, you can use the dlt library in your Python script. Here's how you can do it: +To load back the `load_info`, schema updates, and traces for the pipeline: -Load `load_info` into the destination: - -```python + +```py # we reuse the pipeline instance below and load to the same dataset as data pipeline.run([load_info], table_name="_load_info") -``` - -Save the runtime trace to the destination: -```python # save trace to destination, sensitive data will be removed pipeline.run([pipeline.last_trace], table_name="_trace") -``` -Save the new tables and column schemas to the destination: -```python -# save just the new tables +# print all the new tables/columns in +for package in load_info.load_packages: + for table_name, table in package.schema_update.items(): + print(f"Table {table_name}: {table.get('description')}") + for column_name, column in table["columns"].items(): + print(f"\tcolumn {column_name}: {column['data_type']}") + +# save the new tables and column schemas to the destination: table_updates = [p.asdict()["tables"] for p in load_info.load_packages] pipeline.run(table_updates, table_name="_new_tables") ``` -For more details, you can refer to the [documentation.](https://dlthub.com/docs/running-in-production/running) + ## Sending notifications if schema evolved -To send notifications if the schema has evolved for the chess_pipeline, you can use the dlt library in your Python script. Here's how you can do it: - -- Check for schema updates: - ```python - schema_updates = [p.asdict()["schema_update"] for p in load_info.load_packages] - ``` -- Send notifications if there are schema updates: - ```python - if schema_updates: - # send notification - send_notification("Schema has evolved for chess_pipeline") - ``` +To send notifications if the schema has evolved for the pipeline: -In the above code, send_notification is a placeholder for the function you would use to send notifications. This could be an email, a message to a Slack channel, or any other form of notification. - -Please note that you would need to implement the send_notification function according to your requirements. + +```py +# check for schema updates: +schema_updates = [p.schema_update for p in load_info.load_packages] +# send notifications if there are schema updates +if schema_updates: + # send notification + send_slack_message( + pipeline.runtime_config.slack_incoming_hook, "Schema was updated!" + ) +``` + ## Using context managers to retry pipeline stages separately -To use context managers to retry pipeline stages separately for the chess_pipeline, you can use the tenacity library. Here's how you can do it: +To use context managers to retry pipeline stages separately for the pipeline: + + +```py +from tenacity import ( + Retrying, + retry_if_exception, + stop_after_attempt, + wait_exponential, +) -```python -from tenacity import stop_after_attempt, retry_if_exception, Retrying, retry from dlt.common.runtime.slack import send_slack_message from dlt.pipeline.helpers import retry_load -if __name__ == "__main__" : - pipeline = dlt.pipeline(pipeline_name="chess_pipeline", destination='duckdb', dataset_name="games_data") - # get data for a few famous players - data = chess(['magnuscarlsen', 'rpragchess'], start_month="2022/11", end_month="2022/12") - try: - for attempt in Retrying(stop=stop_after_attempt(5), wait=wait_exponential(multiplier=1.5, min=4, max=10), retry=retry_if_exception(retry_load(())), reraise=True): - with attempt: - load_info = pipeline.run(data) - send_slack_message(pipeline.runtime_config.slack_incoming_hook, "HOORAY 😄") - except Exception: - # we get here after all the retries - send_slack_message(pipeline.runtime_config.slack_incoming_hook, "BOOO 🤯") - raise +# get data for a few famous players +data = chess(chess_url="https://api.chess.com/pub/", max_players=max_players) +try: + for attempt in Retrying( + stop=stop_after_attempt(5), + wait=wait_exponential(multiplier=1.5, min=4, max=10), + retry=retry_if_exception(retry_load(())), + reraise=True, + ): + with attempt: + pipeline.run(data) +except Exception: + # we get here after all the retries + raise +``` + + +You can also use tenacity to decorate functions: + + +```py +from tenacity import retry, retry_if_exception, stop_after_attempt, wait_exponential + +from dlt.pipeline.helpers import retry_load + +@retry( + stop=stop_after_attempt(5), + wait=wait_exponential(multiplier=1.5, min=4, max=10), + retry=retry_if_exception(retry_load(("extract", "load"))), + reraise=True, +) +def load_data_with_retry(): + data = chess(chess_url="https://api.chess.com/pub/", max_players=max_players) + return pipeline.run(data) +``` + + + +```py +load_info = load_data_with_retry() ``` -In the above code, the Retrying context manager from tenacity is used to retry the run method of the pipeline if it raises an exception. The retry_load helper function is used to specify that only the load stage should be retried. If the run method succeeds, a success message is sent to a Slack channel. If all retries fail, an error message is sent to the Slack channel and the exception is re-raised. + ## Running simple tests with sql_client (table counts, warn if no data) -To run simple tests with sql_client, such as checking table counts and warning if there is no data, you can use the sql_client's execute_query method. Here's an example: +To run simple tests with `sql_client`, such as checking table counts and +warning if there is no data, you can use the `execute_query` method: -```python -pipeline = dlt.pipeline(destination="duckdb", dataset_name="chess_data") + +```py with pipeline.sql_client() as client: - with client.execute_query("SELECT COUNT(*) FROM player") as cursor: + with client.execute_query("SELECT COUNT(*) FROM players") as cursor: count = cursor.fetchone()[0] if count == 0: - print("Warning: No data in player table") + print("Warning: No data in players table") else: - print(f"Player table contains {count} rows") + print(f"Players table contains {count} rows") ``` -In the above code, we first create a pipeline instance. -Then, we use the sql_client context manager to execute a SQL query that counts the number of rows in the player table. If the count is zero, a warning is printed. Otherwise, the number of rows is printed. - -## Same as above but with normalize_info -To run simple tests with normalize_info, such as checking table counts and warning if there is no data, you can use the normalize_info's row_counts attribute. Here's an example: -```python -pipeline = dlt.pipeline(destination="duckdb", dataset_name="chess_data") -load_info = pipeline.run(data) -normalize_info = pipeline.last_trace.last_normalize_info + + +To run simple tests with `normalize_info`, such as checking table counts and +warning if there is no data, you can use the `row_counts` attribute: -count = normalize_info.row_counts.get("player", 0) + +```py +normalize_info = pipeline.last_trace.last_normalize_info +count = normalize_info.row_counts.get("players", 0) if count == 0: - print("Warning: No data in player table") + print("Warning: No data in players table") else: - print(f"Player table contains {count} rows") -``` \ No newline at end of file + print(f"Players table contains {count} rows") +``` + \ No newline at end of file From eebc5e8827ddb1f53c945be566939cdd92cef46e Mon Sep 17 00:00:00 2001 From: AstrakhantsevaAA Date: Tue, 24 Oct 2023 16:25:01 +0200 Subject: [PATCH 03/15] add secrets --- .../docs/examples/chess_production/code/.dlt/secrets.toml | 2 ++ 1 file changed, 2 insertions(+) create mode 100644 docs/website/docs/examples/chess_production/code/.dlt/secrets.toml diff --git a/docs/website/docs/examples/chess_production/code/.dlt/secrets.toml b/docs/website/docs/examples/chess_production/code/.dlt/secrets.toml new file mode 100644 index 0000000000..3e49175ed7 --- /dev/null +++ b/docs/website/docs/examples/chess_production/code/.dlt/secrets.toml @@ -0,0 +1,2 @@ +[runtime] +slack_incoming_hook="" \ No newline at end of file From b608a33d84b018cdf3cc407b07aedf725a4009f5 Mon Sep 17 00:00:00 2001 From: AstrakhantsevaAA Date: Mon, 23 Oct 2023 16:51:36 +0200 Subject: [PATCH 04/15] added chess example --- .../docs/examples/chess_production/code/.dlt/secrets.toml | 2 +- docs/website/sidebars.js | 1 + 2 files changed, 2 insertions(+), 1 deletion(-) diff --git a/docs/website/docs/examples/chess_production/code/.dlt/secrets.toml b/docs/website/docs/examples/chess_production/code/.dlt/secrets.toml index 3e49175ed7..c714ff1f9c 100644 --- a/docs/website/docs/examples/chess_production/code/.dlt/secrets.toml +++ b/docs/website/docs/examples/chess_production/code/.dlt/secrets.toml @@ -1,2 +1,2 @@ [runtime] -slack_incoming_hook="" \ No newline at end of file +slack_incoming_hook="" diff --git a/docs/website/sidebars.js b/docs/website/sidebars.js index f52b64abd0..109f423a5c 100644 --- a/docs/website/sidebars.js +++ b/docs/website/sidebars.js @@ -226,6 +226,7 @@ const sidebars = { 'examples/incremental_loading/index', 'examples/chess_production/index', 'examples/connector_x_arrow/index', + 'examples/chess_production/index', ], }, { From 1930b9e0cd23ccc24ca60e58aaf280097d07cb7d Mon Sep 17 00:00:00 2001 From: AstrakhantsevaAA Date: Tue, 24 Oct 2023 16:45:08 +0200 Subject: [PATCH 05/15] del extra page from sidebar --- docs/website/sidebars.js | 1 - 1 file changed, 1 deletion(-) diff --git a/docs/website/sidebars.js b/docs/website/sidebars.js index 109f423a5c..856eb279e2 100644 --- a/docs/website/sidebars.js +++ b/docs/website/sidebars.js @@ -224,7 +224,6 @@ const sidebars = { items: [ 'examples/transformers/index', 'examples/incremental_loading/index', - 'examples/chess_production/index', 'examples/connector_x_arrow/index', 'examples/chess_production/index', ], From 7369bd6205d35249fab9813566b2a089ee57c1de Mon Sep 17 00:00:00 2001 From: AstrakhantsevaAA Date: Thu, 26 Oct 2023 15:35:32 +0200 Subject: [PATCH 06/15] update --- .../chess_production/code/chess-snippets.py | 29 +++++++------ .../docs/examples/chess_production/index.md | 41 ++++++++++++------- 2 files changed, 43 insertions(+), 27 deletions(-) diff --git a/docs/website/docs/examples/chess_production/code/chess-snippets.py b/docs/website/docs/examples/chess_production/code/chess-snippets.py index eefb59352e..215477ec0a 100644 --- a/docs/website/docs/examples/chess_production/code/chess-snippets.py +++ b/docs/website/docs/examples/chess_production/code/chess-snippets.py @@ -69,8 +69,7 @@ def players_games(username: Any) -> Iterator[TDataItems]: retry=retry_if_exception(retry_load(("extract", "load"))), reraise=True, ) - def load_data_with_retry(): - data = chess(chess_url="https://api.chess.com/pub/", max_players=max_players) + def load_data_with_retry(data): return pipeline.run(data) # @@@DLT_SNIPPET_END markdown_retry @@ -83,11 +82,12 @@ def load_data_with_retry(): pipeline_name="chess_pipeline", destination="duckdb", dataset_name="chess_data", + full_refresh=True, ) max_players = 5 - load_info = pipeline.run( - chess(chess_url="https://api.chess.com/pub/", max_players=max_players) - ) + # get data for a few famous players + data = chess(chess_url="https://api.chess.com/pub/", max_players=max_players) + load_info = pipeline.run(data) print(load_info) # @@@DLT_SNIPPET_END markdown_pipeline @@ -143,8 +143,6 @@ def load_data_with_retry(): from dlt.common.runtime.slack import send_slack_message from dlt.pipeline.helpers import retry_load - # get data for a few famous players - data = chess(chess_url="https://api.chess.com/pub/", max_players=max_players) try: for attempt in Retrying( stop=stop_after_attempt(5), @@ -160,17 +158,17 @@ def load_data_with_retry(): # @@@DLT_SNIPPET_END markdown_retry_cm # @@@DLT_SNIPPET_START markdown_retry_run - load_info_retry = load_data_with_retry() + load_info_retry = load_data_with_retry(data) # @@@DLT_SNIPPET_END markdown_retry_run # @@@DLT_SNIPPET_START markdown_sql_client with pipeline.sql_client() as client: with client.execute_query("SELECT COUNT(*) FROM players") as cursor: - count = cursor.fetchone()[0] - if count == 0: + count_client = cursor.fetchone()[0] + if count_client == 0: print("Warning: No data in players table") else: - print(f"Players table contains {count} rows") + print(f"Players table contains {count_client} rows") # @@@DLT_SNIPPET_END markdown_sql_client # @@@DLT_SNIPPET_START markdown_norm_info @@ -182,7 +180,6 @@ def load_data_with_retry(): print(f"Players table contains {count} rows") # @@@DLT_SNIPPET_END markdown_norm_info - # check that stuff was loaded row_counts = pipeline.last_trace.last_normalize_info.row_counts assert row_counts["players"] == max_players @@ -192,3 +189,11 @@ def load_data_with_retry(): jobs_num = len(load_info.load_packages[0].jobs["completed_jobs"]) assert jobs_num == 4 + + packages_num = len(load_info_retry.load_packages) + assert packages_num == 1 + + jobs_num = len(load_info_retry.load_packages[0].jobs["completed_jobs"]) + assert jobs_num == 3 + + assert count_client == max_players diff --git a/docs/website/docs/examples/chess_production/index.md b/docs/website/docs/examples/chess_production/index.md index 4f25e91e5a..6dc308f0e8 100644 --- a/docs/website/docs/examples/chess_production/index.md +++ b/docs/website/docs/examples/chess_production/index.md @@ -7,7 +7,7 @@ keywords: [incremental loading, example] import Header from '../_examples-header.md';
@@ -17,13 +17,12 @@ In this example, you'll find a Python script that interacts with the Chess API t We'll learn how to: -- Examining packages after they have been loaded. -- Reloading load information, schema updates, and traces. +- Inspecting packages after they have been loaded. +- Loading back load information, schema updates, and traces. - Triggering notifications in case of schema evolution. - Using context managers to independently retry pipeline stages. - Run basic tests utilizing sql_client and normalize_info. - ## Init pipeline @@ -90,11 +89,12 @@ if __name__ == "__main__": pipeline_name="chess_pipeline", destination="duckdb", dataset_name="chess_data", + full_refresh=True, ) max_players = 5 - load_info = pipeline.run( - chess(chess_url="https://api.chess.com/pub/", max_players=max_players) - ) + # get data for a few famous players + data = chess(chess_url="https://api.chess.com/pub/", max_players=max_players) + load_info = pipeline.run(data) print(load_info) ``` @@ -158,6 +158,20 @@ if schema_updates: ``` +You can configure Slack incoming hook via +`secrets.toml` or environment variables. + +In `secrets.toml`: +```toml +[runtime] +slack_incoming_hook="https://hooks.slack.com/services/T04DHMAF13Q/B04E7B1MQ1H/TDHEI123WUEE" +``` +ENV: + +```python +RUNTIME__SLACK_INCOMING_HOOK="https://hooks.slack.com/services/T04DHMAF13Q/B04E7B1MQ1H/TDHEI123WUEE" +``` + ## Using context managers to retry pipeline stages separately To use context managers to retry pipeline stages separately for the pipeline: @@ -174,8 +188,6 @@ from tenacity import ( from dlt.common.runtime.slack import send_slack_message from dlt.pipeline.helpers import retry_load -# get data for a few famous players -data = chess(chess_url="https://api.chess.com/pub/", max_players=max_players) try: for attempt in Retrying( stop=stop_after_attempt(5), @@ -205,15 +217,14 @@ from dlt.pipeline.helpers import retry_load retry=retry_if_exception(retry_load(("extract", "load"))), reraise=True, ) -def load_data_with_retry(): - data = chess(chess_url="https://api.chess.com/pub/", max_players=max_players) +def load_data_with_retry(data): return pipeline.run(data) ``` ```py -load_info = load_data_with_retry() +load_info_retry = load_data_with_retry(data) ``` @@ -225,11 +236,11 @@ warning if there is no data, you can use the `execute_query` method: ```py with pipeline.sql_client() as client: with client.execute_query("SELECT COUNT(*) FROM players") as cursor: - count = cursor.fetchone()[0] - if count == 0: + count_client = cursor.fetchone()[0] + if count_client == 0: print("Warning: No data in players table") else: - print(f"Players table contains {count} rows") + print(f"Players table contains {count_client} rows") ``` From 0c5645aeb19a53df7bd1259748bc897613d51672 Mon Sep 17 00:00:00 2001 From: AstrakhantsevaAA Date: Thu, 26 Oct 2023 15:41:35 +0200 Subject: [PATCH 07/15] add example tags --- .../docs/examples/chess_production/code/.dlt/secrets.toml | 2 ++ .../docs/examples/chess_production/code/chess-snippets.py | 2 ++ 2 files changed, 4 insertions(+) diff --git a/docs/website/docs/examples/chess_production/code/.dlt/secrets.toml b/docs/website/docs/examples/chess_production/code/.dlt/secrets.toml index c714ff1f9c..978f2c1bec 100644 --- a/docs/website/docs/examples/chess_production/code/.dlt/secrets.toml +++ b/docs/website/docs/examples/chess_production/code/.dlt/secrets.toml @@ -1,2 +1,4 @@ +# @@@DLT_SNIPPET_START example [runtime] slack_incoming_hook="" +# @@@DLT_SNIPPET_END example diff --git a/docs/website/docs/examples/chess_production/code/chess-snippets.py b/docs/website/docs/examples/chess_production/code/chess-snippets.py index 215477ec0a..b4463bacf6 100644 --- a/docs/website/docs/examples/chess_production/code/chess-snippets.py +++ b/docs/website/docs/examples/chess_production/code/chess-snippets.py @@ -5,6 +5,7 @@ @skipifgithubfork def incremental_snippet() -> None: + # @@@DLT_SNIPPET_START example # @@@DLT_SNIPPET_START markdown_source import threading from typing import Any, Iterator @@ -179,6 +180,7 @@ def load_data_with_retry(data): else: print(f"Players table contains {count} rows") # @@@DLT_SNIPPET_END markdown_norm_info + # @@@DLT_SNIPPET_END example # check that stuff was loaded row_counts = pipeline.last_trace.last_normalize_info.row_counts From 7286ef30af217c393fa711f392e554984a3dd26e Mon Sep 17 00:00:00 2001 From: AstrakhantsevaAA Date: Thu, 26 Oct 2023 15:56:55 +0200 Subject: [PATCH 08/15] add example scripts --- .../chess_production/.dlt/config.toml | 0 .../chess_production/.dlt/secrets.toml | 2 + docs/examples/chess_production/__init__.py | 0 docs/examples/chess_production/chess.py | 153 ++++++++++++++++++ 4 files changed, 155 insertions(+) create mode 100644 docs/examples/chess_production/.dlt/config.toml create mode 100644 docs/examples/chess_production/.dlt/secrets.toml create mode 100644 docs/examples/chess_production/__init__.py create mode 100644 docs/examples/chess_production/chess.py diff --git a/docs/examples/chess_production/.dlt/config.toml b/docs/examples/chess_production/.dlt/config.toml new file mode 100644 index 0000000000..e69de29bb2 diff --git a/docs/examples/chess_production/.dlt/secrets.toml b/docs/examples/chess_production/.dlt/secrets.toml new file mode 100644 index 0000000000..3e49175ed7 --- /dev/null +++ b/docs/examples/chess_production/.dlt/secrets.toml @@ -0,0 +1,2 @@ +[runtime] +slack_incoming_hook="" \ No newline at end of file diff --git a/docs/examples/chess_production/__init__.py b/docs/examples/chess_production/__init__.py new file mode 100644 index 0000000000..e69de29bb2 diff --git a/docs/examples/chess_production/chess.py b/docs/examples/chess_production/chess.py new file mode 100644 index 0000000000..46c2d404b5 --- /dev/null +++ b/docs/examples/chess_production/chess.py @@ -0,0 +1,153 @@ +import threading +from typing import Any, Iterator + +import dlt +from dlt.common import sleep +from dlt.common.runtime.slack import send_slack_message +from dlt.common.typing import StrAny, TDataItems +from dlt.sources.helpers.requests import client + +@dlt.source +def chess( + chess_url: str = dlt.config.value, + title: str = "GM", + max_players: int = 2, + year: int = 2022, + month: int = 10, +) -> Any: + def _get_data_with_retry(path: str) -> StrAny: + r = client.get(f"{chess_url}{path}") + return r.json() # type: ignore + + @dlt.resource(write_disposition="replace") + def players() -> Iterator[TDataItems]: + # return players one by one, you could also return a list + # that would be faster but we want to pass players item by item to the transformer + yield from _get_data_with_retry(f"titled/{title}")["players"][:max_players] + + # this resource takes data from players and returns profiles + # it uses `defer` decorator to enable parallel run in thread pool. + # defer requires return at the end so we convert yield into return (we return one item anyway) + # you can still have yielding transformers, look for the test named `test_evolve_schema` + @dlt.transformer(data_from=players, write_disposition="replace") + @dlt.defer + def players_profiles(username: Any) -> TDataItems: + print( + f"getting {username} profile via thread {threading.current_thread().name}" + ) + sleep(1) # add some latency to show parallel runs + return _get_data_with_retry(f"player/{username}") + + # this resource takes data from players and returns games for the last month + # if not specified otherwise + @dlt.transformer(data_from=players, write_disposition="append") + def players_games(username: Any) -> Iterator[TDataItems]: + # https://api.chess.com/pub/player/{username}/games/{YYYY}/{MM} + path = f"player/{username}/games/{year:04d}/{month:02d}" + yield _get_data_with_retry(path)["games"] + + return players(), players_profiles, players_games + + +from tenacity import retry, retry_if_exception, stop_after_attempt, wait_exponential + +from dlt.pipeline.helpers import retry_load + +@retry( + stop=stop_after_attempt(5), + wait=wait_exponential(multiplier=1.5, min=4, max=10), + retry=retry_if_exception(retry_load(("extract", "load"))), + reraise=True, +) +def load_data_with_retry(data): + return pipeline.run(data) + + +if __name__ == "__main__": + # create dlt pipeline + pipeline = dlt.pipeline( + pipeline_name="chess_pipeline", + destination="duckdb", + dataset_name="chess_data", + full_refresh=True, + ) + max_players = 5 + # get data for a few famous players + data = chess(chess_url="https://api.chess.com/pub/", max_players=max_players) + load_info = pipeline.run(data) + print(load_info) + + # see when load was started + print(f"Pipeline was started: {load_info.started_at}") + # print the information on the first load package and all jobs inside + print(f"First load package info: {load_info.load_packages[0]}") + # print the information on the first completed job in first load package + print( + f"First completed job info: {load_info.load_packages[0].jobs['completed_jobs'][0]}" + ) + + + # we reuse the pipeline instance below and load to the same dataset as data + pipeline.run([load_info], table_name="_load_info") + # save trace to destination, sensitive data will be removed + pipeline.run([pipeline.last_trace], table_name="_trace") + + # print all the new tables/columns in + for package in load_info.load_packages: + for table_name, table in package.schema_update.items(): + print(f"Table {table_name}: {table.get('description')}") + for column_name, column in table["columns"].items(): + print(f"\tcolumn {column_name}: {column['data_type']}") + + # save the new tables and column schemas to the destination: + table_updates = [p.asdict()["tables"] for p in load_info.load_packages] + pipeline.run(table_updates, table_name="_new_tables") + + # check for schema updates: + schema_updates = [p.schema_update for p in load_info.load_packages] + # send notifications if there are schema updates + if schema_updates: + # send notification + send_slack_message( + pipeline.runtime_config.slack_incoming_hook, "Schema was updated!" + ) + + from tenacity import ( + Retrying, + retry_if_exception, + stop_after_attempt, + wait_exponential, + ) + + from dlt.common.runtime.slack import send_slack_message + from dlt.pipeline.helpers import retry_load + + try: + for attempt in Retrying( + stop=stop_after_attempt(5), + wait=wait_exponential(multiplier=1.5, min=4, max=10), + retry=retry_if_exception(retry_load(())), + reraise=True, + ): + with attempt: + pipeline.run(data) + except Exception: + # we get here after all the retries + raise + + load_info_retry = load_data_with_retry(data) + + with pipeline.sql_client() as client: + with client.execute_query("SELECT COUNT(*) FROM players") as cursor: + count_client = cursor.fetchone()[0] + if count_client == 0: + print("Warning: No data in players table") + else: + print(f"Players table contains {count_client} rows") + + normalize_info = pipeline.last_trace.last_normalize_info + count = normalize_info.row_counts.get("players", 0) + if count == 0: + print("Warning: No data in players table") + else: + print(f"Players table contains {count} rows") \ No newline at end of file From 21c837b9903c2266d05600b0f9a216923ccb6da3 Mon Sep 17 00:00:00 2001 From: AstrakhantsevaAA Date: Mon, 30 Oct 2023 18:55:16 +0100 Subject: [PATCH 09/15] merge to one big function --- docs/examples/chess_production/chess.py | 184 ++++++------ .../chess_production/code/chess-snippets.py | 227 +++++++-------- .../docs/examples/chess_production/index.md | 262 +++++++----------- 3 files changed, 304 insertions(+), 369 deletions(-) diff --git a/docs/examples/chess_production/chess.py b/docs/examples/chess_production/chess.py index 46c2d404b5..d49c39fe38 100644 --- a/docs/examples/chess_production/chess.py +++ b/docs/examples/chess_production/chess.py @@ -49,79 +49,20 @@ def players_games(username: Any) -> Iterator[TDataItems]: return players(), players_profiles, players_games -from tenacity import retry, retry_if_exception, stop_after_attempt, wait_exponential - -from dlt.pipeline.helpers import retry_load - -@retry( - stop=stop_after_attempt(5), - wait=wait_exponential(multiplier=1.5, min=4, max=10), - retry=retry_if_exception(retry_load(("extract", "load"))), - reraise=True, +from tenacity import ( + Retrying, + retry_if_exception, + stop_after_attempt, + wait_exponential, ) -def load_data_with_retry(data): - return pipeline.run(data) - - -if __name__ == "__main__": - # create dlt pipeline - pipeline = dlt.pipeline( - pipeline_name="chess_pipeline", - destination="duckdb", - dataset_name="chess_data", - full_refresh=True, - ) - max_players = 5 - # get data for a few famous players - data = chess(chess_url="https://api.chess.com/pub/", max_players=max_players) - load_info = pipeline.run(data) - print(load_info) - - # see when load was started - print(f"Pipeline was started: {load_info.started_at}") - # print the information on the first load package and all jobs inside - print(f"First load package info: {load_info.load_packages[0]}") - # print the information on the first completed job in first load package - print( - f"First completed job info: {load_info.load_packages[0].jobs['completed_jobs'][0]}" - ) - - # we reuse the pipeline instance below and load to the same dataset as data - pipeline.run([load_info], table_name="_load_info") - # save trace to destination, sensitive data will be removed - pipeline.run([pipeline.last_trace], table_name="_trace") - - # print all the new tables/columns in - for package in load_info.load_packages: - for table_name, table in package.schema_update.items(): - print(f"Table {table_name}: {table.get('description')}") - for column_name, column in table["columns"].items(): - print(f"\tcolumn {column_name}: {column['data_type']}") - - # save the new tables and column schemas to the destination: - table_updates = [p.asdict()["tables"] for p in load_info.load_packages] - pipeline.run(table_updates, table_name="_new_tables") - - # check for schema updates: - schema_updates = [p.schema_update for p in load_info.load_packages] - # send notifications if there are schema updates - if schema_updates: - # send notification - send_slack_message( - pipeline.runtime_config.slack_incoming_hook, "Schema was updated!" - ) - - from tenacity import ( - Retrying, - retry_if_exception, - stop_after_attempt, - wait_exponential, - ) +from dlt.common import logger +from dlt.common.runtime.slack import send_slack_message +from dlt.pipeline.helpers import retry_load - from dlt.common.runtime.slack import send_slack_message - from dlt.pipeline.helpers import retry_load +MAX_PLAYERS = 5 +def load_data_with_retry(pipeline, data): try: for attempt in Retrying( stop=stop_after_attempt(5), @@ -130,24 +71,93 @@ def load_data_with_retry(data): reraise=True, ): with attempt: - pipeline.run(data) + logger.info( + f"Running the pipeline, attempt={attempt.retry_state.attempt_number}" + ) + load_info = pipeline.run(data) + logger.info(str(load_info)) + # raise on failed jobs + load_info.raise_on_failed_jobs() + # send notification + send_slack_message( + pipeline.runtime_config.slack_incoming_hook, + "Data was successfully loaded!" + ) except Exception: - # we get here after all the retries + # we get here after all the failed retries + # send notification + send_slack_message( + pipeline.runtime_config.slack_incoming_hook, + "Something went wrong!" + ) raise + finally: + # we get here after a successful attempt + # see when load was started + logger.info(f"Pipeline was started: {load_info.started_at}") + # print the information on the first load package and all jobs inside + logger.info(f"First load package info: {load_info.load_packages[0]}") + # print the information on the first completed job in first load package + logger.info( + f"First completed job info: {load_info.load_packages[0].jobs['completed_jobs'][0]}" + ) + + # check for schema updates: + schema_updates = [p.schema_update for p in load_info.load_packages] + # send notifications if there are schema updates + if schema_updates: + # send notification + send_slack_message( + pipeline.runtime_config.slack_incoming_hook, "Schema was updated!" + ) + + # To run simple tests with `sql_client`, such as checking table counts and + # warning if there is no data, you can use the `execute_query` method + with pipeline.sql_client() as client: + with client.execute_query("SELECT COUNT(*) FROM players") as cursor: + count = cursor.fetchone()[0] + if count == 0: + logger.info("Warning: No data in players table") + else: + logger.info(f"Players table contains {count} rows") + + # To run simple tests with `normalize_info`, such as checking table counts and + # warning if there is no data, you can use the `row_counts` attribute. + normalize_info = pipeline.last_trace.last_normalize_info + count = normalize_info.row_counts.get("players", 0) + if count == 0: + logger.info("Warning: No data in players table") + else: + logger.info(f"Players table contains {count} rows") + + # we reuse the pipeline instance below and load to the same dataset as data + logger.info("Saving the load info in the destination") + pipeline.run([load_info], table_name="_load_info") + # save trace to destination, sensitive data will be removed + logger.info("Saving the trace in the destination") + pipeline.run([pipeline.last_trace], table_name="_trace") + + # print all the new tables/columns in + for package in load_info.load_packages: + for table_name, table in package.schema_update.items(): + logger.info(f"Table {table_name}: {table.get('description')}") + for column_name, column in table["columns"].items(): + logger.info(f"\tcolumn {column_name}: {column['data_type']}") + + # save the new tables and column schemas to the destination: + table_updates = [p.asdict()["tables"] for p in load_info.load_packages] + pipeline.run(table_updates, table_name="_new_tables") + + return load_info - load_info_retry = load_data_with_retry(data) - - with pipeline.sql_client() as client: - with client.execute_query("SELECT COUNT(*) FROM players") as cursor: - count_client = cursor.fetchone()[0] - if count_client == 0: - print("Warning: No data in players table") - else: - print(f"Players table contains {count_client} rows") - - normalize_info = pipeline.last_trace.last_normalize_info - count = normalize_info.row_counts.get("players", 0) - if count == 0: - print("Warning: No data in players table") - else: - print(f"Players table contains {count} rows") \ No newline at end of file + +if __name__ == "__main__": + # create dlt pipeline + pipeline = dlt.pipeline( + pipeline_name="chess_pipeline", + destination="duckdb", + dataset_name="chess_data", + ) + # get data for a few famous players + data = chess(chess_url="https://api.chess.com/pub/", max_players=MAX_PLAYERS) + load_info = load_data_with_retry(pipeline, data) \ No newline at end of file diff --git a/docs/website/docs/examples/chess_production/code/chess-snippets.py b/docs/website/docs/examples/chess_production/code/chess-snippets.py index b4463bacf6..7c16d7a09b 100644 --- a/docs/website/docs/examples/chess_production/code/chess-snippets.py +++ b/docs/website/docs/examples/chess_production/code/chess-snippets.py @@ -59,91 +59,21 @@ def players_games(username: Any) -> Iterator[TDataItems]: # @@@DLT_SNIPPET_END markdown_source - # @@@DLT_SNIPPET_START markdown_retry - from tenacity import retry, retry_if_exception, stop_after_attempt, wait_exponential - - from dlt.pipeline.helpers import retry_load - - @retry( - stop=stop_after_attempt(5), - wait=wait_exponential(multiplier=1.5, min=4, max=10), - retry=retry_if_exception(retry_load(("extract", "load"))), - reraise=True, + # @@@DLT_SNIPPET_START markdown_retry_cm + from tenacity import ( + Retrying, + retry_if_exception, + stop_after_attempt, + wait_exponential, ) - def load_data_with_retry(data): - return pipeline.run(data) - - # @@@DLT_SNIPPET_END markdown_retry - - # @@@DLT_SNIPPET_START markdown_pipeline - __name__ = "__main__" # @@@DLT_REMOVE - if __name__ == "__main__": - # create dlt pipeline - pipeline = dlt.pipeline( - pipeline_name="chess_pipeline", - destination="duckdb", - dataset_name="chess_data", - full_refresh=True, - ) - max_players = 5 - # get data for a few famous players - data = chess(chess_url="https://api.chess.com/pub/", max_players=max_players) - load_info = pipeline.run(data) - print(load_info) - # @@@DLT_SNIPPET_END markdown_pipeline - - # @@@DLT_SNIPPET_START markdown_inspect - # see when load was started - print(f"Pipeline was started: {load_info.started_at}") - # print the information on the first load package and all jobs inside - print(f"First load package info: {load_info.load_packages[0]}") - # print the information on the first completed job in first load package - print( - f"First completed job info: {load_info.load_packages[0].jobs['completed_jobs'][0]}" - ) - - # @@@DLT_SNIPPET_END markdown_inspect - - # @@@DLT_SNIPPET_START markdown_load_back - # we reuse the pipeline instance below and load to the same dataset as data - pipeline.run([load_info], table_name="_load_info") - # save trace to destination, sensitive data will be removed - pipeline.run([pipeline.last_trace], table_name="_trace") - - # print all the new tables/columns in - for package in load_info.load_packages: - for table_name, table in package.schema_update.items(): - print(f"Table {table_name}: {table.get('description')}") - for column_name, column in table["columns"].items(): - print(f"\tcolumn {column_name}: {column['data_type']}") - - # save the new tables and column schemas to the destination: - table_updates = [p.asdict()["tables"] for p in load_info.load_packages] - pipeline.run(table_updates, table_name="_new_tables") - # @@@DLT_SNIPPET_END markdown_load_back - - # @@@DLT_SNIPPET_START markdown_notify - # check for schema updates: - schema_updates = [p.schema_update for p in load_info.load_packages] - # send notifications if there are schema updates - if schema_updates: - # send notification - send_slack_message( - pipeline.runtime_config.slack_incoming_hook, "Schema was updated!" - ) - # @@@DLT_SNIPPET_END markdown_notify - # @@@DLT_SNIPPET_START markdown_retry_cm - from tenacity import ( - Retrying, - retry_if_exception, - stop_after_attempt, - wait_exponential, - ) + from dlt.common import logger + from dlt.common.runtime.slack import send_slack_message + from dlt.pipeline.helpers import retry_load - from dlt.common.runtime.slack import send_slack_message - from dlt.pipeline.helpers import retry_load + MAX_PLAYERS = 5 + def load_data_with_retry(pipeline, data): try: for attempt in Retrying( stop=stop_after_attempt(5), @@ -152,50 +82,99 @@ def load_data_with_retry(data): reraise=True, ): with attempt: - pipeline.run(data) + logger.info( + f"Running the pipeline, attempt={attempt.retry_state.attempt_number}" + ) + load_info = pipeline.run(data) + logger.info(str(load_info)) + # raise on failed jobs + load_info.raise_on_failed_jobs() + # send notification + send_slack_message( + pipeline.runtime_config.slack_incoming_hook, + "Data was successfully loaded!" + ) except Exception: - # we get here after all the retries + # we get here after all the failed retries + # send notification + send_slack_message( + pipeline.runtime_config.slack_incoming_hook, + "Something went wrong!" + ) raise + finally: + # we get here after a successful attempt + # see when load was started + logger.info(f"Pipeline was started: {load_info.started_at}") + # print the information on the first load package and all jobs inside + logger.info(f"First load package info: {load_info.load_packages[0]}") + # print the information on the first completed job in first load package + logger.info( + f"First completed job info: {load_info.load_packages[0].jobs['completed_jobs'][0]}" + ) + + # check for schema updates: + schema_updates = [p.schema_update for p in load_info.load_packages] + # send notifications if there are schema updates + if schema_updates: + # send notification + send_slack_message( + pipeline.runtime_config.slack_incoming_hook, "Schema was updated!" + ) + + # To run simple tests with `sql_client`, such as checking table counts and + # warning if there is no data, you can use the `execute_query` method + with pipeline.sql_client() as client: + with client.execute_query("SELECT COUNT(*) FROM players") as cursor: + count = cursor.fetchone()[0] + if count == 0: + logger.info("Warning: No data in players table") + else: + logger.info(f"Players table contains {count} rows") + assert count == MAX_PLAYERS # @@@DLT_REMOVE + + # To run simple tests with `normalize_info`, such as checking table counts and + # warning if there is no data, you can use the `row_counts` attribute. + normalize_info = pipeline.last_trace.last_normalize_info + count = normalize_info.row_counts.get("players", 0) + if count == 0: + logger.info("Warning: No data in players table") + else: + logger.info(f"Players table contains {count} rows") + assert count == MAX_PLAYERS # @@@DLT_REMOVE + + # we reuse the pipeline instance below and load to the same dataset as data + logger.info("Saving the load info in the destination") + pipeline.run([load_info], table_name="_load_info") + # save trace to destination, sensitive data will be removed + logger.info("Saving the trace in the destination") + pipeline.run([pipeline.last_trace], table_name="_trace") + + # print all the new tables/columns in + for package in load_info.load_packages: + for table_name, table in package.schema_update.items(): + logger.info(f"Table {table_name}: {table.get('description')}") + for column_name, column in table["columns"].items(): + logger.info(f"\tcolumn {column_name}: {column['data_type']}") + + # save the new tables and column schemas to the destination: + table_updates = [p.asdict()["tables"] for p in load_info.load_packages] + pipeline.run(table_updates, table_name="_new_tables") + + return load_info + # @@@DLT_SNIPPET_END markdown_retry_cm - # @@@DLT_SNIPPET_START markdown_retry_run - load_info_retry = load_data_with_retry(data) - # @@@DLT_SNIPPET_END markdown_retry_run - - # @@@DLT_SNIPPET_START markdown_sql_client - with pipeline.sql_client() as client: - with client.execute_query("SELECT COUNT(*) FROM players") as cursor: - count_client = cursor.fetchone()[0] - if count_client == 0: - print("Warning: No data in players table") - else: - print(f"Players table contains {count_client} rows") - # @@@DLT_SNIPPET_END markdown_sql_client - - # @@@DLT_SNIPPET_START markdown_norm_info - normalize_info = pipeline.last_trace.last_normalize_info - count = normalize_info.row_counts.get("players", 0) - if count == 0: - print("Warning: No data in players table") - else: - print(f"Players table contains {count} rows") - # @@@DLT_SNIPPET_END markdown_norm_info - # @@@DLT_SNIPPET_END example - - # check that stuff was loaded - row_counts = pipeline.last_trace.last_normalize_info.row_counts - assert row_counts["players"] == max_players - - packages_num = len(load_info.load_packages) - assert packages_num == 1 - - jobs_num = len(load_info.load_packages[0].jobs["completed_jobs"]) - assert jobs_num == 4 - - packages_num = len(load_info_retry.load_packages) - assert packages_num == 1 - - jobs_num = len(load_info_retry.load_packages[0].jobs["completed_jobs"]) - assert jobs_num == 3 - - assert count_client == max_players + # @@@DLT_SNIPPET_START markdown_pipeline + __name__ = "__main__" # @@@DLT_REMOVE + if __name__ == "__main__": + # create dlt pipeline + pipeline = dlt.pipeline( + pipeline_name="chess_pipeline", + destination="duckdb", + dataset_name="chess_data", + ) + # get data for a few famous players + data = chess(chess_url="https://api.chess.com/pub/", max_players=MAX_PLAYERS) + load_info = load_data_with_retry(pipeline, data) + # @@@DLT_SNIPPET_END markdown_pipeline diff --git a/docs/website/docs/examples/chess_production/index.md b/docs/website/docs/examples/chess_production/index.md index 6dc308f0e8..01d976225d 100644 --- a/docs/website/docs/examples/chess_production/index.md +++ b/docs/website/docs/examples/chess_production/index.md @@ -23,7 +23,7 @@ We'll learn how to: - Using context managers to independently retry pipeline stages. - Run basic tests utilizing sql_client and normalize_info. -## Init pipeline +### Init pipeline ```py @@ -79,102 +79,8 @@ def chess( ``` -Run the pipeline: - -```py -if __name__ == "__main__": - # create dlt pipeline - pipeline = dlt.pipeline( - pipeline_name="chess_pipeline", - destination="duckdb", - dataset_name="chess_data", - full_refresh=True, - ) - max_players = 5 - # get data for a few famous players - data = chess(chess_url="https://api.chess.com/pub/", max_players=max_players) - load_info = pipeline.run(data) - print(load_info) -``` - - -## Inspecting packages - -To inspect a load process after running a pipeline: - - -```py -# see when load was started -print(f"Pipeline was started: {load_info.started_at}") -# print the information on the first load package and all jobs inside -print(f"First load package info: {load_info.load_packages[0]}") -# print the information on the first completed job in first load package -print( - f"First completed job info: {load_info.load_packages[0].jobs['completed_jobs'][0]}" -) -``` - - - -## Loading back load info, schema updates, and traces - -To load back the `load_info`, schema updates, and traces for the pipeline: - - -```py -# we reuse the pipeline instance below and load to the same dataset as data -pipeline.run([load_info], table_name="_load_info") -# save trace to destination, sensitive data will be removed -pipeline.run([pipeline.last_trace], table_name="_trace") - -# print all the new tables/columns in -for package in load_info.load_packages: - for table_name, table in package.schema_update.items(): - print(f"Table {table_name}: {table.get('description')}") - for column_name, column in table["columns"].items(): - print(f"\tcolumn {column_name}: {column['data_type']}") - -# save the new tables and column schemas to the destination: -table_updates = [p.asdict()["tables"] for p in load_info.load_packages] -pipeline.run(table_updates, table_name="_new_tables") -``` - - -## Sending notifications if schema evolved - -To send notifications if the schema has evolved for the pipeline: - - -```py -# check for schema updates: -schema_updates = [p.schema_update for p in load_info.load_packages] -# send notifications if there are schema updates -if schema_updates: - # send notification - send_slack_message( - pipeline.runtime_config.slack_incoming_hook, "Schema was updated!" - ) -``` - - -You can configure Slack incoming hook via -`secrets.toml` or environment variables. - -In `secrets.toml`: -```toml -[runtime] -slack_incoming_hook="https://hooks.slack.com/services/T04DHMAF13Q/B04E7B1MQ1H/TDHEI123WUEE" -``` -ENV: - -```python -RUNTIME__SLACK_INCOMING_HOOK="https://hooks.slack.com/services/T04DHMAF13Q/B04E7B1MQ1H/TDHEI123WUEE" -``` - -## Using context managers to retry pipeline stages separately - -To use context managers to retry pipeline stages separately for the pipeline: +### Using context managers to retry pipeline stages separately ```py @@ -185,75 +91,115 @@ from tenacity import ( wait_exponential, ) +from dlt.common import logger from dlt.common.runtime.slack import send_slack_message from dlt.pipeline.helpers import retry_load -try: - for attempt in Retrying( - stop=stop_after_attempt(5), - wait=wait_exponential(multiplier=1.5, min=4, max=10), - retry=retry_if_exception(retry_load(())), - reraise=True, - ): - with attempt: - pipeline.run(data) -except Exception: - # we get here after all the retries - raise -``` - - -You can also use tenacity to decorate functions: - - -```py -from tenacity import retry, retry_if_exception, stop_after_attempt, wait_exponential - -from dlt.pipeline.helpers import retry_load - -@retry( - stop=stop_after_attempt(5), - wait=wait_exponential(multiplier=1.5, min=4, max=10), - retry=retry_if_exception(retry_load(("extract", "load"))), - reraise=True, -) -def load_data_with_retry(data): - return pipeline.run(data) -``` - - - -```py -load_info_retry = load_data_with_retry(data) -``` - - -## Running simple tests with sql_client (table counts, warn if no data) -To run simple tests with `sql_client`, such as checking table counts and -warning if there is no data, you can use the `execute_query` method: +MAX_PLAYERS = 5 + +def load_data_with_retry(pipeline, data): + try: + for attempt in Retrying( + stop=stop_after_attempt(5), + wait=wait_exponential(multiplier=1.5, min=4, max=10), + retry=retry_if_exception(retry_load(())), + reraise=True, + ): + with attempt: + logger.info( + f"Running the pipeline, attempt={attempt.retry_state.attempt_number}" + ) + load_info = pipeline.run(data) + logger.info(str(load_info)) + # raise on failed jobs + load_info.raise_on_failed_jobs() + # send notification + send_slack_message( + pipeline.runtime_config.slack_incoming_hook, + "Data was successfully loaded!" + ) + except Exception: + # we get here after all the failed retries + # send notification + send_slack_message( + pipeline.runtime_config.slack_incoming_hook, + "Something went wrong!" + ) + raise + finally: + # we get here after a successful attempt + # see when load was started + logger.info(f"Pipeline was started: {load_info.started_at}") + # print the information on the first load package and all jobs inside + logger.info(f"First load package info: {load_info.load_packages[0]}") + # print the information on the first completed job in first load package + logger.info( + f"First completed job info: {load_info.load_packages[0].jobs['completed_jobs'][0]}" + ) - -```py -with pipeline.sql_client() as client: - with client.execute_query("SELECT COUNT(*) FROM players") as cursor: - count_client = cursor.fetchone()[0] - if count_client == 0: - print("Warning: No data in players table") + # check for schema updates: + schema_updates = [p.schema_update for p in load_info.load_packages] + # send notifications if there are schema updates + if schema_updates: + # send notification + send_slack_message( + pipeline.runtime_config.slack_incoming_hook, "Schema was updated!" + ) + + # To run simple tests with `sql_client`, such as checking table counts and + # warning if there is no data, you can use the `execute_query` method + with pipeline.sql_client() as client: + with client.execute_query("SELECT COUNT(*) FROM players") as cursor: + count = cursor.fetchone()[0] + if count == 0: + logger.info("Warning: No data in players table") + else: + logger.info(f"Players table contains {count} rows") + + # To run simple tests with `normalize_info`, such as checking table counts and + # warning if there is no data, you can use the `row_counts` attribute. + normalize_info = pipeline.last_trace.last_normalize_info + count = normalize_info.row_counts.get("players", 0) + if count == 0: + logger.info("Warning: No data in players table") else: - print(f"Players table contains {count_client} rows") + logger.info(f"Players table contains {count} rows") + + # we reuse the pipeline instance below and load to the same dataset as data + logger.info("Saving the load info in the destination") + pipeline.run([load_info], table_name="_load_info") + # save trace to destination, sensitive data will be removed + logger.info("Saving the trace in the destination") + pipeline.run([pipeline.last_trace], table_name="_trace") + + # print all the new tables/columns in + for package in load_info.load_packages: + for table_name, table in package.schema_update.items(): + logger.info(f"Table {table_name}: {table.get('description')}") + for column_name, column in table["columns"].items(): + logger.info(f"\tcolumn {column_name}: {column['data_type']}") + + # save the new tables and column schemas to the destination: + table_updates = [p.asdict()["tables"] for p in load_info.load_packages] + pipeline.run(table_updates, table_name="_new_tables") + + return load_info ``` - + -To run simple tests with `normalize_info`, such as checking table counts and -warning if there is no data, you can use the `row_counts` attribute: +### Run the pipeline - + ```py -normalize_info = pipeline.last_trace.last_normalize_info -count = normalize_info.row_counts.get("players", 0) -if count == 0: - print("Warning: No data in players table") -else: - print(f"Players table contains {count} rows") +if __name__ == "__main__": + # create dlt pipeline + pipeline = dlt.pipeline( + pipeline_name="chess_pipeline", + destination="duckdb", + dataset_name="chess_data", + ) + # get data for a few famous players + data = chess(chess_url="https://api.chess.com/pub/", max_players=MAX_PLAYERS) + load_info = load_data_with_retry(pipeline, data) ``` - \ No newline at end of file + \ No newline at end of file From 5f7d89b1e190494752cadb5783014f08b2bbee6e Mon Sep 17 00:00:00 2001 From: AstrakhantsevaAA Date: Mon, 30 Oct 2023 19:05:40 +0100 Subject: [PATCH 10/15] add more tests --- .../docs/examples/chess_production/code/chess-snippets.py | 3 +++ docs/website/docs/examples/chess_production/index.md | 4 ++-- 2 files changed, 5 insertions(+), 2 deletions(-) diff --git a/docs/website/docs/examples/chess_production/code/chess-snippets.py b/docs/website/docs/examples/chess_production/code/chess-snippets.py index 7c16d7a09b..96e5f61998 100644 --- a/docs/website/docs/examples/chess_production/code/chess-snippets.py +++ b/docs/website/docs/examples/chess_production/code/chess-snippets.py @@ -146,9 +146,11 @@ def load_data_with_retry(pipeline, data): # we reuse the pipeline instance below and load to the same dataset as data logger.info("Saving the load info in the destination") pipeline.run([load_info], table_name="_load_info") + assert "_load_info" in pipeline.last_trace.last_normalize_info.row_counts # @@@DLT_REMOVE # save trace to destination, sensitive data will be removed logger.info("Saving the trace in the destination") pipeline.run([pipeline.last_trace], table_name="_trace") + assert "_trace" in pipeline.last_trace.last_normalize_info.row_counts # @@@DLT_REMOVE # print all the new tables/columns in for package in load_info.load_packages: @@ -160,6 +162,7 @@ def load_data_with_retry(pipeline, data): # save the new tables and column schemas to the destination: table_updates = [p.asdict()["tables"] for p in load_info.load_packages] pipeline.run(table_updates, table_name="_new_tables") + assert "_new_tables" in pipeline.last_trace.last_normalize_info.row_counts # @@@DLT_REMOVE return load_info diff --git a/docs/website/docs/examples/chess_production/index.md b/docs/website/docs/examples/chess_production/index.md index 01d976225d..cb10ed189e 100644 --- a/docs/website/docs/examples/chess_production/index.md +++ b/docs/website/docs/examples/chess_production/index.md @@ -21,9 +21,9 @@ We'll learn how to: - Loading back load information, schema updates, and traces. - Triggering notifications in case of schema evolution. - Using context managers to independently retry pipeline stages. -- Run basic tests utilizing sql_client and normalize_info. +- Run basic tests utilizing `sql_client` and `normalize_info`. -### Init pipeline +### Init chess source ```py From d74c58ef751346df1b7f5b18b66084eba3e30d28 Mon Sep 17 00:00:00 2001 From: AstrakhantsevaAA Date: Tue, 31 Oct 2023 10:57:12 +0100 Subject: [PATCH 11/15] delete finally --- docs/examples/chess_production/chess.py | 127 ++++++++-------- .../chess_production/code/chess-snippets.py | 137 +++++++++--------- .../docs/examples/chess_production/index.md | 127 ++++++++-------- 3 files changed, 197 insertions(+), 194 deletions(-) diff --git a/docs/examples/chess_production/chess.py b/docs/examples/chess_production/chess.py index d49c39fe38..f11fd0ff06 100644 --- a/docs/examples/chess_production/chess.py +++ b/docs/examples/chess_production/chess.py @@ -76,13 +76,14 @@ def load_data_with_retry(pipeline, data): ) load_info = pipeline.run(data) logger.info(str(load_info)) - # raise on failed jobs - load_info.raise_on_failed_jobs() - # send notification - send_slack_message( - pipeline.runtime_config.slack_incoming_hook, - "Data was successfully loaded!" - ) + + # raise on failed jobs + load_info.raise_on_failed_jobs() + # send notification + send_slack_message( + pipeline.runtime_config.slack_incoming_hook, + "Data was successfully loaded!" + ) except Exception: # we get here after all the failed retries # send notification @@ -91,64 +92,64 @@ def load_data_with_retry(pipeline, data): "Something went wrong!" ) raise - finally: - # we get here after a successful attempt - # see when load was started - logger.info(f"Pipeline was started: {load_info.started_at}") - # print the information on the first load package and all jobs inside - logger.info(f"First load package info: {load_info.load_packages[0]}") - # print the information on the first completed job in first load package - logger.info( - f"First completed job info: {load_info.load_packages[0].jobs['completed_jobs'][0]}" - ) - # check for schema updates: - schema_updates = [p.schema_update for p in load_info.load_packages] - # send notifications if there are schema updates - if schema_updates: - # send notification - send_slack_message( - pipeline.runtime_config.slack_incoming_hook, "Schema was updated!" - ) + # we get here after a successful attempt + # see when load was started + logger.info(f"Pipeline was started: {load_info.started_at}") + # print the information on the first load package and all jobs inside + logger.info(f"First load package info: {load_info.load_packages[0]}") + # print the information on the first completed job in first load package + logger.info( + f"First completed job info: {load_info.load_packages[0].jobs['completed_jobs'][0]}" + ) + + # check for schema updates: + schema_updates = [p.schema_update for p in load_info.load_packages] + # send notifications if there are schema updates + if schema_updates: + # send notification + send_slack_message( + pipeline.runtime_config.slack_incoming_hook, "Schema was updated!" + ) - # To run simple tests with `sql_client`, such as checking table counts and - # warning if there is no data, you can use the `execute_query` method - with pipeline.sql_client() as client: - with client.execute_query("SELECT COUNT(*) FROM players") as cursor: - count = cursor.fetchone()[0] - if count == 0: - logger.info("Warning: No data in players table") - else: - logger.info(f"Players table contains {count} rows") - - # To run simple tests with `normalize_info`, such as checking table counts and - # warning if there is no data, you can use the `row_counts` attribute. - normalize_info = pipeline.last_trace.last_normalize_info - count = normalize_info.row_counts.get("players", 0) - if count == 0: - logger.info("Warning: No data in players table") - else: - logger.info(f"Players table contains {count} rows") - - # we reuse the pipeline instance below and load to the same dataset as data - logger.info("Saving the load info in the destination") - pipeline.run([load_info], table_name="_load_info") - # save trace to destination, sensitive data will be removed - logger.info("Saving the trace in the destination") - pipeline.run([pipeline.last_trace], table_name="_trace") - - # print all the new tables/columns in - for package in load_info.load_packages: - for table_name, table in package.schema_update.items(): - logger.info(f"Table {table_name}: {table.get('description')}") - for column_name, column in table["columns"].items(): - logger.info(f"\tcolumn {column_name}: {column['data_type']}") - - # save the new tables and column schemas to the destination: - table_updates = [p.asdict()["tables"] for p in load_info.load_packages] - pipeline.run(table_updates, table_name="_new_tables") - - return load_info + # To run simple tests with `sql_client`, such as checking table counts and + # warning if there is no data, you can use the `execute_query` method + with pipeline.sql_client() as client: + with client.execute_query("SELECT COUNT(*) FROM players") as cursor: + count = cursor.fetchone()[0] + if count == 0: + logger.info("Warning: No data in players table") + else: + logger.info(f"Players table contains {count} rows") + + # To run simple tests with `normalize_info`, such as checking table counts and + # warning if there is no data, you can use the `row_counts` attribute. + normalize_info = pipeline.last_trace.last_normalize_info + count = normalize_info.row_counts.get("players", 0) + if count == 0: + logger.info("Warning: No data in players table") + else: + logger.info(f"Players table contains {count} rows") + + # we reuse the pipeline instance below and load to the same dataset as data + logger.info("Saving the load info in the destination") + pipeline.run([load_info], table_name="_load_info") + # save trace to destination, sensitive data will be removed + logger.info("Saving the trace in the destination") + pipeline.run([pipeline.last_trace], table_name="_trace") + + # print all the new tables/columns in + for package in load_info.load_packages: + for table_name, table in package.schema_update.items(): + logger.info(f"Table {table_name}: {table.get('description')}") + for column_name, column in table["columns"].items(): + logger.info(f"\tcolumn {column_name}: {column['data_type']}") + + # save the new tables and column schemas to the destination: + table_updates = [p.asdict()["tables"] for p in load_info.load_packages] + pipeline.run(table_updates, table_name="_new_tables") + + return load_info if __name__ == "__main__": diff --git a/docs/website/docs/examples/chess_production/code/chess-snippets.py b/docs/website/docs/examples/chess_production/code/chess-snippets.py index 96e5f61998..2ecf04bac7 100644 --- a/docs/website/docs/examples/chess_production/code/chess-snippets.py +++ b/docs/website/docs/examples/chess_production/code/chess-snippets.py @@ -87,13 +87,14 @@ def load_data_with_retry(pipeline, data): ) load_info = pipeline.run(data) logger.info(str(load_info)) - # raise on failed jobs - load_info.raise_on_failed_jobs() - # send notification - send_slack_message( - pipeline.runtime_config.slack_incoming_hook, - "Data was successfully loaded!" - ) + + # raise on failed jobs + load_info.raise_on_failed_jobs() + # send notification + send_slack_message( + pipeline.runtime_config.slack_incoming_hook, + "Data was successfully loaded!" + ) except Exception: # we get here after all the failed retries # send notification @@ -102,69 +103,69 @@ def load_data_with_retry(pipeline, data): "Something went wrong!" ) raise - finally: - # we get here after a successful attempt - # see when load was started - logger.info(f"Pipeline was started: {load_info.started_at}") - # print the information on the first load package and all jobs inside - logger.info(f"First load package info: {load_info.load_packages[0]}") - # print the information on the first completed job in first load package - logger.info( - f"First completed job info: {load_info.load_packages[0].jobs['completed_jobs'][0]}" - ) - # check for schema updates: - schema_updates = [p.schema_update for p in load_info.load_packages] - # send notifications if there are schema updates - if schema_updates: - # send notification - send_slack_message( - pipeline.runtime_config.slack_incoming_hook, "Schema was updated!" - ) + # we get here after a successful attempt + # see when load was started + logger.info(f"Pipeline was started: {load_info.started_at}") + # print the information on the first load package and all jobs inside + logger.info(f"First load package info: {load_info.load_packages[0]}") + # print the information on the first completed job in first load package + logger.info( + f"First completed job info: {load_info.load_packages[0].jobs['completed_jobs'][0]}" + ) + + # check for schema updates: + schema_updates = [p.schema_update for p in load_info.load_packages] + # send notifications if there are schema updates + if schema_updates: + # send notification + send_slack_message( + pipeline.runtime_config.slack_incoming_hook, "Schema was updated!" + ) - # To run simple tests with `sql_client`, such as checking table counts and - # warning if there is no data, you can use the `execute_query` method - with pipeline.sql_client() as client: - with client.execute_query("SELECT COUNT(*) FROM players") as cursor: - count = cursor.fetchone()[0] - if count == 0: - logger.info("Warning: No data in players table") - else: - logger.info(f"Players table contains {count} rows") - assert count == MAX_PLAYERS # @@@DLT_REMOVE - - # To run simple tests with `normalize_info`, such as checking table counts and - # warning if there is no data, you can use the `row_counts` attribute. - normalize_info = pipeline.last_trace.last_normalize_info - count = normalize_info.row_counts.get("players", 0) - if count == 0: - logger.info("Warning: No data in players table") - else: - logger.info(f"Players table contains {count} rows") - assert count == MAX_PLAYERS # @@@DLT_REMOVE - - # we reuse the pipeline instance below and load to the same dataset as data - logger.info("Saving the load info in the destination") - pipeline.run([load_info], table_name="_load_info") - assert "_load_info" in pipeline.last_trace.last_normalize_info.row_counts # @@@DLT_REMOVE - # save trace to destination, sensitive data will be removed - logger.info("Saving the trace in the destination") - pipeline.run([pipeline.last_trace], table_name="_trace") - assert "_trace" in pipeline.last_trace.last_normalize_info.row_counts # @@@DLT_REMOVE - - # print all the new tables/columns in - for package in load_info.load_packages: - for table_name, table in package.schema_update.items(): - logger.info(f"Table {table_name}: {table.get('description')}") - for column_name, column in table["columns"].items(): - logger.info(f"\tcolumn {column_name}: {column['data_type']}") - - # save the new tables and column schemas to the destination: - table_updates = [p.asdict()["tables"] for p in load_info.load_packages] - pipeline.run(table_updates, table_name="_new_tables") - assert "_new_tables" in pipeline.last_trace.last_normalize_info.row_counts # @@@DLT_REMOVE - - return load_info + # To run simple tests with `sql_client`, such as checking table counts and + # warning if there is no data, you can use the `execute_query` method + with pipeline.sql_client() as client: + with client.execute_query("SELECT COUNT(*) FROM players") as cursor: + count = cursor.fetchone()[0] + if count == 0: + logger.info("Warning: No data in players table") + else: + logger.info(f"Players table contains {count} rows") + assert count == MAX_PLAYERS # @@@DLT_REMOVE + + # To run simple tests with `normalize_info`, such as checking table counts and + # warning if there is no data, you can use the `row_counts` attribute. + normalize_info = pipeline.last_trace.last_normalize_info + count = normalize_info.row_counts.get("players", 0) + if count == 0: + logger.info("Warning: No data in players table") + else: + logger.info(f"Players table contains {count} rows") + assert count == MAX_PLAYERS # @@@DLT_REMOVE + + # we reuse the pipeline instance below and load to the same dataset as data + logger.info("Saving the load info in the destination") + pipeline.run([load_info], table_name="_load_info") + assert "_load_info" in pipeline.last_trace.last_normalize_info.row_counts # @@@DLT_REMOVE + # save trace to destination, sensitive data will be removed + logger.info("Saving the trace in the destination") + pipeline.run([pipeline.last_trace], table_name="_trace") + assert "_trace" in pipeline.last_trace.last_normalize_info.row_counts # @@@DLT_REMOVE + + # print all the new tables/columns in + for package in load_info.load_packages: + for table_name, table in package.schema_update.items(): + logger.info(f"Table {table_name}: {table.get('description')}") + for column_name, column in table["columns"].items(): + logger.info(f"\tcolumn {column_name}: {column['data_type']}") + + # save the new tables and column schemas to the destination: + table_updates = [p.asdict()["tables"] for p in load_info.load_packages] + pipeline.run(table_updates, table_name="_new_tables") + assert "_new_tables" in pipeline.last_trace.last_normalize_info.row_counts # @@@DLT_REMOVE + + return load_info # @@@DLT_SNIPPET_END markdown_retry_cm diff --git a/docs/website/docs/examples/chess_production/index.md b/docs/website/docs/examples/chess_production/index.md index cb10ed189e..81524f7ee2 100644 --- a/docs/website/docs/examples/chess_production/index.md +++ b/docs/website/docs/examples/chess_production/index.md @@ -111,13 +111,14 @@ def load_data_with_retry(pipeline, data): ) load_info = pipeline.run(data) logger.info(str(load_info)) - # raise on failed jobs - load_info.raise_on_failed_jobs() - # send notification - send_slack_message( - pipeline.runtime_config.slack_incoming_hook, - "Data was successfully loaded!" - ) + + # raise on failed jobs + load_info.raise_on_failed_jobs() + # send notification + send_slack_message( + pipeline.runtime_config.slack_incoming_hook, + "Data was successfully loaded!" + ) except Exception: # we get here after all the failed retries # send notification @@ -126,64 +127,64 @@ def load_data_with_retry(pipeline, data): "Something went wrong!" ) raise - finally: - # we get here after a successful attempt - # see when load was started - logger.info(f"Pipeline was started: {load_info.started_at}") - # print the information on the first load package and all jobs inside - logger.info(f"First load package info: {load_info.load_packages[0]}") - # print the information on the first completed job in first load package - logger.info( - f"First completed job info: {load_info.load_packages[0].jobs['completed_jobs'][0]}" - ) - # check for schema updates: - schema_updates = [p.schema_update for p in load_info.load_packages] - # send notifications if there are schema updates - if schema_updates: - # send notification - send_slack_message( - pipeline.runtime_config.slack_incoming_hook, "Schema was updated!" - ) + # we get here after a successful attempt + # see when load was started + logger.info(f"Pipeline was started: {load_info.started_at}") + # print the information on the first load package and all jobs inside + logger.info(f"First load package info: {load_info.load_packages[0]}") + # print the information on the first completed job in first load package + logger.info( + f"First completed job info: {load_info.load_packages[0].jobs['completed_jobs'][0]}" + ) + + # check for schema updates: + schema_updates = [p.schema_update for p in load_info.load_packages] + # send notifications if there are schema updates + if schema_updates: + # send notification + send_slack_message( + pipeline.runtime_config.slack_incoming_hook, "Schema was updated!" + ) - # To run simple tests with `sql_client`, such as checking table counts and - # warning if there is no data, you can use the `execute_query` method - with pipeline.sql_client() as client: - with client.execute_query("SELECT COUNT(*) FROM players") as cursor: - count = cursor.fetchone()[0] - if count == 0: - logger.info("Warning: No data in players table") - else: - logger.info(f"Players table contains {count} rows") - - # To run simple tests with `normalize_info`, such as checking table counts and - # warning if there is no data, you can use the `row_counts` attribute. - normalize_info = pipeline.last_trace.last_normalize_info - count = normalize_info.row_counts.get("players", 0) - if count == 0: - logger.info("Warning: No data in players table") - else: - logger.info(f"Players table contains {count} rows") - - # we reuse the pipeline instance below and load to the same dataset as data - logger.info("Saving the load info in the destination") - pipeline.run([load_info], table_name="_load_info") - # save trace to destination, sensitive data will be removed - logger.info("Saving the trace in the destination") - pipeline.run([pipeline.last_trace], table_name="_trace") - - # print all the new tables/columns in - for package in load_info.load_packages: - for table_name, table in package.schema_update.items(): - logger.info(f"Table {table_name}: {table.get('description')}") - for column_name, column in table["columns"].items(): - logger.info(f"\tcolumn {column_name}: {column['data_type']}") - - # save the new tables and column schemas to the destination: - table_updates = [p.asdict()["tables"] for p in load_info.load_packages] - pipeline.run(table_updates, table_name="_new_tables") - - return load_info + # To run simple tests with `sql_client`, such as checking table counts and + # warning if there is no data, you can use the `execute_query` method + with pipeline.sql_client() as client: + with client.execute_query("SELECT COUNT(*) FROM players") as cursor: + count = cursor.fetchone()[0] + if count == 0: + logger.info("Warning: No data in players table") + else: + logger.info(f"Players table contains {count} rows") + + # To run simple tests with `normalize_info`, such as checking table counts and + # warning if there is no data, you can use the `row_counts` attribute. + normalize_info = pipeline.last_trace.last_normalize_info + count = normalize_info.row_counts.get("players", 0) + if count == 0: + logger.info("Warning: No data in players table") + else: + logger.info(f"Players table contains {count} rows") + + # we reuse the pipeline instance below and load to the same dataset as data + logger.info("Saving the load info in the destination") + pipeline.run([load_info], table_name="_load_info") + # save trace to destination, sensitive data will be removed + logger.info("Saving the trace in the destination") + pipeline.run([pipeline.last_trace], table_name="_trace") + + # print all the new tables/columns in + for package in load_info.load_packages: + for table_name, table in package.schema_update.items(): + logger.info(f"Table {table_name}: {table.get('description')}") + for column_name, column in table["columns"].items(): + logger.info(f"\tcolumn {column_name}: {column['data_type']}") + + # save the new tables and column schemas to the destination: + table_updates = [p.asdict()["tables"] for p in load_info.load_packages] + pipeline.run(table_updates, table_name="_new_tables") + + return load_info ``` From 6345ad42e730fe0c0e29585672a8880c0efcb0f8 Mon Sep 17 00:00:00 2001 From: AstrakhantsevaAA Date: Tue, 31 Oct 2023 11:10:22 +0100 Subject: [PATCH 12/15] delete unused load_info --- .../docs/examples/chess_production/code/chess-snippets.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/docs/website/docs/examples/chess_production/code/chess-snippets.py b/docs/website/docs/examples/chess_production/code/chess-snippets.py index 2ecf04bac7..bff1eadd1d 100644 --- a/docs/website/docs/examples/chess_production/code/chess-snippets.py +++ b/docs/website/docs/examples/chess_production/code/chess-snippets.py @@ -180,5 +180,5 @@ def load_data_with_retry(pipeline, data): ) # get data for a few famous players data = chess(chess_url="https://api.chess.com/pub/", max_players=MAX_PLAYERS) - load_info = load_data_with_retry(pipeline, data) + load_data_with_retry(pipeline, data) # @@@DLT_SNIPPET_END markdown_pipeline From 3a0f21588a568e0f6e979a8a3a023401815db137 Mon Sep 17 00:00:00 2001 From: AstrakhantsevaAA Date: Tue, 31 Oct 2023 11:23:14 +0100 Subject: [PATCH 13/15] delete secrets --- docs/examples/chess_production/.dlt/secrets.toml | 2 -- docs/examples/chess_production/chess.py | 2 +- .../docs/examples/chess_production/code/.dlt/secrets.toml | 4 ---- docs/website/docs/examples/chess_production/index.md | 2 +- 4 files changed, 2 insertions(+), 8 deletions(-) delete mode 100644 docs/examples/chess_production/.dlt/secrets.toml delete mode 100644 docs/website/docs/examples/chess_production/code/.dlt/secrets.toml diff --git a/docs/examples/chess_production/.dlt/secrets.toml b/docs/examples/chess_production/.dlt/secrets.toml deleted file mode 100644 index 3e49175ed7..0000000000 --- a/docs/examples/chess_production/.dlt/secrets.toml +++ /dev/null @@ -1,2 +0,0 @@ -[runtime] -slack_incoming_hook="" \ No newline at end of file diff --git a/docs/examples/chess_production/chess.py b/docs/examples/chess_production/chess.py index f11fd0ff06..0ff5ce7c7f 100644 --- a/docs/examples/chess_production/chess.py +++ b/docs/examples/chess_production/chess.py @@ -161,4 +161,4 @@ def load_data_with_retry(pipeline, data): ) # get data for a few famous players data = chess(chess_url="https://api.chess.com/pub/", max_players=MAX_PLAYERS) - load_info = load_data_with_retry(pipeline, data) \ No newline at end of file + load_data_with_retry(pipeline, data) diff --git a/docs/website/docs/examples/chess_production/code/.dlt/secrets.toml b/docs/website/docs/examples/chess_production/code/.dlt/secrets.toml deleted file mode 100644 index 978f2c1bec..0000000000 --- a/docs/website/docs/examples/chess_production/code/.dlt/secrets.toml +++ /dev/null @@ -1,4 +0,0 @@ -# @@@DLT_SNIPPET_START example -[runtime] -slack_incoming_hook="" -# @@@DLT_SNIPPET_END example diff --git a/docs/website/docs/examples/chess_production/index.md b/docs/website/docs/examples/chess_production/index.md index 81524f7ee2..f821600c67 100644 --- a/docs/website/docs/examples/chess_production/index.md +++ b/docs/website/docs/examples/chess_production/index.md @@ -201,6 +201,6 @@ if __name__ == "__main__": ) # get data for a few famous players data = chess(chess_url="https://api.chess.com/pub/", max_players=MAX_PLAYERS) - load_info = load_data_with_retry(pipeline, data) + load_data_with_retry(pipeline, data) ``` \ No newline at end of file From 3f81eb0d482b28b900222c073f0d740c52a4622a Mon Sep 17 00:00:00 2001 From: AstrakhantsevaAA Date: Tue, 31 Oct 2023 11:35:11 +0100 Subject: [PATCH 14/15] add env to github actions --- .github/workflows/test_doc_snippets.yml | 3 ++- .../docs/examples/chess_production/code/chess-snippets.py | 1 - 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/.github/workflows/test_doc_snippets.yml b/.github/workflows/test_doc_snippets.yml index e158c2d669..554c2efba2 100644 --- a/.github/workflows/test_doc_snippets.yml +++ b/.github/workflows/test_doc_snippets.yml @@ -20,7 +20,8 @@ env: # zendesk vars for example SOURCES__ZENDESK__CREDENTIALS: ${{ secrets.ZENDESK__CREDENTIALS }} - + # Slack hook for chess in production example + RUNTIME__SLACK_INCOMING_HOOK: ${{ secrets.RUNTIME__SLACK_INCOMING_HOOK }} jobs: run_lint: diff --git a/docs/website/docs/examples/chess_production/code/chess-snippets.py b/docs/website/docs/examples/chess_production/code/chess-snippets.py index bff1eadd1d..6eb1bb757f 100644 --- a/docs/website/docs/examples/chess_production/code/chess-snippets.py +++ b/docs/website/docs/examples/chess_production/code/chess-snippets.py @@ -12,7 +12,6 @@ def incremental_snippet() -> None: import dlt from dlt.common import sleep - from dlt.common.runtime.slack import send_slack_message from dlt.common.typing import StrAny, TDataItems from dlt.sources.helpers.requests import client From ba8e5ca0e9c033ed34af82f43f56d20ceb2a442c Mon Sep 17 00:00:00 2001 From: AstrakhantsevaAA Date: Tue, 31 Oct 2023 11:43:41 +0100 Subject: [PATCH 15/15] del source name --- .../docs/examples/chess_production/code/chess-snippets.py | 2 -- 1 file changed, 2 deletions(-) diff --git a/docs/website/docs/examples/chess_production/code/chess-snippets.py b/docs/website/docs/examples/chess_production/code/chess-snippets.py index 6eb1bb757f..1cd1c86aed 100644 --- a/docs/website/docs/examples/chess_production/code/chess-snippets.py +++ b/docs/website/docs/examples/chess_production/code/chess-snippets.py @@ -1,7 +1,5 @@ from tests.utils import skipifgithubfork -__source_name__ = "chess_production" - @skipifgithubfork def incremental_snippet() -> None: