Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Create xb_utils.py [to query/manipulate xklb-metadata.db (schema in xb.py) + iiab-glue.db (schema in glue-db.py)] #256

Open
wants to merge 6 commits into
base: master
Choose a base branch
from
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
296 changes: 296 additions & 0 deletions cps/services/xb_utils.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,296 @@
import os
import re
from copy import deepcopy
from datetime import datetime
from itertools import groupby
from sqlalchemy.orm import Session
from sqlalchemy import literal
from cps.xb import XKLBDB, Media, Caption, Playlists
from cps.glue_db import GlueDB, MediaBooksMapping
from cps.subproc_wrapper import process_open
from cps import logger

log = logger.create()

class Settings:
LB_WRAPPER = os.getenv('LB_WRAPPER', 'lb-wrapper')
TIMEOUT = 120 # seconds
MAX_VIDEOS_PER_DOWNLOAD = 10

def format_media_url(media_url):
"""Formats the media URL by removing query parameters."""
return media_url.split("&")[0] if "&" in media_url else media_url

def format_original_url(original_url):
"""Formats the original URL to point to the metadata endpoint."""
return re.sub(r"/media(?=\?|$)", r"/meta", original_url)

def execute_subprocess(subprocess_args):
"""Executes a subprocess and returns the process handle."""
try:
p = process_open(subprocess_args, newlines=True)
return p
except Exception as e:
log.error("An error occurred during subprocess execution: %s", e)
raise

class DatabaseService:
"""Service class for database operations."""

def __init__(self, session: Session):
self.db = XKLBDB()
self.session = self.db.get_session()

def remove_shorts_from_db(self):
"""Deletes media entries where the path contains 'shorts'."""
log.debug("Removing shorts from the database.")
try:
self.session.query(Media).filter(Media.path.like('%shorts%')).delete(synchronize_session=False)
self.session.commit()
log.info("Shorts removed from the database.")
except Exception as e:
self.session.rollback()
log.error("An error occurred while removing shorts from the database: %s", e)
raise

def fetch_requested_urls(self, unavailable: list):
"""Fetches requested URLs from the database."""
log.debug("Fetching requested URLs from the database.")
try:
query = self.session.query(Media.path, Media.duration, Media.live_status)\
.filter(Media.path.like('http%'))\
.filter((Media.error == None) | (Media.error == ''))
rows = query.all()
requested_urls = {}
for path, duration, live_status in rows:
if duration is not None and duration > 0:
requested_urls[path] = {"duration": duration, "live_status": live_status}
else:
unavailable.append(path)
log.info("Fetched %d requested URLs.", len(requested_urls))
return requested_urls
except Exception as e:
log.error("An error occurred while fetching requested URLs: %s", e)
raise

def calculate_views_per_day(self, requested_urls: dict):
"""Calculates views per day for each requested URL."""
log.debug("Calculating views per day for requested URLs.")
now = datetime.now()
try:
for requested_url in list(requested_urls.keys()):
media_entry = self.session.query(Media).filter(Media.path == requested_url).first()
if media_entry and media_entry.view_count and media_entry.time_uploaded:
view_count = media_entry.view_count
time_uploaded = datetime.utcfromtimestamp(media_entry.time_uploaded)
days_since_publish = (now - time_uploaded).days or 1
requested_urls[requested_url]["views_per_day"] = view_count / days_since_publish
else:
# If data is missing, remove the URL from requested_urls
requested_urls.pop(requested_url)
log.warning("Removed URL %s due to missing data.", requested_url)
log.info("Views per day calculated for requested URLs.")
except Exception as e:
log.error("An error occurred during calculation for %s: %s", requested_url, e)
requested_urls.pop(requested_url)

def update_playlist_path(self, media_url):
"""Updates the playlist path with a timestamp."""
log.debug("Updating playlist path for URL: %s", media_url)
try:
playlist = self.session.query(Playlists).filter(Playlists.path == media_url).first()
if playlist:
playlist.path = f"{media_url}&timestamp={int(datetime.now().timestamp())}"
self.session.commit()
log.info("Playlist path updated for %s.", media_url)
else:
log.error("No playlist found with path %s", media_url)
except Exception as e:
self.session.rollback()
log.error("An error occurred while updating the playlist path: %s", e)
raise

def get_extractor_id(self, media_url):
"""Gets the extractor ID for the given media URL."""
log.debug("Getting extractor ID for URL: %s", media_url)
try:
media_entry = self.session.query(Media).filter(
literal(media_url).like('%' + Media.extractor_id + '%')
).first()
if media_entry:
log.info("Extractor ID found: %s", media_entry.extractor_id)
return media_entry.extractor_id
else:
log.error("Extractor ID not found for URL: %s", media_url)
return None
except Exception as e:
log.error("An error occurred while getting extractor ID: %s", e)
raise

def read_error_from_database(self, media_url):
"""Reads the error message from the database."""
log.debug("Reading error message from the database for URL: %s", media_url)
try:
error_entry = self.session.query(Media.error).filter(Media.webpath == media_url).first()
if error_entry and error_entry.error:
return error_entry.error
else:
return "No error message found in database"
except Exception as e:
log.error("An error occurred while reading error from the database: %s", e)
return f"An error occurred while reading error from the database: {e}"

def delete_media_and_captions(self, media_id, media_url):
"""Deletes media and captions entries for a given media ID."""
log.debug("Deleting media and captions entries for media ID: %s", media_id)
try:
self.session.query(Caption).filter(Caption.media_id == media_id).delete(synchronize_session=False)
self.session.query(Media).filter(Media.webpath == media_url).delete(synchronize_session=False)
self.session.commit()
log.info("Deleted media and caption entries for media ID: %s", media_id)
except Exception as e:
self.session.rollback()
log.error("An error occurred while deleting media and captions: %s", e)
raise

def close_session(self):
self.session.close()
self.db.remove_session()

class MappingService:
"""Service class for mapping operations."""
def __init__(self):
db = GlueDB()
self.db = db
self.session = db.get_session()

def add_book_media_mapping(self, media_id, book_id):
"""Adds a mapping between the media_id and the book_id."""
try:
mapping = MediaBooksMapping(media_id=media_id, book_id=book_id)
# to avoid duplicate entries, use the merge method
self.session.merge(mapping)
self.session.commit()
log.info("Mapping added: %s", mapping)
except Exception as e:
self.session.rollback()
log.error("An error occurred while adding mapping: %s", e)
raise
finally:
self.session.close()
self.db.remove_session()

def get_mapping(self, media_id):
"""Gets the mapping for the given media_id."""
try:
mapping = self.session.query(MediaBooksMapping).filter(MediaBooksMapping.media_id == media_id).first()
if mapping:
log.info("Mapping found: %s", mapping)
return mapping
else:
log.error("No mapping found for media ID: %s", media_id)
return None
except Exception as e:
log.error("An error occurred while getting mapping: %s", e)
raise
finally:
self.session.close()
self.db.remove_session()

def update_mapping(self, media_id, book_id):
"""Updates the mapping for the given media_id."""
try:
mapping = self.session.query(MediaBooksMapping).filter(MediaBooksMapping.media_id == media_id).first()
if mapping:
mapping.book_id = book_id
self.session.commit()
log.info("Mapping updated: %s", mapping)
else:
log.error("No mapping found for media ID: %s", media_id)
except Exception as e:
self.session.rollback()
log.error("An error occurred while updating mapping: %s", e)
raise
finally:
self.session.close()
self.db.remove_session()

class CaptionSearcher:
def __init__(self):
self.xklb_db = XKLBDB()
self.glue_db = GlueDB()

def _query_database(self, term):
"""Executes a query on the xklb database and retrieves book_ids from iiab-glue.db."""
xklb_session = self.xklb_db.get_session()
glue_session = self.glue_db.get_session()
try:
captions = xklb_session.query(
# Caption.rowid,
Caption.media_id,
Caption.text,
Caption.time
).filter(Caption.text.like(f'%{term}%')).all()

media_ids = [caption[0] for caption in captions]

# Get corresponding book_ids from the glue database
mappings = glue_session.query(MediaBooksMapping).filter(
MediaBooksMapping.media_id.in_(media_ids)
).all()
media_id_to_book_id = {mapping.media_id: mapping.book_id for mapping in mappings}

# Combine captions with book_ids
captions_list = []
for caption in captions:
media_id = caption[0]
book_id = media_id_to_book_id.get(media_id)
if book_id:
captions_list.append({
'book_id': book_id,
'text': caption[1],
'time': caption[2]
})

return captions_list
except Exception as e:
log.error("An error occured during caption search: %s", e)
raise
finally:
xklb_session.close()
glue_session.close()
self.xklb_db.remove_session()
self.glue_db.remove_session()

def _merge_captions(self, captions):
"""Merges overlapping captions for the same book_id."""
def get_end(caption):
return caption["time"] + (len(caption["text"]) / 4.2 / 220 * 60)

merged_captions = []
for book_id, group in groupby(captions, key=lambda x: x["book_id"]):
group = list(group)
merged_group = deepcopy(group[0])
merged_group["end"] = get_end(group[0])
for i in range(1, len(group)):
if group[i]["time"] <= merged_group["end"]:
merged_group["text"] += " " + group[i]["text"]
merged_group["end"] = get_end(group[i])
else:
merged_captions.append(merged_group)
merged_group = deepcopy(group[i])
merged_group["end"] = get_end(group[i])
merged_captions.append(merged_group)

return merged_captions

def get_captions_search_results(self, term):
"""Searches for captions matching the term and returns book_ids."""
captions = self._query_database(term)
if not captions:
return []

merged_captions = self._merge_captions(captions)
book_ids = list({caption['book_id'] for caption in merged_captions})

return book_ids