From 1c1be18c30d4bf1aeb0cb76e6b4141d2d71e87dc Mon Sep 17 00:00:00 2001 From: Krystle Salazar Date: Wed, 5 Jun 2024 14:41:10 -0400 Subject: [PATCH 1/3] Revert "Save cleaned data of Ingestion Server to AWS S3 (#4163)" This reverts commit 43d4d09cb3e9edb66f087e438f6714a0de514bd8. --- ingestion_server/ingestion_server/cleanup.py | 128 ++++-------------- .../test/unit_tests/test_slack.py | 2 +- 2 files changed, 25 insertions(+), 105 deletions(-) diff --git a/ingestion_server/ingestion_server/cleanup.py b/ingestion_server/ingestion_server/cleanup.py index d9ca160b4ab..6e794f290d7 100644 --- a/ingestion_server/ingestion_server/cleanup.py +++ b/ingestion_server/ingestion_server/cleanup.py @@ -7,16 +7,12 @@ import csv import logging as log import multiprocessing -import os import time import uuid -from dataclasses import dataclass from urllib.parse import urlparse -import boto3 import requests as re import tldextract -from decouple import config from psycopg2.extras import DictCursor, Json from ingestion_server.db_helpers import database_connect @@ -221,7 +217,7 @@ def _clean_data_worker(rows, temp_table, sources_config, all_fields: list[str]): write_cur = worker_conn.cursor(cursor_factory=DictCursor) log.info(f"Cleaning {len(rows)} rows") - start_time = time.perf_counter() + start_time = time.time() cleaned_values = {field: [] for field in all_fields} for row in rows: source, _id, identifier = row["source"], row["id"], row["identifier"] @@ -257,7 +253,6 @@ def _clean_data_worker(rows, temp_table, sources_config, all_fields: list[str]): # Save cleaned values for later # (except for tags, which take up too much space) if field == "tags": - log.debug("Skipping tags.") continue cleaned_values[field].append((identifier, clean_value)) @@ -271,103 +266,30 @@ def _clean_data_worker(rows, temp_table, sources_config, all_fields: list[str]): worker_conn.commit() write_cur.close() worker_conn.close() - end_time = time.perf_counter() + end_time = time.time() total_time = end_time - start_time log.info(f"Worker finished batch in {total_time}") return cleaned_values -@dataclass -class FieldBuffered: - part: int - rows: list[tuple[str, str]] +def save_cleaned_data(result: dict) -> dict[str, int]: + log.info("Saving cleaned data...") + start_time = time.time() + cleanup_counts = {field: len(items) for field, items in result.items()} + for field, cleaned_items in result.items(): + # Skip the tag field because the file is too large and fills up the disk + if field == "tag": + continue + if cleaned_items: + with open(f"{field}.tsv", "a") as f: + csv_writer = csv.writer(f, delimiter="\t") + csv_writer.writerows(cleaned_items) -class CleanDataUploader: - # Number of lines to keep in memory before writing to S3 - buffer_size: int - - s3_path = "shared/data-refresh-cleaned-data" - - buffer = { - field: FieldBuffered(part=1, rows=[]) - for field in _cleanup_config["tables"]["image"]["sources"]["*"]["fields"] - } - - def __init__(self): - log.info("Initializing clean data uploader.") - self.date = time.strftime("%Y-%m-%d") - self.buffer_size = config("CLEANUP_BUFFER_SIZE", default=10_000_000, cast=int) - bucket_name = config("OPENVERSE_BUCKET", default="openverse-catalog") - try: - self.s3 = self._get_s3_resource() - self.s3_bucket = self.s3.Bucket(bucket_name) - # Try loading the bucket's attributes to check the connection works. - self.s3_bucket.load() - except Exception as e: - log.error(f"Error connecting to S3 or creating bucket: {e}") - self.s3 = None - self.s3_bucket = None - log.info(f"Connected to S3 and `{bucket_name}` bucket loaded.") - - @staticmethod - def _get_s3_resource(): - if config("ENVIRONMENT", default="local") == "local": - return boto3.resource( - "s3", - endpoint_url=config("AWS_S3_ENDPOINT", default="http://s3:5000"), - aws_access_key_id=config("AWS_ACCESS_KEY_ID", default="test_key"), - aws_secret_access_key=config( - "AWS_SECRET_ACCESS_KEY", default="test_secret" - ), - ) - - return boto3.resource( - "s3", region_name=config("AWS_REGION", default="us-east-1") - ) - - def _upload_to_s3(self, field: str): - if not self.s3_bucket: - log.warning("No S3 bucket available, skipping upload.") - return - - part_number = self.buffer[field].part - s3_file_name = f"{self.s3_path}/{self.date}_{field}_{part_number}.tsv" - tsv_file = f"{field}.tsv" - with open(tsv_file, "w") as f: - csv_writer = csv.writer(f, delimiter="\t") - csv_writer.writerows(self.buffer[field].rows) - file_size = os.path.getsize(tsv_file) / (1024 * 1024) - try: - log.info( - f"Uploading file part {part_number} ({file_size:.4f} MB) of `{field}` " - f"with {len(self.buffer[field].rows)} rows to S3..." - ) - self.s3_bucket.upload_file(tsv_file, s3_file_name) - except Exception as e: - log.error(f"Error uploading {field} to S3: {e}") - os.remove(tsv_file) - log.info(f"`{tsv_file}` removed locally. Clearing buffer.") - self.buffer[field].part += 1 - self.buffer[field].rows = [] - - def save(self, result: dict) -> dict[str, int]: - for field, cleaned_items in result.items(): - if not cleaned_items: - continue - - self.buffer[field].rows += cleaned_items - if len(self.buffer[field].rows) >= self.buffer_size: - self._upload_to_s3(field) - - return {field: len(items) for field, items in result.items()} - - def flush(self): - log.info("Flushing remaining rows...") - for field in self.buffer: - if self.buffer[field].rows: - self._upload_to_s3(field) - log.info("Saved all the cleaned data.") + end_time = time.time() + total_time = end_time - start_time + log.info(f"Finished saving cleaned data in {total_time},\n{cleanup_counts}") + return cleanup_counts def clean_image_data(table): @@ -377,12 +299,11 @@ def clean_image_data(table): :param table: The staging table for the new data :return: None """ - data_uploader = CleanDataUploader() # Map each table to the fields that need to be cleaned up. Then, map each # field to its cleanup function. log.info("Cleaning up data...") - start_time = time.perf_counter() + start_time = time.time() table_config = _cleanup_config["tables"][table] # Pull data from selected sources only. @@ -420,7 +341,7 @@ def clean_image_data(table): while batch: # Divide updates into jobs for parallel execution. - batch_start_time = time.perf_counter() + batch_start_time = time.time() temp_table = f"temp_import_{table}" job_size = int(len(batch) / num_workers) last_end = -1 @@ -443,14 +364,14 @@ def clean_image_data(table): log.info(f"Starting {len(jobs)} cleaning jobs") for result in pool.starmap(_clean_data_worker, jobs): - batch_cleaned_counts = data_uploader.save(result) + batch_cleaned_counts = save_cleaned_data(result) for field in batch_cleaned_counts: cleaned_counts_by_field[field] += batch_cleaned_counts[field] pool.close() pool.join() num_cleaned += len(batch) - batch_end_time = time.perf_counter() + batch_end_time = time.time() rate = len(batch) / (batch_end_time - batch_start_time) log.info( f"Batch finished, records/s: cleanup_rate={rate}, " @@ -462,10 +383,9 @@ def clean_image_data(table): conn.commit() iter_cur.close() conn.close() - data_uploader.flush() - end_time = time.perf_counter() + end_time = time.time() cleanup_time = end_time - start_time log.info( - f"Cleaned all records in {cleanup_time:.3f} seconds," + f"Cleaned all records in {cleanup_time} seconds," f"counts: {cleaned_counts_by_field}" ) diff --git a/ingestion_server/test/unit_tests/test_slack.py b/ingestion_server/test/unit_tests/test_slack.py index fbca6641660..a52970714c2 100644 --- a/ingestion_server/test/unit_tests/test_slack.py +++ b/ingestion_server/test/unit_tests/test_slack.py @@ -32,7 +32,7 @@ "environment", [ # Default environment - "local", + None, # Different, explicit environment "staging", ], From c809f353cfc51ad1a7ce6e21780118155e891d13 Mon Sep 17 00:00:00 2001 From: Krystle Salazar Date: Wed, 5 Jun 2024 14:56:56 -0400 Subject: [PATCH 2/3] Update default environment in test to omit warning --- ingestion_server/test/unit_tests/test_slack.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/ingestion_server/test/unit_tests/test_slack.py b/ingestion_server/test/unit_tests/test_slack.py index a52970714c2..fbca6641660 100644 --- a/ingestion_server/test/unit_tests/test_slack.py +++ b/ingestion_server/test/unit_tests/test_slack.py @@ -32,7 +32,7 @@ "environment", [ # Default environment - None, + "local", # Different, explicit environment "staging", ], From 490c9c76dedddac6d4b50551cdb1b209a99738f3 Mon Sep 17 00:00:00 2001 From: Krystle Salazar Date: Wed, 5 Jun 2024 15:08:02 -0400 Subject: [PATCH 3/3] Use perf_counter and limit decimals to three digits in time --- ingestion_server/ingestion_server/cleanup.py | 20 ++++++++++---------- 1 file changed, 10 insertions(+), 10 deletions(-) diff --git a/ingestion_server/ingestion_server/cleanup.py b/ingestion_server/ingestion_server/cleanup.py index 6e794f290d7..da092a324ce 100644 --- a/ingestion_server/ingestion_server/cleanup.py +++ b/ingestion_server/ingestion_server/cleanup.py @@ -217,7 +217,7 @@ def _clean_data_worker(rows, temp_table, sources_config, all_fields: list[str]): write_cur = worker_conn.cursor(cursor_factory=DictCursor) log.info(f"Cleaning {len(rows)} rows") - start_time = time.time() + start_time = time.perf_counter() cleaned_values = {field: [] for field in all_fields} for row in rows: source, _id, identifier = row["source"], row["id"], row["identifier"] @@ -266,7 +266,7 @@ def _clean_data_worker(rows, temp_table, sources_config, all_fields: list[str]): worker_conn.commit() write_cur.close() worker_conn.close() - end_time = time.time() + end_time = time.perf_counter() total_time = end_time - start_time log.info(f"Worker finished batch in {total_time}") return cleaned_values @@ -274,7 +274,7 @@ def _clean_data_worker(rows, temp_table, sources_config, all_fields: list[str]): def save_cleaned_data(result: dict) -> dict[str, int]: log.info("Saving cleaned data...") - start_time = time.time() + start_time = time.perf_counter() cleanup_counts = {field: len(items) for field, items in result.items()} for field, cleaned_items in result.items(): @@ -286,9 +286,9 @@ def save_cleaned_data(result: dict) -> dict[str, int]: csv_writer = csv.writer(f, delimiter="\t") csv_writer.writerows(cleaned_items) - end_time = time.time() + end_time = time.perf_counter() total_time = end_time - start_time - log.info(f"Finished saving cleaned data in {total_time},\n{cleanup_counts}") + log.info(f"Finished saving cleaned data in {total_time:.3f},\n{cleanup_counts}") return cleanup_counts @@ -303,7 +303,7 @@ def clean_image_data(table): # Map each table to the fields that need to be cleaned up. Then, map each # field to its cleanup function. log.info("Cleaning up data...") - start_time = time.time() + start_time = time.perf_counter() table_config = _cleanup_config["tables"][table] # Pull data from selected sources only. @@ -341,7 +341,7 @@ def clean_image_data(table): while batch: # Divide updates into jobs for parallel execution. - batch_start_time = time.time() + batch_start_time = time.perf_counter() temp_table = f"temp_import_{table}" job_size = int(len(batch) / num_workers) last_end = -1 @@ -371,7 +371,7 @@ def clean_image_data(table): pool.join() num_cleaned += len(batch) - batch_end_time = time.time() + batch_end_time = time.perf_counter() rate = len(batch) / (batch_end_time - batch_start_time) log.info( f"Batch finished, records/s: cleanup_rate={rate}, " @@ -383,9 +383,9 @@ def clean_image_data(table): conn.commit() iter_cur.close() conn.close() - end_time = time.time() + end_time = time.perf_counter() cleanup_time = end_time - start_time log.info( - f"Cleaned all records in {cleanup_time} seconds," + f"Cleaned all records in {cleanup_time:.3f} seconds," f"counts: {cleaned_counts_by_field}" )