Skip to content

Commit

Permalink
fix(jxxghp#2755): refactor pagination and fix media sync DB issue
Browse files Browse the repository at this point in the history
  • Loading branch information
InfinityPacer committed Sep 27, 2024
1 parent 48122d8 commit 1add203
Show file tree
Hide file tree
Showing 8 changed files with 135 additions and 76 deletions.
56 changes: 42 additions & 14 deletions app/chain/mediaserver.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import json
import threading
from typing import List, Union, Optional
from typing import List, Union, Optional, Generator

from app import schemas
from app.chain import ChainBase
Expand All @@ -26,19 +26,47 @@ def librarys(self, server: str, username: str = None, hidden: bool = False) -> L
"""
return self.run_module("mediaserver_librarys", server=server, username=username, hidden=hidden)

def items(self, server: str, library_id: Union[str, int], start_index: int = 0, limit: int = 100) \
-> List[schemas.MediaServerItem]:
def items(self, server: str, library_id: Union[str, int], start_index: int = 0, limit: Optional[int] = -1) \
-> Optional[Generator]:
"""
获取媒体服务器所有项目
"""
data = []
data_generator = self.run_module("mediaserver_items", server=server, library_id=library_id,
start_index=start_index, limit=limit)
if data_generator:
for item in data_generator:
if item:
data.append(item)
return data
获取媒体服务器项目列表,支持分页和不分页逻辑,默认不分页获取所有数据
:param server: 媒体服务器名称
:param library_id: 媒体库ID,用于标识要获取的媒体库
:param start_index: 起始索引,用于分页获取数据。默认为 0,即从第一个项目开始获取
:param limit: 每次请求的最大项目数,用于分页。如果为 None 或 -1,则表示一次性获取所有数据,默认为 -1
:return: 返回一个生成器对象,用于逐步获取媒体服务器中的项目
说明:
- 特别注意的是,这里使用yield from返回迭代器,避免同时使用return与yield导致Python生成器解析异常
- 如果 `limit` 为 None 或 -1 时,表示一次性获取所有数据,分页处理将不再生效
- 在这种情况下,内存消耗可能会较大,特别是在数据量非常大的场景下
- 如果未来评估结果显示,不分页场景下的内存消耗远大于分页处理时的网络请求开销,可以考虑在此方法中实现自分页的处理
- 即通过 `while` 循环在上层进行分页控制,逐步获取所有数据,避免内存爆炸,当前该逻辑由具体实例来实现不分页的处理
- Plex 实际上已默认支持内部分页处理,Jellyfin 与 Emby 获取数据时存在内部过滤场景,如排除合集等,分页数据可能是错误的
if limit is not None and limit != -1:
yield from self.run_module("mediaserver_items", server=server, library_id=library_id,
start_index=start_index, limit=limit)
else:
# 自分页逻辑,通过循环逐步获取所有数据
page_size = 10
while True:
data_generator = self.run_module("mediaserver_items", server=server, library_id=library_id,
start_index=start_index, limit=page_size)
if not data_generator:
break
count = 0
for item in data_generator:
if item:
count += 1
yield item
if count < page_size:
break
start_index += page_size
"""
yield from self.run_module("mediaserver_items", server=server, library_id=library_id,
start_index=start_index, limit=limit)

def iteminfo(self, server: str, item_id: Union[str, int]) -> schemas.MediaServerItem:
"""
Expand Down Expand Up @@ -107,7 +135,7 @@ def sync(self):
continue
logger.info(f"正在同步 {server_name} 媒体库 {library.name} ...")
library_count = 0
for item in self.items(server_name, library.id):
for item in self.items(server=server_name, library_id=library.id):
if not item or not item.item_id:
continue
logger.debug(f"正在同步 {item.title} ...")
Expand Down
15 changes: 11 additions & 4 deletions app/modules/emby/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -173,14 +173,21 @@ def mediaserver_librarys(self, server: str,
return server.get_librarys(username=username, hidden=hidden)
return None

def mediaserver_items(self, server: str, library_id: str, start_index: int = 0, limit: int = 100) \
-> Optional[Generator]:
def mediaserver_items(self, server: str, library_id: Union[str, int], start_index: int = 0,
limit: Optional[int] = -1) -> Optional[Generator]:
"""
媒体库项目列表
获取媒体服务器项目列表,支持分页和不分页逻辑,默认不分页获取所有数据
:param server: 媒体服务器名称
:param library_id: 媒体库ID,用于标识要获取的媒体库
:param start_index: 起始索引,用于分页获取数据。默认为 0,即从第一个项目开始获取
:param limit: 每次请求的最大项目数,用于分页。如果为 None 或 -1,则表示一次性获取所有数据,默认为 -1
:return: 返回一个生成器对象,用于逐步获取媒体服务器中的项目
"""
server: Emby = self.get_instance(server)
if server:
return server.get_items(library_id, start_index, limit)
yield from server.get_items(library_id, start_index, limit)
return None

def mediaserver_iteminfo(self, server: str, item_id: str) -> Optional[schemas.MediaServerItem]:
Expand Down
40 changes: 22 additions & 18 deletions app/modules/emby/emby.py
Original file line number Diff line number Diff line change
Expand Up @@ -313,7 +313,7 @@ def __get_emby_series_id_by_name(self, name: str, year: str) -> Optional[str]:
if not self._host or not self._apikey:
return None
url = f"{self._host}emby/Items"
params={
params = {
"IncludeItemTypes": "Series",
"Fields": "ProductionYear",
"StartIndex": 0,
Expand Down Expand Up @@ -601,7 +601,8 @@ def __get_emby_library_id_by_item(self, item: schemas.RefreshMediaItem) -> Optio
# 刷新根目录
return "/"

def __format_item_info(self, item) -> Optional[schemas.MediaServerItem]:
@staticmethod
def __format_item_info(item) -> Optional[schemas.MediaServerItem]:
"""
格式化item
"""
Expand All @@ -610,7 +611,8 @@ def __format_item_info(self, item) -> Optional[schemas.MediaServerItem]:
if not user_data:
user_state = None
else:
resume = item.get("UserData", {}).get("PlaybackPositionTicks") and item.get("UserData", {}).get("PlaybackPositionTicks") > 0
resume = item.get("UserData", {}).get("PlaybackPositionTicks") and item.get("UserData", {}).get(
"PlaybackPositionTicks") > 0
last_played_date = item.get("UserData", {}).get("LastPlayedDate")
if last_played_date is not None and "." in last_played_date:
last_played_date = last_played_date.split(".")[0]
Expand All @@ -624,7 +626,6 @@ def __format_item_info(self, item) -> Optional[schemas.MediaServerItem]:
)
tmdbid = item.get("ProviderIds", {}).get("Tmdb")
return schemas.MediaServerItem(
id=item.get("Id"),
server="emby",
library=item.get("ParentId"),
item_id=item.get("Id"),
Expand Down Expand Up @@ -664,26 +665,30 @@ def get_iteminfo(self, itemid: str) -> Optional[schemas.MediaServerItem]:
logger.error(f"连接/Users/{self.user}/Items/{itemid}出错:" + str(e))
return None

def get_items(self, parent: str, start_index: int = 0, limit: int = 100) -> Generator:
def get_items(self, parent: Union[str, int], start_index: int = 0, limit: Optional[int] = -1) \
-> Optional[Generator]:
"""
获取媒体服务器所有媒体库列表
:param parent: 父媒体库ID
:param start_index: 开始索引,用于分页
:param limit: 每次请求返回的项目数量
:return: 生成器 schemas.MediaServerItem
获取媒体服务器项目列表,支持分页和不分页逻辑,默认不分页获取所有数据
:param parent: 媒体库ID,用于标识要获取的媒体库
:param start_index: 起始索引,用于分页获取数据。默认为 0,即从第一个项目开始获取
:param limit: 每次请求的最大项目数,用于分页。如果为 None 或 -1,则表示一次性获取所有数据,默认为 -1
:return: 返回一个生成器对象,用于逐步获取媒体服务器中的项目
"""
if not parent:
yield None
if not self._host or not self._apikey:
yield None
if not parent or not self._host or not self._apikey:
return None
url = f"{self._host}emby/Users/{self.user}/Items"
params = {
"ParentId": parent,
"api_key": self._apikey,
"Fields": "ProviderIds,OriginalTitle,ProductionYear,Path,UserDataPlayCount,UserDataLastPlayedDate,ParentId",
"StartIndex": start_index,
"Limit": limit
"Fields": "ProviderIds,OriginalTitle,ProductionYear,Path,UserDataPlayCount,UserDataLastPlayedDate,ParentId"
}
if limit is not None and limit != -1:
params.update({
"StartIndex": start_index,
"Limit": limit
})
try:
res = RequestUtils().get_res(url, params)
if not res or res.status_code != 200:
Expand All @@ -700,7 +705,6 @@ def get_items(self, parent: str, start_index: int = 0, limit: int = 100) -> Gene

except Exception as e:
logger.error(f"连接Users/Items出错:" + str(e))
yield None

def get_webhook_message(self, form: any, args: dict) -> Optional[schemas.WebhookEventInfo]:
"""
Expand Down
15 changes: 11 additions & 4 deletions app/modules/jellyfin/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -171,14 +171,21 @@ def mediaserver_librarys(self, server: str = None,
return server.get_librarys(username=username, hidden=hidden)
return None

def mediaserver_items(self, server: str, library_id: str, start_index: int = 0, limit: int = 100) \
-> Optional[Generator]:
def mediaserver_items(self, server: str, library_id: Union[str, int], start_index: int = 0,
limit: Optional[int] = -1) -> Optional[Generator]:
"""
媒体库项目列表
获取媒体服务器项目列表,支持分页和不分页逻辑,默认不分页获取所有数据
:param server: 媒体服务器名称
:param library_id: 媒体库ID,用于标识要获取的媒体库
:param start_index: 起始索引,用于分页获取数据。默认为 0,即从第一个项目开始获取
:param limit: 每次请求的最大项目数,用于分页。如果为 None 或 -1,则表示一次性获取所有数据,默认为 -1
:return: 返回一个生成器对象,用于逐步获取媒体服务器中的项目
"""
server: Jellyfin = self.get_instance(server)
if server:
return server.get_items(library_id, start_index, limit)
yield from server.get_items(library_id, start_index, limit)
return None

def mediaserver_iteminfo(self, server: str, item_id: str) -> Optional[schemas.MediaServerItem]:
Expand Down
39 changes: 21 additions & 18 deletions app/modules/jellyfin/jellyfin.py
Original file line number Diff line number Diff line change
Expand Up @@ -662,8 +662,8 @@ def get_webhook_message(self, body: any) -> Optional[schemas.WebhookEventInfo]:

return eventItem


def __format_item_info(self, item) -> Optional[schemas.MediaServerItem]:
@staticmethod
def __format_item_info(item) -> Optional[schemas.MediaServerItem]:
"""
格式化item
"""
Expand All @@ -672,7 +672,8 @@ def __format_item_info(self, item) -> Optional[schemas.MediaServerItem]:
if not user_data:
user_state = None
else:
resume = item.get("UserData", {}).get("PlaybackPositionTicks") and item.get("UserData", {}).get("PlaybackPositionTicks") > 0
resume = item.get("UserData", {}).get("PlaybackPositionTicks") and item.get("UserData", {}).get(
"PlaybackPositionTicks") > 0
last_played_date = item.get("UserData", {}).get("LastPlayedDate")
if last_played_date is not None and "." in last_played_date:
last_played_date = last_played_date.split(".")[0]
Expand All @@ -687,7 +688,6 @@ def __format_item_info(self, item) -> Optional[schemas.MediaServerItem]:
tmdbid = item.get("ProviderIds", {}).get("Tmdb")
return schemas.MediaServerItem(
server="jellyfin",
id=item.get("Id"),
library=item.get("ParentId"),
item_id=item.get("Id"),
item_type=item.get("Type"),
Expand Down Expand Up @@ -725,26 +725,30 @@ def get_iteminfo(self, itemid: str) -> Optional[schemas.MediaServerItem]:
logger.error(f"连接Users/{self.user}/Items/{itemid}:" + str(e))
return None

def get_items(self, parent: str, start_index: int = 0, limit: int = 100) -> Generator:
def get_items(self, parent: Union[str, int], start_index: int = 0, limit: Optional[int] = -1) \
-> Optional[Generator]:
"""
获取媒体服务器所有媒体库列表
:param parent: 父媒体库ID
:param start_index: 开始索引,用于分页
:param limit: 每次请求返回的项目数量
:return: 生成器 schemas.MediaServerItem
获取媒体服务器项目列表,支持分页和不分页逻辑,默认不分页获取所有数据
:param parent: 媒体库ID,用于标识要获取的媒体库
:param start_index: 起始索引,用于分页获取数据。默认为 0,即从第一个项目开始获取
:param limit: 每次请求的最大项目数,用于分页。如果为 None 或 -1,则表示一次性获取所有数据,默认为 -1
:return: 返回一个生成器对象,用于逐步获取媒体服务器中的项目
"""
if not parent:
yield None
if not self._host or not self._apikey:
yield None
if not parent or not self._host or not self._apikey:
return None
url = f"{self._host}Users/{self.user}/Items"
params = {
"parentId": parent,
"ParentId": parent,
"api_key": self._apikey,
"Fields": "ProviderIds,OriginalTitle,ProductionYear,Path,UserDataPlayCount,UserDataLastPlayedDate,ParentId",
"StartIndex": start_index,
"Limit": limit,
}
if limit is not None and limit != -1:
params.update({
"StartIndex": start_index,
"Limit": limit
})
try:
res = RequestUtils().get_res(url, params)
if not res or res.status_code != 200:
Expand All @@ -760,7 +764,6 @@ def get_items(self, parent: str, start_index: int = 0, limit: int = 100) -> Gene
yield self.__format_item_info(item)
except Exception as e:
logger.error(f"连接Users/Items出错:" + str(e))
yield None

def get_data(self, url: str) -> Optional[Response]:
"""
Expand Down
15 changes: 11 additions & 4 deletions app/modules/plex/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -159,14 +159,21 @@ def mediaserver_librarys(self, server: str = None, hidden: bool = False,
return server.get_librarys(hidden)
return None

def mediaserver_items(self, server: str, library_id: str, start_index: int = 0, limit: int = 100) \
-> Optional[Generator]:
def mediaserver_items(self, server: str, library_id: Union[str, int], start_index: int = 0,
limit: Optional[int] = -1) -> Optional[Generator]:
"""
媒体库项目列表
获取媒体服务器项目列表,支持分页和不分页逻辑,默认不分页获取所有数据
:param server: 媒体服务器名称
:param library_id: 媒体库ID,用于标识要获取的媒体库
:param start_index: 起始索引,用于分页获取数据。默认为 0,即从第一个项目开始获取
:param limit: 每次请求的最大项目数,用于分页。如果为 None 或 -1,则表示一次性获取所有数据,默认为 -1
:return: 返回一个生成器对象,用于逐步获取媒体服务器中的项目
"""
server: Plex = self.get_instance(server)
if server:
return server.get_items(library_id, start_index, limit)
yield from server.get_items(library_id, start_index, limit)
return None

def mediaserver_iteminfo(self, server: str, item_id: str) -> Optional[schemas.MediaServerItem]:
Expand Down
29 changes: 16 additions & 13 deletions app/modules/plex/plex.py
Original file line number Diff line number Diff line change
Expand Up @@ -465,7 +465,6 @@ def __build_media_server_item(self, item) -> Optional[schemas.MediaServerItem]:
)

return schemas.MediaServerItem(
id=item.ratingKey,
server="plex",
library=item.librarySectionID,
item_id=item.key,
Expand All @@ -480,22 +479,27 @@ def __build_media_server_item(self, item) -> Optional[schemas.MediaServerItem]:
user_state=user_state,
)

def get_items(self, parent: str, start_index: int = 0, limit: int = 100) -> Generator:
def get_items(self, parent: Union[str, int], start_index: int = 0, limit: Optional[int] = -1) \
-> Optional[Generator]:
"""
获取媒体服务器所有媒体库列表
:param parent: 父媒体库ID
:param start_index: 开始索引,用于分页
:param limit: 每次请求返回的项目数量
:return: 生成器 schemas.MediaServerItem
获取媒体服务器项目列表,支持分页和不分页逻辑,默认不分页获取所有数据
:param parent: 媒体库ID,用于标识要获取的媒体库
:param start_index: 起始索引,用于分页获取数据。默认为 0,即从第一个项目开始获取
:param limit: 每次请求的最大项目数,用于分页。如果为 None 或 -1,则表示一次性获取所有数据,默认为 -1
:return: 返回一个生成器对象,用于逐步获取媒体服务器中的项目
"""
if not parent:
yield None
if not self._plex:
yield None
if not parent or not self._plex:
return None
try:
section = self._plex.library.sectionByID(int(parent))
if section:
for item in section.all(container_start=start_index, limit=limit):
if limit is None or limit == -1:
items = section.all()
else:
items = section.all(container_start=start_index, container_size=limit, maxresults=limit)
for item in items:
try:
if not item:
continue
Expand All @@ -505,7 +509,6 @@ def get_items(self, parent: str, start_index: int = 0, limit: int = 100) -> Gene
continue
except Exception as err:
logger.error(f"获取媒体库列表出错:{str(err)}")
yield None

def get_webhook_message(self, form: any) -> Optional[schemas.WebhookEventInfo]:
"""
Expand Down
2 changes: 1 addition & 1 deletion app/schemas/mediaserver.py
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,6 @@ class MediaServerLibrary(BaseModel):
link: Optional[str] = None



class MediaServerItemUserState(BaseModel):
# 已播放
played: Optional[bool] = None
Expand All @@ -85,6 +84,7 @@ class MediaServerItemUserState(BaseModel):
# 播放进度
percentage: Optional[float] = None


class MediaServerItem(BaseModel):
"""
媒体服务器媒体信息
Expand Down

0 comments on commit 1add203

Please sign in to comment.