From 59258e110805efc8b1c2cf5071e9a35be360e8ca Mon Sep 17 00:00:00 2001 From: Alena Astrakhantseva Date: Tue, 31 Oct 2023 17:25:42 +0100 Subject: [PATCH] Example:chess production (#711) * added chess example * add env to github actions --- .github/workflows/test_doc_snippets.yml | 3 +- .../chess_production/.dlt/config.toml | 0 docs/examples/chess_production/__init__.py | 0 docs/examples/chess_production/chess.py | 164 ++++++++++++++ .../examples/chess_production/__init__.py | 0 .../chess_production/code/.dlt/config.toml | 2 + .../chess_production/code/__init__.py | 0 .../chess_production/code/chess-snippets.py | 181 +++++++++++++++ .../docs/examples/chess_production/index.md | 206 ++++++++++++++++++ docs/website/sidebars.js | 1 + 10 files changed, 556 insertions(+), 1 deletion(-) create mode 100644 docs/examples/chess_production/.dlt/config.toml create mode 100644 docs/examples/chess_production/__init__.py create mode 100644 docs/examples/chess_production/chess.py create mode 100644 docs/website/docs/examples/chess_production/__init__.py create mode 100644 docs/website/docs/examples/chess_production/code/.dlt/config.toml create mode 100644 docs/website/docs/examples/chess_production/code/__init__.py create mode 100644 docs/website/docs/examples/chess_production/code/chess-snippets.py create mode 100644 docs/website/docs/examples/chess_production/index.md diff --git a/.github/workflows/test_doc_snippets.yml b/.github/workflows/test_doc_snippets.yml index e158c2d669..554c2efba2 100644 --- a/.github/workflows/test_doc_snippets.yml +++ b/.github/workflows/test_doc_snippets.yml @@ -20,7 +20,8 @@ env: # zendesk vars for example SOURCES__ZENDESK__CREDENTIALS: ${{ secrets.ZENDESK__CREDENTIALS }} - + # Slack hook for chess in production example + RUNTIME__SLACK_INCOMING_HOOK: ${{ secrets.RUNTIME__SLACK_INCOMING_HOOK }} jobs: run_lint: diff --git a/docs/examples/chess_production/.dlt/config.toml b/docs/examples/chess_production/.dlt/config.toml new file mode 100644 index 0000000000..e69de29bb2 diff --git a/docs/examples/chess_production/__init__.py b/docs/examples/chess_production/__init__.py new file mode 100644 index 0000000000..e69de29bb2 diff --git a/docs/examples/chess_production/chess.py b/docs/examples/chess_production/chess.py new file mode 100644 index 0000000000..0ff5ce7c7f --- /dev/null +++ b/docs/examples/chess_production/chess.py @@ -0,0 +1,164 @@ +import threading +from typing import Any, Iterator + +import dlt +from dlt.common import sleep +from dlt.common.runtime.slack import send_slack_message +from dlt.common.typing import StrAny, TDataItems +from dlt.sources.helpers.requests import client + +@dlt.source +def chess( + chess_url: str = dlt.config.value, + title: str = "GM", + max_players: int = 2, + year: int = 2022, + month: int = 10, +) -> Any: + def _get_data_with_retry(path: str) -> StrAny: + r = client.get(f"{chess_url}{path}") + return r.json() # type: ignore + + @dlt.resource(write_disposition="replace") + def players() -> Iterator[TDataItems]: + # return players one by one, you could also return a list + # that would be faster but we want to pass players item by item to the transformer + yield from _get_data_with_retry(f"titled/{title}")["players"][:max_players] + + # this resource takes data from players and returns profiles + # it uses `defer` decorator to enable parallel run in thread pool. + # defer requires return at the end so we convert yield into return (we return one item anyway) + # you can still have yielding transformers, look for the test named `test_evolve_schema` + @dlt.transformer(data_from=players, write_disposition="replace") + @dlt.defer + def players_profiles(username: Any) -> TDataItems: + print( + f"getting {username} profile via thread {threading.current_thread().name}" + ) + sleep(1) # add some latency to show parallel runs + return _get_data_with_retry(f"player/{username}") + + # this resource takes data from players and returns games for the last month + # if not specified otherwise + @dlt.transformer(data_from=players, write_disposition="append") + def players_games(username: Any) -> Iterator[TDataItems]: + # https://api.chess.com/pub/player/{username}/games/{YYYY}/{MM} + path = f"player/{username}/games/{year:04d}/{month:02d}" + yield _get_data_with_retry(path)["games"] + + return players(), players_profiles, players_games + + +from tenacity import ( + Retrying, + retry_if_exception, + stop_after_attempt, + wait_exponential, +) + +from dlt.common import logger +from dlt.common.runtime.slack import send_slack_message +from dlt.pipeline.helpers import retry_load + +MAX_PLAYERS = 5 + +def load_data_with_retry(pipeline, data): + try: + for attempt in Retrying( + stop=stop_after_attempt(5), + wait=wait_exponential(multiplier=1.5, min=4, max=10), + retry=retry_if_exception(retry_load(())), + reraise=True, + ): + with attempt: + logger.info( + f"Running the pipeline, attempt={attempt.retry_state.attempt_number}" + ) + load_info = pipeline.run(data) + logger.info(str(load_info)) + + # raise on failed jobs + load_info.raise_on_failed_jobs() + # send notification + send_slack_message( + pipeline.runtime_config.slack_incoming_hook, + "Data was successfully loaded!" + ) + except Exception: + # we get here after all the failed retries + # send notification + send_slack_message( + pipeline.runtime_config.slack_incoming_hook, + "Something went wrong!" + ) + raise + + # we get here after a successful attempt + # see when load was started + logger.info(f"Pipeline was started: {load_info.started_at}") + # print the information on the first load package and all jobs inside + logger.info(f"First load package info: {load_info.load_packages[0]}") + # print the information on the first completed job in first load package + logger.info( + f"First completed job info: {load_info.load_packages[0].jobs['completed_jobs'][0]}" + ) + + # check for schema updates: + schema_updates = [p.schema_update for p in load_info.load_packages] + # send notifications if there are schema updates + if schema_updates: + # send notification + send_slack_message( + pipeline.runtime_config.slack_incoming_hook, "Schema was updated!" + ) + + # To run simple tests with `sql_client`, such as checking table counts and + # warning if there is no data, you can use the `execute_query` method + with pipeline.sql_client() as client: + with client.execute_query("SELECT COUNT(*) FROM players") as cursor: + count = cursor.fetchone()[0] + if count == 0: + logger.info("Warning: No data in players table") + else: + logger.info(f"Players table contains {count} rows") + + # To run simple tests with `normalize_info`, such as checking table counts and + # warning if there is no data, you can use the `row_counts` attribute. + normalize_info = pipeline.last_trace.last_normalize_info + count = normalize_info.row_counts.get("players", 0) + if count == 0: + logger.info("Warning: No data in players table") + else: + logger.info(f"Players table contains {count} rows") + + # we reuse the pipeline instance below and load to the same dataset as data + logger.info("Saving the load info in the destination") + pipeline.run([load_info], table_name="_load_info") + # save trace to destination, sensitive data will be removed + logger.info("Saving the trace in the destination") + pipeline.run([pipeline.last_trace], table_name="_trace") + + # print all the new tables/columns in + for package in load_info.load_packages: + for table_name, table in package.schema_update.items(): + logger.info(f"Table {table_name}: {table.get('description')}") + for column_name, column in table["columns"].items(): + logger.info(f"\tcolumn {column_name}: {column['data_type']}") + + # save the new tables and column schemas to the destination: + table_updates = [p.asdict()["tables"] for p in load_info.load_packages] + pipeline.run(table_updates, table_name="_new_tables") + + return load_info + + +if __name__ == "__main__": + # create dlt pipeline + pipeline = dlt.pipeline( + pipeline_name="chess_pipeline", + destination="duckdb", + dataset_name="chess_data", + ) + # get data for a few famous players + data = chess(chess_url="https://api.chess.com/pub/", max_players=MAX_PLAYERS) + load_data_with_retry(pipeline, data) diff --git a/docs/website/docs/examples/chess_production/__init__.py b/docs/website/docs/examples/chess_production/__init__.py new file mode 100644 index 0000000000..e69de29bb2 diff --git a/docs/website/docs/examples/chess_production/code/.dlt/config.toml b/docs/website/docs/examples/chess_production/code/.dlt/config.toml new file mode 100644 index 0000000000..be627e6c11 --- /dev/null +++ b/docs/website/docs/examples/chess_production/code/.dlt/config.toml @@ -0,0 +1,2 @@ +# @@@DLT_SNIPPET_START example +# @@@DLT_SNIPPET_END example diff --git a/docs/website/docs/examples/chess_production/code/__init__.py b/docs/website/docs/examples/chess_production/code/__init__.py new file mode 100644 index 0000000000..e69de29bb2 diff --git a/docs/website/docs/examples/chess_production/code/chess-snippets.py b/docs/website/docs/examples/chess_production/code/chess-snippets.py new file mode 100644 index 0000000000..1cd1c86aed --- /dev/null +++ b/docs/website/docs/examples/chess_production/code/chess-snippets.py @@ -0,0 +1,181 @@ +from tests.utils import skipifgithubfork + + +@skipifgithubfork +def incremental_snippet() -> None: + # @@@DLT_SNIPPET_START example + # @@@DLT_SNIPPET_START markdown_source + import threading + from typing import Any, Iterator + + import dlt + from dlt.common import sleep + from dlt.common.typing import StrAny, TDataItems + from dlt.sources.helpers.requests import client + + @dlt.source + def chess( + chess_url: str = dlt.config.value, + title: str = "GM", + max_players: int = 2, + year: int = 2022, + month: int = 10, + ) -> Any: + def _get_data_with_retry(path: str) -> StrAny: + r = client.get(f"{chess_url}{path}") + return r.json() # type: ignore + + @dlt.resource(write_disposition="replace") + def players() -> Iterator[TDataItems]: + # return players one by one, you could also return a list + # that would be faster but we want to pass players item by item to the transformer + yield from _get_data_with_retry(f"titled/{title}")["players"][:max_players] + + # this resource takes data from players and returns profiles + # it uses `defer` decorator to enable parallel run in thread pool. + # defer requires return at the end so we convert yield into return (we return one item anyway) + # you can still have yielding transformers, look for the test named `test_evolve_schema` + @dlt.transformer(data_from=players, write_disposition="replace") + @dlt.defer + def players_profiles(username: Any) -> TDataItems: + print( + f"getting {username} profile via thread {threading.current_thread().name}" + ) + sleep(1) # add some latency to show parallel runs + return _get_data_with_retry(f"player/{username}") + + # this resource takes data from players and returns games for the last month + # if not specified otherwise + @dlt.transformer(data_from=players, write_disposition="append") + def players_games(username: Any) -> Iterator[TDataItems]: + # https://api.chess.com/pub/player/{username}/games/{YYYY}/{MM} + path = f"player/{username}/games/{year:04d}/{month:02d}" + yield _get_data_with_retry(path)["games"] + + return players(), players_profiles, players_games + + # @@@DLT_SNIPPET_END markdown_source + + # @@@DLT_SNIPPET_START markdown_retry_cm + from tenacity import ( + Retrying, + retry_if_exception, + stop_after_attempt, + wait_exponential, + ) + + from dlt.common import logger + from dlt.common.runtime.slack import send_slack_message + from dlt.pipeline.helpers import retry_load + + MAX_PLAYERS = 5 + + def load_data_with_retry(pipeline, data): + try: + for attempt in Retrying( + stop=stop_after_attempt(5), + wait=wait_exponential(multiplier=1.5, min=4, max=10), + retry=retry_if_exception(retry_load(())), + reraise=True, + ): + with attempt: + logger.info( + f"Running the pipeline, attempt={attempt.retry_state.attempt_number}" + ) + load_info = pipeline.run(data) + logger.info(str(load_info)) + + # raise on failed jobs + load_info.raise_on_failed_jobs() + # send notification + send_slack_message( + pipeline.runtime_config.slack_incoming_hook, + "Data was successfully loaded!" + ) + except Exception: + # we get here after all the failed retries + # send notification + send_slack_message( + pipeline.runtime_config.slack_incoming_hook, + "Something went wrong!" + ) + raise + + # we get here after a successful attempt + # see when load was started + logger.info(f"Pipeline was started: {load_info.started_at}") + # print the information on the first load package and all jobs inside + logger.info(f"First load package info: {load_info.load_packages[0]}") + # print the information on the first completed job in first load package + logger.info( + f"First completed job info: {load_info.load_packages[0].jobs['completed_jobs'][0]}" + ) + + # check for schema updates: + schema_updates = [p.schema_update for p in load_info.load_packages] + # send notifications if there are schema updates + if schema_updates: + # send notification + send_slack_message( + pipeline.runtime_config.slack_incoming_hook, "Schema was updated!" + ) + + # To run simple tests with `sql_client`, such as checking table counts and + # warning if there is no data, you can use the `execute_query` method + with pipeline.sql_client() as client: + with client.execute_query("SELECT COUNT(*) FROM players") as cursor: + count = cursor.fetchone()[0] + if count == 0: + logger.info("Warning: No data in players table") + else: + logger.info(f"Players table contains {count} rows") + assert count == MAX_PLAYERS # @@@DLT_REMOVE + + # To run simple tests with `normalize_info`, such as checking table counts and + # warning if there is no data, you can use the `row_counts` attribute. + normalize_info = pipeline.last_trace.last_normalize_info + count = normalize_info.row_counts.get("players", 0) + if count == 0: + logger.info("Warning: No data in players table") + else: + logger.info(f"Players table contains {count} rows") + assert count == MAX_PLAYERS # @@@DLT_REMOVE + + # we reuse the pipeline instance below and load to the same dataset as data + logger.info("Saving the load info in the destination") + pipeline.run([load_info], table_name="_load_info") + assert "_load_info" in pipeline.last_trace.last_normalize_info.row_counts # @@@DLT_REMOVE + # save trace to destination, sensitive data will be removed + logger.info("Saving the trace in the destination") + pipeline.run([pipeline.last_trace], table_name="_trace") + assert "_trace" in pipeline.last_trace.last_normalize_info.row_counts # @@@DLT_REMOVE + + # print all the new tables/columns in + for package in load_info.load_packages: + for table_name, table in package.schema_update.items(): + logger.info(f"Table {table_name}: {table.get('description')}") + for column_name, column in table["columns"].items(): + logger.info(f"\tcolumn {column_name}: {column['data_type']}") + + # save the new tables and column schemas to the destination: + table_updates = [p.asdict()["tables"] for p in load_info.load_packages] + pipeline.run(table_updates, table_name="_new_tables") + assert "_new_tables" in pipeline.last_trace.last_normalize_info.row_counts # @@@DLT_REMOVE + + return load_info + + # @@@DLT_SNIPPET_END markdown_retry_cm + + # @@@DLT_SNIPPET_START markdown_pipeline + __name__ = "__main__" # @@@DLT_REMOVE + if __name__ == "__main__": + # create dlt pipeline + pipeline = dlt.pipeline( + pipeline_name="chess_pipeline", + destination="duckdb", + dataset_name="chess_data", + ) + # get data for a few famous players + data = chess(chess_url="https://api.chess.com/pub/", max_players=MAX_PLAYERS) + load_data_with_retry(pipeline, data) + # @@@DLT_SNIPPET_END markdown_pipeline diff --git a/docs/website/docs/examples/chess_production/index.md b/docs/website/docs/examples/chess_production/index.md new file mode 100644 index 0000000000..f821600c67 --- /dev/null +++ b/docs/website/docs/examples/chess_production/index.md @@ -0,0 +1,206 @@ +--- +title: Run chess pipeline in production +description: Learn how run chess pipeline in production +keywords: [incremental loading, example] +--- + +import Header from '../_examples-header.md'; + +
+ +## Run chess pipeline in production + +In this example, you'll find a Python script that interacts with the Chess API to extract players and game data. + +We'll learn how to: + +- Inspecting packages after they have been loaded. +- Loading back load information, schema updates, and traces. +- Triggering notifications in case of schema evolution. +- Using context managers to independently retry pipeline stages. +- Run basic tests utilizing `sql_client` and `normalize_info`. + +### Init chess source + + +```py +import threading +from typing import Any, Iterator + +import dlt +from dlt.common import sleep +from dlt.common.runtime.slack import send_slack_message +from dlt.common.typing import StrAny, TDataItems +from dlt.sources.helpers.requests import client + +@dlt.source +def chess( + chess_url: str = dlt.config.value, + title: str = "GM", + max_players: int = 2, + year: int = 2022, + month: int = 10, +) -> Any: + def _get_data_with_retry(path: str) -> StrAny: + r = client.get(f"{chess_url}{path}") + return r.json() # type: ignore + + @dlt.resource(write_disposition="replace") + def players() -> Iterator[TDataItems]: + # return players one by one, you could also return a list + # that would be faster but we want to pass players item by item to the transformer + yield from _get_data_with_retry(f"titled/{title}")["players"][:max_players] + + # this resource takes data from players and returns profiles + # it uses `defer` decorator to enable parallel run in thread pool. + # defer requires return at the end so we convert yield into return (we return one item anyway) + # you can still have yielding transformers, look for the test named `test_evolve_schema` + @dlt.transformer(data_from=players, write_disposition="replace") + @dlt.defer + def players_profiles(username: Any) -> TDataItems: + print( + f"getting {username} profile via thread {threading.current_thread().name}" + ) + sleep(1) # add some latency to show parallel runs + return _get_data_with_retry(f"player/{username}") + + # this resource takes data from players and returns games for the last month + # if not specified otherwise + @dlt.transformer(data_from=players, write_disposition="append") + def players_games(username: Any) -> Iterator[TDataItems]: + # https://api.chess.com/pub/player/{username}/games/{YYYY}/{MM} + path = f"player/{username}/games/{year:04d}/{month:02d}" + yield _get_data_with_retry(path)["games"] + + return players(), players_profiles, players_games +``` + + + +### Using context managers to retry pipeline stages separately + + +```py +from tenacity import ( + Retrying, + retry_if_exception, + stop_after_attempt, + wait_exponential, +) + +from dlt.common import logger +from dlt.common.runtime.slack import send_slack_message +from dlt.pipeline.helpers import retry_load + +MAX_PLAYERS = 5 + +def load_data_with_retry(pipeline, data): + try: + for attempt in Retrying( + stop=stop_after_attempt(5), + wait=wait_exponential(multiplier=1.5, min=4, max=10), + retry=retry_if_exception(retry_load(())), + reraise=True, + ): + with attempt: + logger.info( + f"Running the pipeline, attempt={attempt.retry_state.attempt_number}" + ) + load_info = pipeline.run(data) + logger.info(str(load_info)) + + # raise on failed jobs + load_info.raise_on_failed_jobs() + # send notification + send_slack_message( + pipeline.runtime_config.slack_incoming_hook, + "Data was successfully loaded!" + ) + except Exception: + # we get here after all the failed retries + # send notification + send_slack_message( + pipeline.runtime_config.slack_incoming_hook, + "Something went wrong!" + ) + raise + + # we get here after a successful attempt + # see when load was started + logger.info(f"Pipeline was started: {load_info.started_at}") + # print the information on the first load package and all jobs inside + logger.info(f"First load package info: {load_info.load_packages[0]}") + # print the information on the first completed job in first load package + logger.info( + f"First completed job info: {load_info.load_packages[0].jobs['completed_jobs'][0]}" + ) + + # check for schema updates: + schema_updates = [p.schema_update for p in load_info.load_packages] + # send notifications if there are schema updates + if schema_updates: + # send notification + send_slack_message( + pipeline.runtime_config.slack_incoming_hook, "Schema was updated!" + ) + + # To run simple tests with `sql_client`, such as checking table counts and + # warning if there is no data, you can use the `execute_query` method + with pipeline.sql_client() as client: + with client.execute_query("SELECT COUNT(*) FROM players") as cursor: + count = cursor.fetchone()[0] + if count == 0: + logger.info("Warning: No data in players table") + else: + logger.info(f"Players table contains {count} rows") + + # To run simple tests with `normalize_info`, such as checking table counts and + # warning if there is no data, you can use the `row_counts` attribute. + normalize_info = pipeline.last_trace.last_normalize_info + count = normalize_info.row_counts.get("players", 0) + if count == 0: + logger.info("Warning: No data in players table") + else: + logger.info(f"Players table contains {count} rows") + + # we reuse the pipeline instance below and load to the same dataset as data + logger.info("Saving the load info in the destination") + pipeline.run([load_info], table_name="_load_info") + # save trace to destination, sensitive data will be removed + logger.info("Saving the trace in the destination") + pipeline.run([pipeline.last_trace], table_name="_trace") + + # print all the new tables/columns in + for package in load_info.load_packages: + for table_name, table in package.schema_update.items(): + logger.info(f"Table {table_name}: {table.get('description')}") + for column_name, column in table["columns"].items(): + logger.info(f"\tcolumn {column_name}: {column['data_type']}") + + # save the new tables and column schemas to the destination: + table_updates = [p.asdict()["tables"] for p in load_info.load_packages] + pipeline.run(table_updates, table_name="_new_tables") + + return load_info +``` + + +### Run the pipeline + + +```py +if __name__ == "__main__": + # create dlt pipeline + pipeline = dlt.pipeline( + pipeline_name="chess_pipeline", + destination="duckdb", + dataset_name="chess_data", + ) + # get data for a few famous players + data = chess(chess_url="https://api.chess.com/pub/", max_players=MAX_PLAYERS) + load_data_with_retry(pipeline, data) +``` + \ No newline at end of file diff --git a/docs/website/sidebars.js b/docs/website/sidebars.js index efdd04faa0..a82b7acee6 100644 --- a/docs/website/sidebars.js +++ b/docs/website/sidebars.js @@ -226,6 +226,7 @@ const sidebars = { 'examples/transformers/index', 'examples/incremental_loading/index', 'examples/connector_x_arrow/index', + 'examples/chess_production/index', ], }, {