Skip to content

Commit

Permalink
feat: new db structure
Browse files Browse the repository at this point in the history
  • Loading branch information
g0ldyy committed Nov 21, 2024
1 parent f6fb223 commit 8c3a81a
Show file tree
Hide file tree
Showing 5 changed files with 134 additions and 67 deletions.
108 changes: 59 additions & 49 deletions comet/api/stream.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,11 @@
import asyncio
import hashlib
import time
import aiohttp
import httpx
import uuid
import orjson

from fastapi import APIRouter, Request
from fastapi import APIRouter, Request, BackgroundTasks
from fastapi.responses import (
RedirectResponse,
StreamingResponse,
Expand All @@ -31,6 +30,7 @@
format_title,
get_client_ip,
get_aliases,
add_torrent_to_cache,
)
from comet.utils.logger import logger
from comet.utils.models import database, rtn, settings, trackers
Expand All @@ -52,7 +52,13 @@ async def stream_noconfig(request: Request, type: str, id: str):


@streams.get("/{b64config}/stream/{type}/{id}.json")
async def stream(request: Request, b64config: str, type: str, id: str):
async def stream(
request: Request,
b64config: str,
type: str,
id: str,
background_tasks: BackgroundTasks,
):
config = config_check(b64config)
if not config:
return {
Expand All @@ -66,7 +72,9 @@ async def stream(request: Request, b64config: str, type: str, id: str):
}

connector = aiohttp.TCPConnector(limit=0)
async with aiohttp.ClientSession(connector=connector, raise_for_status=True) as session:
async with aiohttp.ClientSession(
connector=connector, raise_for_status=True
) as session:
full_id = id
season = None
episode = None
Expand Down Expand Up @@ -160,44 +168,51 @@ async def stream(request: Request, b64config: str, type: str, id: str):
}
)

indexers = config["indexers"].copy()
if settings.SCRAPE_TORRENTIO:
indexers.append("torrentio")
if settings.SCRAPE_MEDIAFUSION:
indexers.append("mediafusion")
if settings.ZILEAN_URL:
indexers.append("dmm")
indexers_json = orjson.dumps(indexers).decode("utf-8")

all_sorted_ranked_files = {}
for debrid_service in services:
cache_key = hashlib.md5(
orjson.dumps(
{
"debridService": debrid_service,
"name": name,
"season": season,
"episode": episode,
"indexers": config["indexers"],
}
)
).hexdigest()
trackers_found = (
set()
) # we want to check that we have a cache for each of the user's trackers
the_time = time.time()
cache_ttl = settings.CACHE_TTL

cached = await database.fetch_one(
f"SELECT EXISTS (SELECT 1 FROM cache WHERE cacheKey = '{cache_key}')"
for debrid_service in services:
cached_results = await database.fetch_all(
"""
SELECT info_hash, tracker, data
FROM cache
WHERE debridService = :debrid_service
AND name = :name
AND ((:season IS NULL AND season IS NULL) OR season = :season)
AND ((:episode IS NULL AND episode IS NULL) OR episode = :episode)
AND tracker IN (SELECT value FROM json_each(:indexers))
AND timestamp + :cache_ttl >= :current_time
""",
{
"debrid_service": debrid_service,
"name": name,
"season": season,
"episode": episode,
"indexers": indexers_json,
"cache_ttl": cache_ttl,
"current_time": the_time,
},
)

if cached[0] != 0:
timestamp = await database.fetch_one(
f"SELECT timestamp FROM cache WHERE cacheKey = '{cache_key}'"
for result in cached_results:
trackers_found.add(result["tracker"].lower())
all_sorted_ranked_files[result["info_hash"]] = orjson.loads(
result["data"]
)
if timestamp[0] + settings.CACHE_TTL < time.time():
await database.execute(
f"DELETE FROM cache WHERE cacheKey = '{cache_key}'"
)
logger.info(f"Cache expired for {log_name} using {debrid_service}")
continue

sorted_ranked_files = await database.fetch_one(
f"SELECT results FROM cache WHERE cacheKey = '{cache_key}'"
)
sorted_ranked_files = orjson.loads(sorted_ranked_files[0])

all_sorted_ranked_files.update(sorted_ranked_files)

cached_count = len(all_sorted_ranked_files)
if cached_count != 0:
if len(all_sorted_ranked_files) != 0 and set(indexers).issubset(trackers_found):
debrid_extension = get_debrid_extension(
debrid_service, config["debridApiKey"]
)
Expand All @@ -206,7 +221,6 @@ async def stream(request: Request, b64config: str, type: str, id: str):
for resolution in balanced_hashes:
for hash in balanced_hashes[resolution]:
data = all_sorted_ranked_files[hash]["data"]

the_stream = {
"name": f"[{debrid_extension}{debrid_emoji}] Comet {data['resolution']}",
"description": format_title(data, config),
Expand All @@ -228,17 +242,17 @@ async def stream(request: Request, b64config: str, type: str, id: str):
)
else:
the_stream["infoHash"] = hash

index = data["index"]
the_stream["fileIdx"] = (
1 if "|" in index else int(index)
) # 1 because for Premiumize it's impossible to get the file index

the_stream["sources"] = trackers

results.append(the_stream)

logger.info(f"{cached_count} cached results found for {log_name}")
logger.info(
f"{len(all_sorted_ranked_files)} cached results found for {log_name}"
)

return {"streams": results}

Expand Down Expand Up @@ -316,7 +330,7 @@ async def stream(request: Request, b64config: str, type: str, id: str):
torrents.append(result)

logger.info(
f"{len(torrents)} torrents found for {log_name}"
f"{len(torrents)} unique torrents found for {log_name}"
+ (
" with "
+ ", ".join(
Expand Down Expand Up @@ -419,9 +433,6 @@ async def stream(request: Request, b64config: str, type: str, id: str):
)

ranked_files.add(ranked_file)
# except Exception as e:
# logger.info(f"Filtered out: {e}")
# pass
except:
pass

Expand Down Expand Up @@ -465,11 +476,10 @@ async def stream(request: Request, b64config: str, type: str, id: str):
)
sorted_ranked_files[hash]["data"]["index"] = files[hash]["index"]

json_data = orjson.dumps(sorted_ranked_files).decode("utf-8")
await database.execute(
f"INSERT {'OR IGNORE ' if settings.DATABASE_TYPE == 'sqlite' else ''}INTO cache (cacheKey, results, timestamp) VALUES (:cache_key, :json_data, :timestamp){' ON CONFLICT DO NOTHING' if settings.DATABASE_TYPE == 'postgresql' else ''}",
{"cache_key": cache_key, "json_data": json_data, "timestamp": time.time()},
background_tasks.add_task(
add_torrent_to_cache, config, name, season, episode, sorted_ranked_files
)

logger.info(f"Results have been cached for {log_name}")

debrid_extension = get_debrid_extension(config["debridService"])
Expand Down
2 changes: 1 addition & 1 deletion comet/debrid/realdebrid.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ async def get_instant(self, chunk: list):
async def get_files(
self, torrent_hashes: list, type: str, season: str, episode: str, kitsu: bool
):
chunk_size = 50
chunk_size = 100
chunks = [
torrent_hashes[i : i + chunk_size]
for i in range(0, len(torrent_hashes), chunk_size)
Expand Down
27 changes: 26 additions & 1 deletion comet/utils/db.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import os
import time

from comet.utils.logger import logger
from comet.utils.models import database, settings
Expand All @@ -13,8 +14,23 @@ async def setup_database():
open(settings.DATABASE_PATH, "a").close()

await database.connect()

if settings.DATABASE_TYPE == "postgresql":
check_query = """SELECT column_name
FROM information_schema.columns
WHERE table_name = 'cache'
AND column_name = 'cachekey'"""
else:
check_query = """SELECT name FROM pragma_table_info('cache')
WHERE name = 'cacheKey'"""

old_structure = await database.fetch_one(check_query)

if old_structure:
await database.execute("DROP TABLE IF EXISTS cache")

await database.execute(
"CREATE TABLE IF NOT EXISTS cache (cacheKey TEXT PRIMARY KEY, timestamp INTEGER, results TEXT)"
"CREATE TABLE IF NOT EXISTS cache (debridService TEXT, info_hash TEXT, name TEXT, season INTEGER, episode INTEGER, tracker TEXT, data TEXT, timestamp INTEGER)"
)
await database.execute(
"CREATE TABLE IF NOT EXISTS download_links (debrid_key TEXT, hash TEXT, file_index TEXT, link TEXT, timestamp INTEGER, PRIMARY KEY (debrid_key, hash, file_index))"
Expand All @@ -23,6 +39,15 @@ async def setup_database():
await database.execute(
"CREATE TABLE IF NOT EXISTS active_connections (id TEXT PRIMARY KEY, ip TEXT, content TEXT, timestamp INTEGER)"
)

# clear expired entries
await database.execute(
"""
DELETE FROM cache
WHERE timestamp + :cache_ttl < :current_time
""",
{"cache_ttl": settings.CACHE_TTL, "current_time": time.time()},
)
except Exception as e:
logger.error(f"Error setting up the database: {e}")

Expand Down
62 changes: 47 additions & 15 deletions comet/utils/general.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,13 +6,14 @@
import PTT
import asyncio
import orjson
import time

from RTN import parse, title_match
from curl_cffi import requests
from fastapi import Request

from comet.utils.logger import logger
from comet.utils.models import settings, ConfigModel
from comet.utils.models import database, settings, ConfigModel

languages_emojis = {
"unknown": "❓", # Unknown
Expand Down Expand Up @@ -388,7 +389,7 @@ async def get_torrentio(log_name: str, type: str, full_id: str):
for torrent in get_torrentio["streams"]:
title_full = torrent["title"]
title = title_full.split("\n")[0]
tracker = title_full.split("⚙️ ")[1]
tracker = title_full.split("⚙️ ")[1].split("\n")[0]

results.append(
{
Expand Down Expand Up @@ -637,32 +638,34 @@ def format_metadata(data: dict):


def format_title(data: dict, config: dict):
result_format = config["resultFormat"]
has_all = "All" in result_format

title = ""
if "All" in config["resultFormat"] or "Title" in config["resultFormat"]:
if has_all or "Title" in result_format:
title += f"{data['title']}\n"

if "All" in config["resultFormat"] or "Metadata" in config["resultFormat"]:
if has_all or "Metadata" in result_format:
metadata = format_metadata(data)
if metadata != "":
title += f"💿 {metadata}\n"

if "All" in config["resultFormat"] or "Size" in config["resultFormat"]:
if has_all or "Size" in result_format:
title += f"💾 {bytes_to_size(data['size'])} "

if "All" in config["resultFormat"] or "Tracker" in config["resultFormat"]:
if has_all or "Tracker" in result_format:
title += f"🔎 {data['tracker'] if 'tracker' in data else '?'}"

if "All" in config["resultFormat"] or "Languages" in config["resultFormat"]:
if has_all or "Languages" in result_format:
languages = data["languages"]
if data["dubbed"]:
languages.insert(0, "multi")
formatted_languages = (
"/".join(get_language_emoji(language) for language in languages)
if languages
else None
)
languages_str = "\n" + formatted_languages if formatted_languages else ""
title += f"{languages_str}"
if languages:
formatted_languages = "/".join(
get_language_emoji(language) for language in languages
)
languages_str = "\n" + formatted_languages
title += f"{languages_str}"

if title == "":
# Without this, Streamio shows SD as the result, which is confusing
Expand All @@ -688,11 +691,40 @@ async def get_aliases(session: aiohttp.ClientSession, media_type: str, media_id:

for aliase in await response.json():
country = aliase["country"]
if not country in aliases:
if country not in aliases:
aliases[country] = []

aliases[country].append(aliase["title"])
except:
pass

return aliases


async def add_torrent_to_cache(
config: dict, name: str, season: int, episode: int, sorted_ranked_files: dict
):
values = [
{
"debridService": config["debridService"],
"info_hash": sorted_ranked_files[torrent]["infohash"],
"name": name,
"season": season,
"episode": episode,
"tracker": sorted_ranked_files[torrent]["data"]["tracker"]
.split("|")[0]
.lower(),
"data": orjson.dumps(sorted_ranked_files[torrent]).decode("utf-8"),
"timestamp": time.time(),
}
for torrent in sorted_ranked_files
]

query = f"""
INSERT {'OR IGNORE ' if settings.DATABASE_TYPE == 'sqlite' else ''}
INTO cache (debridService, info_hash, name, season, episode, tracker, data, timestamp)
VALUES (:debridService, :info_hash, :name, :season, :episode, :tracker, :data, :timestamp)
{' ON CONFLICT DO NOTHING' if settings.DATABASE_TYPE == 'postgresql' else ''}
"""

await database.execute_many(query, values)
2 changes: 1 addition & 1 deletion comet/utils/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ class AppSettings(BaseSettings):
INDEXER_MANAGER_URL: Optional[str] = "http://127.0.0.1:9117"
INDEXER_MANAGER_API_KEY: Optional[str] = None
INDEXER_MANAGER_TIMEOUT: Optional[int] = 30
INDEXER_MANAGER_INDEXERS: List[str] = ["EXAMPLE1_CHANGETHIS", "EXAMPLE2_CHANGETHIS"]
INDEXER_MANAGER_INDEXERS: List[str] = []
GET_TORRENT_TIMEOUT: Optional[int] = 5
ZILEAN_URL: Optional[str] = None
ZILEAN_TAKE_FIRST: Optional[int] = 500
Expand Down

0 comments on commit 8c3a81a

Please sign in to comment.