Skip to content

Commit

Permalink
Better handling of album directory for local files (#1445)
Browse files Browse the repository at this point in the history
  • Loading branch information
marcelveldt authored Jul 4, 2024
1 parent d843fc2 commit 277f185
Show file tree
Hide file tree
Showing 3 changed files with 83 additions and 51 deletions.
25 changes: 15 additions & 10 deletions music_assistant/server/helpers/compare.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,10 @@
from __future__ import annotations

import re
from difflib import SequenceMatcher

import unidecode

from music_assistant.common.helpers.util import create_sort_name
from music_assistant.common.models.enums import ExternalID, MediaType
from music_assistant.common.models.media_items import (
Album,
Expand Down Expand Up @@ -388,18 +388,23 @@ def compare_strings(str1: str, str2: str, strict: bool = True) -> bool:
"""Compare strings and return True if we have an (almost) perfect match."""
if not str1 or not str2:
return False
str1_lower = str1.lower()
str2_lower = str2.lower()
if strict:
return str1_lower == str2_lower
# return early if total length mismatch
if abs(len(str1) - len(str2)) > 4:
return False
if not strict:
# handle '&' vs 'And'
if " & " in str1 and " and " in str2.lower():
str2 = str2.lower().replace(" and ", " & ")
elif " and " in str1.lower() and " & " in str2:
str2 = str2.replace(" & ", " and ")
return create_safe_string(str1) == create_safe_string(str2)

return create_sort_name(str1) == create_sort_name(str2)
# handle '&' vs 'And'
if " & " in str1_lower and " and " in str2_lower:
str2 = str2_lower.replace(" and ", " & ")
elif " and " in str1_lower and " & " in str2:
str2 = str2.replace(" & ", " and ")
if create_safe_string(str1) == create_safe_string(str2):
return True
# last resort: use difflib to compare strings
required_accuracy = 0.91 if len(str1) > 8 else 0.85
return SequenceMatcher(a=str1_lower, b=str2).ratio() > required_accuracy


def compare_version(base_version: str, compare_version: str) -> bool:
Expand Down
67 changes: 34 additions & 33 deletions music_assistant/server/providers/filesystem_local/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,14 +47,13 @@
DB_TABLE_TRACK_ARTISTS,
VARIOUS_ARTISTS_NAME,
)
from music_assistant.server.controllers.cache import use_cache
from music_assistant.server.controllers.music import DB_SCHEMA_VERSION
from music_assistant.server.helpers.compare import compare_strings
from music_assistant.server.helpers.playlists import parse_m3u, parse_pls
from music_assistant.server.helpers.tags import parse_tags, split_items
from music_assistant.server.models.music_provider import MusicProvider

from .helpers import get_parentdir
from .helpers import get_album_dir, get_artist_dir, get_disc_dir

if TYPE_CHECKING:
from collections.abc import AsyncGenerator
Expand Down Expand Up @@ -731,16 +730,14 @@ async def _parse_track(self, file_item: FileSystemItem) -> Track:
# disc_dir is the folder level where the tracks are located
# this may be a separate disc folder (Disc 1, Disc 2 etc) underneath the album folder
# or this is an album folder with the disc attached
disc_dir = get_parentdir(file_item.path, f"disc {tags.disc or 1}")
album_dir = get_parentdir(disc_dir or file_item.path, tags.album)
disc_dir = get_disc_dir(file_item.path, tags.album, tags.disc)
album_dir = get_album_dir(file_item.path, tags.album, disc_dir)

# album artist(s)
album_artists = []
if tags.album_artists:
for index, album_artist_str in enumerate(tags.album_artists):
# work out if we have an artist folder
artist_dir = get_parentdir(album_dir, album_artist_str, 1)
artist = await self._parse_artist(album_artist_str, artist_path=artist_dir)
artist = await self._parse_artist(album_artist_str, album_path=album_dir)
if not artist.mbid:
with contextlib.suppress(IndexError):
artist.mbid = tags.musicbrainz_albumartistids[index]
Expand Down Expand Up @@ -781,8 +778,9 @@ async def _parse_track(self, file_item: FileSystemItem) -> Track:

track.album = await self._parse_album(
tags.album,
album_dir,
disc_dir,
track_path=file_item.path,
album_path=album_dir,
disc_path=disc_dir,
artists=album_artists,
barcode=tags.barcode,
)
Expand Down Expand Up @@ -859,14 +857,20 @@ async def _parse_track(self, file_item: FileSystemItem) -> Track:

async def _parse_artist(
self,
name: str | None = None,
name: str,
artist_path: str | None = None,
album_path: str | None = None,
sort_name: str | None = None,
) -> Artist | None:
"""Lookup metadata in Artist folder."""
assert name or artist_path
) -> Artist:
"""Parse Artist metadata into an Artist object."""
cache_key = f"{self.instance_id}-artistdata-{name}-{artist_path}"
if cache := await self.mass.cache.get(cache_key):
return cache
if not artist_path and album_path:
# try to find (album)artist folder based on album path
artist_path = get_artist_dir(album_path=album_path, artist_name=name)
if not artist_path:
# check if we have an existing item
# check if we have an existing item to retrieve the artist path
async for item in self.mass.music.artists.iter_library_items(search=name):
if not compare_strings(name, item.name):
continue
Expand All @@ -882,30 +886,25 @@ async def _parse_artist(
artist_path = name
elif await self.exists(name.title()):
artist_path = name.title()
else:
# use fake artist path as item id which is just the name
artist_path = name

if not name:
name = artist_path.split(os.sep)[-1]

artist = Artist(
item_id=artist_path,
item_id=artist_path or name,
provider=self.instance_id,
name=name,
sort_name=sort_name,
provider_mappings={
ProviderMapping(
item_id=artist_path,
item_id=artist_path or name,
provider_domain=self.domain,
provider_instance=self.instance_id,
url=artist_path,
url=artist_path or name,
)
},
)

if not await self.exists(artist_path):
# return basic object if there is no dedicated artist folder
await self.mass.cache.set(cache_key, artist, expiration=120)
return artist

nfo_file = os.path.join(artist_path, "artist.nfo")
Expand All @@ -930,19 +929,23 @@ async def _parse_artist(
if images := await self._get_local_images(artist_path):
artist.metadata.images = images

await self.mass.cache.set(cache_key, artist, expiration=120)
return artist

async def _parse_album(
self,
name: str | None,
name: str,
track_path: str,
album_path: str | None,
disc_path: str | None,
artists: list[Artist],
barcode: str | None = None,
sort_name: str | None = None,
) -> Album | None:
"""Lookup metadata in Album folder."""
assert name or album_path
) -> Album:
"""Parse Album metadata into an Album object."""
cache_key = f"{self.instance_id}-albumdata-{name}-{album_path}"
if cache := await self.mass.cache.get(cache_key):
return cache
# create fake path if needed
if not album_path and artists:
album_path = artists[0].name + os.sep + name
Expand Down Expand Up @@ -970,11 +973,9 @@ async def _parse_album(
if barcode:
album.external_ids.add((ExternalID.BARCODE, barcode))

if not await self.exists(album_path):
# return basic object if there is no dedicated album folder
return album

for folder_path in (disc_path, album_path):
# hunt for additional metadata and images in the folder structure
extra_path = os.path.dirname(track_path) if (track_path and not album_path) else None
for folder_path in (disc_path, album_path, extra_path):
if not folder_path:
continue
nfo_file = os.path.join(folder_path, "album.nfo")
Expand Down Expand Up @@ -1009,9 +1010,9 @@ async def _parse_album(
else:
album.metadata.images += images

await self.mass.cache.set(cache_key, album, expiration=120)
return album

@use_cache(120)
async def _get_local_images(self, folder: str) -> list[MediaItemImage]:
"""Return local images found in a given folderpath."""
images = []
Expand Down
42 changes: 34 additions & 8 deletions music_assistant/server/providers/filesystem_local/helpers.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,15 +7,41 @@
from music_assistant.server.helpers.compare import compare_strings


def get_parentdir(base_path: str, name: str, skip: int = 0) -> str | None:
"""Look for folder name in path (to find dedicated artist or album folder)."""
if not base_path:
return None
parentdir = os.path.dirname(base_path)
for _ in range(skip, 3):
def get_artist_dir(album_path: str, artist_name: str) -> str | None:
"""Look for (Album)Artist directory in path of album."""
parentdir = os.path.dirname(album_path)
dirname = parentdir.rsplit(os.sep)[-1]
if compare_strings(artist_name, dirname, False):
return parentdir
return None


def get_disc_dir(track_path: str, album_name: str, disc_number: int | None) -> str | None:
"""Look for disc directory in path of album/tracks."""
parentdir = os.path.dirname(track_path)
dirname = parentdir.rsplit(os.sep)[-1]
dirname_lower = dirname.lower()
if disc_number is not None and compare_strings(f"disc {disc_number}", dirname, False):
return parentdir
if dirname_lower.startswith(album_name.lower()) and "disc" in dirname_lower:
return parentdir
if dirname_lower.startswith(album_name.lower()) and dirname_lower.endswith(str(disc_number)):
return parentdir
return None


def get_album_dir(track_path: str, album_name: str, disc_dir: str | None) -> str | None:
"""Return album/parent directory of a track."""
parentdir = os.path.dirname(track_path)
# account for disc sublevel by ignoring 1 level if needed
for _ in range(2 if disc_dir else 1):
dirname = parentdir.rsplit(os.sep)[-1]
dirname = dirname.split("(")[0].split("[")[0].strip()
if compare_strings(name, dirname, False):
dirname_lower = dirname.lower()
if compare_strings(album_name, dirname, False):
return parentdir
if album_name in dirname_lower:
return parentdir
if dirname_lower in album_name:
return parentdir
parentdir = os.path.dirname(parentdir)
return None
Expand Down

0 comments on commit 277f185

Please sign in to comment.