Skip to content

Commit

Permalink
Use perf_counter and limit decimals to three digits in time
Browse files Browse the repository at this point in the history
  • Loading branch information
krysal committed Jun 5, 2024
1 parent c809f35 commit 490c9c7
Showing 1 changed file with 10 additions and 10 deletions.
20 changes: 10 additions & 10 deletions ingestion_server/ingestion_server/cleanup.py
Original file line number Diff line number Diff line change
Expand Up @@ -217,7 +217,7 @@ def _clean_data_worker(rows, temp_table, sources_config, all_fields: list[str]):
write_cur = worker_conn.cursor(cursor_factory=DictCursor)
log.info(f"Cleaning {len(rows)} rows")

start_time = time.time()
start_time = time.perf_counter()
cleaned_values = {field: [] for field in all_fields}
for row in rows:
source, _id, identifier = row["source"], row["id"], row["identifier"]
Expand Down Expand Up @@ -266,15 +266,15 @@ def _clean_data_worker(rows, temp_table, sources_config, all_fields: list[str]):
worker_conn.commit()
write_cur.close()
worker_conn.close()
end_time = time.time()
end_time = time.perf_counter()
total_time = end_time - start_time
log.info(f"Worker finished batch in {total_time}")
return cleaned_values


def save_cleaned_data(result: dict) -> dict[str, int]:
log.info("Saving cleaned data...")
start_time = time.time()
start_time = time.perf_counter()

cleanup_counts = {field: len(items) for field, items in result.items()}
for field, cleaned_items in result.items():
Expand All @@ -286,9 +286,9 @@ def save_cleaned_data(result: dict) -> dict[str, int]:
csv_writer = csv.writer(f, delimiter="\t")
csv_writer.writerows(cleaned_items)

end_time = time.time()
end_time = time.perf_counter()
total_time = end_time - start_time
log.info(f"Finished saving cleaned data in {total_time},\n{cleanup_counts}")
log.info(f"Finished saving cleaned data in {total_time:.3f},\n{cleanup_counts}")
return cleanup_counts


Expand All @@ -303,7 +303,7 @@ def clean_image_data(table):
# Map each table to the fields that need to be cleaned up. Then, map each
# field to its cleanup function.
log.info("Cleaning up data...")
start_time = time.time()
start_time = time.perf_counter()
table_config = _cleanup_config["tables"][table]

# Pull data from selected sources only.
Expand Down Expand Up @@ -341,7 +341,7 @@ def clean_image_data(table):

while batch:
# Divide updates into jobs for parallel execution.
batch_start_time = time.time()
batch_start_time = time.perf_counter()
temp_table = f"temp_import_{table}"
job_size = int(len(batch) / num_workers)
last_end = -1
Expand Down Expand Up @@ -371,7 +371,7 @@ def clean_image_data(table):
pool.join()

num_cleaned += len(batch)
batch_end_time = time.time()
batch_end_time = time.perf_counter()
rate = len(batch) / (batch_end_time - batch_start_time)
log.info(
f"Batch finished, records/s: cleanup_rate={rate}, "
Expand All @@ -383,9 +383,9 @@ def clean_image_data(table):
conn.commit()
iter_cur.close()
conn.close()
end_time = time.time()
end_time = time.perf_counter()
cleanup_time = end_time - start_time
log.info(
f"Cleaned all records in {cleanup_time} seconds,"
f"Cleaned all records in {cleanup_time:.3f} seconds,"
f"counts: {cleaned_counts_by_field}"
)

0 comments on commit 490c9c7

Please sign in to comment.