Skip to content

Commit

Permalink
gzip content
Browse files Browse the repository at this point in the history
  • Loading branch information
polomarcus committed Dec 12, 2024
1 parent 709cdae commit 8f9a7bb
Show file tree
Hide file tree
Showing 2 changed files with 11 additions and 14 deletions.
21 changes: 9 additions & 12 deletions quotaclimat/data_processing/mediatree/s3/api_to_s3.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
from time import sleep
import sys
import os
import gc
import gzip
from quotaclimat.utils.healthcheck_config import run_health_check_server
from quotaclimat.utils.logger import getLogger
from quotaclimat.data_processing.mediatree.utils import *
Expand All @@ -30,7 +30,7 @@
import ray
import s3fs
import boto3
from io import StringIO
from io import BytesIO

from quotaclimat.utils.sentry import sentry_init
logging.getLogger('modin.logger.default').setLevel(logging.ERROR)
Expand Down Expand Up @@ -62,7 +62,7 @@

def get_bucket_key(date, channel):
(year, month, day) = (date.year, date.month, date.day)
return f'year={year}/month={month:02}/day={day:02}/channel={channel}/data.json'
return f'year={year}/month={month:02}/day={day:02}/channel={channel}/data.json.gz'

def save_to_s3(df: pd.DataFrame, channel: str, date: pd.Timestamp):
logging.info(f"Saving DF with {len(df)} elements to S3 for {date} and channel {channel}")
Expand All @@ -74,14 +74,11 @@ def save_to_s3(df: pd.DataFrame, channel: str, date: pd.Timestamp):
logging.info(f"s3://{BUCKET_NAME}/{object_key}")

try:
json_buffer = StringIO()
json_buffer = df.to_json(
None,
index=False,
orient='records',
lines=False
)
s3_client.put_object(Bucket=BUCKET_NAME, Key=object_key, Body=json_buffer)
json_buffer = BytesIO()
with gzip.GzipFile(fileobj=json_buffer, mode='w') as gz:
df.to_json(gz, orient='records', lines=False)

s3_client.put_object(Bucket=BUCKET_NAME, Key=object_key, Body=json_buffer.getvalue())

logging.info(f"Uploaded partition: {object_key}")
except Exception as e:
Expand Down Expand Up @@ -117,7 +114,7 @@ async def get_and_save_api_data(exit_event):
channel_program_type = str(program.program_type)
logging.info(f"Querying API for {channel} - {channel_program} - {channel_program_type} - {start_epoch} - {end_epoch}")
df = get_df_api(token, type_sub, start_epoch, channel, end_epoch, channel_program, channel_program_type)
df.drop(columns=['srt'], axis=1, inplace=True)

if(df is not None):
df_res = pd.concat([df_res, df ], ignore_index=True)
else:
Expand Down
4 changes: 2 additions & 2 deletions test/s3/test_s3.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,11 +7,11 @@ def test_get_bucket_key():
friday_6h26 = 1726719981
date = pd.to_datetime(friday_6h26, unit='s', utc=True)
channel = "tf1"
assert get_bucket_key(date, channel) == "year=2024/month=09/day=19/channel=tf1/data.json"
assert get_bucket_key(date, channel) == "year=2024/month=09/day=19/channel=tf1/data.json.gz"


def test_get_bucket_key_first_of_the_month():
first_december = 1733040125
date = pd.to_datetime(first_december, unit='s', utc=True)
channel = "tf1"
assert get_bucket_key(date, channel) == "year=2024/month=12/day=01/channel=tf1/data.json"
assert get_bucket_key(date, channel) == "year=2024/month=12/day=01/channel=tf1/data.json.gz"

1 comment on commit 8f9a7bb

@github-actions
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Coverage

Coverage Report
FileStmtsMissCoverMissing
postgres
   insert_data.py43784%36–38, 56–58, 63
   insert_existing_data_example.py19384%25–27
postgres/schemas
   models.py1681193%137–144, 157, 159–160, 225–226, 240–241
quotaclimat/data_ingestion
   scrap_sitemap.py1341787%27–28, 33–34, 66–71, 95–97, 138–140, 202, 223–228
quotaclimat/data_ingestion/ingest_db
   ingest_sitemap_in_db.py553733%21–42, 45–58, 62–73
quotaclimat/data_ingestion/scrap_html
   scrap_description_article.py36392%19–20, 32
quotaclimat/data_processing/mediatree
   api_import.py21313338%44–48, 53–74, 78–81, 87, 90–132, 138–153, 158, 171–183, 187–193, 206–218, 221–225, 231, 269–270, 273–304, 307–309
   channel_program.py1625765%21–23, 34–36, 53–54, 57–59, 98–99, 108, 124, 175–216
   config.py15287%7, 16
   detect_keywords.py2521694%111–118, 126–127, 271, 341–348, 390
   update_pg_keywords.py674927%15–130, 154, 157, 164–179, 213–250, 257
   utils.py792568%29–53, 56, 65, 86–87, 117–120
quotaclimat/data_processing/mediatree/s3
   api_to_s3.py1216844%68–86, 89–133, 136–162, 165–167
quotaclimat/utils
   healthcheck_config.py291452%22–24, 27–38
   logger.py241154%22–24, 28–37
   sentry.py11282%22–23
TOTAL145545569% 

Tests Skipped Failures Errors Time
104 0 💤 0 ❌ 0 🔥 8m 28s ⏱️

Please sign in to comment.