Skip to content

Commit

Permalink
delete finally
Browse files Browse the repository at this point in the history
  • Loading branch information
AstrakhantsevaAA committed Oct 31, 2023
1 parent 5f7d89b commit d74c58e
Show file tree
Hide file tree
Showing 3 changed files with 197 additions and 194 deletions.
127 changes: 64 additions & 63 deletions docs/examples/chess_production/chess.py
Original file line number Diff line number Diff line change
Expand Up @@ -76,13 +76,14 @@ def load_data_with_retry(pipeline, data):
)
load_info = pipeline.run(data)
logger.info(str(load_info))
# raise on failed jobs
load_info.raise_on_failed_jobs()
# send notification
send_slack_message(
pipeline.runtime_config.slack_incoming_hook,
"Data was successfully loaded!"
)

# raise on failed jobs
load_info.raise_on_failed_jobs()
# send notification
send_slack_message(
pipeline.runtime_config.slack_incoming_hook,
"Data was successfully loaded!"
)
except Exception:
# we get here after all the failed retries
# send notification
Expand All @@ -91,64 +92,64 @@ def load_data_with_retry(pipeline, data):
"Something went wrong!"
)
raise
finally:
# we get here after a successful attempt
# see when load was started
logger.info(f"Pipeline was started: {load_info.started_at}")
# print the information on the first load package and all jobs inside
logger.info(f"First load package info: {load_info.load_packages[0]}")
# print the information on the first completed job in first load package
logger.info(
f"First completed job info: {load_info.load_packages[0].jobs['completed_jobs'][0]}"
)

# check for schema updates:
schema_updates = [p.schema_update for p in load_info.load_packages]
# send notifications if there are schema updates
if schema_updates:
# send notification
send_slack_message(
pipeline.runtime_config.slack_incoming_hook, "Schema was updated!"
)
# we get here after a successful attempt
# see when load was started
logger.info(f"Pipeline was started: {load_info.started_at}")
# print the information on the first load package and all jobs inside
logger.info(f"First load package info: {load_info.load_packages[0]}")
# print the information on the first completed job in first load package
logger.info(
f"First completed job info: {load_info.load_packages[0].jobs['completed_jobs'][0]}"
)

# check for schema updates:
schema_updates = [p.schema_update for p in load_info.load_packages]
# send notifications if there are schema updates
if schema_updates:
# send notification
send_slack_message(
pipeline.runtime_config.slack_incoming_hook, "Schema was updated!"
)

# To run simple tests with `sql_client`, such as checking table counts and
# warning if there is no data, you can use the `execute_query` method
with pipeline.sql_client() as client:
with client.execute_query("SELECT COUNT(*) FROM players") as cursor:
count = cursor.fetchone()[0]
if count == 0:
logger.info("Warning: No data in players table")
else:
logger.info(f"Players table contains {count} rows")

# To run simple tests with `normalize_info`, such as checking table counts and
# warning if there is no data, you can use the `row_counts` attribute.
normalize_info = pipeline.last_trace.last_normalize_info
count = normalize_info.row_counts.get("players", 0)
if count == 0:
logger.info("Warning: No data in players table")
else:
logger.info(f"Players table contains {count} rows")

# we reuse the pipeline instance below and load to the same dataset as data
logger.info("Saving the load info in the destination")
pipeline.run([load_info], table_name="_load_info")
# save trace to destination, sensitive data will be removed
logger.info("Saving the trace in the destination")
pipeline.run([pipeline.last_trace], table_name="_trace")

# print all the new tables/columns in
for package in load_info.load_packages:
for table_name, table in package.schema_update.items():
logger.info(f"Table {table_name}: {table.get('description')}")
for column_name, column in table["columns"].items():
logger.info(f"\tcolumn {column_name}: {column['data_type']}")

# save the new tables and column schemas to the destination:
table_updates = [p.asdict()["tables"] for p in load_info.load_packages]
pipeline.run(table_updates, table_name="_new_tables")

return load_info
# To run simple tests with `sql_client`, such as checking table counts and
# warning if there is no data, you can use the `execute_query` method
with pipeline.sql_client() as client:
with client.execute_query("SELECT COUNT(*) FROM players") as cursor:
count = cursor.fetchone()[0]
if count == 0:
logger.info("Warning: No data in players table")
else:
logger.info(f"Players table contains {count} rows")

# To run simple tests with `normalize_info`, such as checking table counts and
# warning if there is no data, you can use the `row_counts` attribute.
normalize_info = pipeline.last_trace.last_normalize_info
count = normalize_info.row_counts.get("players", 0)
if count == 0:
logger.info("Warning: No data in players table")
else:
logger.info(f"Players table contains {count} rows")

# we reuse the pipeline instance below and load to the same dataset as data
logger.info("Saving the load info in the destination")
pipeline.run([load_info], table_name="_load_info")
# save trace to destination, sensitive data will be removed
logger.info("Saving the trace in the destination")
pipeline.run([pipeline.last_trace], table_name="_trace")

# print all the new tables/columns in
for package in load_info.load_packages:
for table_name, table in package.schema_update.items():
logger.info(f"Table {table_name}: {table.get('description')}")
for column_name, column in table["columns"].items():
logger.info(f"\tcolumn {column_name}: {column['data_type']}")

# save the new tables and column schemas to the destination:
table_updates = [p.asdict()["tables"] for p in load_info.load_packages]
pipeline.run(table_updates, table_name="_new_tables")

return load_info


if __name__ == "__main__":
Expand Down
137 changes: 69 additions & 68 deletions docs/website/docs/examples/chess_production/code/chess-snippets.py
Original file line number Diff line number Diff line change
Expand Up @@ -87,13 +87,14 @@ def load_data_with_retry(pipeline, data):
)
load_info = pipeline.run(data)
logger.info(str(load_info))
# raise on failed jobs
load_info.raise_on_failed_jobs()
# send notification
send_slack_message(
pipeline.runtime_config.slack_incoming_hook,
"Data was successfully loaded!"
)

# raise on failed jobs
load_info.raise_on_failed_jobs()
# send notification
send_slack_message(
pipeline.runtime_config.slack_incoming_hook,
"Data was successfully loaded!"
)
except Exception:
# we get here after all the failed retries
# send notification
Expand All @@ -102,69 +103,69 @@ def load_data_with_retry(pipeline, data):
"Something went wrong!"
)
raise
finally:
# we get here after a successful attempt
# see when load was started
logger.info(f"Pipeline was started: {load_info.started_at}")
# print the information on the first load package and all jobs inside
logger.info(f"First load package info: {load_info.load_packages[0]}")
# print the information on the first completed job in first load package
logger.info(
f"First completed job info: {load_info.load_packages[0].jobs['completed_jobs'][0]}"
)

# check for schema updates:
schema_updates = [p.schema_update for p in load_info.load_packages]
# send notifications if there are schema updates
if schema_updates:
# send notification
send_slack_message(
pipeline.runtime_config.slack_incoming_hook, "Schema was updated!"
)
# we get here after a successful attempt
# see when load was started
logger.info(f"Pipeline was started: {load_info.started_at}")
# print the information on the first load package and all jobs inside
logger.info(f"First load package info: {load_info.load_packages[0]}")
# print the information on the first completed job in first load package
logger.info(
f"First completed job info: {load_info.load_packages[0].jobs['completed_jobs'][0]}"
)

# check for schema updates:
schema_updates = [p.schema_update for p in load_info.load_packages]
# send notifications if there are schema updates
if schema_updates:
# send notification
send_slack_message(
pipeline.runtime_config.slack_incoming_hook, "Schema was updated!"
)

# To run simple tests with `sql_client`, such as checking table counts and
# warning if there is no data, you can use the `execute_query` method
with pipeline.sql_client() as client:
with client.execute_query("SELECT COUNT(*) FROM players") as cursor:
count = cursor.fetchone()[0]
if count == 0:
logger.info("Warning: No data in players table")
else:
logger.info(f"Players table contains {count} rows")
assert count == MAX_PLAYERS # @@@DLT_REMOVE

# To run simple tests with `normalize_info`, such as checking table counts and
# warning if there is no data, you can use the `row_counts` attribute.
normalize_info = pipeline.last_trace.last_normalize_info
count = normalize_info.row_counts.get("players", 0)
if count == 0:
logger.info("Warning: No data in players table")
else:
logger.info(f"Players table contains {count} rows")
assert count == MAX_PLAYERS # @@@DLT_REMOVE

# we reuse the pipeline instance below and load to the same dataset as data
logger.info("Saving the load info in the destination")
pipeline.run([load_info], table_name="_load_info")
assert "_load_info" in pipeline.last_trace.last_normalize_info.row_counts # @@@DLT_REMOVE
# save trace to destination, sensitive data will be removed
logger.info("Saving the trace in the destination")
pipeline.run([pipeline.last_trace], table_name="_trace")
assert "_trace" in pipeline.last_trace.last_normalize_info.row_counts # @@@DLT_REMOVE

# print all the new tables/columns in
for package in load_info.load_packages:
for table_name, table in package.schema_update.items():
logger.info(f"Table {table_name}: {table.get('description')}")
for column_name, column in table["columns"].items():
logger.info(f"\tcolumn {column_name}: {column['data_type']}")

# save the new tables and column schemas to the destination:
table_updates = [p.asdict()["tables"] for p in load_info.load_packages]
pipeline.run(table_updates, table_name="_new_tables")
assert "_new_tables" in pipeline.last_trace.last_normalize_info.row_counts # @@@DLT_REMOVE

return load_info
# To run simple tests with `sql_client`, such as checking table counts and
# warning if there is no data, you can use the `execute_query` method
with pipeline.sql_client() as client:
with client.execute_query("SELECT COUNT(*) FROM players") as cursor:
count = cursor.fetchone()[0]
if count == 0:
logger.info("Warning: No data in players table")
else:
logger.info(f"Players table contains {count} rows")
assert count == MAX_PLAYERS # @@@DLT_REMOVE

# To run simple tests with `normalize_info`, such as checking table counts and
# warning if there is no data, you can use the `row_counts` attribute.
normalize_info = pipeline.last_trace.last_normalize_info
count = normalize_info.row_counts.get("players", 0)
if count == 0:
logger.info("Warning: No data in players table")
else:
logger.info(f"Players table contains {count} rows")
assert count == MAX_PLAYERS # @@@DLT_REMOVE

# we reuse the pipeline instance below and load to the same dataset as data
logger.info("Saving the load info in the destination")
pipeline.run([load_info], table_name="_load_info")
assert "_load_info" in pipeline.last_trace.last_normalize_info.row_counts # @@@DLT_REMOVE
# save trace to destination, sensitive data will be removed
logger.info("Saving the trace in the destination")
pipeline.run([pipeline.last_trace], table_name="_trace")
assert "_trace" in pipeline.last_trace.last_normalize_info.row_counts # @@@DLT_REMOVE

# print all the new tables/columns in
for package in load_info.load_packages:
for table_name, table in package.schema_update.items():
logger.info(f"Table {table_name}: {table.get('description')}")
for column_name, column in table["columns"].items():
logger.info(f"\tcolumn {column_name}: {column['data_type']}")

# save the new tables and column schemas to the destination:
table_updates = [p.asdict()["tables"] for p in load_info.load_packages]
pipeline.run(table_updates, table_name="_new_tables")
assert "_new_tables" in pipeline.last_trace.last_normalize_info.row_counts # @@@DLT_REMOVE

return load_info

# @@@DLT_SNIPPET_END markdown_retry_cm

Expand Down
Loading

0 comments on commit d74c58e

Please sign in to comment.