Skip to content

Commit

Permalink
zilean multithreaded
Browse files Browse the repository at this point in the history
  • Loading branch information
g0ldyy committed Jul 8, 2024
1 parent 5c579c1 commit 4399f1f
Show file tree
Hide file tree
Showing 3 changed files with 67 additions and 57 deletions.
79 changes: 29 additions & 50 deletions comet/api/stream.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
bytes_to_size,
config_check,
get_indexer_manager,
get_zilean,
filter,
get_torrent_hash,
translate,
Expand Down Expand Up @@ -91,9 +92,9 @@ async def stream(request: Request, b64config: str, type: str, id: str):
}

name = translate(name)
logName = name
log_name = name
if type == "series":
logName = f"{name} S0{season}E0{episode}"
log_name = f"{name} S0{season}E0{episode}"

cache_key = hashlib.md5(
json.dumps(
Expand All @@ -110,7 +111,7 @@ async def stream(request: Request, b64config: str, type: str, id: str):
f"SELECT EXISTS (SELECT 1 FROM cache WHERE cacheKey = '{cache_key}')"
)
if cached[0] != 0:
logger.info(f"Cache found for {logName}")
logger.info(f"Cache found for {log_name}")

timestamp = await database.fetch_one(
f"SELECT timestamp FROM cache WHERE cacheKey = '{cache_key}'"
Expand All @@ -120,7 +121,7 @@ async def stream(request: Request, b64config: str, type: str, id: str):
f"DELETE FROM cache WHERE cacheKey = '{cache_key}'"
)

logger.info(f"Cache expired for {logName}")
logger.info(f"Cache expired for {log_name}")
else:
sorted_ranked_files = await database.fetch_one(
f"SELECT results FROM cache WHERE cacheKey = '{cache_key}'"
Expand Down Expand Up @@ -168,16 +169,18 @@ async def stream(request: Request, b64config: str, type: str, id: str):

return {"streams": results}
else:
logger.info(f"No cache found for {logName} with user configuration")
logger.info(f"No cache found for {log_name} with user configuration")

indexer_manager_type = settings.INDEXER_MANAGER_TYPE

logger.info(
f"Start of {indexer_manager_type} search for {logName} with indexers {config['indexers']}"
)
search_indexer = len(config["indexers"]) != 0
if search_indexer:
logger.info(
f"Start of {indexer_manager_type} search for {log_name} with indexers {config['indexers']}"
)

torrents = []
if len(config["indexers"]) != 0:
if search_indexer:
search_terms = [name]
if type == "series":
search_terms.append(f"{name} S0{season}E0{episode}")
Expand All @@ -188,52 +191,28 @@ async def stream(request: Request, b64config: str, type: str, id: str):
for term in search_terms
]
search_response = await asyncio.gather(*tasks)
else:
logger.info(f"No indexer selected by user for {log_name}")

if settings.ZILEAN_URL:
zilean_torrents = await asyncio.create_task(get_zilean(session, indexer_manager_type, name, log_name))

if search_indexer:
for results in search_response:
if results is None:
continue

for result in results:
torrents.append(result)

logger.info(
f"{len(torrents)} torrents found for {logName} with {indexer_manager_type}"
f"{len(torrents)} torrents found for {log_name} with {indexer_manager_type}"
)
else:
logger.info(f"No indexer selected by user for {logName}")

zilean_hashes_count = 0
if settings.ZILEAN_URL:
try:
get_dmm = await session.post(
f"{settings.ZILEAN_URL}/dmm/search", json={"queryText": name}
)
get_dmm = await get_dmm.json()

if isinstance(get_dmm, list):
for result in get_dmm[: settings.ZILEAN_TAKE_FIRST]:
zilean_hashes_count += 1

if indexer_manager_type == "jackett":
object = {
"Title": result["filename"],
"InfoHash": result["infoHash"],
}
elif indexer_manager_type == "prowlarr":
object = {
"title": result["filename"],
"infoHash": result["infoHash"],
}

torrents.append(object)

logger.info(
f"{zilean_hashes_count} torrents found for {logName} with Zilean API"
)
except Exception as e:
logger.warning(
f"Exception while getting torrents for {logName} with Zilean API: {e}"
)
for torrent in zilean_torrents:
torrents.append(torrent)

logger.info(
f"{len(zilean_torrents)} torrents found for {log_name} with Zilean"
)

if len(torrents) == 0:
return {"streams": []}
Expand All @@ -260,7 +239,7 @@ async def stream(request: Request, b64config: str, type: str, id: str):
filtered_torrents = torrents

logger.info(
f"{len(torrents) - len(filtered_torrents)} filtered torrents for {logName}"
f"{len(torrents) - len(filtered_torrents)} filtered torrents for {log_name}"
)

tasks = []
Expand All @@ -270,7 +249,7 @@ async def stream(request: Request, b64config: str, type: str, id: str):
torrent_hashes = await asyncio.gather(*tasks)
torrent_hashes = list(set([hash for hash in torrent_hashes if hash]))

logger.info(f"{len(torrent_hashes)} info hashes found for {logName}")
logger.info(f"{len(torrent_hashes)} info hashes found for {log_name}")

if len(torrent_hashes) == 0:
return {"streams": []}
Expand All @@ -285,7 +264,7 @@ async def stream(request: Request, b64config: str, type: str, id: str):
sorted_ranked_files = sort_torrents(ranked_files)

logger.info(
f"{len(sorted_ranked_files)} cached files found on {config['debridService']} for {logName}"
f"{len(sorted_ranked_files)} cached files found on {config['debridService']} for {log_name}"
)

if len(sorted_ranked_files) == 0:
Expand All @@ -304,7 +283,7 @@ async def stream(request: Request, b64config: str, type: str, id: str):
await database.execute(
f"INSERT OR IGNORE INTO cache (cacheKey, results, timestamp) VALUES ('{cache_key}', '{json_data}', {time.time()})"
)
logger.info(f"Results have been cached for {logName}")
logger.info(f"Results have been cached for {log_name}")

if config["debridService"] == "realdebrid":
debrid_extension = "RD"
Expand Down
2 changes: 1 addition & 1 deletion comet/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -123,7 +123,7 @@ def start_log():
)
logger.log("COMET", f"Indexers: {settings.INDEXER_MANAGER_INDEXERS}")
logger.log("COMET", f"Get Torrent Timeout: {settings.GET_TORRENT_TIMEOUT}s")
logger.log("COMET", f"Zilean API: {settings.ZILEAN_URL}")
logger.log("COMET", f"Zilean: {settings.ZILEAN_URL}|{settings.ZILEAN_TAKE_FIRST}")
logger.log(
"COMET", f"Debrid Stream Proxy Enabled: {bool(settings.PROXY_DEBRID_STREAM)}"
)
Expand Down
43 changes: 37 additions & 6 deletions comet/utils/general.py
Original file line number Diff line number Diff line change
Expand Up @@ -196,17 +196,16 @@ async def get_indexer_manager(
indexers: list,
query: str,
):
results = []
try:
indexers = [indexer.replace("_", " ") for indexer in indexers]

timeout = aiohttp.ClientTimeout(total=settings.INDEXER_MANAGER_TIMEOUT)
results = []

if indexer_manager_type == "jackett":
response = await session.get(
f"{settings.INDEXER_MANAGER_URL}/api/v2.0/indexers/all/results?apikey={settings.INDEXER_MANAGER_API_KEY}&Query={query}&Tracker[]={'&Tracker[]='.join(indexer for indexer in indexers)}",
timeout=timeout,
) # &Category[]=2000&Category[]=5000
)
response = await response.json()

for result in response["Results"]:
Expand All @@ -229,20 +228,52 @@ async def get_indexer_manager(

response = await session.get(
f"{settings.INDEXER_MANAGER_URL}/api/v1/search?query={query}&indexerIds={'&indexerIds='.join(str(indexer_id) for indexer_id in indexers_id)}&type=search",
headers={ # &categories=2000&categories=5000
headers={
"X-Api-Key": settings.INDEXER_MANAGER_API_KEY
},
)
response = await response.json()

for result in response:
results.append(result)

return results
except Exception as e:
logger.warning(
f"Exception while getting {indexer_manager_type} results for {query} with {indexers}: {e}"
)
pass

return results


async def get_zilean(session: aiohttp.ClientSession, indexer_manager_type: str, name: str, log_name: str):
results = []
try:
get_dmm = await session.post(
f"{settings.ZILEAN_URL}/dmm/search", json={"queryText": name}
)
get_dmm = await get_dmm.json()

if isinstance(get_dmm, list):
for result in get_dmm[: settings.ZILEAN_TAKE_FIRST]:
if indexer_manager_type == "jackett":
object = {
"Title": result["filename"],
"InfoHash": result["infoHash"],
}
elif indexer_manager_type == "prowlarr":
object = {
"title": result["filename"],
"infoHash": result["infoHash"],
}

results.append(object)
except Exception as e:
logger.warning(
f"Exception while getting torrents for {log_name} with Zilean: {e}"
)
pass

return results


async def filter(torrents: list, name_lower: str, indexer_manager_type: str):
Expand Down

0 comments on commit 4399f1f

Please sign in to comment.