From 943ceee759b35b2ddea35025ea91d5458f8904e1 Mon Sep 17 00:00:00 2001 From: Joshua Peek Date: Thu, 1 Aug 2024 21:20:26 +0000 Subject: [PATCH] Add history sync Fixes #4 --- imdb_trakt_sync.py | 183 +++++++++++++++++++++++++++++++++++++++++---- 1 file changed, 169 insertions(+), 14 deletions(-) diff --git a/imdb_trakt_sync.py b/imdb_trakt_sync.py index 3085429..2718446 100644 --- a/imdb_trakt_sync.py +++ b/imdb_trakt_sync.py @@ -168,6 +168,46 @@ def sync_ratings(session: requests.Session, imdb_ratings_url: str) -> None: trakt_add_ratings(session=session, movies=add_movies, shows=add_shows) +@main.command() +@click.option( + "--imdb-ratings-url", + required=True, + envvar="IMDB_RATINGS_URL", +) +@click.pass_obj +def sync_history(session: requests.Session, imdb_ratings_url: str) -> None: + existing_movie_imdb_ids: set[str] = set() + existing_episodes_imdb_ids: set[str] = set() + + imdb_id_rated_at: dict[str, date] = {} + imdb_movie_ids: set[str] = set() + imdb_episode_ids: set[str] = set() + + for imdb_item in fetch_imdb_ratings(imdb_ratings_url): + imdb_id_rated_at[imdb_item.imdb_id] = imdb_item.rated_on + if imdb_item.trakt_type == "movie": + imdb_movie_ids.add(imdb_item.imdb_id) + elif imdb_item.trakt_type == "episode": + imdb_episode_ids.add(imdb_item.imdb_id) + + for trakt_item in trakt_history(session): + if trakt_item["type"] == "movie": + existing_movie_imdb_ids.add(trakt_item["movie"]["ids"]["imdb"]) + elif trakt_item["type"] == "episode": + existing_episodes_imdb_ids.add(trakt_item["episode"]["ids"]["imdb"]) + + add_movies: list[TraktWatchedItem] = [ + {"watched_at": imdb_id_rated_at[imdb].isoformat(), "ids": {"imdb": imdb}} + for imdb in imdb_movie_ids - existing_movie_imdb_ids + ] + add_episodes: list[TraktWatchedItem] = [ + {"watched_at": imdb_id_rated_at[imdb].isoformat(), "ids": {"imdb": imdb}} + for imdb in imdb_episode_ids - existing_episodes_imdb_ids + ] + + trakt_add_history(session, movies=add_movies, episodes=add_episodes) + + @dataclass class IMDBWatchlistItem: imdb_id: str @@ -266,6 +306,20 @@ class TraktRatedItem(TypedDict): ids: TraktIMDBIDs +class TraktHistoryItem(TypedDict): + id: int + watched_at: str + action: Literal["scrobble", "checkin", "watch"] + type: Literal["movie", "episode"] + movie: TraktAnyItem + episode: TraktAnyItem + + +class TraktWatchedItem(TypedDict): + watched_at: str + ids: TraktIMDBIDs + + class TraktRatingItem(TypedDict): rated_at: str rating: int @@ -276,6 +330,14 @@ class TraktRatingItem(TypedDict): episode: TraktAnyItem +class TraktTypedContainer(TypedDict): + type: Literal["movie", "show", "season", "episode"] + movie: TraktAnyItem + show: TraktAnyItem + season: TraktAnyItem + episode: TraktAnyItem + + _TRAKT_API_HEADERS = { "Content-Type": "application/json", "trakt-api-key": "", @@ -286,6 +348,7 @@ class TraktRatingItem(TypedDict): _TRAKT_UPDATE_WATCHLIST_URL = "https://api.trakt.tv/sync/watchlist" _TRAKT_REMOVE_FROM_WATCHLIST_URL = "https://api.trakt.tv/sync/watchlist/remove" _TRAKT_RATINGS_URL = "https://api.trakt.tv/sync/ratings" +_TRAKT_HISTORY_URL = "https://api.trakt.tv/sync/history" _TRAKT_ADD_RATINGS_URL = "https://api.trakt.tv/sync/ratings" _TRAKT_REMOVE_RATINGS_URL = "https://api.trakt.tv/sync/ratings/remove" @@ -322,10 +385,57 @@ def trakt_request( return response -def trakt_watchlist(session: requests.Session) -> list[TraktWatchlistItem]: - response = trakt_request(session, method="GET", url=_TRAKT_WATCHLIST_URL) - data: list[TraktWatchlistItem] = response.json() - return data +def trakt_request_paginated( + session: requests.Session, + method: Literal["GET"], + url: str, + limit: int, +) -> Iterator[Any]: + page = 1 + + while True: + response = trakt_request( + session, + method=method, + url=url, + params={ + "page": str(page), + "limit": str(limit), + }, + ) + + yield from response.json() + + pagination = _trakt_pagination(response) + if pagination.page >= pagination.page_count: + break + page += 1 + + +@dataclass +class TraktPagination: + page: int + limit: int + page_count: int + item_count: int + + +def _trakt_pagination(response: requests.Response) -> TraktPagination: + return TraktPagination( + page=int(response.headers["X-Pagination-Page"]), + limit=int(response.headers["X-Pagination-Limit"]), + page_count=int(response.headers["X-Pagination-Page-Count"]), + item_count=int(response.headers["X-Pagination-Item-Count"]), + ) + + +def trakt_watchlist(session: requests.Session) -> Iterator[TraktWatchlistItem]: + yield from trakt_request_paginated( + session, + method="GET", + url=_TRAKT_WATCHLIST_URL, + limit=1000, + ) def trakt_update_watchlist( @@ -409,14 +519,13 @@ def trakt_remove_from_watchlist( def trakt_ratings( session: requests.Session, media_type: Literal["movies", "shows", "seasons", "episodes", "all"] = "all", -) -> list[TraktRatingItem]: - response = trakt_request( +) -> Iterator[TraktRatingItem]: + yield from trakt_request_paginated( session, method="GET", url=f"{_TRAKT_RATINGS_URL}/{media_type}", + limit=1000, ) - data: list[TraktRatingItem] = response.json() - return data def trakt_add_ratings( @@ -494,12 +603,58 @@ def trakt_remove_ratings( ) -class TraktTypedContainer(TypedDict): - type: Literal["movie", "show", "season", "episode"] - movie: TraktAnyItem - show: TraktAnyItem - season: TraktAnyItem - episode: TraktAnyItem +def trakt_history( + session: requests.Session, + media_type: Literal["movies", "shows", "seasons", "episodes"] | None = None, +) -> Iterator[TraktHistoryItem]: + url = _TRAKT_HISTORY_URL + if media_type: + url += f"/{media_type}" + + yield from trakt_request_paginated( + session, + method="GET", + url=url, + limit=1000, + ) + + +def trakt_add_history( + session: requests.Session, + movies: list[TraktWatchedItem] = [], + shows: list[TraktWatchedItem] = [], + seasons: list[TraktWatchedItem] = [], + episodes: list[TraktWatchedItem] = [], +) -> None: + if not movies and not shows and not seasons and not episodes: + logger.debug("No items to add") + return + + data = { + "movies": movies, + "shows": shows, + "seasons": seasons, + "episodes": episodes, + } + response = trakt_request( + session, + method="POST", + url=_TRAKT_HISTORY_URL, + json=data, + ) + result = response.json() + + for media_type in ["movies", "episodes"]: + added: int = result["added"][media_type] + not_found: list[TraktAnyItem] = result["not_found"][media_type] + if added > 0: + logger.info("Added %d %s to ratings", added, media_type) + if not_found: + for item in not_found: + logger.warning( + "https://www.imdb.com/title/%s/ not found on Trakt", + item["ids"]["imdb"], + ) def _trakt_mediaitem_imdb_id(item: TraktTypedContainer) -> str | None: