diff --git a/ingestion_server/ingestion_server/cleanup.py b/ingestion_server/ingestion_server/cleanup.py index bd36d120f51..b2faeb0a701 100644 --- a/ingestion_server/ingestion_server/cleanup.py +++ b/ingestion_server/ingestion_server/cleanup.py @@ -13,8 +13,10 @@ import uuid from urllib.parse import urlparse +import boto3 import requests as re import tldextract +from decouple import config from psycopg2.extras import DictCursor, Json from ingestion_server.db_helpers import database_connect @@ -207,7 +209,7 @@ def test_tls_supported(cls, url): https = url.replace("http://", "https://") try: res = re.get(https, timeout=2) - log.debug(f"tls_test - {https}:{res.status_code}") + log.info(f"tls_test - {https}:{res.status_code}") return 200 <= res.status_code < 400 except re.RequestException: return False @@ -300,6 +302,45 @@ def save_cleaned_data(result: dict) -> dict[str, int]: return cleanup_counts +def _get_s3_resource(): + if config("ENVIRONMENT", default="local") == "local": + return boto3.resource( + "s3", + endpoint_url=config("AWS_S3_ENDPOINT", default="http://s3:5000"), + aws_access_key_id=config("AWS_ACCESS_KEY_ID", default="test_key"), + aws_secret_access_key=config( + "AWS_SECRET_ACCESS_KEY", default="test_secret" + ), + ) + + return boto3.resource("s3", region_name=config("AWS_REGION", default="us-east-1")) + + +def _upload_to_s3(fields): + bucket_name = config("OPENVERSE_BUCKET", default="openverse-catalog") + s3_path = "shared/data-refresh-cleaned-data" + try: + s3 = _get_s3_resource() + bucket = s3.Bucket(bucket_name) + bucket.load() + log.info(f"Connected to S3 and '{bucket_name}' bucket loaded.") + except Exception as e: + log.error(f"Upload failed. Error connecting to S3 or loading bucket: {e}") + return + + for field in fields: + file_path = TMP_DIR.joinpath(f"{field}.tsv") + if not file_path.exists(): + continue + + try: + bucket.upload_file(file_path, f"{s3_path}/{field}.tsv") + log.info(f"Uploaded '{field}.tsv' to S3.") + file_path.unlink() + except Exception as e: + log.error(f"Error uploading '{field}.tsv' to S3: {e}") + + def clean_image_data(table): """ Clean up data loaded from upstream that is unsuitable for prod before going live. @@ -395,6 +436,7 @@ def clean_image_data(table): conn.commit() iter_cur.close() conn.close() + _upload_to_s3(cleanable_fields_for_table) end_time = time.perf_counter() cleanup_time = end_time - start_time log.info(