diff --git a/comet/api/stream.py b/comet/api/stream.py index 8aed5bb..02b21a4 100644 --- a/comet/api/stream.py +++ b/comet/api/stream.py @@ -1,12 +1,11 @@ import asyncio -import hashlib import time import aiohttp import httpx import uuid import orjson -from fastapi import APIRouter, Request +from fastapi import APIRouter, Request, BackgroundTasks from fastapi.responses import ( RedirectResponse, StreamingResponse, @@ -31,6 +30,7 @@ format_title, get_client_ip, get_aliases, + add_torrent_to_cache, ) from comet.utils.logger import logger from comet.utils.models import database, rtn, settings, trackers @@ -52,7 +52,13 @@ async def stream_noconfig(request: Request, type: str, id: str): @streams.get("/{b64config}/stream/{type}/{id}.json") -async def stream(request: Request, b64config: str, type: str, id: str): +async def stream( + request: Request, + b64config: str, + type: str, + id: str, + background_tasks: BackgroundTasks, +): config = config_check(b64config) if not config: return { @@ -66,7 +72,9 @@ async def stream(request: Request, b64config: str, type: str, id: str): } connector = aiohttp.TCPConnector(limit=0) - async with aiohttp.ClientSession(connector=connector, raise_for_status=True) as session: + async with aiohttp.ClientSession( + connector=connector, raise_for_status=True + ) as session: full_id = id season = None episode = None @@ -160,44 +168,51 @@ async def stream(request: Request, b64config: str, type: str, id: str): } ) + indexers = config["indexers"].copy() + if settings.SCRAPE_TORRENTIO: + indexers.append("torrentio") + if settings.SCRAPE_MEDIAFUSION: + indexers.append("mediafusion") + if settings.ZILEAN_URL: + indexers.append("dmm") + indexers_json = orjson.dumps(indexers).decode("utf-8") + all_sorted_ranked_files = {} - for debrid_service in services: - cache_key = hashlib.md5( - orjson.dumps( - { - "debridService": debrid_service, - "name": name, - "season": season, - "episode": episode, - "indexers": config["indexers"], - } - ) - ).hexdigest() + trackers_found = ( + set() + ) # we want to check that we have a cache for each of the user's trackers + the_time = time.time() + cache_ttl = settings.CACHE_TTL - cached = await database.fetch_one( - f"SELECT EXISTS (SELECT 1 FROM cache WHERE cacheKey = '{cache_key}')" + for debrid_service in services: + cached_results = await database.fetch_all( + """ + SELECT info_hash, tracker, data + FROM cache + WHERE debridService = :debrid_service + AND name = :name + AND ((:season IS NULL AND season IS NULL) OR season = :season) + AND ((:episode IS NULL AND episode IS NULL) OR episode = :episode) + AND tracker IN (SELECT value FROM json_each(:indexers)) + AND timestamp + :cache_ttl >= :current_time + """, + { + "debrid_service": debrid_service, + "name": name, + "season": season, + "episode": episode, + "indexers": indexers_json, + "cache_ttl": cache_ttl, + "current_time": the_time, + }, ) - - if cached[0] != 0: - timestamp = await database.fetch_one( - f"SELECT timestamp FROM cache WHERE cacheKey = '{cache_key}'" + for result in cached_results: + trackers_found.add(result["tracker"].lower()) + all_sorted_ranked_files[result["info_hash"]] = orjson.loads( + result["data"] ) - if timestamp[0] + settings.CACHE_TTL < time.time(): - await database.execute( - f"DELETE FROM cache WHERE cacheKey = '{cache_key}'" - ) - logger.info(f"Cache expired for {log_name} using {debrid_service}") - continue - - sorted_ranked_files = await database.fetch_one( - f"SELECT results FROM cache WHERE cacheKey = '{cache_key}'" - ) - sorted_ranked_files = orjson.loads(sorted_ranked_files[0]) - - all_sorted_ranked_files.update(sorted_ranked_files) - cached_count = len(all_sorted_ranked_files) - if cached_count != 0: + if len(all_sorted_ranked_files) != 0 and set(indexers).issubset(trackers_found): debrid_extension = get_debrid_extension( debrid_service, config["debridApiKey"] ) @@ -206,7 +221,6 @@ async def stream(request: Request, b64config: str, type: str, id: str): for resolution in balanced_hashes: for hash in balanced_hashes[resolution]: data = all_sorted_ranked_files[hash]["data"] - the_stream = { "name": f"[{debrid_extension}{debrid_emoji}] Comet {data['resolution']}", "description": format_title(data, config), @@ -228,17 +242,17 @@ async def stream(request: Request, b64config: str, type: str, id: str): ) else: the_stream["infoHash"] = hash - index = data["index"] the_stream["fileIdx"] = ( 1 if "|" in index else int(index) ) # 1 because for Premiumize it's impossible to get the file index - the_stream["sources"] = trackers results.append(the_stream) - logger.info(f"{cached_count} cached results found for {log_name}") + logger.info( + f"{len(all_sorted_ranked_files)} cached results found for {log_name}" + ) return {"streams": results} @@ -316,7 +330,7 @@ async def stream(request: Request, b64config: str, type: str, id: str): torrents.append(result) logger.info( - f"{len(torrents)} torrents found for {log_name}" + f"{len(torrents)} unique torrents found for {log_name}" + ( " with " + ", ".join( @@ -419,9 +433,6 @@ async def stream(request: Request, b64config: str, type: str, id: str): ) ranked_files.add(ranked_file) - # except Exception as e: - # logger.info(f"Filtered out: {e}") - # pass except: pass @@ -465,11 +476,10 @@ async def stream(request: Request, b64config: str, type: str, id: str): ) sorted_ranked_files[hash]["data"]["index"] = files[hash]["index"] - json_data = orjson.dumps(sorted_ranked_files).decode("utf-8") - await database.execute( - f"INSERT {'OR IGNORE ' if settings.DATABASE_TYPE == 'sqlite' else ''}INTO cache (cacheKey, results, timestamp) VALUES (:cache_key, :json_data, :timestamp){' ON CONFLICT DO NOTHING' if settings.DATABASE_TYPE == 'postgresql' else ''}", - {"cache_key": cache_key, "json_data": json_data, "timestamp": time.time()}, + background_tasks.add_task( + add_torrent_to_cache, config, name, season, episode, sorted_ranked_files ) + logger.info(f"Results have been cached for {log_name}") debrid_extension = get_debrid_extension(config["debridService"]) diff --git a/comet/debrid/realdebrid.py b/comet/debrid/realdebrid.py index ea2c9ba..d37c864 100644 --- a/comet/debrid/realdebrid.py +++ b/comet/debrid/realdebrid.py @@ -44,7 +44,7 @@ async def get_instant(self, chunk: list): async def get_files( self, torrent_hashes: list, type: str, season: str, episode: str, kitsu: bool ): - chunk_size = 50 + chunk_size = 100 chunks = [ torrent_hashes[i : i + chunk_size] for i in range(0, len(torrent_hashes), chunk_size) diff --git a/comet/utils/db.py b/comet/utils/db.py index 5556ef5..6333a26 100644 --- a/comet/utils/db.py +++ b/comet/utils/db.py @@ -1,4 +1,5 @@ import os +import time from comet.utils.logger import logger from comet.utils.models import database, settings @@ -13,8 +14,23 @@ async def setup_database(): open(settings.DATABASE_PATH, "a").close() await database.connect() + + if settings.DATABASE_TYPE == "postgresql": + check_query = """SELECT column_name + FROM information_schema.columns + WHERE table_name = 'cache' + AND column_name = 'cachekey'""" + else: + check_query = """SELECT name FROM pragma_table_info('cache') + WHERE name = 'cacheKey'""" + + old_structure = await database.fetch_one(check_query) + + if old_structure: + await database.execute("DROP TABLE IF EXISTS cache") + await database.execute( - "CREATE TABLE IF NOT EXISTS cache (cacheKey TEXT PRIMARY KEY, timestamp INTEGER, results TEXT)" + "CREATE TABLE IF NOT EXISTS cache (debridService TEXT, info_hash TEXT, name TEXT, season INTEGER, episode INTEGER, tracker TEXT, data TEXT, timestamp INTEGER)" ) await database.execute( "CREATE TABLE IF NOT EXISTS download_links (debrid_key TEXT, hash TEXT, file_index TEXT, link TEXT, timestamp INTEGER, PRIMARY KEY (debrid_key, hash, file_index))" @@ -23,6 +39,15 @@ async def setup_database(): await database.execute( "CREATE TABLE IF NOT EXISTS active_connections (id TEXT PRIMARY KEY, ip TEXT, content TEXT, timestamp INTEGER)" ) + + # clear expired entries + await database.execute( + """ + DELETE FROM cache + WHERE timestamp + :cache_ttl < :current_time + """, + {"cache_ttl": settings.CACHE_TTL, "current_time": time.time()}, + ) except Exception as e: logger.error(f"Error setting up the database: {e}") diff --git a/comet/utils/general.py b/comet/utils/general.py index 15266b3..688b63b 100644 --- a/comet/utils/general.py +++ b/comet/utils/general.py @@ -6,13 +6,14 @@ import PTT import asyncio import orjson +import time from RTN import parse, title_match from curl_cffi import requests from fastapi import Request from comet.utils.logger import logger -from comet.utils.models import settings, ConfigModel +from comet.utils.models import database, settings, ConfigModel languages_emojis = { "unknown": "❓", # Unknown @@ -388,7 +389,7 @@ async def get_torrentio(log_name: str, type: str, full_id: str): for torrent in get_torrentio["streams"]: title_full = torrent["title"] title = title_full.split("\n")[0] - tracker = title_full.split("⚙️ ")[1] + tracker = title_full.split("⚙️ ")[1].split("\n")[0] results.append( { @@ -637,32 +638,34 @@ def format_metadata(data: dict): def format_title(data: dict, config: dict): + result_format = config["resultFormat"] + has_all = "All" in result_format + title = "" - if "All" in config["resultFormat"] or "Title" in config["resultFormat"]: + if has_all or "Title" in result_format: title += f"{data['title']}\n" - if "All" in config["resultFormat"] or "Metadata" in config["resultFormat"]: + if has_all or "Metadata" in result_format: metadata = format_metadata(data) if metadata != "": title += f"💿 {metadata}\n" - if "All" in config["resultFormat"] or "Size" in config["resultFormat"]: + if has_all or "Size" in result_format: title += f"💾 {bytes_to_size(data['size'])} " - if "All" in config["resultFormat"] or "Tracker" in config["resultFormat"]: + if has_all or "Tracker" in result_format: title += f"🔎 {data['tracker'] if 'tracker' in data else '?'}" - if "All" in config["resultFormat"] or "Languages" in config["resultFormat"]: + if has_all or "Languages" in result_format: languages = data["languages"] if data["dubbed"]: languages.insert(0, "multi") - formatted_languages = ( - "/".join(get_language_emoji(language) for language in languages) - if languages - else None - ) - languages_str = "\n" + formatted_languages if formatted_languages else "" - title += f"{languages_str}" + if languages: + formatted_languages = "/".join( + get_language_emoji(language) for language in languages + ) + languages_str = "\n" + formatted_languages + title += f"{languages_str}" if title == "": # Without this, Streamio shows SD as the result, which is confusing @@ -688,7 +691,7 @@ async def get_aliases(session: aiohttp.ClientSession, media_type: str, media_id: for aliase in await response.json(): country = aliase["country"] - if not country in aliases: + if country not in aliases: aliases[country] = [] aliases[country].append(aliase["title"]) @@ -696,3 +699,32 @@ async def get_aliases(session: aiohttp.ClientSession, media_type: str, media_id: pass return aliases + + +async def add_torrent_to_cache( + config: dict, name: str, season: int, episode: int, sorted_ranked_files: dict +): + values = [ + { + "debridService": config["debridService"], + "info_hash": sorted_ranked_files[torrent]["infohash"], + "name": name, + "season": season, + "episode": episode, + "tracker": sorted_ranked_files[torrent]["data"]["tracker"] + .split("|")[0] + .lower(), + "data": orjson.dumps(sorted_ranked_files[torrent]).decode("utf-8"), + "timestamp": time.time(), + } + for torrent in sorted_ranked_files + ] + + query = f""" + INSERT {'OR IGNORE ' if settings.DATABASE_TYPE == 'sqlite' else ''} + INTO cache (debridService, info_hash, name, season, episode, tracker, data, timestamp) + VALUES (:debridService, :info_hash, :name, :season, :episode, :tracker, :data, :timestamp) + {' ON CONFLICT DO NOTHING' if settings.DATABASE_TYPE == 'postgresql' else ''} + """ + + await database.execute_many(query, values) diff --git a/comet/utils/models.py b/comet/utils/models.py index 0d5959c..62cef9e 100644 --- a/comet/utils/models.py +++ b/comet/utils/models.py @@ -27,7 +27,7 @@ class AppSettings(BaseSettings): INDEXER_MANAGER_URL: Optional[str] = "http://127.0.0.1:9117" INDEXER_MANAGER_API_KEY: Optional[str] = None INDEXER_MANAGER_TIMEOUT: Optional[int] = 30 - INDEXER_MANAGER_INDEXERS: List[str] = ["EXAMPLE1_CHANGETHIS", "EXAMPLE2_CHANGETHIS"] + INDEXER_MANAGER_INDEXERS: List[str] = [] GET_TORRENT_TIMEOUT: Optional[int] = 5 ZILEAN_URL: Optional[str] = None ZILEAN_TAKE_FIRST: Optional[int] = 500