Skip to content

Commit

Permalink
Merge branch 'dev' of https://github.com/music-assistant/server into dev
Browse files Browse the repository at this point in the history
  • Loading branch information
marcelveldt committed Jan 9, 2025
2 parents 2654a48 + 7dba6db commit 329a961
Show file tree
Hide file tree
Showing 29 changed files with 155 additions and 99 deletions.
2 changes: 2 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@ Music Assistant is a free, opensource Media library manager that connects to you

Documentation https://music-assistant.io

Beta Documentation https://beta.music-assistant.io

For issues, please go to [the issue tracker](https://github.com/music-assistant/support/issues).

For feature requests, please see [feature requests](https://github.com/music-assistant/support/discussions/categories/feature-requests-and-ideas).
Expand Down
2 changes: 1 addition & 1 deletion music_assistant/helpers/playlists.py
Original file line number Diff line number Diff line change
Expand Up @@ -169,7 +169,7 @@ async def fetch_playlist(
) or "#EXT-X-STREAM-INF:" in playlist_data:
raise IsHLSPlaylist

if url.endswith((".m3u", ".m3u8")):
if urlparse(url).path.endswith((".m3u", ".m3u8")):
playlist = parse_m3u(playlist_data)
else:
playlist = parse_pls(playlist_data)
Expand Down
3 changes: 1 addition & 2 deletions music_assistant/providers/airplay/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
from music_assistant_models.enums import ConfigEntryType
from music_assistant_models.provider import ProviderManifest

from music_assistant import MusicAssistant
from music_assistant.mass import MusicAssistant

from .const import CONF_BIND_INTERFACE
from .provider import AirplayProvider
Expand All @@ -17,7 +17,6 @@
from music_assistant_models.config_entries import ProviderConfig
from music_assistant_models.provider import ProviderManifest

from music_assistant import MusicAssistant
from music_assistant.models import ProviderInstanceType


Expand Down
2 changes: 0 additions & 2 deletions music_assistant/providers/airplay/helpers.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,8 +30,6 @@ def get_model_info(info: AsyncServiceInfo) -> tuple[str, str]:
return (manufacturer, model)
# try parse from am property
if am_property := info.decoded_properties.get("am"):
if isinstance(am_property, bytes):
am_property = am_property.decode("utf-8")
model = am_property

if not model:
Expand Down
63 changes: 38 additions & 25 deletions music_assistant/providers/airplay/provider.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
import socket
import time
from random import randrange
from typing import TYPE_CHECKING
from typing import cast

from music_assistant_models.config_entries import ConfigEntry
from music_assistant_models.enums import (
Expand Down Expand Up @@ -36,12 +36,13 @@
CONF_ENTRY_SYNC_ADJUST,
create_sample_rates_config_entry,
)
from music_assistant.helpers.audio import get_ffmpeg_stream
from music_assistant.helpers.datetime import utc
from music_assistant.helpers.ffmpeg import get_ffmpeg_stream
from music_assistant.helpers.process import check_output
from music_assistant.helpers.util import TaskManager, get_ip_pton, lock, select_free_port
from music_assistant.models.player_provider import PlayerProvider
from music_assistant.providers.airplay.raop import RaopStreamSession
from music_assistant.providers.player_group import PlayerGroupProvider

from .const import (
AIRPLAY_FLOW_PCM_FORMAT,
Expand All @@ -61,10 +62,6 @@
)
from .player import AirPlayPlayer

if TYPE_CHECKING:
from music_assistant.providers.player_group import PlayerGroupProvider


PLAYER_CONFIG_ENTRIES = (
CONF_ENTRY_FLOW_MODE_ENFORCED,
CONF_ENTRY_CROSSFADE,
Expand Down Expand Up @@ -138,15 +135,15 @@
class AirplayProvider(PlayerProvider):
"""Player provider for Airplay based players."""

cliraop_bin: str | None = None
cliraop_bin: str | None
_players: dict[str, AirPlayPlayer]
_dacp_server: asyncio.Server = None
_dacp_info: AsyncServiceInfo = None
_dacp_server: asyncio.Server
_dacp_info: AsyncServiceInfo

@property
def supported_features(self) -> set[ProviderFeature]:
"""Return the features supported by this Provider."""
return (ProviderFeature.SYNC_PLAYERS,)
return {ProviderFeature.SYNC_PLAYERS}

async def handle_async_init(self) -> None:
"""Handle async initialization of the provider."""
Expand Down Expand Up @@ -179,10 +176,12 @@ async def on_mdns_service_state_change(
self, name: str, state_change: ServiceStateChange, info: AsyncServiceInfo | None
) -> None:
"""Handle MDNS service state callback."""
if not info:
return
if "@" in name:
raw_id, display_name = name.split(".")[0].split("@", 1)
elif "deviceid" in info.decoded_properties:
raw_id = info.decoded_properties["deviceid"].replace(":", "")
elif deviceid := info.decoded_properties.get("deviceid"):
raw_id = deviceid.replace(":", "")
display_name = info.name.split(".")[0]
else:
return
Expand Down Expand Up @@ -263,6 +262,8 @@ async def cmd_pause(self, player_id: str) -> None:
- player_id: player_id of the player to handle the command.
"""
player = self.mass.players.get(player_id)
if not player:
return
if player.group_childs:
# pause is not supported while synced, use stop instead
self.logger.debug("Player is synced, using STOP instead of PAUSE")
Expand All @@ -279,6 +280,8 @@ async def play_media(
) -> None:
"""Handle PLAY MEDIA on given player."""
player = self.mass.players.get(player_id)
if not player:
return
# set the active source for the player to the media queue
# this accounts for syncgroups and linked players (e.g. sonos)
player.active_source = media.queue_id
Expand All @@ -300,7 +303,7 @@ async def play_media(
)
elif media.queue_id and media.queue_id.startswith("ugp_"):
# special case: UGP stream
ugp_provider: PlayerGroupProvider = self.mass.get_provider("player_group")
ugp_provider = cast(PlayerGroupProvider, self.mass.get_provider("player_group"))
ugp_stream = ugp_provider.ugp_streams[media.queue_id]
input_format = ugp_stream.output_format
audio_source = ugp_stream.subscribe()
Expand Down Expand Up @@ -338,6 +341,8 @@ async def cmd_volume_set(self, player_id: str, volume_level: int) -> None:
if airplay_player.raop_stream and airplay_player.raop_stream.running:
await airplay_player.raop_stream.send_cli_command(f"VOLUME={volume_level}\n")
mass_player = self.mass.players.get(player_id)
if not mass_player:
return
mass_player.volume_level = volume_level
mass_player.volume_muted = volume_level == 0
self.mass.players.update(player_id)
Expand Down Expand Up @@ -404,22 +409,24 @@ async def cmd_ungroup(self, player_id: str) -> None:
- player_id: player_id of the player to handle the command.
"""
mass_player = self.mass.players.get(player_id, raise_unavailable=True)
if not mass_player.synced_to:
if not mass_player or not mass_player.synced_to:
return
ap_player = self._players[player_id]
if ap_player.raop_stream and ap_player.raop_stream.running:
await ap_player.raop_stream.session.remove_client(ap_player)
group_leader = self.mass.players.get(mass_player.synced_to, raise_unavailable=True)
assert group_leader
if player_id in group_leader.group_childs:
group_leader.group_childs.remove(player_id)
mass_player.synced_to = None
airplay_player = self._players.get(player_id)
await airplay_player.cmd_stop()
if airplay_player:
await airplay_player.cmd_stop()
# make sure that the player manager gets an update
self.mass.players.update(mass_player.player_id, skip_forward=True)
self.mass.players.update(group_leader.player_id, skip_forward=True)

async def _getcliraop_binary(self):
async def _getcliraop_binary(self) -> str:
"""Find the correct raop/airplay binary belonging to the platform."""
# ruff: noqa: SIM102
if self.cliraop_bin is not None:
Expand All @@ -435,7 +442,8 @@ async def check_binary(cliraop_path: str) -> str | None:
self.cliraop_bin = cliraop_path
return cliraop_path
except OSError:
return None
pass
return None

base_path = os.path.join(os.path.dirname(__file__), "bin")
system = platform.system().lower().replace("darwin", "macos")
Expand All @@ -452,6 +460,7 @@ async def check_binary(cliraop_path: str) -> str | None:
def _get_sync_clients(self, player_id: str) -> list[AirPlayPlayer]:
"""Get all sync clients for a player."""
mass_player = self.mass.players.get(player_id, True)
assert mass_player
sync_clients: list[AirPlayPlayer] = []
# we need to return the player itself too
group_child_ids = {player_id}
Expand Down Expand Up @@ -541,15 +550,15 @@ async def _handle_dacp_request( # noqa: PLR0915
else:
headers_raw = request
body = ""
headers_raw = headers_raw.split("\r\n")
headers_split = headers_raw.split("\r\n")
headers = {}
for line in headers_raw[1:]:
for line in headers_split[1:]:
if ":" not in line:
continue
x, y = line.split(":", 1)
headers[x.strip()] = y.strip()
active_remote = headers.get("Active-Remote")
_, path, _ = headers_raw[0].split(" ")
_, path, _ = headers_split[0].split(" ")
airplay_player = next(
(
x
Expand All @@ -570,6 +579,8 @@ async def _handle_dacp_request( # noqa: PLR0915

player_id = airplay_player.player_id
mass_player = self.mass.players.get(player_id)
if not mass_player:
return
active_queue = self.mass.player_queues.get_active_queue(player_id)
if path == "/ctrl-int/1/nextitem":
self.mass.create_task(self.mass.player_queues.next(active_queue.queue_id))
Expand All @@ -590,10 +601,10 @@ async def _handle_dacp_request( # noqa: PLR0915
self.mass.create_task(self.mass.players.cmd_volume_down(player_id))
elif path == "/ctrl-int/1/shuffle_songs":
queue = self.mass.player_queues.get(player_id)
self.mass.loop.call_soon(
self.mass.player_queues.set_shuffle(
active_queue.queue_id, not queue.shuffle_enabled
)
if not queue:
return
self.mass.player_queues.set_shuffle(
active_queue.queue_id, not queue.shuffle_enabled
)
elif path in ("/ctrl-int/1/pause", "/ctrl-int/1/discrete-pause"):
# sometimes this request is sent by a device as confirmation of a play command
Expand Down Expand Up @@ -650,11 +661,13 @@ async def _handle_dacp_request( # noqa: PLR0915
finally:
writer.close()

async def monitor_prevent_playback(self, player_id: str):
async def monitor_prevent_playback(self, player_id: str) -> None:
"""Monitor the prevent playback state of an airplay player."""
count = 0
if not (airplay_player := self._players.get(player_id)):
return
if not airplay_player.raop_stream:
return
prev_active_remote_id = airplay_player.raop_stream.active_remote_id
while count < 40:
count += 1
Expand Down
34 changes: 27 additions & 7 deletions music_assistant/providers/airplay/raop.py
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ def __init__(
self.input_format = input_format
self._sync_clients = sync_clients
self._audio_source = audio_source
self._audio_source_task: asyncio.Task | None = None
self._audio_source_task: asyncio.Task[None] | None = None
self._stopped: bool = False
self._lock = asyncio.Lock()

Expand All @@ -75,27 +75,36 @@ async def audio_streamer() -> None:
return
async with self._lock:
await asyncio.gather(
*[x.raop_stream.write_chunk(chunk) for x in self._sync_clients],
*[
x.raop_stream.write_chunk(chunk)
for x in self._sync_clients
if x.raop_stream
],
return_exceptions=True,
)
# entire stream consumed: send EOF
generator_exhausted = True
async with self._lock:
await asyncio.gather(
*[x.raop_stream.write_eof() for x in self._sync_clients],
*[x.raop_stream.write_eof() for x in self._sync_clients if x.raop_stream],
return_exceptions=True,
)
finally:
if not generator_exhausted:
await close_async_generator(self._audio_source)

# get current ntp and start RaopStream per player
assert self.prov.cliraop_bin
_, stdout = await check_output(self.prov.cliraop_bin, "-ntp")
start_ntp = int(stdout.strip())
wait_start = 1500 + (250 * len(self._sync_clients))
async with self._lock:
await asyncio.gather(
*[x.raop_stream.start(start_ntp, wait_start) for x in self._sync_clients],
*[
x.raop_stream.start(start_ntp, wait_start)
for x in self._sync_clients
if x.raop_stream
],
return_exceptions=True,
)
self._audio_source_task = asyncio.create_task(audio_streamer())
Expand All @@ -116,6 +125,7 @@ async def remove_client(self, airplay_player: AirPlayPlayer) -> None:
"""Remove a sync client from the session."""
if airplay_player not in self._sync_clients:
return
assert airplay_player.raop_stream
assert airplay_player.raop_stream.session == self
async with self._lock:
self._sync_clients.remove(airplay_player)
Expand Down Expand Up @@ -154,7 +164,7 @@ def __init__(
# with the named pipe used to send audio
self.active_remote_id: str = str(randint(1000, 8000))
self.prevent_playback: bool = False
self._log_reader_task: asyncio.Task | None = None
self._log_reader_task: asyncio.Task[None] | asyncio.Future[None] | None = None
self._cliraop_proc: AsyncProcess | None = None
self._ffmpeg_proc: AsyncProcess | None = None
self._started = asyncio.Event()
Expand All @@ -170,6 +180,8 @@ async def start(self, start_ntp: int, wait_start: int = 1000) -> None:
extra_args = []
player_id = self.airplay_player.player_id
mass_player = self.mass.players.get(player_id)
if not mass_player:
return
bind_ip = await self.mass.config.get_provider_config_value(
self.prov.instance_id, CONF_BIND_INTERFACE
)
Expand Down Expand Up @@ -240,10 +252,12 @@ async def start(self, start_ntp: int, wait_start: int = 1000) -> None:
self._started.set()
self._log_reader_task = self.mass.create_task(self._log_watcher())

async def stop(self):
async def stop(self) -> None:
"""Stop playback and cleanup."""
if self._stopped:
return
if not self._cliraop_proc:
return
if self._cliraop_proc.proc and not self._cliraop_proc.closed:
await self.send_cli_command("ACTION=STOP")
self._stopped = True # set after send_cli command!
Expand All @@ -259,13 +273,15 @@ async def write_chunk(self, chunk: bytes) -> None:
if self._stopped:
return
await self._started.wait()
assert self._ffmpeg_proc
await self._ffmpeg_proc.write(chunk)

async def write_eof(self) -> None:
"""Write EOF."""
if self._stopped:
return
await self._started.wait()
assert self._ffmpeg_proc
await self._ffmpeg_proc.write_eof()

async def send_cli_command(self, command: str) -> None:
Expand All @@ -277,7 +293,7 @@ async def send_cli_command(self, command: str) -> None:
if not command.endswith("\n"):
command += "\n"

def send_data():
def send_data() -> None:
with suppress(BrokenPipeError), open(named_pipe, "w") as f:
f.write(command)

Expand All @@ -290,11 +306,15 @@ async def _log_watcher(self) -> None:
"""Monitor stderr for the running CLIRaop process."""
airplay_player = self.airplay_player
mass_player = self.mass.players.get(airplay_player.player_id)
if not mass_player:
return
queue = self.mass.player_queues.get_active_queue(mass_player.active_source)
logger = airplay_player.logger
lost_packets = 0
prev_metadata_checksum: str = ""
prev_progress_report: float = 0
if not self._cliraop_proc:
return
async for line in self._cliraop_proc.iter_stderr():
if "elapsed milliseconds:" in line:
# this is received more or less every second while playing
Expand Down
2 changes: 1 addition & 1 deletion music_assistant/providers/builtin/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@
from music_assistant_models.config_entries import ConfigValueType, ProviderConfig
from music_assistant_models.provider import ProviderManifest

from music_assistant import MusicAssistant
from music_assistant.mass import MusicAssistant
from music_assistant.models import ProviderInstanceType


Expand Down
Loading

0 comments on commit 329a961

Please sign in to comment.