diff --git a/ingestion_server/ingestion_server/cleanup.py b/ingestion_server/ingestion_server/cleanup.py index d9ca160b4ab..da092a324ce 100644 --- a/ingestion_server/ingestion_server/cleanup.py +++ b/ingestion_server/ingestion_server/cleanup.py @@ -7,16 +7,12 @@ import csv import logging as log import multiprocessing -import os import time import uuid -from dataclasses import dataclass from urllib.parse import urlparse -import boto3 import requests as re import tldextract -from decouple import config from psycopg2.extras import DictCursor, Json from ingestion_server.db_helpers import database_connect @@ -257,7 +253,6 @@ def _clean_data_worker(rows, temp_table, sources_config, all_fields: list[str]): # Save cleaned values for later # (except for tags, which take up too much space) if field == "tags": - log.debug("Skipping tags.") continue cleaned_values[field].append((identifier, clean_value)) @@ -277,97 +272,24 @@ def _clean_data_worker(rows, temp_table, sources_config, all_fields: list[str]): return cleaned_values -@dataclass -class FieldBuffered: - part: int - rows: list[tuple[str, str]] - - -class CleanDataUploader: - # Number of lines to keep in memory before writing to S3 - buffer_size: int - - s3_path = "shared/data-refresh-cleaned-data" - - buffer = { - field: FieldBuffered(part=1, rows=[]) - for field in _cleanup_config["tables"]["image"]["sources"]["*"]["fields"] - } - - def __init__(self): - log.info("Initializing clean data uploader.") - self.date = time.strftime("%Y-%m-%d") - self.buffer_size = config("CLEANUP_BUFFER_SIZE", default=10_000_000, cast=int) - bucket_name = config("OPENVERSE_BUCKET", default="openverse-catalog") - try: - self.s3 = self._get_s3_resource() - self.s3_bucket = self.s3.Bucket(bucket_name) - # Try loading the bucket's attributes to check the connection works. - self.s3_bucket.load() - except Exception as e: - log.error(f"Error connecting to S3 or creating bucket: {e}") - self.s3 = None - self.s3_bucket = None - log.info(f"Connected to S3 and `{bucket_name}` bucket loaded.") - - @staticmethod - def _get_s3_resource(): - if config("ENVIRONMENT", default="local") == "local": - return boto3.resource( - "s3", - endpoint_url=config("AWS_S3_ENDPOINT", default="http://s3:5000"), - aws_access_key_id=config("AWS_ACCESS_KEY_ID", default="test_key"), - aws_secret_access_key=config( - "AWS_SECRET_ACCESS_KEY", default="test_secret" - ), - ) - - return boto3.resource( - "s3", region_name=config("AWS_REGION", default="us-east-1") - ) - - def _upload_to_s3(self, field: str): - if not self.s3_bucket: - log.warning("No S3 bucket available, skipping upload.") - return - - part_number = self.buffer[field].part - s3_file_name = f"{self.s3_path}/{self.date}_{field}_{part_number}.tsv" - tsv_file = f"{field}.tsv" - with open(tsv_file, "w") as f: - csv_writer = csv.writer(f, delimiter="\t") - csv_writer.writerows(self.buffer[field].rows) - file_size = os.path.getsize(tsv_file) / (1024 * 1024) - try: - log.info( - f"Uploading file part {part_number} ({file_size:.4f} MB) of `{field}` " - f"with {len(self.buffer[field].rows)} rows to S3..." - ) - self.s3_bucket.upload_file(tsv_file, s3_file_name) - except Exception as e: - log.error(f"Error uploading {field} to S3: {e}") - os.remove(tsv_file) - log.info(f"`{tsv_file}` removed locally. Clearing buffer.") - self.buffer[field].part += 1 - self.buffer[field].rows = [] - - def save(self, result: dict) -> dict[str, int]: - for field, cleaned_items in result.items(): - if not cleaned_items: - continue - - self.buffer[field].rows += cleaned_items - if len(self.buffer[field].rows) >= self.buffer_size: - self._upload_to_s3(field) +def save_cleaned_data(result: dict) -> dict[str, int]: + log.info("Saving cleaned data...") + start_time = time.perf_counter() - return {field: len(items) for field, items in result.items()} + cleanup_counts = {field: len(items) for field, items in result.items()} + for field, cleaned_items in result.items(): + # Skip the tag field because the file is too large and fills up the disk + if field == "tag": + continue + if cleaned_items: + with open(f"{field}.tsv", "a") as f: + csv_writer = csv.writer(f, delimiter="\t") + csv_writer.writerows(cleaned_items) - def flush(self): - log.info("Flushing remaining rows...") - for field in self.buffer: - if self.buffer[field].rows: - self._upload_to_s3(field) - log.info("Saved all the cleaned data.") + end_time = time.perf_counter() + total_time = end_time - start_time + log.info(f"Finished saving cleaned data in {total_time:.3f},\n{cleanup_counts}") + return cleanup_counts def clean_image_data(table): @@ -377,7 +299,6 @@ def clean_image_data(table): :param table: The staging table for the new data :return: None """ - data_uploader = CleanDataUploader() # Map each table to the fields that need to be cleaned up. Then, map each # field to its cleanup function. @@ -443,7 +364,7 @@ def clean_image_data(table): log.info(f"Starting {len(jobs)} cleaning jobs") for result in pool.starmap(_clean_data_worker, jobs): - batch_cleaned_counts = data_uploader.save(result) + batch_cleaned_counts = save_cleaned_data(result) for field in batch_cleaned_counts: cleaned_counts_by_field[field] += batch_cleaned_counts[field] pool.close() @@ -462,7 +383,6 @@ def clean_image_data(table): conn.commit() iter_cur.close() conn.close() - data_uploader.flush() end_time = time.perf_counter() cleanup_time = end_time - start_time log.info(