diff --git a/docs/examples/chess_production/chess.py b/docs/examples/chess_production/chess.py index d49c39fe38..f11fd0ff06 100644 --- a/docs/examples/chess_production/chess.py +++ b/docs/examples/chess_production/chess.py @@ -76,13 +76,14 @@ def load_data_with_retry(pipeline, data): ) load_info = pipeline.run(data) logger.info(str(load_info)) - # raise on failed jobs - load_info.raise_on_failed_jobs() - # send notification - send_slack_message( - pipeline.runtime_config.slack_incoming_hook, - "Data was successfully loaded!" - ) + + # raise on failed jobs + load_info.raise_on_failed_jobs() + # send notification + send_slack_message( + pipeline.runtime_config.slack_incoming_hook, + "Data was successfully loaded!" + ) except Exception: # we get here after all the failed retries # send notification @@ -91,64 +92,64 @@ def load_data_with_retry(pipeline, data): "Something went wrong!" ) raise - finally: - # we get here after a successful attempt - # see when load was started - logger.info(f"Pipeline was started: {load_info.started_at}") - # print the information on the first load package and all jobs inside - logger.info(f"First load package info: {load_info.load_packages[0]}") - # print the information on the first completed job in first load package - logger.info( - f"First completed job info: {load_info.load_packages[0].jobs['completed_jobs'][0]}" - ) - # check for schema updates: - schema_updates = [p.schema_update for p in load_info.load_packages] - # send notifications if there are schema updates - if schema_updates: - # send notification - send_slack_message( - pipeline.runtime_config.slack_incoming_hook, "Schema was updated!" - ) + # we get here after a successful attempt + # see when load was started + logger.info(f"Pipeline was started: {load_info.started_at}") + # print the information on the first load package and all jobs inside + logger.info(f"First load package info: {load_info.load_packages[0]}") + # print the information on the first completed job in first load package + logger.info( + f"First completed job info: {load_info.load_packages[0].jobs['completed_jobs'][0]}" + ) + + # check for schema updates: + schema_updates = [p.schema_update for p in load_info.load_packages] + # send notifications if there are schema updates + if schema_updates: + # send notification + send_slack_message( + pipeline.runtime_config.slack_incoming_hook, "Schema was updated!" + ) - # To run simple tests with `sql_client`, such as checking table counts and - # warning if there is no data, you can use the `execute_query` method - with pipeline.sql_client() as client: - with client.execute_query("SELECT COUNT(*) FROM players") as cursor: - count = cursor.fetchone()[0] - if count == 0: - logger.info("Warning: No data in players table") - else: - logger.info(f"Players table contains {count} rows") - - # To run simple tests with `normalize_info`, such as checking table counts and - # warning if there is no data, you can use the `row_counts` attribute. - normalize_info = pipeline.last_trace.last_normalize_info - count = normalize_info.row_counts.get("players", 0) - if count == 0: - logger.info("Warning: No data in players table") - else: - logger.info(f"Players table contains {count} rows") - - # we reuse the pipeline instance below and load to the same dataset as data - logger.info("Saving the load info in the destination") - pipeline.run([load_info], table_name="_load_info") - # save trace to destination, sensitive data will be removed - logger.info("Saving the trace in the destination") - pipeline.run([pipeline.last_trace], table_name="_trace") - - # print all the new tables/columns in - for package in load_info.load_packages: - for table_name, table in package.schema_update.items(): - logger.info(f"Table {table_name}: {table.get('description')}") - for column_name, column in table["columns"].items(): - logger.info(f"\tcolumn {column_name}: {column['data_type']}") - - # save the new tables and column schemas to the destination: - table_updates = [p.asdict()["tables"] for p in load_info.load_packages] - pipeline.run(table_updates, table_name="_new_tables") - - return load_info + # To run simple tests with `sql_client`, such as checking table counts and + # warning if there is no data, you can use the `execute_query` method + with pipeline.sql_client() as client: + with client.execute_query("SELECT COUNT(*) FROM players") as cursor: + count = cursor.fetchone()[0] + if count == 0: + logger.info("Warning: No data in players table") + else: + logger.info(f"Players table contains {count} rows") + + # To run simple tests with `normalize_info`, such as checking table counts and + # warning if there is no data, you can use the `row_counts` attribute. + normalize_info = pipeline.last_trace.last_normalize_info + count = normalize_info.row_counts.get("players", 0) + if count == 0: + logger.info("Warning: No data in players table") + else: + logger.info(f"Players table contains {count} rows") + + # we reuse the pipeline instance below and load to the same dataset as data + logger.info("Saving the load info in the destination") + pipeline.run([load_info], table_name="_load_info") + # save trace to destination, sensitive data will be removed + logger.info("Saving the trace in the destination") + pipeline.run([pipeline.last_trace], table_name="_trace") + + # print all the new tables/columns in + for package in load_info.load_packages: + for table_name, table in package.schema_update.items(): + logger.info(f"Table {table_name}: {table.get('description')}") + for column_name, column in table["columns"].items(): + logger.info(f"\tcolumn {column_name}: {column['data_type']}") + + # save the new tables and column schemas to the destination: + table_updates = [p.asdict()["tables"] for p in load_info.load_packages] + pipeline.run(table_updates, table_name="_new_tables") + + return load_info if __name__ == "__main__": diff --git a/docs/website/docs/examples/chess_production/code/chess-snippets.py b/docs/website/docs/examples/chess_production/code/chess-snippets.py index 96e5f61998..2ecf04bac7 100644 --- a/docs/website/docs/examples/chess_production/code/chess-snippets.py +++ b/docs/website/docs/examples/chess_production/code/chess-snippets.py @@ -87,13 +87,14 @@ def load_data_with_retry(pipeline, data): ) load_info = pipeline.run(data) logger.info(str(load_info)) - # raise on failed jobs - load_info.raise_on_failed_jobs() - # send notification - send_slack_message( - pipeline.runtime_config.slack_incoming_hook, - "Data was successfully loaded!" - ) + + # raise on failed jobs + load_info.raise_on_failed_jobs() + # send notification + send_slack_message( + pipeline.runtime_config.slack_incoming_hook, + "Data was successfully loaded!" + ) except Exception: # we get here after all the failed retries # send notification @@ -102,69 +103,69 @@ def load_data_with_retry(pipeline, data): "Something went wrong!" ) raise - finally: - # we get here after a successful attempt - # see when load was started - logger.info(f"Pipeline was started: {load_info.started_at}") - # print the information on the first load package and all jobs inside - logger.info(f"First load package info: {load_info.load_packages[0]}") - # print the information on the first completed job in first load package - logger.info( - f"First completed job info: {load_info.load_packages[0].jobs['completed_jobs'][0]}" - ) - # check for schema updates: - schema_updates = [p.schema_update for p in load_info.load_packages] - # send notifications if there are schema updates - if schema_updates: - # send notification - send_slack_message( - pipeline.runtime_config.slack_incoming_hook, "Schema was updated!" - ) + # we get here after a successful attempt + # see when load was started + logger.info(f"Pipeline was started: {load_info.started_at}") + # print the information on the first load package and all jobs inside + logger.info(f"First load package info: {load_info.load_packages[0]}") + # print the information on the first completed job in first load package + logger.info( + f"First completed job info: {load_info.load_packages[0].jobs['completed_jobs'][0]}" + ) + + # check for schema updates: + schema_updates = [p.schema_update for p in load_info.load_packages] + # send notifications if there are schema updates + if schema_updates: + # send notification + send_slack_message( + pipeline.runtime_config.slack_incoming_hook, "Schema was updated!" + ) - # To run simple tests with `sql_client`, such as checking table counts and - # warning if there is no data, you can use the `execute_query` method - with pipeline.sql_client() as client: - with client.execute_query("SELECT COUNT(*) FROM players") as cursor: - count = cursor.fetchone()[0] - if count == 0: - logger.info("Warning: No data in players table") - else: - logger.info(f"Players table contains {count} rows") - assert count == MAX_PLAYERS # @@@DLT_REMOVE - - # To run simple tests with `normalize_info`, such as checking table counts and - # warning if there is no data, you can use the `row_counts` attribute. - normalize_info = pipeline.last_trace.last_normalize_info - count = normalize_info.row_counts.get("players", 0) - if count == 0: - logger.info("Warning: No data in players table") - else: - logger.info(f"Players table contains {count} rows") - assert count == MAX_PLAYERS # @@@DLT_REMOVE - - # we reuse the pipeline instance below and load to the same dataset as data - logger.info("Saving the load info in the destination") - pipeline.run([load_info], table_name="_load_info") - assert "_load_info" in pipeline.last_trace.last_normalize_info.row_counts # @@@DLT_REMOVE - # save trace to destination, sensitive data will be removed - logger.info("Saving the trace in the destination") - pipeline.run([pipeline.last_trace], table_name="_trace") - assert "_trace" in pipeline.last_trace.last_normalize_info.row_counts # @@@DLT_REMOVE - - # print all the new tables/columns in - for package in load_info.load_packages: - for table_name, table in package.schema_update.items(): - logger.info(f"Table {table_name}: {table.get('description')}") - for column_name, column in table["columns"].items(): - logger.info(f"\tcolumn {column_name}: {column['data_type']}") - - # save the new tables and column schemas to the destination: - table_updates = [p.asdict()["tables"] for p in load_info.load_packages] - pipeline.run(table_updates, table_name="_new_tables") - assert "_new_tables" in pipeline.last_trace.last_normalize_info.row_counts # @@@DLT_REMOVE - - return load_info + # To run simple tests with `sql_client`, such as checking table counts and + # warning if there is no data, you can use the `execute_query` method + with pipeline.sql_client() as client: + with client.execute_query("SELECT COUNT(*) FROM players") as cursor: + count = cursor.fetchone()[0] + if count == 0: + logger.info("Warning: No data in players table") + else: + logger.info(f"Players table contains {count} rows") + assert count == MAX_PLAYERS # @@@DLT_REMOVE + + # To run simple tests with `normalize_info`, such as checking table counts and + # warning if there is no data, you can use the `row_counts` attribute. + normalize_info = pipeline.last_trace.last_normalize_info + count = normalize_info.row_counts.get("players", 0) + if count == 0: + logger.info("Warning: No data in players table") + else: + logger.info(f"Players table contains {count} rows") + assert count == MAX_PLAYERS # @@@DLT_REMOVE + + # we reuse the pipeline instance below and load to the same dataset as data + logger.info("Saving the load info in the destination") + pipeline.run([load_info], table_name="_load_info") + assert "_load_info" in pipeline.last_trace.last_normalize_info.row_counts # @@@DLT_REMOVE + # save trace to destination, sensitive data will be removed + logger.info("Saving the trace in the destination") + pipeline.run([pipeline.last_trace], table_name="_trace") + assert "_trace" in pipeline.last_trace.last_normalize_info.row_counts # @@@DLT_REMOVE + + # print all the new tables/columns in + for package in load_info.load_packages: + for table_name, table in package.schema_update.items(): + logger.info(f"Table {table_name}: {table.get('description')}") + for column_name, column in table["columns"].items(): + logger.info(f"\tcolumn {column_name}: {column['data_type']}") + + # save the new tables and column schemas to the destination: + table_updates = [p.asdict()["tables"] for p in load_info.load_packages] + pipeline.run(table_updates, table_name="_new_tables") + assert "_new_tables" in pipeline.last_trace.last_normalize_info.row_counts # @@@DLT_REMOVE + + return load_info # @@@DLT_SNIPPET_END markdown_retry_cm diff --git a/docs/website/docs/examples/chess_production/index.md b/docs/website/docs/examples/chess_production/index.md index cb10ed189e..81524f7ee2 100644 --- a/docs/website/docs/examples/chess_production/index.md +++ b/docs/website/docs/examples/chess_production/index.md @@ -111,13 +111,14 @@ def load_data_with_retry(pipeline, data): ) load_info = pipeline.run(data) logger.info(str(load_info)) - # raise on failed jobs - load_info.raise_on_failed_jobs() - # send notification - send_slack_message( - pipeline.runtime_config.slack_incoming_hook, - "Data was successfully loaded!" - ) + + # raise on failed jobs + load_info.raise_on_failed_jobs() + # send notification + send_slack_message( + pipeline.runtime_config.slack_incoming_hook, + "Data was successfully loaded!" + ) except Exception: # we get here after all the failed retries # send notification @@ -126,64 +127,64 @@ def load_data_with_retry(pipeline, data): "Something went wrong!" ) raise - finally: - # we get here after a successful attempt - # see when load was started - logger.info(f"Pipeline was started: {load_info.started_at}") - # print the information on the first load package and all jobs inside - logger.info(f"First load package info: {load_info.load_packages[0]}") - # print the information on the first completed job in first load package - logger.info( - f"First completed job info: {load_info.load_packages[0].jobs['completed_jobs'][0]}" - ) - # check for schema updates: - schema_updates = [p.schema_update for p in load_info.load_packages] - # send notifications if there are schema updates - if schema_updates: - # send notification - send_slack_message( - pipeline.runtime_config.slack_incoming_hook, "Schema was updated!" - ) + # we get here after a successful attempt + # see when load was started + logger.info(f"Pipeline was started: {load_info.started_at}") + # print the information on the first load package and all jobs inside + logger.info(f"First load package info: {load_info.load_packages[0]}") + # print the information on the first completed job in first load package + logger.info( + f"First completed job info: {load_info.load_packages[0].jobs['completed_jobs'][0]}" + ) + + # check for schema updates: + schema_updates = [p.schema_update for p in load_info.load_packages] + # send notifications if there are schema updates + if schema_updates: + # send notification + send_slack_message( + pipeline.runtime_config.slack_incoming_hook, "Schema was updated!" + ) - # To run simple tests with `sql_client`, such as checking table counts and - # warning if there is no data, you can use the `execute_query` method - with pipeline.sql_client() as client: - with client.execute_query("SELECT COUNT(*) FROM players") as cursor: - count = cursor.fetchone()[0] - if count == 0: - logger.info("Warning: No data in players table") - else: - logger.info(f"Players table contains {count} rows") - - # To run simple tests with `normalize_info`, such as checking table counts and - # warning if there is no data, you can use the `row_counts` attribute. - normalize_info = pipeline.last_trace.last_normalize_info - count = normalize_info.row_counts.get("players", 0) - if count == 0: - logger.info("Warning: No data in players table") - else: - logger.info(f"Players table contains {count} rows") - - # we reuse the pipeline instance below and load to the same dataset as data - logger.info("Saving the load info in the destination") - pipeline.run([load_info], table_name="_load_info") - # save trace to destination, sensitive data will be removed - logger.info("Saving the trace in the destination") - pipeline.run([pipeline.last_trace], table_name="_trace") - - # print all the new tables/columns in - for package in load_info.load_packages: - for table_name, table in package.schema_update.items(): - logger.info(f"Table {table_name}: {table.get('description')}") - for column_name, column in table["columns"].items(): - logger.info(f"\tcolumn {column_name}: {column['data_type']}") - - # save the new tables and column schemas to the destination: - table_updates = [p.asdict()["tables"] for p in load_info.load_packages] - pipeline.run(table_updates, table_name="_new_tables") - - return load_info + # To run simple tests with `sql_client`, such as checking table counts and + # warning if there is no data, you can use the `execute_query` method + with pipeline.sql_client() as client: + with client.execute_query("SELECT COUNT(*) FROM players") as cursor: + count = cursor.fetchone()[0] + if count == 0: + logger.info("Warning: No data in players table") + else: + logger.info(f"Players table contains {count} rows") + + # To run simple tests with `normalize_info`, such as checking table counts and + # warning if there is no data, you can use the `row_counts` attribute. + normalize_info = pipeline.last_trace.last_normalize_info + count = normalize_info.row_counts.get("players", 0) + if count == 0: + logger.info("Warning: No data in players table") + else: + logger.info(f"Players table contains {count} rows") + + # we reuse the pipeline instance below and load to the same dataset as data + logger.info("Saving the load info in the destination") + pipeline.run([load_info], table_name="_load_info") + # save trace to destination, sensitive data will be removed + logger.info("Saving the trace in the destination") + pipeline.run([pipeline.last_trace], table_name="_trace") + + # print all the new tables/columns in + for package in load_info.load_packages: + for table_name, table in package.schema_update.items(): + logger.info(f"Table {table_name}: {table.get('description')}") + for column_name, column in table["columns"].items(): + logger.info(f"\tcolumn {column_name}: {column['data_type']}") + + # save the new tables and column schemas to the destination: + table_updates = [p.asdict()["tables"] for p in load_info.load_packages] + pipeline.run(table_updates, table_name="_new_tables") + + return load_info ```