Skip to content

Commit

Permalink
Parse album
Browse files Browse the repository at this point in the history
  • Loading branch information
snejus committed Aug 1, 2024
1 parent e37fd99 commit 5ceda6a
Show file tree
Hide file tree
Showing 6 changed files with 200 additions and 67 deletions.
54 changes: 39 additions & 15 deletions beetsplug/bandcamp/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,13 +18,15 @@

from __future__ import annotations

import json
import logging
import re
from contextlib import contextmanager
from contextlib import contextmanager, suppress
from functools import lru_cache, partial
from html import unescape
from itertools import chain
from operator import itemgetter
from pathlib import Path
from typing import (
TYPE_CHECKING,
Any,
Expand Down Expand Up @@ -219,10 +221,33 @@ def _find_url_in_item(
return url
return ""

def candidates(
self, items: List[library.Item], artist: str, album: str, *_: Any, **__: Any
) -> Iterable[AlbumInfo]:
"""Return a sequence of album candidates matching given artist and album."""
def candidates(self, items, artist, album, va_likely, extra_tags=None):
# type: (List[library.Item], str, str, bool, Any) -> Iterable[AlbumInfo]
"""Return a sequence of AlbumInfo objects that match the
album whose items are provided or are being searched.
"""
from pprint import pprint

url = items[0].comments
parent_dir = Path(items[0].path.decode()).parent
with suppress(StopIteration):
playlist_info_path = next(parent_dir.glob("Playlist_*"))
with open(playlist_info_path) as f:
playlist_info = json.load(f)

playlist_info["tracks"] = []
for track_info_path in set(parent_dir.glob("*.info.json")) - {
playlist_info_path
}:
with open(track_info_path) as f:
track_data = {**json.load(f), "path": str(track_info_path)}
playlist_info["tracks"].append(track_data)

pprint(playlist_info)

# if url.startswith("https://"):
# yield from self.get_album_info(url)

item = items[0]
label = ""
if items and album == item.album and artist == item.albumartist:
Expand Down Expand Up @@ -260,7 +285,7 @@ def item_candidates(
def album_for_id(self, album_id: str) -> AlbumInfo | None:
"""Fetch an album by its bandcamp ID."""
if not ("soundcloud" in album_id or _from_bandcamp(album_id)):
self._info("Not a bandcamp URL, skipping")
self._info("Not a Bandcamp or Soundcloud URL, skipping")
return None

albums = self.get_album_info(album_id)
Expand All @@ -276,11 +301,11 @@ def album_for_id(self, album_id: str) -> AlbumInfo | None:

def track_for_id(self, track_id: str) -> TrackInfo | None:
"""Fetch a track by its bandcamp ID."""
if _from_bandcamp(track_id):
return self.get_track_info(track_id)
if not ("soundcloud" in track_id or _from_bandcamp(track_id)):
self._info("Not a Bandcamp or Soundcloud URL, skipping")
return None

self._info("Not a bandcamp URL, skipping")
return None
return self.get_track_info(track_id)

def handle(self, guru: Metaguru, attr: str, _id: str) -> Any:
try:
Expand All @@ -292,7 +317,7 @@ def handle(self, guru: Metaguru, attr: str, _id: str) -> Any:
self._exc("Unexpected error obtaining {}, please report at {}", _id, url)
return None

def get_album_info(self, url: str) -> List[AlbumInfo] | None:
def get_album_info(self, url: str) -> Optional[List[AlbumInfo]]:
"""Return an AlbumInfo object for a bandcamp album page.
If track url is given by mistake, find and fetch the album url instead.
Expand Down Expand Up @@ -321,8 +346,8 @@ def _get_soundcloud_data(self, url: str) -> AlbumInfo | TrackInfo | None:
sc_data_key = "sound"
method = get_soundcloud_track

self._info("Fetching data from soundcloud url {} as {}", url, _type)
data = re.search(r"\[\{[^<]+[^;<)]", self._get(url))
self._info("Fetching data from soundcloud url {}", url)
data = re.search(r"\[.*hydratable.*\]", self._get(url))
if not data:
return None

Expand All @@ -336,8 +361,7 @@ def get_track_info(self, url: str) -> Optional[TrackInfo]:
if track:
return track

guru = self.guru(url, "singleton")
return self.handle(guru, "singleton", url) if guru else None
return self.guru(url).singleton

def _search(self, data: JSONDict) -> Iterable[JSONDict]:
"""Return a list of track/album URLs of type search_type matching the query."""
Expand Down
47 changes: 44 additions & 3 deletions beetsplug/bandcamp/helpers.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,9 @@
)

from ordered_set import OrderedSet as ordset # noqa: N813
from beets.autotag.hooks import AlbumInfo
from beets.ui import log
from ordered_set import OrderedSet as ordset

from .genres_lookup import GENRES

Expand Down Expand Up @@ -247,6 +250,10 @@ def valid_for_mode(kw: str) -> bool:
return valid_mb_genre(kw) or valid_mb_genre(list(words)[-1])

unique_genres: ordset[str] = ordset()
keywords = set(keywords)
for kw in list(keywords):
keywords.add(kw.replace(" ", "-"))
keywords.add(kw.replace("-", " "))
# expand badly delimited keywords
split_kw = partial(re.split, r"[.] | #| - ")
for kw in chain.from_iterable(map(split_kw, keywords)):
Expand All @@ -271,10 +278,14 @@ def within_another_genre(genre: str) -> bool:
return (g for g in unique_genres if not within_another_genre(g))

@staticmethod
def unpack_props(obj: JSONDict) -> JSONDict:
def unpack_props(obj: Any) -> Any:
"""Add all 'additionalProperty'-ies to the parent dictionary."""
for prop in obj.get("additionalProperty") or []:
obj[prop["name"]] = prop["value"]
if isinstance(obj, dict):
for prop in obj.pop("additionalProperty", []):
obj[prop["name"]] = prop["value"]
return {k: Helpers.unpack_props(v) for k, v in obj.items()}
if isinstance(obj, list):
return [Helpers.unpack_props(item) for item in obj]
return obj

@staticmethod
Expand Down Expand Up @@ -343,3 +354,33 @@ def get_medium_total(medium: int) -> int:
else:
medium_index += 1
return album

@staticmethod
def parse_additional_fields(meta: str, field_patterns: JSONDict) -> JSONDict:
additional_fields = {}
for field, pattern_item in field_patterns.items():
# log.debug("Parsing [b]{}[/]", field)
try:
pat = pattern_item["pattern"]
if len(pat.splitlines()) > 1:
matches = list(re.finditer(pat, meta, re.VERBOSE))
else:
matches = list(re.finditer(pat, meta))
# log.debug("\n".join(map(str, matches)))
if matches:
if "replace" in pattern_item:
log.info(str(matches[0].expand(pattern_item["replace"])))
value = matches[0].expand(pattern_item["replace"])
elif "replace_expr" in pattern_item:
value = eval(
pattern_item["replace_expr"],
{"matches": matches, "match": matches[0]},
)
else:
value = matches[0].group()
if isinstance(value, str):
value = value.replace("\r", "").strip()
additional_fields[field] = value
except Exception:
log.error("Failed parsing {}", field, exc_info=True)
return additional_fields
30 changes: 27 additions & 3 deletions beetsplug/bandcamp/metaguru.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
import re
from collections import Counter
from datetime import date, datetime
from functools import cached_property, partial
from functools import cached_property, partial, singledispatch
from typing import Any, Dict, Iterable, List, Optional, Set
from unicodedata import normalize

Expand Down Expand Up @@ -74,7 +74,7 @@ def from_html(cls, html: str, config: Optional[JSONDict] = None) -> "Metaguru":
except AttributeError as exc:
raise AttributeError("Could not find release metadata JSON") from exc
else:
return cls(json.loads(meta), config)
return cls(cls.unpack_props(json.loads(meta)), config)

@cached_property
def excluded_fields(self) -> Set[str]:
Expand Down Expand Up @@ -329,8 +329,9 @@ def is_comp(self) -> bool:
def first_one(artist: str) -> str:
return PATTERNS["split_artists"].split(artist.replace(" & ", ", "))[0]

artist_count = len(set(map(first_one, self.tracks.artists)))
truly_unique = set(map(first_one, self.tracks.artists))
return (
return artist_count > 1 and (
self._album_name.mentions_compilation
or (len(truly_unique) > 1 and self._search_albumtype("compilation"))
or (len(truly_unique) > 3 and len(self.tracks) > 4)
Expand Down Expand Up @@ -423,18 +424,41 @@ def get_fields(self, fields: Iterable[str], src: object = None) -> JSONDict:
return {field: getattr(self, field)}
return dict(zip(fields, iter(op.attrgetter(*fields)(src or self))))

@cached_property
def parseable_meta(self) -> str:
@singledispatch
def to_text(x: Any, key: str = "") -> str:
return f"{key}: {x}".replace("\r", "") + "\r\n"

@to_text.register(dict)
def _(x: JSONDict, key: str = "") -> str:
return "".join([to_text(v, f"{key}.{k}") for k, v in x.items()])

@to_text.register(list)
def _(x: List[Any], key: str = "") -> str:
return "".join([to_text(v, f"{key}[{i}]") for i, v in enumerate(x)])

return to_text(self.meta)

@property
def _common_album(self) -> JSONDict:
common_data: JSONDict = {"album": self.album_name}
fields = ["label", "catalognum", "albumtype", "country"]
if EXTENDED_FIELDS_SUPPORT:
fields.extend(["genre", "style", "comments", "albumtypes"])

common_data.update(self.get_fields(fields))
if EXTENDED_FIELDS_SUPPORT and not ALBUMTYPES_LIST_SUPPORT:
common_data["albumtypes"] = "; ".join(common_data["albumtypes"])
reldate = self.release_date
if reldate:
common_data.update(self.get_fields(["year", "month", "day"], reldate))
if "field_patterns" in self.config:
common_data.update(
self.parse_additional_fields(
self.parseable_meta, self.config["field_patterns"]
)
)

return common_data

Expand Down
Loading

0 comments on commit 5ceda6a

Please sign in to comment.