From 7dba6db0a2bf4460ab319e7d5da8fc3295875a1c Mon Sep 17 00:00:00 2001 From: Jc2k Date: Thu, 9 Jan 2025 20:46:35 +0000 Subject: [PATCH] chore: mypy for airplay (#1848) --- music_assistant/providers/airplay/__init__.py | 3 +- music_assistant/providers/airplay/helpers.py | 2 - music_assistant/providers/airplay/provider.py | 63 +++++++++++-------- music_assistant/providers/airplay/raop.py | 34 +++++++--- pyproject.toml | 1 - 5 files changed, 66 insertions(+), 37 deletions(-) diff --git a/music_assistant/providers/airplay/__init__.py b/music_assistant/providers/airplay/__init__.py index 83081ea52..ee3282cbf 100644 --- a/music_assistant/providers/airplay/__init__.py +++ b/music_assistant/providers/airplay/__init__.py @@ -8,7 +8,7 @@ from music_assistant_models.enums import ConfigEntryType from music_assistant_models.provider import ProviderManifest -from music_assistant import MusicAssistant +from music_assistant.mass import MusicAssistant from .const import CONF_BIND_INTERFACE from .provider import AirplayProvider @@ -17,7 +17,6 @@ from music_assistant_models.config_entries import ProviderConfig from music_assistant_models.provider import ProviderManifest - from music_assistant import MusicAssistant from music_assistant.models import ProviderInstanceType diff --git a/music_assistant/providers/airplay/helpers.py b/music_assistant/providers/airplay/helpers.py index ddb6c63a8..4ecb594b4 100644 --- a/music_assistant/providers/airplay/helpers.py +++ b/music_assistant/providers/airplay/helpers.py @@ -30,8 +30,6 @@ def get_model_info(info: AsyncServiceInfo) -> tuple[str, str]: return (manufacturer, model) # try parse from am property if am_property := info.decoded_properties.get("am"): - if isinstance(am_property, bytes): - am_property = am_property.decode("utf-8") model = am_property if not model: diff --git a/music_assistant/providers/airplay/provider.py b/music_assistant/providers/airplay/provider.py index d44963031..3549b4898 100644 --- a/music_assistant/providers/airplay/provider.py +++ b/music_assistant/providers/airplay/provider.py @@ -8,7 +8,7 @@ import socket import time from random import randrange -from typing import TYPE_CHECKING +from typing import cast from music_assistant_models.config_entries import ConfigEntry from music_assistant_models.enums import ( @@ -36,12 +36,13 @@ CONF_ENTRY_SYNC_ADJUST, create_sample_rates_config_entry, ) -from music_assistant.helpers.audio import get_ffmpeg_stream from music_assistant.helpers.datetime import utc +from music_assistant.helpers.ffmpeg import get_ffmpeg_stream from music_assistant.helpers.process import check_output from music_assistant.helpers.util import TaskManager, get_ip_pton, lock, select_free_port from music_assistant.models.player_provider import PlayerProvider from music_assistant.providers.airplay.raop import RaopStreamSession +from music_assistant.providers.player_group import PlayerGroupProvider from .const import ( AIRPLAY_FLOW_PCM_FORMAT, @@ -61,10 +62,6 @@ ) from .player import AirPlayPlayer -if TYPE_CHECKING: - from music_assistant.providers.player_group import PlayerGroupProvider - - PLAYER_CONFIG_ENTRIES = ( CONF_ENTRY_FLOW_MODE_ENFORCED, CONF_ENTRY_CROSSFADE, @@ -138,15 +135,15 @@ class AirplayProvider(PlayerProvider): """Player provider for Airplay based players.""" - cliraop_bin: str | None = None + cliraop_bin: str | None _players: dict[str, AirPlayPlayer] - _dacp_server: asyncio.Server = None - _dacp_info: AsyncServiceInfo = None + _dacp_server: asyncio.Server + _dacp_info: AsyncServiceInfo @property def supported_features(self) -> set[ProviderFeature]: """Return the features supported by this Provider.""" - return (ProviderFeature.SYNC_PLAYERS,) + return {ProviderFeature.SYNC_PLAYERS} async def handle_async_init(self) -> None: """Handle async initialization of the provider.""" @@ -179,10 +176,12 @@ async def on_mdns_service_state_change( self, name: str, state_change: ServiceStateChange, info: AsyncServiceInfo | None ) -> None: """Handle MDNS service state callback.""" + if not info: + return if "@" in name: raw_id, display_name = name.split(".")[0].split("@", 1) - elif "deviceid" in info.decoded_properties: - raw_id = info.decoded_properties["deviceid"].replace(":", "") + elif deviceid := info.decoded_properties.get("deviceid"): + raw_id = deviceid.replace(":", "") display_name = info.name.split(".")[0] else: return @@ -263,6 +262,8 @@ async def cmd_pause(self, player_id: str) -> None: - player_id: player_id of the player to handle the command. """ player = self.mass.players.get(player_id) + if not player: + return if player.group_childs: # pause is not supported while synced, use stop instead self.logger.debug("Player is synced, using STOP instead of PAUSE") @@ -279,6 +280,8 @@ async def play_media( ) -> None: """Handle PLAY MEDIA on given player.""" player = self.mass.players.get(player_id) + if not player: + return # set the active source for the player to the media queue # this accounts for syncgroups and linked players (e.g. sonos) player.active_source = media.queue_id @@ -300,7 +303,7 @@ async def play_media( ) elif media.queue_id and media.queue_id.startswith("ugp_"): # special case: UGP stream - ugp_provider: PlayerGroupProvider = self.mass.get_provider("player_group") + ugp_provider = cast(PlayerGroupProvider, self.mass.get_provider("player_group")) ugp_stream = ugp_provider.ugp_streams[media.queue_id] input_format = ugp_stream.output_format audio_source = ugp_stream.subscribe() @@ -338,6 +341,8 @@ async def cmd_volume_set(self, player_id: str, volume_level: int) -> None: if airplay_player.raop_stream and airplay_player.raop_stream.running: await airplay_player.raop_stream.send_cli_command(f"VOLUME={volume_level}\n") mass_player = self.mass.players.get(player_id) + if not mass_player: + return mass_player.volume_level = volume_level mass_player.volume_muted = volume_level == 0 self.mass.players.update(player_id) @@ -404,22 +409,24 @@ async def cmd_ungroup(self, player_id: str) -> None: - player_id: player_id of the player to handle the command. """ mass_player = self.mass.players.get(player_id, raise_unavailable=True) - if not mass_player.synced_to: + if not mass_player or not mass_player.synced_to: return ap_player = self._players[player_id] if ap_player.raop_stream and ap_player.raop_stream.running: await ap_player.raop_stream.session.remove_client(ap_player) group_leader = self.mass.players.get(mass_player.synced_to, raise_unavailable=True) + assert group_leader if player_id in group_leader.group_childs: group_leader.group_childs.remove(player_id) mass_player.synced_to = None airplay_player = self._players.get(player_id) - await airplay_player.cmd_stop() + if airplay_player: + await airplay_player.cmd_stop() # make sure that the player manager gets an update self.mass.players.update(mass_player.player_id, skip_forward=True) self.mass.players.update(group_leader.player_id, skip_forward=True) - async def _getcliraop_binary(self): + async def _getcliraop_binary(self) -> str: """Find the correct raop/airplay binary belonging to the platform.""" # ruff: noqa: SIM102 if self.cliraop_bin is not None: @@ -435,7 +442,8 @@ async def check_binary(cliraop_path: str) -> str | None: self.cliraop_bin = cliraop_path return cliraop_path except OSError: - return None + pass + return None base_path = os.path.join(os.path.dirname(__file__), "bin") system = platform.system().lower().replace("darwin", "macos") @@ -452,6 +460,7 @@ async def check_binary(cliraop_path: str) -> str | None: def _get_sync_clients(self, player_id: str) -> list[AirPlayPlayer]: """Get all sync clients for a player.""" mass_player = self.mass.players.get(player_id, True) + assert mass_player sync_clients: list[AirPlayPlayer] = [] # we need to return the player itself too group_child_ids = {player_id} @@ -541,15 +550,15 @@ async def _handle_dacp_request( # noqa: PLR0915 else: headers_raw = request body = "" - headers_raw = headers_raw.split("\r\n") + headers_split = headers_raw.split("\r\n") headers = {} - for line in headers_raw[1:]: + for line in headers_split[1:]: if ":" not in line: continue x, y = line.split(":", 1) headers[x.strip()] = y.strip() active_remote = headers.get("Active-Remote") - _, path, _ = headers_raw[0].split(" ") + _, path, _ = headers_split[0].split(" ") airplay_player = next( ( x @@ -570,6 +579,8 @@ async def _handle_dacp_request( # noqa: PLR0915 player_id = airplay_player.player_id mass_player = self.mass.players.get(player_id) + if not mass_player: + return active_queue = self.mass.player_queues.get_active_queue(player_id) if path == "/ctrl-int/1/nextitem": self.mass.create_task(self.mass.player_queues.next(active_queue.queue_id)) @@ -590,10 +601,10 @@ async def _handle_dacp_request( # noqa: PLR0915 self.mass.create_task(self.mass.players.cmd_volume_down(player_id)) elif path == "/ctrl-int/1/shuffle_songs": queue = self.mass.player_queues.get(player_id) - self.mass.loop.call_soon( - self.mass.player_queues.set_shuffle( - active_queue.queue_id, not queue.shuffle_enabled - ) + if not queue: + return + self.mass.player_queues.set_shuffle( + active_queue.queue_id, not queue.shuffle_enabled ) elif path in ("/ctrl-int/1/pause", "/ctrl-int/1/discrete-pause"): # sometimes this request is sent by a device as confirmation of a play command @@ -650,11 +661,13 @@ async def _handle_dacp_request( # noqa: PLR0915 finally: writer.close() - async def monitor_prevent_playback(self, player_id: str): + async def monitor_prevent_playback(self, player_id: str) -> None: """Monitor the prevent playback state of an airplay player.""" count = 0 if not (airplay_player := self._players.get(player_id)): return + if not airplay_player.raop_stream: + return prev_active_remote_id = airplay_player.raop_stream.active_remote_id while count < 40: count += 1 diff --git a/music_assistant/providers/airplay/raop.py b/music_assistant/providers/airplay/raop.py index 98841c263..27c6062aa 100644 --- a/music_assistant/providers/airplay/raop.py +++ b/music_assistant/providers/airplay/raop.py @@ -54,7 +54,7 @@ def __init__( self.input_format = input_format self._sync_clients = sync_clients self._audio_source = audio_source - self._audio_source_task: asyncio.Task | None = None + self._audio_source_task: asyncio.Task[None] | None = None self._stopped: bool = False self._lock = asyncio.Lock() @@ -75,14 +75,18 @@ async def audio_streamer() -> None: return async with self._lock: await asyncio.gather( - *[x.raop_stream.write_chunk(chunk) for x in self._sync_clients], + *[ + x.raop_stream.write_chunk(chunk) + for x in self._sync_clients + if x.raop_stream + ], return_exceptions=True, ) # entire stream consumed: send EOF generator_exhausted = True async with self._lock: await asyncio.gather( - *[x.raop_stream.write_eof() for x in self._sync_clients], + *[x.raop_stream.write_eof() for x in self._sync_clients if x.raop_stream], return_exceptions=True, ) finally: @@ -90,12 +94,17 @@ async def audio_streamer() -> None: await close_async_generator(self._audio_source) # get current ntp and start RaopStream per player + assert self.prov.cliraop_bin _, stdout = await check_output(self.prov.cliraop_bin, "-ntp") start_ntp = int(stdout.strip()) wait_start = 1500 + (250 * len(self._sync_clients)) async with self._lock: await asyncio.gather( - *[x.raop_stream.start(start_ntp, wait_start) for x in self._sync_clients], + *[ + x.raop_stream.start(start_ntp, wait_start) + for x in self._sync_clients + if x.raop_stream + ], return_exceptions=True, ) self._audio_source_task = asyncio.create_task(audio_streamer()) @@ -116,6 +125,7 @@ async def remove_client(self, airplay_player: AirPlayPlayer) -> None: """Remove a sync client from the session.""" if airplay_player not in self._sync_clients: return + assert airplay_player.raop_stream assert airplay_player.raop_stream.session == self async with self._lock: self._sync_clients.remove(airplay_player) @@ -154,7 +164,7 @@ def __init__( # with the named pipe used to send audio self.active_remote_id: str = str(randint(1000, 8000)) self.prevent_playback: bool = False - self._log_reader_task: asyncio.Task | None = None + self._log_reader_task: asyncio.Task[None] | asyncio.Future[None] | None = None self._cliraop_proc: AsyncProcess | None = None self._ffmpeg_proc: AsyncProcess | None = None self._started = asyncio.Event() @@ -170,6 +180,8 @@ async def start(self, start_ntp: int, wait_start: int = 1000) -> None: extra_args = [] player_id = self.airplay_player.player_id mass_player = self.mass.players.get(player_id) + if not mass_player: + return bind_ip = await self.mass.config.get_provider_config_value( self.prov.instance_id, CONF_BIND_INTERFACE ) @@ -240,10 +252,12 @@ async def start(self, start_ntp: int, wait_start: int = 1000) -> None: self._started.set() self._log_reader_task = self.mass.create_task(self._log_watcher()) - async def stop(self): + async def stop(self) -> None: """Stop playback and cleanup.""" if self._stopped: return + if not self._cliraop_proc: + return if self._cliraop_proc.proc and not self._cliraop_proc.closed: await self.send_cli_command("ACTION=STOP") self._stopped = True # set after send_cli command! @@ -259,6 +273,7 @@ async def write_chunk(self, chunk: bytes) -> None: if self._stopped: return await self._started.wait() + assert self._ffmpeg_proc await self._ffmpeg_proc.write(chunk) async def write_eof(self) -> None: @@ -266,6 +281,7 @@ async def write_eof(self) -> None: if self._stopped: return await self._started.wait() + assert self._ffmpeg_proc await self._ffmpeg_proc.write_eof() async def send_cli_command(self, command: str) -> None: @@ -277,7 +293,7 @@ async def send_cli_command(self, command: str) -> None: if not command.endswith("\n"): command += "\n" - def send_data(): + def send_data() -> None: with suppress(BrokenPipeError), open(named_pipe, "w") as f: f.write(command) @@ -290,11 +306,15 @@ async def _log_watcher(self) -> None: """Monitor stderr for the running CLIRaop process.""" airplay_player = self.airplay_player mass_player = self.mass.players.get(airplay_player.player_id) + if not mass_player: + return queue = self.mass.player_queues.get_active_queue(mass_player.active_source) logger = airplay_player.logger lost_packets = 0 prev_metadata_checksum: str = "" prev_progress_report: float = 0 + if not self._cliraop_proc: + return async for line in self._cliraop_proc.iter_stderr(): if "elapsed milliseconds:" in line: # this is received more or less every second while playing diff --git a/pyproject.toml b/pyproject.toml index 459189f26..8f9e5ab61 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -144,7 +144,6 @@ exclude = [ '^music_assistant/__main__\.py$', '^music_assistant/providers/_template_music_provider/.*$', '^music_assistant/providers/_template_player_provider/.*$', - '^music_assistant/providers/airplay/.*$', '^music_assistant/providers/apple_music/.*$', '^music_assistant/providers/bluesound/.*$', '^music_assistant/providers/chromecast/.*$',