diff --git a/quotaclimat/data_processing/mediatree/s3/api_to_s3.py b/quotaclimat/data_processing/mediatree/s3/api_to_s3.py index 1f0390aa..c06f6359 100644 --- a/quotaclimat/data_processing/mediatree/s3/api_to_s3.py +++ b/quotaclimat/data_processing/mediatree/s3/api_to_s3.py @@ -7,7 +7,7 @@ from time import sleep import sys import os -import gc +import gzip from quotaclimat.utils.healthcheck_config import run_health_check_server from quotaclimat.utils.logger import getLogger from quotaclimat.data_processing.mediatree.utils import * @@ -30,7 +30,7 @@ import ray import s3fs import boto3 -from io import StringIO +from io import BytesIO from quotaclimat.utils.sentry import sentry_init logging.getLogger('modin.logger.default').setLevel(logging.ERROR) @@ -62,7 +62,7 @@ def get_bucket_key(date, channel): (year, month, day) = (date.year, date.month, date.day) - return f'year={year}/month={month:02}/day={day:02}/channel={channel}/data.json' + return f'year={year}/month={month:02}/day={day:02}/channel={channel}/data.json.gz' def save_to_s3(df: pd.DataFrame, channel: str, date: pd.Timestamp): logging.info(f"Saving DF with {len(df)} elements to S3 for {date} and channel {channel}") @@ -74,14 +74,11 @@ def save_to_s3(df: pd.DataFrame, channel: str, date: pd.Timestamp): logging.info(f"s3://{BUCKET_NAME}/{object_key}") try: - json_buffer = StringIO() - json_buffer = df.to_json( - None, - index=False, - orient='records', - lines=False - ) - s3_client.put_object(Bucket=BUCKET_NAME, Key=object_key, Body=json_buffer) + json_buffer = BytesIO() + with gzip.GzipFile(fileobj=json_buffer, mode='w') as gz: + df.to_json(gz, orient='records', lines=False) + + s3_client.put_object(Bucket=BUCKET_NAME, Key=object_key, Body=json_buffer.getvalue()) logging.info(f"Uploaded partition: {object_key}") except Exception as e: @@ -117,7 +114,7 @@ async def get_and_save_api_data(exit_event): channel_program_type = str(program.program_type) logging.info(f"Querying API for {channel} - {channel_program} - {channel_program_type} - {start_epoch} - {end_epoch}") df = get_df_api(token, type_sub, start_epoch, channel, end_epoch, channel_program, channel_program_type) - df.drop(columns=['srt'], axis=1, inplace=True) + if(df is not None): df_res = pd.concat([df_res, df ], ignore_index=True) else: diff --git a/test/s3/test_s3.py b/test/s3/test_s3.py index 228f6ec6..8cf0eca5 100644 --- a/test/s3/test_s3.py +++ b/test/s3/test_s3.py @@ -7,11 +7,11 @@ def test_get_bucket_key(): friday_6h26 = 1726719981 date = pd.to_datetime(friday_6h26, unit='s', utc=True) channel = "tf1" - assert get_bucket_key(date, channel) == "year=2024/month=09/day=19/channel=tf1/data.json" + assert get_bucket_key(date, channel) == "year=2024/month=09/day=19/channel=tf1/data.json.gz" def test_get_bucket_key_first_of_the_month(): first_december = 1733040125 date = pd.to_datetime(first_december, unit='s', utc=True) channel = "tf1" - assert get_bucket_key(date, channel) == "year=2024/month=12/day=01/channel=tf1/data.json" \ No newline at end of file + assert get_bucket_key(date, channel) == "year=2024/month=12/day=01/channel=tf1/data.json.gz" \ No newline at end of file