diff --git a/music_assistant/server/helpers/compare.py b/music_assistant/server/helpers/compare.py index 692890600..b2e51a466 100644 --- a/music_assistant/server/helpers/compare.py +++ b/music_assistant/server/helpers/compare.py @@ -3,10 +3,10 @@ from __future__ import annotations import re +from difflib import SequenceMatcher import unidecode -from music_assistant.common.helpers.util import create_sort_name from music_assistant.common.models.enums import ExternalID, MediaType from music_assistant.common.models.media_items import ( Album, @@ -388,18 +388,23 @@ def compare_strings(str1: str, str2: str, strict: bool = True) -> bool: """Compare strings and return True if we have an (almost) perfect match.""" if not str1 or not str2: return False + str1_lower = str1.lower() + str2_lower = str2.lower() + if strict: + return str1_lower == str2_lower # return early if total length mismatch if abs(len(str1) - len(str2)) > 4: return False - if not strict: - # handle '&' vs 'And' - if " & " in str1 and " and " in str2.lower(): - str2 = str2.lower().replace(" and ", " & ") - elif " and " in str1.lower() and " & " in str2: - str2 = str2.replace(" & ", " and ") - return create_safe_string(str1) == create_safe_string(str2) - - return create_sort_name(str1) == create_sort_name(str2) + # handle '&' vs 'And' + if " & " in str1_lower and " and " in str2_lower: + str2 = str2_lower.replace(" and ", " & ") + elif " and " in str1_lower and " & " in str2: + str2 = str2.replace(" & ", " and ") + if create_safe_string(str1) == create_safe_string(str2): + return True + # last resort: use difflib to compare strings + required_accuracy = 0.91 if len(str1) > 8 else 0.85 + return SequenceMatcher(a=str1_lower, b=str2).ratio() > required_accuracy def compare_version(base_version: str, compare_version: str) -> bool: diff --git a/music_assistant/server/providers/filesystem_local/base.py b/music_assistant/server/providers/filesystem_local/base.py index 7b3c2136e..c217664ba 100644 --- a/music_assistant/server/providers/filesystem_local/base.py +++ b/music_assistant/server/providers/filesystem_local/base.py @@ -47,14 +47,13 @@ DB_TABLE_TRACK_ARTISTS, VARIOUS_ARTISTS_NAME, ) -from music_assistant.server.controllers.cache import use_cache from music_assistant.server.controllers.music import DB_SCHEMA_VERSION from music_assistant.server.helpers.compare import compare_strings from music_assistant.server.helpers.playlists import parse_m3u, parse_pls from music_assistant.server.helpers.tags import parse_tags, split_items from music_assistant.server.models.music_provider import MusicProvider -from .helpers import get_parentdir +from .helpers import get_album_dir, get_artist_dir, get_disc_dir if TYPE_CHECKING: from collections.abc import AsyncGenerator @@ -731,16 +730,14 @@ async def _parse_track(self, file_item: FileSystemItem) -> Track: # disc_dir is the folder level where the tracks are located # this may be a separate disc folder (Disc 1, Disc 2 etc) underneath the album folder # or this is an album folder with the disc attached - disc_dir = get_parentdir(file_item.path, f"disc {tags.disc or 1}") - album_dir = get_parentdir(disc_dir or file_item.path, tags.album) + disc_dir = get_disc_dir(file_item.path, tags.album, tags.disc) + album_dir = get_album_dir(file_item.path, tags.album, disc_dir) # album artist(s) album_artists = [] if tags.album_artists: for index, album_artist_str in enumerate(tags.album_artists): - # work out if we have an artist folder - artist_dir = get_parentdir(album_dir, album_artist_str, 1) - artist = await self._parse_artist(album_artist_str, artist_path=artist_dir) + artist = await self._parse_artist(album_artist_str, album_path=album_dir) if not artist.mbid: with contextlib.suppress(IndexError): artist.mbid = tags.musicbrainz_albumartistids[index] @@ -781,8 +778,9 @@ async def _parse_track(self, file_item: FileSystemItem) -> Track: track.album = await self._parse_album( tags.album, - album_dir, - disc_dir, + track_path=file_item.path, + album_path=album_dir, + disc_path=disc_dir, artists=album_artists, barcode=tags.barcode, ) @@ -859,14 +857,20 @@ async def _parse_track(self, file_item: FileSystemItem) -> Track: async def _parse_artist( self, - name: str | None = None, + name: str, artist_path: str | None = None, + album_path: str | None = None, sort_name: str | None = None, - ) -> Artist | None: - """Lookup metadata in Artist folder.""" - assert name or artist_path + ) -> Artist: + """Parse Artist metadata into an Artist object.""" + cache_key = f"{self.instance_id}-artistdata-{name}-{artist_path}" + if cache := await self.mass.cache.get(cache_key): + return cache + if not artist_path and album_path: + # try to find (album)artist folder based on album path + artist_path = get_artist_dir(album_path=album_path, artist_name=name) if not artist_path: - # check if we have an existing item + # check if we have an existing item to retrieve the artist path async for item in self.mass.music.artists.iter_library_items(search=name): if not compare_strings(name, item.name): continue @@ -882,30 +886,25 @@ async def _parse_artist( artist_path = name elif await self.exists(name.title()): artist_path = name.title() - else: - # use fake artist path as item id which is just the name - artist_path = name - - if not name: - name = artist_path.split(os.sep)[-1] artist = Artist( - item_id=artist_path, + item_id=artist_path or name, provider=self.instance_id, name=name, sort_name=sort_name, provider_mappings={ ProviderMapping( - item_id=artist_path, + item_id=artist_path or name, provider_domain=self.domain, provider_instance=self.instance_id, - url=artist_path, + url=artist_path or name, ) }, ) if not await self.exists(artist_path): # return basic object if there is no dedicated artist folder + await self.mass.cache.set(cache_key, artist, expiration=120) return artist nfo_file = os.path.join(artist_path, "artist.nfo") @@ -930,19 +929,23 @@ async def _parse_artist( if images := await self._get_local_images(artist_path): artist.metadata.images = images + await self.mass.cache.set(cache_key, artist, expiration=120) return artist async def _parse_album( self, - name: str | None, + name: str, + track_path: str, album_path: str | None, disc_path: str | None, artists: list[Artist], barcode: str | None = None, sort_name: str | None = None, - ) -> Album | None: - """Lookup metadata in Album folder.""" - assert name or album_path + ) -> Album: + """Parse Album metadata into an Album object.""" + cache_key = f"{self.instance_id}-albumdata-{name}-{album_path}" + if cache := await self.mass.cache.get(cache_key): + return cache # create fake path if needed if not album_path and artists: album_path = artists[0].name + os.sep + name @@ -970,11 +973,9 @@ async def _parse_album( if barcode: album.external_ids.add((ExternalID.BARCODE, barcode)) - if not await self.exists(album_path): - # return basic object if there is no dedicated album folder - return album - - for folder_path in (disc_path, album_path): + # hunt for additional metadata and images in the folder structure + extra_path = os.path.dirname(track_path) if (track_path and not album_path) else None + for folder_path in (disc_path, album_path, extra_path): if not folder_path: continue nfo_file = os.path.join(folder_path, "album.nfo") @@ -1009,9 +1010,9 @@ async def _parse_album( else: album.metadata.images += images + await self.mass.cache.set(cache_key, album, expiration=120) return album - @use_cache(120) async def _get_local_images(self, folder: str) -> list[MediaItemImage]: """Return local images found in a given folderpath.""" images = [] diff --git a/music_assistant/server/providers/filesystem_local/helpers.py b/music_assistant/server/providers/filesystem_local/helpers.py index acc42a8bf..9459f5e1d 100644 --- a/music_assistant/server/providers/filesystem_local/helpers.py +++ b/music_assistant/server/providers/filesystem_local/helpers.py @@ -7,15 +7,41 @@ from music_assistant.server.helpers.compare import compare_strings -def get_parentdir(base_path: str, name: str, skip: int = 0) -> str | None: - """Look for folder name in path (to find dedicated artist or album folder).""" - if not base_path: - return None - parentdir = os.path.dirname(base_path) - for _ in range(skip, 3): +def get_artist_dir(album_path: str, artist_name: str) -> str | None: + """Look for (Album)Artist directory in path of album.""" + parentdir = os.path.dirname(album_path) + dirname = parentdir.rsplit(os.sep)[-1] + if compare_strings(artist_name, dirname, False): + return parentdir + return None + + +def get_disc_dir(track_path: str, album_name: str, disc_number: int | None) -> str | None: + """Look for disc directory in path of album/tracks.""" + parentdir = os.path.dirname(track_path) + dirname = parentdir.rsplit(os.sep)[-1] + dirname_lower = dirname.lower() + if disc_number is not None and compare_strings(f"disc {disc_number}", dirname, False): + return parentdir + if dirname_lower.startswith(album_name.lower()) and "disc" in dirname_lower: + return parentdir + if dirname_lower.startswith(album_name.lower()) and dirname_lower.endswith(str(disc_number)): + return parentdir + return None + + +def get_album_dir(track_path: str, album_name: str, disc_dir: str | None) -> str | None: + """Return album/parent directory of a track.""" + parentdir = os.path.dirname(track_path) + # account for disc sublevel by ignoring 1 level if needed + for _ in range(2 if disc_dir else 1): dirname = parentdir.rsplit(os.sep)[-1] - dirname = dirname.split("(")[0].split("[")[0].strip() - if compare_strings(name, dirname, False): + dirname_lower = dirname.lower() + if compare_strings(album_name, dirname, False): + return parentdir + if album_name in dirname_lower: + return parentdir + if dirname_lower in album_name: return parentdir parentdir = os.path.dirname(parentdir) return None