diff --git a/ingestion_server/ingestion_server/cleanup.py b/ingestion_server/ingestion_server/cleanup.py index 6e794f290d7..da092a324ce 100644 --- a/ingestion_server/ingestion_server/cleanup.py +++ b/ingestion_server/ingestion_server/cleanup.py @@ -217,7 +217,7 @@ def _clean_data_worker(rows, temp_table, sources_config, all_fields: list[str]): write_cur = worker_conn.cursor(cursor_factory=DictCursor) log.info(f"Cleaning {len(rows)} rows") - start_time = time.time() + start_time = time.perf_counter() cleaned_values = {field: [] for field in all_fields} for row in rows: source, _id, identifier = row["source"], row["id"], row["identifier"] @@ -266,7 +266,7 @@ def _clean_data_worker(rows, temp_table, sources_config, all_fields: list[str]): worker_conn.commit() write_cur.close() worker_conn.close() - end_time = time.time() + end_time = time.perf_counter() total_time = end_time - start_time log.info(f"Worker finished batch in {total_time}") return cleaned_values @@ -274,7 +274,7 @@ def _clean_data_worker(rows, temp_table, sources_config, all_fields: list[str]): def save_cleaned_data(result: dict) -> dict[str, int]: log.info("Saving cleaned data...") - start_time = time.time() + start_time = time.perf_counter() cleanup_counts = {field: len(items) for field, items in result.items()} for field, cleaned_items in result.items(): @@ -286,9 +286,9 @@ def save_cleaned_data(result: dict) -> dict[str, int]: csv_writer = csv.writer(f, delimiter="\t") csv_writer.writerows(cleaned_items) - end_time = time.time() + end_time = time.perf_counter() total_time = end_time - start_time - log.info(f"Finished saving cleaned data in {total_time},\n{cleanup_counts}") + log.info(f"Finished saving cleaned data in {total_time:.3f},\n{cleanup_counts}") return cleanup_counts @@ -303,7 +303,7 @@ def clean_image_data(table): # Map each table to the fields that need to be cleaned up. Then, map each # field to its cleanup function. log.info("Cleaning up data...") - start_time = time.time() + start_time = time.perf_counter() table_config = _cleanup_config["tables"][table] # Pull data from selected sources only. @@ -341,7 +341,7 @@ def clean_image_data(table): while batch: # Divide updates into jobs for parallel execution. - batch_start_time = time.time() + batch_start_time = time.perf_counter() temp_table = f"temp_import_{table}" job_size = int(len(batch) / num_workers) last_end = -1 @@ -371,7 +371,7 @@ def clean_image_data(table): pool.join() num_cleaned += len(batch) - batch_end_time = time.time() + batch_end_time = time.perf_counter() rate = len(batch) / (batch_end_time - batch_start_time) log.info( f"Batch finished, records/s: cleanup_rate={rate}, " @@ -383,9 +383,9 @@ def clean_image_data(table): conn.commit() iter_cur.close() conn.close() - end_time = time.time() + end_time = time.perf_counter() cleanup_time = end_time - start_time log.info( - f"Cleaned all records in {cleanup_time} seconds," + f"Cleaned all records in {cleanup_time:.3f} seconds," f"counts: {cleaned_counts_by_field}" )