Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Revert "Save cleaned data of Ingestion Server to AWS S3 (#4163)" #4443

Merged
merged 3 commits into from
Jun 7, 2024
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
114 changes: 17 additions & 97 deletions ingestion_server/ingestion_server/cleanup.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,16 +7,12 @@
import csv
import logging as log
import multiprocessing
import os
import time
import uuid
from dataclasses import dataclass
from urllib.parse import urlparse

import boto3
import requests as re
import tldextract
from decouple import config
from psycopg2.extras import DictCursor, Json

from ingestion_server.db_helpers import database_connect
Expand Down Expand Up @@ -257,7 +253,6 @@ def _clean_data_worker(rows, temp_table, sources_config, all_fields: list[str]):
# Save cleaned values for later
# (except for tags, which take up too much space)
if field == "tags":
log.debug("Skipping tags.")
continue
cleaned_values[field].append((identifier, clean_value))

Expand All @@ -277,97 +272,24 @@ def _clean_data_worker(rows, temp_table, sources_config, all_fields: list[str]):
return cleaned_values


@dataclass
class FieldBuffered:
part: int
rows: list[tuple[str, str]]


class CleanDataUploader:
# Number of lines to keep in memory before writing to S3
buffer_size: int

s3_path = "shared/data-refresh-cleaned-data"

buffer = {
field: FieldBuffered(part=1, rows=[])
for field in _cleanup_config["tables"]["image"]["sources"]["*"]["fields"]
}

def __init__(self):
log.info("Initializing clean data uploader.")
self.date = time.strftime("%Y-%m-%d")
self.buffer_size = config("CLEANUP_BUFFER_SIZE", default=10_000_000, cast=int)
bucket_name = config("OPENVERSE_BUCKET", default="openverse-catalog")
try:
self.s3 = self._get_s3_resource()
self.s3_bucket = self.s3.Bucket(bucket_name)
# Try loading the bucket's attributes to check the connection works.
self.s3_bucket.load()
except Exception as e:
log.error(f"Error connecting to S3 or creating bucket: {e}")
self.s3 = None
self.s3_bucket = None
log.info(f"Connected to S3 and `{bucket_name}` bucket loaded.")

@staticmethod
def _get_s3_resource():
if config("ENVIRONMENT", default="local") == "local":
return boto3.resource(
"s3",
endpoint_url=config("AWS_S3_ENDPOINT", default="http://s3:5000"),
aws_access_key_id=config("AWS_ACCESS_KEY_ID", default="test_key"),
aws_secret_access_key=config(
"AWS_SECRET_ACCESS_KEY", default="test_secret"
),
)

return boto3.resource(
"s3", region_name=config("AWS_REGION", default="us-east-1")
)

def _upload_to_s3(self, field: str):
if not self.s3_bucket:
log.warning("No S3 bucket available, skipping upload.")
return

part_number = self.buffer[field].part
s3_file_name = f"{self.s3_path}/{self.date}_{field}_{part_number}.tsv"
tsv_file = f"{field}.tsv"
with open(tsv_file, "w") as f:
csv_writer = csv.writer(f, delimiter="\t")
csv_writer.writerows(self.buffer[field].rows)
file_size = os.path.getsize(tsv_file) / (1024 * 1024)
try:
log.info(
f"Uploading file part {part_number} ({file_size:.4f} MB) of `{field}` "
f"with {len(self.buffer[field].rows)} rows to S3..."
)
self.s3_bucket.upload_file(tsv_file, s3_file_name)
except Exception as e:
log.error(f"Error uploading {field} to S3: {e}")
os.remove(tsv_file)
log.info(f"`{tsv_file}` removed locally. Clearing buffer.")
self.buffer[field].part += 1
self.buffer[field].rows = []

def save(self, result: dict) -> dict[str, int]:
for field, cleaned_items in result.items():
if not cleaned_items:
continue

self.buffer[field].rows += cleaned_items
if len(self.buffer[field].rows) >= self.buffer_size:
self._upload_to_s3(field)
def save_cleaned_data(result: dict) -> dict[str, int]:
log.info("Saving cleaned data...")
start_time = time.perf_counter()

return {field: len(items) for field, items in result.items()}
cleanup_counts = {field: len(items) for field, items in result.items()}
for field, cleaned_items in result.items():
# Skip the tag field because the file is too large and fills up the disk
if field == "tag":
continue
if cleaned_items:
with open(f"{field}.tsv", "a") as f:
csv_writer = csv.writer(f, delimiter="\t")
csv_writer.writerows(cleaned_items)

def flush(self):
log.info("Flushing remaining rows...")
for field in self.buffer:
if self.buffer[field].rows:
self._upload_to_s3(field)
log.info("Saved all the cleaned data.")
end_time = time.perf_counter()
total_time = end_time - start_time
log.info(f"Finished saving cleaned data in {total_time:.3f},\n{cleanup_counts}")
return cleanup_counts


def clean_image_data(table):
Expand All @@ -377,7 +299,6 @@ def clean_image_data(table):
:param table: The staging table for the new data
:return: None
"""
data_uploader = CleanDataUploader()

# Map each table to the fields that need to be cleaned up. Then, map each
# field to its cleanup function.
Expand Down Expand Up @@ -443,7 +364,7 @@ def clean_image_data(table):
log.info(f"Starting {len(jobs)} cleaning jobs")

for result in pool.starmap(_clean_data_worker, jobs):
batch_cleaned_counts = data_uploader.save(result)
batch_cleaned_counts = save_cleaned_data(result)
for field in batch_cleaned_counts:
cleaned_counts_by_field[field] += batch_cleaned_counts[field]
pool.close()
Expand All @@ -462,7 +383,6 @@ def clean_image_data(table):
conn.commit()
iter_cur.close()
conn.close()
data_uploader.flush()
end_time = time.perf_counter()
cleanup_time = end_time - start_time
log.info(
Expand Down