From a71fa2135c62787d11c37768c948c600a5d5947d Mon Sep 17 00:00:00 2001 From: Luis Arias Date: Mon, 13 May 2024 14:26:17 +0200 Subject: [PATCH] Add latest and missing mode to dataset update cli --- observatoire/tmdb/movies/__main__.py | 46 +++++++++++++++++++++------- observatoire/tmdb/series/__main__.py | 46 +++++++++++++++++++++------- observatoire/tmdb/series/data.py | 7 +++-- observatoire/tmdb/types.py | 3 ++ 4 files changed, 77 insertions(+), 25 deletions(-) create mode 100644 observatoire/tmdb/types.py diff --git a/observatoire/tmdb/movies/__main__.py b/observatoire/tmdb/movies/__main__.py index 7aaf525..f098033 100644 --- a/observatoire/tmdb/movies/__main__.py +++ b/observatoire/tmdb/movies/__main__.py @@ -2,6 +2,7 @@ import contextlib import os +import fire from tqdm import tqdm from observatoire.tmdb.config import HF_MOVIES_DATASET, TMDB_BATCH_SIZE @@ -10,24 +11,47 @@ from observatoire.tmdb.logger import setup_logger from observatoire.tmdb.movies.data import make_movie_df from observatoire.tmdb.movies.tmdb import get_latest_movie_id, get_movie_data +from observatoire.tmdb.types import Mode -def executor() -> None: +def executor(mode: Mode = "latest") -> None: logger = setup_logger() - logger.info("Starting Executor") - - # first, lets get the id of the lastest movies - latest_id = get_latest_movie_id() + logger.info("Starting Movie Dataset Update") # second, let's get the last movie from our last run df_current = load_dataset(HF_MOVIES_DATASET) current_id = df_current["id"].max() if df_current is not None else None - # Generate a list of movie IDs - movie_ids_list = list(range(current_id or 1, latest_id)) - total_movies_to_process = len(movie_ids_list) + if mode == "missing": + logger.info("Mode: Update Missing Movies") + + # Create a set of all possible ids + all_ids = set(range(1, current_id + 1)) + + # Create a set of ids present in df_current + current_ids = set(df_current["id"].dropna()) + + # Find the difference between all_ids and current_ids + missing_ids = all_ids - current_ids + + # Convert the set to a list + movie_ids_list = list(missing_ids) + + logger.info(f"Total Missing Movies: {len(movie_ids_list)}") + elif mode == "latest": + logger.info("Mode: Update Latest MOvies") + + # first, lets get the id of the lastest movie + latest_id = get_latest_movie_id() + + # Generate a list of movie IDs + movie_ids_list = list(range(current_id or 1, latest_id)) + + logger.info(f"Total Latest Movies: {len(movie_ids_list)}") - logger.info(f"Total Movies to Process in this run: {total_movies_to_process}") + else: + logger.critical("Invalid mode selected") + return # Split movie_ids_list into chunks of TMDB_BATCH_SIZE batches = [ @@ -35,7 +59,7 @@ def executor() -> None: for i in range(0, len(movie_ids_list), TMDB_BATCH_SIZE) ] - with tqdm(total=total_movies_to_process, unit=" movies") as pbar: + with tqdm(total=len(movie_ids_list), unit=" movies") as pbar: for batch in batches: logger.info(f"Processing batch of {len(batch)} movies") @@ -75,4 +99,4 @@ def executor() -> None: if __name__ == "__main__": - executor() + fire.Fire(executor) diff --git a/observatoire/tmdb/series/__main__.py b/observatoire/tmdb/series/__main__.py index f7a5207..0ab2f7b 100644 --- a/observatoire/tmdb/series/__main__.py +++ b/observatoire/tmdb/series/__main__.py @@ -2,6 +2,7 @@ import contextlib import os +import fire from tqdm import tqdm from observatoire.tmdb.config import HF_SERIES_DATASET, TMDB_BATCH_SIZE @@ -10,24 +11,47 @@ from observatoire.tmdb.logger import setup_logger from observatoire.tmdb.series.data import make_series_df from observatoire.tmdb.series.tmdb import get_latest_series_id, get_series_data +from observatoire.tmdb.types import Mode -def executor() -> None: +def executor(mode: Mode = "latest") -> None: logger = setup_logger() - logger.info("Starting Executor") - - # first, lets get the id of the lastest series - latest_id = get_latest_series_id() + logger.info("Starting Series Dataset Update") # second, let's get the last series from our last run df_current = load_dataset(HF_SERIES_DATASET) current_id = df_current["id"].max() if df_current is not None else None - # Generate a list of series IDs - series_ids_list = list(range(current_id or 1, latest_id)) - total_series_to_process = len(series_ids_list) + if mode == "missing": + logger.info("Mode: Update Missing Series") + + # Create a set of all possible ids + all_ids = set(range(1, current_id + 1)) + + # Create a set of ids present in df_current + current_ids = set(df_current["id"].dropna()) + + # Find the difference between all_ids and current_ids + missing_ids = all_ids - current_ids + + # Convert the set to a list + series_ids_list = list(missing_ids) + + logger.info(f"Total Missing Series: {len(series_ids_list)}") + elif mode == "latest": + logger.info("Mode: Update Latest Series") + + # first, lets get the id of the lastest series + latest_id = get_latest_series_id() + + # Generate a list of series IDs + series_ids_list = list(range(current_id or 1, latest_id)) + + logger.info(f"Total Latest Series: {len(series_ids_list)}") - logger.info(f"Total Series to Process in this run: {total_series_to_process}") + else: + logger.critical("Invalid mode selected") + return # Split series_ids_list into chunks of TMDB_BATCH_SIZE batches = [ @@ -35,7 +59,7 @@ def executor() -> None: for i in range(0, len(series_ids_list), TMDB_BATCH_SIZE) ] - with tqdm(total=total_series_to_process, unit=" series") as pbar: + with tqdm(total=len(series_ids_list), unit=" series") as pbar: for batch in batches: logger.info(f"Processing batch of {len(batch)} series") @@ -75,4 +99,4 @@ def executor() -> None: if __name__ == "__main__": - executor() + fire.Fire(executor) diff --git a/observatoire/tmdb/series/data.py b/observatoire/tmdb/series/data.py index a1af015..c074234 100644 --- a/observatoire/tmdb/series/data.py +++ b/observatoire/tmdb/series/data.py @@ -1,5 +1,4 @@ import json -import sys import pandas as pd @@ -12,12 +11,14 @@ safe_list, safe_str, ) +from observatoire.tmdb.logger import setup_logger def make_series_df(series_json: list[str]) -> pd.DataFrame: """ Transforms the JSON data into a DataFrame """ + logger = setup_logger() data = [] unique_ids = set() @@ -82,8 +83,8 @@ def make_series_df(series_json: list[str]) -> pd.DataFrame: safe_data["vote_average"] = safe_float(line_in_json, "vote_average") safe_data["vote_count"] = safe_int(line_in_json, "vote_count") - except Exception as e: - print(f"Error: {e}", file=sys.stderr) + except Exception: + logger.exception(f"Could not parse line: {line_in_json}") continue # remove and newline chracters diff --git a/observatoire/tmdb/types.py b/observatoire/tmdb/types.py new file mode 100644 index 0000000..d53a4f6 --- /dev/null +++ b/observatoire/tmdb/types.py @@ -0,0 +1,3 @@ +from typing import Literal + +Mode = Literal["latest", "missing"]