From 5cd210d241dd7d225320206b81641f36261c0aca Mon Sep 17 00:00:00 2001 From: Blondel MONDESIR Date: Tue, 24 Sep 2024 12:51:43 -0400 Subject: [PATCH 1/6] Create xb_utils.py --- cps/services/xb_utils.py | 150 +++++++++++++++++++++++++++++++++++++++ 1 file changed, 150 insertions(+) create mode 100644 cps/services/xb_utils.py diff --git a/cps/services/xb_utils.py b/cps/services/xb_utils.py new file mode 100644 index 0000000000..1caf346b62 --- /dev/null +++ b/cps/services/xb_utils.py @@ -0,0 +1,150 @@ +import os +import re +import logging +from datetime import datetime +from sqlalchemy.orm import Session +from sqlalchemy import literal +from cps.xb import Media, Caption, Playlists +from cps.subproc_wrapper import process_open + +log = logging.getLogger(__name__) + +class Settings: + LB_WRAPPER = os.getenv('LB_WRAPPER', 'lb-wrapper') + TIMEOUT = 120 # seconds + MAX_VIDEOS_PER_DOWNLOAD = 10 # Will use constants.py for this later + +def format_media_url(media_url): + """Formats the media URL by removing query parameters.""" + return media_url.split("&")[0] if "&" in media_url else media_url + +def format_original_url(original_url): + """Formats the original URL to point to the metadata endpoint.""" + return re.sub(r"/media(?=\?|$)", r"/meta", original_url) + +def execute_subprocess(subprocess_args): + """Executes a subprocess and returns the process handle.""" + try: + p = process_open(subprocess_args, newlines=True) + return p + except Exception as e: + log.error("An error occurred during subprocess execution: %s", e) + raise + +class DatabaseService: + """Service class for database operations.""" + + def __init__(self, session: Session): + self.session = session + + def remove_shorts_from_db(self): + """Deletes media entries where the path contains 'shorts'.""" + log.debug("Removing shorts from the database.") + try: + self.session.query(Media).filter(Media.path.like('%shorts%')).delete(synchronize_session=False) + self.session.commit() + log.info("Shorts removed from the database.") + except Exception as e: + self.session.rollback() + log.error("An error occurred while removing shorts from the database: %s", e) + raise + + def fetch_requested_urls(self, unavailable: list): + """Fetches requested URLs from the database.""" + log.debug("Fetching requested URLs from the database.") + try: + query = self.session.query(Media.path, Media.duration, Media.live_status)\ + .filter(Media.path.like('http%'))\ + .filter((Media.error == None) | (Media.error == '')) + rows = query.all() + requested_urls = {} + for path, duration, live_status in rows: + if duration is not None and duration > 0: + requested_urls[path] = {"duration": duration, "live_status": live_status} + else: + unavailable.append(path) + log.info("Fetched %d requested URLs.", len(requested_urls)) + return requested_urls + except Exception as e: + log.error("An error occurred while fetching requested URLs: %s", e) + raise + + def calculate_views_per_day(self, requested_urls: dict): + """Calculates views per day for each requested URL.""" + log.debug("Calculating views per day for requested URLs.") + now = datetime.now() + for requested_url in list(requested_urls.keys()): + try: + media_entry = self.session.query(Media).filter(Media.path == requested_url).first() + if media_entry and media_entry.view_count and media_entry.time_uploaded: + view_count = media_entry.view_count + time_uploaded = datetime.utcfromtimestamp(media_entry.time_uploaded) + days_since_publish = (now - time_uploaded).days or 1 + requested_urls[requested_url]["views_per_day"] = view_count / days_since_publish + else: + # If data is missing, remove the URL from requested_urls + requested_urls.pop(requested_url) + log.warning("Removed URL %s due to missing data.", requested_url) + except Exception as e: + log.error("An error occurred during calculation for %s: %s", requested_url, e) + requested_urls.pop(requested_url) + log.info("Views per day calculated for requested URLs.") + + def update_playlist_path(self, media_url): + """Updates the playlist path with a timestamp.""" + log.debug("Updating playlist path for URL: %s", media_url) + try: + playlist = self.session.query(Playlists).filter(Playlists.path == media_url).first() + if playlist: + playlist.path = f"{media_url}×tamp={int(datetime.now().timestamp())}" + self.session.commit() + log.info("Playlist path updated for %s.", media_url) + else: + log.error("No playlist found with path %s", media_url) + except Exception as e: + self.session.rollback() + log.error("An error occurred while updating the playlist path: %s", e) + raise + + def get_extractor_id(self, media_url): + """Gets the extractor ID for the given media URL.""" + log.debug("Getting extractor ID for URL: %s", media_url) + try: + media_entry = self.session.query(Media).filter( + literal(media_url).like('%' + Media.extractor_id + '%') + ).first() + if media_entry: + log.info("Extractor ID found: %s", media_entry.extractor_id) + return media_entry.extractor_id + else: + log.error("Extractor ID not found for URL: %s", media_url) + return None + except Exception as e: + log.error("An error occurred while getting extractor ID: %s", e) + raise + + def read_error_from_database(self, media_url): + """Reads the error message from the database.""" + log.debug("Reading error message from the database for URL: %s", media_url) + try: + error_entry = self.session.query(Media.error).filter(Media.webpath == media_url).first() + if error_entry and error_entry.error: + return error_entry.error + else: + return "No error message found in database" + except Exception as e: + log.error("An error occurred while reading error from the database: %s", e) + return f"An error occurred while reading error from the database: {e}" + + def delete_media_and_captions(self, media_id, media_url): + """Deletes media and captions entries for a given media ID.""" + log.debug("Deleting media and captions entries for media ID: %s", media_id) + try: + self.session.query(Caption).filter(Caption.media_id == media_id).delete(synchronize_session=False) + self.session.query(Media).filter(Media.webpath == media_url).delete(synchronize_session=False) + self.session.commit() + log.info("Deleted media and caption entries for media ID: %s", media_id) + except Exception as e: + self.session.rollback() + log.error("An error occurred while deleting media and captions: %s", e) + raise From 80554ab5437bcbce14cce69f7bc69d84ca5680a9 Mon Sep 17 00:00:00 2001 From: Blondel MONDESIR Date: Tue, 24 Sep 2024 22:14:06 -0400 Subject: [PATCH 2/6] Use the right session --- cps/services/xb_utils.py | 11 ++++++----- 1 file changed, 6 insertions(+), 5 deletions(-) diff --git a/cps/services/xb_utils.py b/cps/services/xb_utils.py index 1caf346b62..68b8ff2d61 100644 --- a/cps/services/xb_utils.py +++ b/cps/services/xb_utils.py @@ -1,18 +1,18 @@ import os import re -import logging from datetime import datetime from sqlalchemy.orm import Session from sqlalchemy import literal -from cps.xb import Media, Caption, Playlists +from cps.xb import XKLBDB, Media, Caption, Playlists from cps.subproc_wrapper import process_open +from cps import logger -log = logging.getLogger(__name__) +log = logger.create() class Settings: LB_WRAPPER = os.getenv('LB_WRAPPER', 'lb-wrapper') TIMEOUT = 120 # seconds - MAX_VIDEOS_PER_DOWNLOAD = 10 # Will use constants.py for this later + MAX_VIDEOS_PER_DOWNLOAD = 10 def format_media_url(media_url): """Formats the media URL by removing query parameters.""" @@ -35,7 +35,8 @@ class DatabaseService: """Service class for database operations.""" def __init__(self, session: Session): - self.session = session + db = XKLBDB() + self.session = db.get_session() def remove_shorts_from_db(self): """Deletes media entries where the path contains 'shorts'.""" From ded94b266f8ac3a8e53aac79b618fbbf611fd985 Mon Sep 17 00:00:00 2001 From: Blondel MONDESIR Date: Mon, 14 Oct 2024 11:41:30 -0400 Subject: [PATCH 3/6] Add MappingService class --- cps/services/xb_utils.py | 49 ++++++++++++++++++++++++++++++++++++++++ 1 file changed, 49 insertions(+) diff --git a/cps/services/xb_utils.py b/cps/services/xb_utils.py index 68b8ff2d61..a1c7baa56d 100644 --- a/cps/services/xb_utils.py +++ b/cps/services/xb_utils.py @@ -4,6 +4,7 @@ from sqlalchemy.orm import Session from sqlalchemy import literal from cps.xb import XKLBDB, Media, Caption, Playlists +from cps.glue_db import GlueDB, MediaBooksMapping from cps.subproc_wrapper import process_open from cps import logger @@ -149,3 +150,51 @@ def delete_media_and_captions(self, media_id, media_url): self.session.rollback() log.error("An error occurred while deleting media and captions: %s", e) raise + +class MappingService: + """Service class for mapping operations.""" + def __init__(self, session: Session): + db = GlueDB() + self.session = db.get_session() + + def add_book_media_mapping(self, media_id, book_id): + """Adds a mapping between the media_id and the book_id.""" + try: + mapping = MediaBooksMapping(media_id=media_id, book_id=book_id) + # to avoid duplicate entries, use the merge method + self.session.merge(mapping) + self.session.commit() + log.info("Mapping added: %s", mapping) + except Exception as e: + self.session.rollback() + log.error("An error occurred while adding mapping: %s", e) + raise + + def get_mapping(self, media_id): + """Gets the mapping for the given media_id.""" + try: + mapping = self.session.query(MediaBooksMapping).filter(MediaBooksMapping.media_id == media_id).first() + if mapping: + log.info("Mapping found: %s", mapping) + return mapping + else: + log.error("No mapping found for media ID: %s", media_id) + return None + except Exception as e: + log.error("An error occurred while getting mapping: %s", e) + raise + + def update_mapping(self, media_id, book_id): + """Updates the mapping for the given media_id.""" + try: + mapping = self.session.query(MediaBooksMapping).filter(MediaBooksMapping.media_id == media_id).first() + if mapping: + mapping.book_id = book_id + self.session.commit() + log.info("Mapping updated: %s", mapping) + else: + log.error("No mapping found for media ID: %s", media_id) + except Exception as e: + self.session.rollback() + log.error("An error occurred while updating mapping: %s", e) + raise From fc2d71387bf402a2c2c61d018d121bd5572984b7 Mon Sep 17 00:00:00 2001 From: Blondel MONDESIR Date: Mon, 14 Oct 2024 18:26:15 -0400 Subject: [PATCH 4/6] Add CaptionSearcher to xb_utils.py --- cps/services/xb_utils.py | 70 ++++++++++++++++++++++++++++++++++++++++ 1 file changed, 70 insertions(+) diff --git a/cps/services/xb_utils.py b/cps/services/xb_utils.py index a1c7baa56d..c21a7588f5 100644 --- a/cps/services/xb_utils.py +++ b/cps/services/xb_utils.py @@ -1,6 +1,8 @@ import os import re +from copy import deepcopy from datetime import datetime +from itertools import groupby from sqlalchemy.orm import Session from sqlalchemy import literal from cps.xb import XKLBDB, Media, Caption, Playlists @@ -198,3 +200,71 @@ def update_mapping(self, media_id, book_id): self.session.rollback() log.error("An error occurred while updating mapping: %s", e) raise + +class CaptionSearcher: + def __init__(self): + self.xklb_session = XKLBDB().get_session() + self.glue_session = GlueDB().get_session() + + def _query_database(self, term): + """Executes a query on the xklb database and retrieves book_ids from iiab-glue.db.""" + captions = self.xklb_session.query( + Caption.media_id, + Caption.text, + Caption.time + ).filter(Caption.text.like(f'%{term}%')).all() + + media_ids = [caption[0] for caption in captions] + + # Get corresponding book_ids from the glue database + mappings = self.glue_session.query(MediaBooksMapping).filter( + MediaBooksMapping.media_id.in_(media_ids) + ).all() + media_id_to_book_id = {mapping.media_id: mapping.book_id for mapping in mappings} + + # Combine captions with book_ids + captions_list = [] + for caption in captions: + media_id = caption[0] + book_id = media_id_to_book_id.get(media_id) + if book_id: + captions_list.append({ + 'book_id': book_id, + 'text': caption[1], + 'time': caption[2] + }) + + return captions_list + + def _merge_captions(self, captions): + """Merges overlapping captions for the same book_id.""" + def get_end(caption): + return caption["time"] + (len(caption["text"]) / 4.2 / 220 * 60) + + merged_captions = [] + for book_id, group in groupby(captions, key=lambda x: x["book_id"]): + group = list(group) + merged_group = deepcopy(group[0]) + merged_group["end"] = get_end(group[0]) + for i in range(1, len(group)): + if group[i]["time"] <= merged_group["end"]: + merged_group["text"] += " " + group[i]["text"] + merged_group["end"] = get_end(group[i]) + else: + merged_captions.append(merged_group) + merged_group = deepcopy(group[i]) + merged_group["end"] = get_end(group[i]) + merged_captions.append(merged_group) + + return merged_captions + + def get_captions_search_results(self, term): + """Searches for captions matching the term and returns book_ids.""" + captions = self._query_database(term) + if not captions: + return [] + + merged_captions = self._merge_captions(captions) + book_ids = list({caption['book_id'] for caption in merged_captions}) + + return book_ids From 53a01579f045a675f988e7a4abdcad530a257347 Mon Sep 17 00:00:00 2001 From: Blondel MONDESIR Date: Wed, 16 Oct 2024 09:02:23 -0400 Subject: [PATCH 5/6] Fix session mismatch --- cps/services/xb_utils.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/cps/services/xb_utils.py b/cps/services/xb_utils.py index c21a7588f5..0d5baa309a 100644 --- a/cps/services/xb_utils.py +++ b/cps/services/xb_utils.py @@ -155,7 +155,7 @@ def delete_media_and_captions(self, media_id, media_url): class MappingService: """Service class for mapping operations.""" - def __init__(self, session: Session): + def __init__(self): db = GlueDB() self.session = db.get_session() @@ -209,6 +209,7 @@ def __init__(self): def _query_database(self, term): """Executes a query on the xklb database and retrieves book_ids from iiab-glue.db.""" captions = self.xklb_session.query( + # Caption.rowid, Caption.media_id, Caption.text, Caption.time From a8f88af94b9a341436a7286fd626eaad58dc1181 Mon Sep 17 00:00:00 2001 From: Blondel MONDESIR Date: Fri, 1 Nov 2024 23:46:12 -0400 Subject: [PATCH 6/6] Update session management in xb_utils.py --- cps/services/xb_utils.py | 93 +++++++++++++++++++++++++--------------- 1 file changed, 59 insertions(+), 34 deletions(-) diff --git a/cps/services/xb_utils.py b/cps/services/xb_utils.py index 0d5baa309a..fac37c9821 100644 --- a/cps/services/xb_utils.py +++ b/cps/services/xb_utils.py @@ -38,8 +38,8 @@ class DatabaseService: """Service class for database operations.""" def __init__(self, session: Session): - db = XKLBDB() - self.session = db.get_session() + self.db = XKLBDB() + self.session = self.db.get_session() def remove_shorts_from_db(self): """Deletes media entries where the path contains 'shorts'.""" @@ -77,8 +77,8 @@ def calculate_views_per_day(self, requested_urls: dict): """Calculates views per day for each requested URL.""" log.debug("Calculating views per day for requested URLs.") now = datetime.now() - for requested_url in list(requested_urls.keys()): - try: + try: + for requested_url in list(requested_urls.keys()): media_entry = self.session.query(Media).filter(Media.path == requested_url).first() if media_entry and media_entry.view_count and media_entry.time_uploaded: view_count = media_entry.view_count @@ -89,10 +89,10 @@ def calculate_views_per_day(self, requested_urls: dict): # If data is missing, remove the URL from requested_urls requested_urls.pop(requested_url) log.warning("Removed URL %s due to missing data.", requested_url) - except Exception as e: - log.error("An error occurred during calculation for %s: %s", requested_url, e) - requested_urls.pop(requested_url) - log.info("Views per day calculated for requested URLs.") + log.info("Views per day calculated for requested URLs.") + except Exception as e: + log.error("An error occurred during calculation for %s: %s", requested_url, e) + requested_urls.pop(requested_url) def update_playlist_path(self, media_url): """Updates the playlist path with a timestamp.""" @@ -153,10 +153,15 @@ def delete_media_and_captions(self, media_id, media_url): log.error("An error occurred while deleting media and captions: %s", e) raise + def close_session(self): + self.session.close() + self.db.remove_session() + class MappingService: """Service class for mapping operations.""" def __init__(self): db = GlueDB() + self.db = db self.session = db.get_session() def add_book_media_mapping(self, media_id, book_id): @@ -171,6 +176,9 @@ def add_book_media_mapping(self, media_id, book_id): self.session.rollback() log.error("An error occurred while adding mapping: %s", e) raise + finally: + self.session.close() + self.db.remove_session() def get_mapping(self, media_id): """Gets the mapping for the given media_id.""" @@ -185,6 +193,9 @@ def get_mapping(self, media_id): except Exception as e: log.error("An error occurred while getting mapping: %s", e) raise + finally: + self.session.close() + self.db.remove_session() def update_mapping(self, media_id, book_id): """Updates the mapping for the given media_id.""" @@ -200,42 +211,56 @@ def update_mapping(self, media_id, book_id): self.session.rollback() log.error("An error occurred while updating mapping: %s", e) raise + finally: + self.session.close() + self.db.remove_session() class CaptionSearcher: def __init__(self): - self.xklb_session = XKLBDB().get_session() - self.glue_session = GlueDB().get_session() + self.xklb_db = XKLBDB() + self.glue_db = GlueDB() def _query_database(self, term): """Executes a query on the xklb database and retrieves book_ids from iiab-glue.db.""" - captions = self.xklb_session.query( - # Caption.rowid, - Caption.media_id, - Caption.text, - Caption.time - ).filter(Caption.text.like(f'%{term}%')).all() + xklb_session = self.xklb_db.get_session() + glue_session = self.glue_db.get_session() + try: + captions = xklb_session.query( + # Caption.rowid, + Caption.media_id, + Caption.text, + Caption.time + ).filter(Caption.text.like(f'%{term}%')).all() - media_ids = [caption[0] for caption in captions] + media_ids = [caption[0] for caption in captions] - # Get corresponding book_ids from the glue database - mappings = self.glue_session.query(MediaBooksMapping).filter( - MediaBooksMapping.media_id.in_(media_ids) - ).all() - media_id_to_book_id = {mapping.media_id: mapping.book_id for mapping in mappings} + # Get corresponding book_ids from the glue database + mappings = glue_session.query(MediaBooksMapping).filter( + MediaBooksMapping.media_id.in_(media_ids) + ).all() + media_id_to_book_id = {mapping.media_id: mapping.book_id for mapping in mappings} - # Combine captions with book_ids - captions_list = [] - for caption in captions: - media_id = caption[0] - book_id = media_id_to_book_id.get(media_id) - if book_id: - captions_list.append({ - 'book_id': book_id, - 'text': caption[1], - 'time': caption[2] - }) + # Combine captions with book_ids + captions_list = [] + for caption in captions: + media_id = caption[0] + book_id = media_id_to_book_id.get(media_id) + if book_id: + captions_list.append({ + 'book_id': book_id, + 'text': caption[1], + 'time': caption[2] + }) - return captions_list + return captions_list + except Exception as e: + log.error("An error occured during caption search: %s", e) + raise + finally: + xklb_session.close() + glue_session.close() + self.xklb_db.remove_session() + self.glue_db.remove_session() def _merge_captions(self, captions): """Merges overlapping captions for the same book_id."""