Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

chore: mypy for airplay #1848

Merged
merged 2 commits into from
Jan 9, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 1 addition & 2 deletions music_assistant/providers/airplay/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
from music_assistant_models.enums import ConfigEntryType
from music_assistant_models.provider import ProviderManifest

from music_assistant import MusicAssistant
from music_assistant.mass import MusicAssistant

from .const import CONF_BIND_INTERFACE
from .provider import AirplayProvider
Expand All @@ -17,7 +17,6 @@
from music_assistant_models.config_entries import ProviderConfig
from music_assistant_models.provider import ProviderManifest

from music_assistant import MusicAssistant
from music_assistant.models import ProviderInstanceType


Expand Down
2 changes: 0 additions & 2 deletions music_assistant/providers/airplay/helpers.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,8 +30,6 @@ def get_model_info(info: AsyncServiceInfo) -> tuple[str, str]:
return (manufacturer, model)
# try parse from am property
if am_property := info.decoded_properties.get("am"):
if isinstance(am_property, bytes):
am_property = am_property.decode("utf-8")
model = am_property

if not model:
Expand Down
63 changes: 38 additions & 25 deletions music_assistant/providers/airplay/provider.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
import socket
import time
from random import randrange
from typing import TYPE_CHECKING
from typing import cast

from music_assistant_models.config_entries import ConfigEntry
from music_assistant_models.enums import (
Expand Down Expand Up @@ -36,12 +36,13 @@
CONF_ENTRY_SYNC_ADJUST,
create_sample_rates_config_entry,
)
from music_assistant.helpers.audio import get_ffmpeg_stream
from music_assistant.helpers.datetime import utc
from music_assistant.helpers.ffmpeg import get_ffmpeg_stream
from music_assistant.helpers.process import check_output
from music_assistant.helpers.util import TaskManager, get_ip_pton, lock, select_free_port
from music_assistant.models.player_provider import PlayerProvider
from music_assistant.providers.airplay.raop import RaopStreamSession
from music_assistant.providers.player_group import PlayerGroupProvider

from .const import (
AIRPLAY_FLOW_PCM_FORMAT,
Expand All @@ -61,10 +62,6 @@
)
from .player import AirPlayPlayer

if TYPE_CHECKING:
from music_assistant.providers.player_group import PlayerGroupProvider


PLAYER_CONFIG_ENTRIES = (
CONF_ENTRY_FLOW_MODE_ENFORCED,
CONF_ENTRY_CROSSFADE,
Expand Down Expand Up @@ -138,15 +135,15 @@
class AirplayProvider(PlayerProvider):
"""Player provider for Airplay based players."""

cliraop_bin: str | None = None
cliraop_bin: str | None
_players: dict[str, AirPlayPlayer]
_dacp_server: asyncio.Server = None
_dacp_info: AsyncServiceInfo = None
_dacp_server: asyncio.Server
_dacp_info: AsyncServiceInfo

@property
def supported_features(self) -> set[ProviderFeature]:
"""Return the features supported by this Provider."""
return (ProviderFeature.SYNC_PLAYERS,)
return {ProviderFeature.SYNC_PLAYERS}

async def handle_async_init(self) -> None:
"""Handle async initialization of the provider."""
Expand Down Expand Up @@ -179,10 +176,12 @@ async def on_mdns_service_state_change(
self, name: str, state_change: ServiceStateChange, info: AsyncServiceInfo | None
) -> None:
"""Handle MDNS service state callback."""
if not info:
return
if "@" in name:
raw_id, display_name = name.split(".")[0].split("@", 1)
elif "deviceid" in info.decoded_properties:
raw_id = info.decoded_properties["deviceid"].replace(":", "")
elif deviceid := info.decoded_properties.get("deviceid"):
raw_id = deviceid.replace(":", "")
display_name = info.name.split(".")[0]
else:
return
Expand Down Expand Up @@ -263,6 +262,8 @@ async def cmd_pause(self, player_id: str) -> None:
- player_id: player_id of the player to handle the command.
"""
player = self.mass.players.get(player_id)
if not player:
return
if player.group_childs:
# pause is not supported while synced, use stop instead
self.logger.debug("Player is synced, using STOP instead of PAUSE")
Expand All @@ -279,6 +280,8 @@ async def play_media(
) -> None:
"""Handle PLAY MEDIA on given player."""
player = self.mass.players.get(player_id)
if not player:
return
# set the active source for the player to the media queue
# this accounts for syncgroups and linked players (e.g. sonos)
player.active_source = media.queue_id
Expand All @@ -300,7 +303,7 @@ async def play_media(
)
elif media.queue_id and media.queue_id.startswith("ugp_"):
# special case: UGP stream
ugp_provider: PlayerGroupProvider = self.mass.get_provider("player_group")
ugp_provider = cast(PlayerGroupProvider, self.mass.get_provider("player_group"))
ugp_stream = ugp_provider.ugp_streams[media.queue_id]
input_format = ugp_stream.output_format
audio_source = ugp_stream.subscribe()
Expand Down Expand Up @@ -338,6 +341,8 @@ async def cmd_volume_set(self, player_id: str, volume_level: int) -> None:
if airplay_player.raop_stream and airplay_player.raop_stream.running:
await airplay_player.raop_stream.send_cli_command(f"VOLUME={volume_level}\n")
mass_player = self.mass.players.get(player_id)
if not mass_player:
return
mass_player.volume_level = volume_level
mass_player.volume_muted = volume_level == 0
self.mass.players.update(player_id)
Expand Down Expand Up @@ -404,22 +409,24 @@ async def cmd_ungroup(self, player_id: str) -> None:
- player_id: player_id of the player to handle the command.
"""
mass_player = self.mass.players.get(player_id, raise_unavailable=True)
if not mass_player.synced_to:
if not mass_player or not mass_player.synced_to:
return
ap_player = self._players[player_id]
if ap_player.raop_stream and ap_player.raop_stream.running:
await ap_player.raop_stream.session.remove_client(ap_player)
group_leader = self.mass.players.get(mass_player.synced_to, raise_unavailable=True)
assert group_leader
if player_id in group_leader.group_childs:
group_leader.group_childs.remove(player_id)
mass_player.synced_to = None
airplay_player = self._players.get(player_id)
await airplay_player.cmd_stop()
if airplay_player:
await airplay_player.cmd_stop()
# make sure that the player manager gets an update
self.mass.players.update(mass_player.player_id, skip_forward=True)
self.mass.players.update(group_leader.player_id, skip_forward=True)

async def _getcliraop_binary(self):
async def _getcliraop_binary(self) -> str:
"""Find the correct raop/airplay binary belonging to the platform."""
# ruff: noqa: SIM102
if self.cliraop_bin is not None:
Expand All @@ -435,7 +442,8 @@ async def check_binary(cliraop_path: str) -> str | None:
self.cliraop_bin = cliraop_path
return cliraop_path
except OSError:
return None
pass
return None

base_path = os.path.join(os.path.dirname(__file__), "bin")
system = platform.system().lower().replace("darwin", "macos")
Expand All @@ -452,6 +460,7 @@ async def check_binary(cliraop_path: str) -> str | None:
def _get_sync_clients(self, player_id: str) -> list[AirPlayPlayer]:
"""Get all sync clients for a player."""
mass_player = self.mass.players.get(player_id, True)
assert mass_player
sync_clients: list[AirPlayPlayer] = []
# we need to return the player itself too
group_child_ids = {player_id}
Expand Down Expand Up @@ -541,15 +550,15 @@ async def _handle_dacp_request( # noqa: PLR0915
else:
headers_raw = request
body = ""
headers_raw = headers_raw.split("\r\n")
headers_split = headers_raw.split("\r\n")
headers = {}
for line in headers_raw[1:]:
for line in headers_split[1:]:
if ":" not in line:
continue
x, y = line.split(":", 1)
headers[x.strip()] = y.strip()
active_remote = headers.get("Active-Remote")
_, path, _ = headers_raw[0].split(" ")
_, path, _ = headers_split[0].split(" ")
airplay_player = next(
(
x
Expand All @@ -570,6 +579,8 @@ async def _handle_dacp_request( # noqa: PLR0915

player_id = airplay_player.player_id
mass_player = self.mass.players.get(player_id)
if not mass_player:
return
active_queue = self.mass.player_queues.get_active_queue(player_id)
if path == "/ctrl-int/1/nextitem":
self.mass.create_task(self.mass.player_queues.next(active_queue.queue_id))
Expand All @@ -590,10 +601,10 @@ async def _handle_dacp_request( # noqa: PLR0915
self.mass.create_task(self.mass.players.cmd_volume_down(player_id))
elif path == "/ctrl-int/1/shuffle_songs":
queue = self.mass.player_queues.get(player_id)
self.mass.loop.call_soon(
self.mass.player_queues.set_shuffle(
active_queue.queue_id, not queue.shuffle_enabled
)
if not queue:
return
self.mass.player_queues.set_shuffle(
active_queue.queue_id, not queue.shuffle_enabled
)
elif path in ("/ctrl-int/1/pause", "/ctrl-int/1/discrete-pause"):
# sometimes this request is sent by a device as confirmation of a play command
Expand Down Expand Up @@ -650,11 +661,13 @@ async def _handle_dacp_request( # noqa: PLR0915
finally:
writer.close()

async def monitor_prevent_playback(self, player_id: str):
async def monitor_prevent_playback(self, player_id: str) -> None:
"""Monitor the prevent playback state of an airplay player."""
count = 0
if not (airplay_player := self._players.get(player_id)):
return
if not airplay_player.raop_stream:
return
prev_active_remote_id = airplay_player.raop_stream.active_remote_id
while count < 40:
count += 1
Expand Down
34 changes: 27 additions & 7 deletions music_assistant/providers/airplay/raop.py
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ def __init__(
self.input_format = input_format
self._sync_clients = sync_clients
self._audio_source = audio_source
self._audio_source_task: asyncio.Task | None = None
self._audio_source_task: asyncio.Task[None] | None = None
self._stopped: bool = False
self._lock = asyncio.Lock()

Expand All @@ -75,27 +75,36 @@ async def audio_streamer() -> None:
return
async with self._lock:
await asyncio.gather(
*[x.raop_stream.write_chunk(chunk) for x in self._sync_clients],
*[
x.raop_stream.write_chunk(chunk)
for x in self._sync_clients
if x.raop_stream
],
return_exceptions=True,
)
# entire stream consumed: send EOF
generator_exhausted = True
async with self._lock:
await asyncio.gather(
*[x.raop_stream.write_eof() for x in self._sync_clients],
*[x.raop_stream.write_eof() for x in self._sync_clients if x.raop_stream],
return_exceptions=True,
)
finally:
if not generator_exhausted:
await close_async_generator(self._audio_source)

# get current ntp and start RaopStream per player
assert self.prov.cliraop_bin
_, stdout = await check_output(self.prov.cliraop_bin, "-ntp")
start_ntp = int(stdout.strip())
wait_start = 1500 + (250 * len(self._sync_clients))
async with self._lock:
await asyncio.gather(
*[x.raop_stream.start(start_ntp, wait_start) for x in self._sync_clients],
*[
x.raop_stream.start(start_ntp, wait_start)
for x in self._sync_clients
if x.raop_stream
],
return_exceptions=True,
)
self._audio_source_task = asyncio.create_task(audio_streamer())
Expand All @@ -116,6 +125,7 @@ async def remove_client(self, airplay_player: AirPlayPlayer) -> None:
"""Remove a sync client from the session."""
if airplay_player not in self._sync_clients:
return
assert airplay_player.raop_stream
assert airplay_player.raop_stream.session == self
async with self._lock:
self._sync_clients.remove(airplay_player)
Expand Down Expand Up @@ -154,7 +164,7 @@ def __init__(
# with the named pipe used to send audio
self.active_remote_id: str = str(randint(1000, 8000))
self.prevent_playback: bool = False
self._log_reader_task: asyncio.Task | None = None
self._log_reader_task: asyncio.Task[None] | asyncio.Future[None] | None = None
self._cliraop_proc: AsyncProcess | None = None
self._ffmpeg_proc: AsyncProcess | None = None
self._started = asyncio.Event()
Expand All @@ -170,6 +180,8 @@ async def start(self, start_ntp: int, wait_start: int = 1000) -> None:
extra_args = []
player_id = self.airplay_player.player_id
mass_player = self.mass.players.get(player_id)
if not mass_player:
return
bind_ip = await self.mass.config.get_provider_config_value(
self.prov.instance_id, CONF_BIND_INTERFACE
)
Expand Down Expand Up @@ -240,10 +252,12 @@ async def start(self, start_ntp: int, wait_start: int = 1000) -> None:
self._started.set()
self._log_reader_task = self.mass.create_task(self._log_watcher())

async def stop(self):
async def stop(self) -> None:
"""Stop playback and cleanup."""
if self._stopped:
return
if not self._cliraop_proc:
return
if self._cliraop_proc.proc and not self._cliraop_proc.closed:
await self.send_cli_command("ACTION=STOP")
self._stopped = True # set after send_cli command!
Expand All @@ -259,13 +273,15 @@ async def write_chunk(self, chunk: bytes) -> None:
if self._stopped:
return
await self._started.wait()
assert self._ffmpeg_proc
await self._ffmpeg_proc.write(chunk)

async def write_eof(self) -> None:
"""Write EOF."""
if self._stopped:
return
await self._started.wait()
assert self._ffmpeg_proc
await self._ffmpeg_proc.write_eof()

async def send_cli_command(self, command: str) -> None:
Expand All @@ -277,7 +293,7 @@ async def send_cli_command(self, command: str) -> None:
if not command.endswith("\n"):
command += "\n"

def send_data():
def send_data() -> None:
with suppress(BrokenPipeError), open(named_pipe, "w") as f:
f.write(command)

Expand All @@ -290,11 +306,15 @@ async def _log_watcher(self) -> None:
"""Monitor stderr for the running CLIRaop process."""
airplay_player = self.airplay_player
mass_player = self.mass.players.get(airplay_player.player_id)
if not mass_player:
return
queue = self.mass.player_queues.get_active_queue(mass_player.active_source)
logger = airplay_player.logger
lost_packets = 0
prev_metadata_checksum: str = ""
prev_progress_report: float = 0
if not self._cliraop_proc:
return
async for line in self._cliraop_proc.iter_stderr():
if "elapsed milliseconds:" in line:
# this is received more or less every second while playing
Expand Down
1 change: 0 additions & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -144,7 +144,6 @@ exclude = [
'^music_assistant/__main__\.py$',
'^music_assistant/providers/_template_music_provider/.*$',
'^music_assistant/providers/_template_player_provider/.*$',
'^music_assistant/providers/airplay/.*$',
'^music_assistant/providers/apple_music/.*$',
'^music_assistant/providers/bluesound/.*$',
'^music_assistant/providers/chromecast/.*$',
Expand Down
Loading