diff --git a/examples/apis.json b/examples/apis.json index e3a1ff921..1d3c9e4d0 100644 --- a/examples/apis.json +++ b/examples/apis.json @@ -106,5 +106,13 @@ "func": "get_point_cloud_map", "packagePath": "viam.services", "importName": "SLAMClient" + }, + "audio_in": { + "func": "get_properties", + "packagePath": "viam.components" + }, + "audio_out": { + "func": "get_properties", + "packagePath": "viam.components" } } diff --git a/examples/server/v1/components.py b/examples/server/v1/components.py index 7647dff08..0eab492c6 100644 --- a/examples/server/v1/components.py +++ b/examples/server/v1/components.py @@ -18,7 +18,9 @@ from PIL import Image from viam.components.arm import Arm +from viam.components.audio_in import AudioIn, AudioResponse from viam.components.audio_input import AudioInput +from viam.components.audio_out import AudioOut from viam.components.base import Base from viam.components.board import Board, TickStream from viam.components.camera import Camera @@ -52,6 +54,8 @@ from viam.proto.component.audioinput import AudioChunk, AudioChunkInfo, SampleFormat from viam.proto.component.encoder import PositionType from viam.utils import SensorReading +from viam.proto.component.audioin import AudioChunk as AudioInChunk +from viam.proto.common import AudioInfo GEOMETRIES = [ Geometry(center=Pose(x=1, y=2, z=3, o_x=2, o_y=3, o_z=4, theta=20), sphere=Sphere(radius_mm=2)), @@ -173,6 +177,121 @@ async def get_geometries(self, extra: Optional[Dict[str, Any]] = None, **kwargs) return GEOMETRIES +class ExampleAudioIn(AudioIn): + def __init__(self, name: str): + super().__init__(name) + self.sample_rate = 44100 + self.num_channels = 2 + self.supported_codecs = ["pcm16"] + self.chunk_count = 0 + self.latency = timedelta(milliseconds=20) + self.volume_scale = 0.2 + self.frequency_hz = 440 + + async def get_audio(self, codec: str, duration_seconds: float, previous_timestamp_ns: int, + *, timeout: Optional[float] = None, **kwargs) -> AudioIn.AudioStream: + + async def read() -> AsyncIterator[AudioIn.AudioResponse]: + # Generate chunks based on duration + chunk_duration_ms = 100 # 100ms per chunk + chunks_to_generate = max(1, int((duration_seconds * 1000) / chunk_duration_ms)) + + for i in range(chunks_to_generate): + # Generate audio data (sine wave pattern) + chunk_data = b"" + samples_per_chunk = int(self.sample_rate * (chunk_duration_ms / 1000)) + + for sample in range(samples_per_chunk): + # Calculate the timing in seconds of this audio sample + time_offset = (i * chunk_duration_ms / 1000) + (sample / self.sample_rate) + # Generate one 16-bit PCM audio sample for a sine wave + # 32767 scales the value from (-1,1) to full 16 bit signed range (-32768,32767) + amplitude = int(32767 * self.volume_scale * math.sin(2 * math.pi * self.frequency_hz * time_offset)) + + # Convert to 16-bit PCM stereo + sample_bytes = amplitude.to_bytes(2, byteorder='little', signed=True) + chunk_data += sample_bytes * self.num_channels + + chunk_start_time = previous_timestamp_ns + (i * chunk_duration_ms * 1000000) # Convert ms to ns + chunk_end_time = chunk_start_time + (chunk_duration_ms * 1000000) + + audio_chunk = AudioInChunk( + audio_data=bytes(chunk_data), + audio_info=AudioInfo( + codec=codec, + sample_rate_hz=int(self.sample_rate), + num_channels=self.num_channels + ), + sequence=i, + start_timestamp_nanoseconds=chunk_start_time, + end_timestamp_nanoseconds=chunk_end_time + ) + audio_response = AudioResponse(audio=audio_chunk) + yield audio_response + + await asyncio.sleep(self.latency.total_seconds()) + + return StreamWithIterator(read()) + + async def get_properties(self, *, timeout: Optional[float] = None, **kwargs) -> AudioIn.Properties: + """Return the audio input device properties.""" + return AudioIn.Properties( + supported_codecs=self.supported_codecs, + sample_rate_hz=self.sample_rate, + num_channels=self.num_channels + ) + + async def get_geometries(self, extra: Optional[Dict[str, Any]] = None, **kwargs) -> List[Geometry]: + return GEOMETRIES + + +class ExampleAudioOut(AudioOut): + def __init__(self, name: str): + super().__init__(name) + self.sample_rate = 44100 + self.num_channels = 2 + self.supported_codecs = ["pcm16", "mp3", "wav"] + self.volume = 1.0 + self.is_playing = False + + async def play(self, + data: bytes, + info: Optional[AudioInfo] = None, + *, + extra: Optional[Dict[str, Any]] = None, + timeout: Optional[float] = None, + **kwargs) -> None: + """Play the given audio data.""" + + # Simulate playing audio + self.is_playing = True + if info: + print(f"Playing audio: {len(data)} bytes, codec={info.codec}, " + f"sample_rate={info.sample_rate_hz}, channels={info.num_channels}") + else: + print(f"Playing audio: {len(data)} bytes (no audio info provided)") + + await asyncio.sleep(0.1) + + self.is_playing = False + + async def get_properties(self, + *, + extra: Optional[Dict[str, Any]] = None, + timeout: Optional[float] = None, + **kwargs) -> AudioOut.Properties: + """Return the audio output device properties.""" + + return AudioOut.Properties( + supported_codecs=self.supported_codecs, + sample_rate_hz=self.sample_rate, + num_channels=self.num_channels + ) + + async def get_geometries(self, extra: Optional[Dict[str, Any]] = None, **kwargs) -> List[Geometry]: + return GEOMETRIES + + class ExampleBase(Base): def __init__(self, name: str): self.position = 0 diff --git a/examples/server/v1/server.py b/examples/server/v1/server.py index f1f7dfc2e..e40f02cff 100644 --- a/examples/server/v1/server.py +++ b/examples/server/v1/server.py @@ -8,7 +8,9 @@ from .components import ( ExampleAnalog, ExampleArm, + ExampleAudioIn, ExampleAudioInput, + ExampleAudioOut, ExampleBase, ExampleBoard, ExampleCamera, @@ -30,6 +32,8 @@ async def run(host: str, port: int, log_level: int): my_arm = ExampleArm("arm0") my_audio_input = ExampleAudioInput("audio_input0") + my_audio_in = ExampleAudioIn("audio_in0") + my_audio_out = ExampleAudioOut("audio_out0") my_base = ExampleBase("base0") my_board = ExampleBoard( name="board", @@ -75,7 +79,9 @@ async def run(host: str, port: int, log_level: int): server = Server( resources=[ my_arm, + my_audio_in, my_audio_input, + my_audio_out, my_base, my_board, my_camera, diff --git a/src/viam/components/audio_in/__init__.py b/src/viam/components/audio_in/__init__.py new file mode 100644 index 000000000..867aa01b0 --- /dev/null +++ b/src/viam/components/audio_in/__init__.py @@ -0,0 +1,24 @@ +from viam.resource.registry import Registry, ResourceRegistration + +from viam.proto.common import AudioInfo +from viam.media.audio import AudioCodec +from .audio_in import AudioIn +from .client import AudioInClient +from .service import AudioInRPCService + +AudioResponse = AudioIn.AudioResponse + +__all__ = [ + "AudioIn", + "AudioResponse", + "AudioInfo", + "AudioCodec", +] + +Registry.register_api( + ResourceRegistration( + AudioIn, + AudioInRPCService, + lambda name, channel: AudioInClient(name, channel), + ) +) diff --git a/src/viam/components/audio_in/audio_in.py b/src/viam/components/audio_in/audio_in.py new file mode 100644 index 000000000..303f2a1b6 --- /dev/null +++ b/src/viam/components/audio_in/audio_in.py @@ -0,0 +1,75 @@ +import abc +import sys +from typing import Final, Optional + +from viam.streams import Stream + +from viam.proto.common import GetPropertiesResponse +from viam.proto.component.audioin import GetAudioResponse +from viam.resource.types import API, RESOURCE_NAMESPACE_RDK, RESOURCE_TYPE_COMPONENT + +from ..component_base import ComponentBase + +if sys.version_info >= (3, 10): + from typing import TypeAlias +else: + from typing_extensions import TypeAlias + + +class AudioIn(ComponentBase): + """AudioIn represents a component that can capture audio. + + This acts as an abstract base class for any drivers representing specific + audio input implementations. This cannot be used on its own. If the ``__init__()`` function is + overridden, it must call the ``super().__init__()`` function. + """ + + API: Final = API( # pyright: ignore [reportIncompatibleVariableOverride] + RESOURCE_NAMESPACE_RDK, RESOURCE_TYPE_COMPONENT, "audio_in" + ) + + Properties: "TypeAlias" = GetPropertiesResponse + AudioResponse: "TypeAlias" = GetAudioResponse + AudioStream = Stream[AudioResponse] + + + @abc.abstractmethod + async def get_audio(self, codec: str, + duration_seconds: float, + previous_timestamp_ns:int, + *, timeout: Optional[float] = None, **kwargs) -> AudioStream: + """ + Get a stream of audio from the device + + :: + my_audio_in = AudioIn.from_robot(robot=machine, name="my_audio_in") + + stream = await my_audio_in.get_audio( + codec=AudioCodec.PCM16, + duration_seconds=10.0, + previous_timestamp_ns=0 + ) + + Args: + codec (str): The desired codec of the returned audio data + duration_seconds (float): duration of the stream. 0 = indefinite stream + previous_timestamp_ns (int): starting timestamp in nanoseconds for recording continuity. + Set to 0 to begin recording from the current time. + Returns: + AudioStream: stream of audio chunks. + ... + """ + + @abc.abstractmethod + async def get_properties(self, *, timeout: Optional[float] = None, **kwargs) -> Properties: + """ + Get the audio device's properties + + :: + my_audio_in = AudioIn.from_robot(robot=machine, name="my_audio_in") + properties = await my_audio_in.get_properties() + + Returns: + Properties: The properties of the audio in device. + ... + """ diff --git a/src/viam/components/audio_in/client.py b/src/viam/components/audio_in/client.py new file mode 100644 index 000000000..fd45c1f0c --- /dev/null +++ b/src/viam/components/audio_in/client.py @@ -0,0 +1,79 @@ +from typing import Any, Dict, List, Mapping, Optional +import uuid + +from grpclib.client import Channel + +from viam.proto.component.audioin import GetAudioRequest, GetAudioResponse +from viam.proto.common import ( + DoCommandRequest, + DoCommandResponse, + GetPropertiesRequest, + Geometry) +from grpclib.client import Stream as ClientStream +from viam.proto.component.audioin import AudioInServiceStub +from viam.resource.rpc_client_base import ReconfigurableResourceRPCClientBase +from viam.streams import StreamWithIterator + +from .audio_in import AudioIn +from viam.utils import ValueTypes, dict_to_struct, get_geometries, struct_to_dict + + +class AudioInClient(AudioIn, ReconfigurableResourceRPCClientBase): + + def __init__(self, name: str, channel: Channel) -> None: + self.channel = channel + self.client = AudioInServiceStub(channel) + super().__init__(name) + + + async def get_audio(self, + codec:str, + duration_seconds: float, + previous_timestamp_ns:int, + *, + extra: Optional[Dict[str, Any]] = None, + **kwargs, + ): + request = GetAudioRequest(name=self.name, codec = codec, + duration_seconds=duration_seconds, + previous_timestamp_nanoseconds = previous_timestamp_ns, + request_id = str(uuid.uuid4()), + extra=dict_to_struct(extra)) + async def read(): + md = kwargs.get("metadata", self.Metadata()).proto + audio_stream: ClientStream[GetAudioRequest, GetAudioResponse] + async with self.client.GetAudio.open(metadata=md) as audio_stream: + try: + await audio_stream.send_message(request, end=True) + async for response in audio_stream: + yield response + except Exception as e: + raise (e) + + return StreamWithIterator(read()) + + + async def get_properties( + self, + *, + timeout: Optional[float] = None, + **kwargs, + ) -> AudioIn.Properties: + md = kwargs.get("metadata", self.Metadata()).proto + return await self.client.GetProperties(GetPropertiesRequest(name=self.name), timeout=timeout, metadata=md) + + async def do_command( + self, + command: Mapping[str, ValueTypes], + *, + timeout: Optional[float] = None, + **kwargs, + ) -> Mapping[str, ValueTypes]: + md = kwargs.get("metadata", self.Metadata()).proto + request = DoCommandRequest(name=self.name, command=dict_to_struct(command)) + response: DoCommandResponse = await self.client.DoCommand(request, timeout=timeout, metadata=md) + return struct_to_dict(response.result) + + async def get_geometries(self, *, extra: Optional[Dict[str, Any]] = None, timeout: Optional[float] = None, **kwargs) -> List[Geometry]: + md = kwargs.get("metadata", self.Metadata()) + return await get_geometries(self.client, self.name, extra, timeout, md) diff --git a/src/viam/components/audio_in/service.py b/src/viam/components/audio_in/service.py new file mode 100644 index 000000000..1cc11c43a --- /dev/null +++ b/src/viam/components/audio_in/service.py @@ -0,0 +1,85 @@ +from grpclib.server import Stream +from h2.exceptions import StreamClosedError + + +from viam.logging import getLogger +from viam.proto.common import ( + DoCommandRequest, + DoCommandResponse, + GetPropertiesRequest, + GetPropertiesResponse, + GetGeometriesRequest, + GetGeometriesResponse +) +from viam.proto.component.audioin import ( + AudioInServiceBase, + GetAudioRequest, + GetAudioResponse +) +from viam.resource.rpc_service_base import ResourceRPCServiceBase +from viam.utils import dict_to_struct, struct_to_dict + +from .audio_in import AudioIn + +LOGGER = getLogger(__name__) + +class AudioInRPCService(AudioInServiceBase, ResourceRPCServiceBase[AudioIn]): + """ + gRPC Service for a generic audio in. + """ + + RESOURCE_TYPE = AudioIn + + + async def GetAudio(self, stream: Stream[GetAudioRequest, GetAudioResponse]) -> None: + request = await stream.recv_message() + assert request is not None + name = request.name + audio_in = self.get_resource(name) + audio_stream = await audio_in.get_audio(codec=request.codec, duration_seconds=request.duration_seconds, + previous_timestamp_ns=request.previous_timestamp_nanoseconds, metadata=stream.metadata) + async for response in audio_stream: + try: + response.request_id = request.request_id + await stream.send_message(response) + except StreamClosedError: + return + except Exception as e: + LOGGER.error(e) + return + + + async def GetProperties(self, stream: Stream[GetPropertiesRequest, GetPropertiesResponse]) -> None: + request = await stream.recv_message() + assert request is not None + name = request.name + audio_in = self.get_resource(name) + timeout = stream.deadline.time_remaining() if stream.deadline else None + properties = await audio_in.get_properties( + timeout=timeout, + metadata=stream.metadata, + ) + await stream.send_message(properties) + + async def DoCommand(self, stream: Stream[DoCommandRequest, DoCommandResponse]) -> None: + request = await stream.recv_message() + assert request is not None + name = request.name + audio_in= self.get_resource(name) + timeout = stream.deadline.time_remaining() if stream.deadline else None + result = await audio_in.do_command( + command=struct_to_dict(request.command), + timeout=timeout, + metadata=stream.metadata, + ) + response = DoCommandResponse(result=dict_to_struct(result)) + await stream.send_message(response) + + async def GetGeometries(self, stream: Stream[GetGeometriesRequest, GetGeometriesResponse]) -> None: + request = await stream.recv_message() + assert request is not None + arm = self.get_resource(request.name) + timeout = stream.deadline.time_remaining() if stream.deadline else None + geometries = await arm.get_geometries(extra=struct_to_dict(request.extra), timeout=timeout) + response = GetGeometriesResponse(geometries=geometries) + await stream.send_message(response) diff --git a/src/viam/components/audio_input/audio_input.py b/src/viam/components/audio_input/audio_input.py index f48d31f46..e808543ea 100644 --- a/src/viam/components/audio_input/audio_input.py +++ b/src/viam/components/audio_input/audio_input.py @@ -15,7 +15,8 @@ class AudioInput(ComponentBase, StreamSource[Audio]): - """AudioInput represents a component that can capture audio. + """ DEPRECATED: AudioInput is deprecated, use AudioIn instead + AudioInput represents a component that can capture audio. This acts as an abstract base class for any drivers representing specific audio input implementations. This cannot be used on its own. If the ``__init__()`` function is diff --git a/src/viam/components/audio_input/client.py b/src/viam/components/audio_input/client.py index 5dbb87db5..3826b368b 100644 --- a/src/viam/components/audio_input/client.py +++ b/src/viam/components/audio_input/client.py @@ -1,4 +1,5 @@ from typing import Any, AsyncIterator, Dict, List, Mapping, Optional, Union +import warnings from grpclib.client import Channel @@ -21,10 +22,17 @@ class AudioInputClient(AudioInput, ReconfigurableResourceRPCClientBase): """ + DEPRECATED: AudioInput is deprecated, use AudioIn instead. gRPC client for the AudioInput component. """ def __init__(self, name: str, channel: Channel): + warnings.warn( + "AudioInputClient is deprecated and will be removed in a future release. " + "Use AudioIn instead.", + DeprecationWarning, + stacklevel=2, + ) self.channel = channel self.client = AudioInputServiceStub(channel) super().__init__(name) diff --git a/src/viam/components/audio_input/service.py b/src/viam/components/audio_input/service.py index 41acd2257..4ecc5f5b7 100644 --- a/src/viam/components/audio_input/service.py +++ b/src/viam/components/audio_input/service.py @@ -1,4 +1,5 @@ import wave +import warnings from datetime import timedelta from io import BytesIO @@ -25,9 +26,19 @@ class AudioInputRPCService(AudioInputServiceBase, ResourceRPCServiceBase[AudioInput]): """ + DEPRECATED: AudioInput is deprecated, use AudioIn instead. gRPC Service for a generic AudioInput """ + def __init__(self, *args, **kwargs): + warnings.warn( + "AudioInput is deprecated and will be removed in a future release. " + "Use AudioIn instead.", + DeprecationWarning, + stacklevel=2, + ) + super().__init__(*args, **kwargs) + RESOURCE_TYPE = AudioInput async def Chunks(self, stream: Stream[ChunksRequest, ChunksResponse]) -> None: diff --git a/src/viam/components/audio_out/__init__.py b/src/viam/components/audio_out/__init__.py new file mode 100644 index 000000000..170204575 --- /dev/null +++ b/src/viam/components/audio_out/__init__.py @@ -0,0 +1,21 @@ +from viam.resource.registry import Registry, ResourceRegistration + +from viam.proto.common import AudioInfo +from viam.media.audio import AudioCodec +from .audio_out import AudioOut +from .client import AudioOutClient +from .service import AudioOutRPCService + +__all__ = [ + "AudioOut", + "AudioInfo", + "AudioCodec", +] + +Registry.register_api( + ResourceRegistration( + AudioOut, + AudioOutRPCService, + lambda name, channel: AudioOutClient(name, channel), + ) +) diff --git a/src/viam/components/audio_out/audio_out.py b/src/viam/components/audio_out/audio_out.py new file mode 100644 index 000000000..ae8c21e50 --- /dev/null +++ b/src/viam/components/audio_out/audio_out.py @@ -0,0 +1,72 @@ +import abc +import sys +from typing import Any, Dict, Final, Optional + +from viam.proto.common import GetPropertiesResponse +from viam.resource.types import API, RESOURCE_NAMESPACE_RDK, RESOURCE_TYPE_COMPONENT + +from ..component_base import ComponentBase +from . import AudioInfo + +if sys.version_info >= (3, 10): + from typing import TypeAlias +else: + from typing_extensions import TypeAlias + + +class AudioOut(ComponentBase): + """AudioOut represents a component that can play audio. + + This acts as an abstract base class for any drivers representing specific + audio output implementations. This cannot be used on its own. If the ``__init__()`` function is + overridden, it must call the ``super().__init__()`` function. + """ + + API: Final = API( # pyright: ignore [reportIncompatibleVariableOverride] + RESOURCE_NAMESPACE_RDK, RESOURCE_TYPE_COMPONENT, "audio_out" + ) + + Properties: "TypeAlias" = GetPropertiesResponse + + @abc.abstractmethod + async def play(self, + data: bytes, + info: Optional[AudioInfo] = None, + *, + extra: Optional[Dict[str, Any]] = None, + timeout: Optional[float] = None, + **kwargs) -> None: + """ + Play the given audio data. + + :: + my_audio_out = AudioOut.from_robot(robot=machine, name="my_audio_out") + + # With audio info + audio_info = AudioInfo(codec=AudioCodec.PCM16, sample_rate_hz=44100, num_channels=2) + await my_audio_out.play(audio_data, audio_info) + + # Without audio info (when codec encodes information within audio_data) + await my_audio_out.play(audio_data) + + Args: + data: audio bytes to play + info: (optional) information about the audio data such as codec, sample rate, and channel count + """ + + @abc.abstractmethod + async def get_properties(self, + *, + extra: Optional[Dict[str, Any]] = None, + timeout: Optional[float] = None, + **kwargs) -> Properties: + """ + Get the audio output device's properties. + + :: + my_audio_out = AudioOut.from_robot(robot=machine, name="my_audio_out") + properties = await my_audio_out.get_properties() + + Returns: + Properties: The properties of the audio output device + """ diff --git a/src/viam/components/audio_out/client.py b/src/viam/components/audio_out/client.py new file mode 100644 index 000000000..2b986ea0e --- /dev/null +++ b/src/viam/components/audio_out/client.py @@ -0,0 +1,79 @@ +from typing import Any, Dict, List, Mapping, Optional + +from grpclib.client import Channel + +from viam.proto.common import ( + DoCommandRequest, + DoCommandResponse, + GetPropertiesRequest, + GetPropertiesResponse, + Geometry +) +from viam.proto.component.audioout import ( + AudioOutServiceStub, + PlayRequest +) + + +from viam.resource.rpc_client_base import ReconfigurableResourceRPCClientBase + +from viam.utils import ValueTypes, dict_to_struct, get_geometries, struct_to_dict + +from .audio_out import AudioOut, AudioInfo + + +class AudioOutClient(AudioOut, ReconfigurableResourceRPCClientBase): + """gRPC client for AudioOut component.""" + + def __init__(self, name: str, channel: Channel) -> None: + self.channel = channel + self.client = AudioOutServiceStub(channel) + super().__init__(name) + + async def play(self, + data: bytes, + info: Optional[AudioInfo] = None, + *, + extra: Optional[Dict[str, Any]] = None, + timeout: Optional[float] = None, + **kwargs) -> None: + if extra is None: + extra = {} + + md = kwargs.get("metadata", self.Metadata()).proto + request = PlayRequest( + name=self.name, + audio_data=data, + audio_info=info, + extra=dict_to_struct(extra), + ) + await self.client.Play(request, timeout=timeout, metadata=md) + + async def get_properties(self, + *, + extra: Optional[Dict[str, Any]] = None, + timeout: Optional[float] = None, + **kwargs) -> AudioOut.Properties: + if extra is None: + extra = {} + + md = kwargs.get("metadata", self.Metadata()).proto + request = GetPropertiesRequest(name=self.name, extra=dict_to_struct(extra)) + response: GetPropertiesResponse = await self.client.GetProperties(request, timeout=timeout, metadata=md) + return response + + async def do_command( + self, + command: Mapping[str, ValueTypes], + *, + timeout: Optional[float] = None, + **kwargs, + ) -> Mapping[str, ValueTypes]: + md = kwargs.get("metadata", self.Metadata()).proto + request = DoCommandRequest(name=self.name, command=dict_to_struct(command)) + response: DoCommandResponse = await self.client.DoCommand(request, timeout=timeout, metadata=md) + return struct_to_dict(response.result) + + async def get_geometries(self, *, extra: Optional[Dict[str, Any]] = None, timeout: Optional[float] = None, **kwargs) -> List[Geometry]: + md = kwargs.get("metadata", self.Metadata()) + return await get_geometries(self.client, self.name, extra, timeout, md) diff --git a/src/viam/components/audio_out/service.py b/src/viam/components/audio_out/service.py new file mode 100644 index 000000000..48a97a33a --- /dev/null +++ b/src/viam/components/audio_out/service.py @@ -0,0 +1,63 @@ +from grpclib.server import Stream + +from viam.proto.common import ( + DoCommandRequest, + DoCommandResponse, + GetGeometriesRequest, + GetGeometriesResponse, + GetPropertiesRequest, + GetPropertiesResponse, +) +from viam.proto.component.audioout import ( + AudioOutServiceBase, + PlayRequest, + PlayResponse, +) +from viam.resource.rpc_service_base import ResourceRPCServiceBase +from viam.utils import dict_to_struct, struct_to_dict + +from .audio_out import AudioOut + + +class AudioOutRPCService(AudioOutServiceBase, ResourceRPCServiceBase[AudioOut]): + """gRPC service for AudioOut component.""" + + RESOURCE_TYPE = AudioOut + + async def Play(self, stream: Stream[PlayRequest, PlayResponse]) -> None: + request = await stream.recv_message() + assert request is not None + name = request.name + audio_out = self.get_resource(name) + # Check if audio_info was provided in the request + audio_info = request.audio_info if request.HasField("audio_info") else None + timeout = stream.deadline.time_remaining() if stream.deadline else None + await audio_out.play(request.audio_data, audio_info, extra=struct_to_dict(request.extra), timeout=timeout, metadata=stream.metadata) + await stream.send_message(PlayResponse()) + + async def GetProperties(self, stream: Stream[GetPropertiesRequest, GetPropertiesResponse]) -> None: + request = await stream.recv_message() + assert request is not None + name = request.name + audio_out = self.get_resource(name) + timeout = stream.deadline.time_remaining() if stream.deadline else None + properties = await audio_out.get_properties(extra=struct_to_dict(request.extra), timeout=timeout, metadata=stream.metadata) + await stream.send_message(properties) + + async def DoCommand(self, stream: Stream[DoCommandRequest, DoCommandResponse]) -> None: + request = await stream.recv_message() + assert request is not None + audio_out = self.get_resource(request.name) + timeout = stream.deadline.time_remaining() if stream.deadline else None + result = await audio_out.do_command(command=struct_to_dict(request.command), timeout=timeout, metadata=stream.metadata) + response = DoCommandResponse(result=dict_to_struct(result)) + await stream.send_message(response) + + async def GetGeometries(self, stream: Stream[GetGeometriesRequest, GetGeometriesResponse]) -> None: + request = await stream.recv_message() + assert request is not None + audio_out = self.get_resource(request.name) + timeout = stream.deadline.time_remaining() if stream.deadline else None + geometries = await audio_out.get_geometries(extra=struct_to_dict(request.extra), timeout=timeout) + response = GetGeometriesResponse(geometries=geometries) + await stream.send_message(response) diff --git a/src/viam/media/audio.py b/src/viam/media/audio.py index 56e5a8800..7dbf4846c 100644 --- a/src/viam/media/audio.py +++ b/src/viam/media/audio.py @@ -1,4 +1,5 @@ from dataclasses import dataclass +from enum import Enum from viam.proto.component.audioinput import AudioChunk, AudioChunkInfo from viam.streams import Stream, StreamReader @@ -14,3 +15,29 @@ class Audio: AudioReader = StreamReader[Audio] AudioStream = Stream[Audio] + +class AudioCodec(str, Enum): + """Common audio codec identifiers. + + These constants represent commonly supported audio codecs + for audioin and audioout components. + + Example:: + + from viam.components.codecs import AudioCodec + from viam.proto.common import AudioInfo + + audio_info = AudioInfo( + codec=AudioCodec.PCM16, + sample_rate_hz=44100, + num_channels=2 + ) + """ + + PCM16 = "pcm16" + PCM32 = "pcm32" + PCM32_FLOAT = "pcm32_float" + MP3 = "mp3" + AAC = "aac" + OPUS = "opus" + FLAC = "flac" diff --git a/tests/mocks/components.py b/tests/mocks/components.py index fd80dc51e..1e3f471ba 100644 --- a/tests/mocks/components.py +++ b/tests/mocks/components.py @@ -14,6 +14,8 @@ from viam.components.arm import Arm, JointPositions, KinematicsFileFormat from viam.components.audio_input import AudioInput +from viam.components.audio_in import AudioIn, AudioResponse +from viam.components.audio_out import AudioOut from viam.components.base import Base from viam.components.board import Board, Tick from viam.components.button import Button @@ -39,6 +41,9 @@ from viam.proto.component.encoder import PositionType from viam.streams import StreamWithIterator from viam.utils import SensorReading, ValueTypes +from viam.proto.common import AudioInfo +from viam.proto.component.audioin import AudioChunk as Chunk + GEOMETRIES = [ Geometry(center=Pose(x=1, y=2, z=3, o_x=2, o_y=3, o_z=4, theta=20), sphere=Sphere(radius_mm=2)), @@ -114,6 +119,58 @@ async def do_command(self, command: Mapping[str, ValueTypes], *, timeout: Option return {"command": command} +class MockAudioIn(AudioIn): + def __init__(self, name: str, properties: AudioIn.Properties): + super().__init__(name) + self.geometries = GEOMETRIES + self.properties = properties + self.timeout: Optional[float] = None + self.extra: Optional[Dict[str, Any]] = None + + async def get_audio(self, codec: str, duration_seconds: float, previous_timestamp_ns: int, + *, extra: Optional[Dict[str, Any]] = None, timeout: Optional[float] = None, **kwargs): + async def read() -> AsyncIterator[AudioResponse]: + # Generate mock audio chunks + for i in range(2): + chunk_data = f"audio_chunk_{i}".encode("utf-8") + timestamp_start = previous_timestamp_ns + i * 1000000000 # 1 second intervals in nanoseconds + timestamp_end = timestamp_start + 1000000000 + + audio_chunk = Chunk( + audio_data=chunk_data, + audio_info=AudioInfo( + codec=codec, + sample_rate_hz=self.properties.sample_rate_hz, + num_channels=self.properties.num_channels + ), + sequence=i, + start_timestamp_nanoseconds=timestamp_start, + end_timestamp_nanoseconds=timestamp_end + ) + + audio_response = AudioResponse( + audio=audio_chunk, + request_id="mock_request" + ) + yield audio_response + + self.extra = extra + self.timeout = timeout + return StreamWithIterator(read()) + + async def get_properties(self, *, extra: Optional[Dict[str, Any]] = None, timeout: Optional[float] = None, **kwargs) -> AudioIn.Properties: + self.extra = extra + self.timeout = timeout + return self.properties + + async def get_geometries(self, *, extra: Optional[Dict[str, Any]] = None, timeout: Optional[float] = None) -> List[Geometry]: + self.extra = extra + self.timeout = timeout + return self.geometries + + async def do_command(self, command: Mapping[str, ValueTypes], *, timeout: Optional[float] = None, **kwargs) -> Mapping[str, ValueTypes]: + return {"command": command} + class MockAudioInput(AudioInput): def __init__(self, name: str, properties: AudioInput.Properties): super().__init__(name) @@ -148,6 +205,33 @@ async def get_geometries(self, *, extra: Optional[Dict[str, Any]] = None, timeou async def do_command(self, command: Mapping[str, ValueTypes], *, timeout: Optional[float] = None, **kwargs) -> Mapping[str, ValueTypes]: return {"command": command} +class MockAudioOut(AudioOut): + def __init__(self, name: str, properties: AudioOut.Properties): + super().__init__(name) + self.play_called = False + self.properties = properties + self.last_audio_data = None + self.last_audio_info = None + self.geometries = GEOMETRIES + self.timeout: Optional[float] = None + self.extra: Optional[Dict[str, Any]] = None + + async def play(self, data: bytes, info: Optional[AudioInfo] = None, *, extra: Optional[Dict[str, Any]] = None, timeout: Optional[float] = None, **kwargs) -> None: + self.play_called = True + self.last_audio_data = data + self.last_audio_info = info + + async def get_properties(self, *, extra: Optional[Dict[str, Any]] = None, timeout: Optional[float] = None, **kwargs): + self.timeout = timeout + return self.properties + + async def do_command(self, command: Mapping[str, ValueTypes], *, timeout: Optional[float] = None, **kwargs) -> Mapping[str, ValueTypes]: + return command + + async def get_geometries(self, *, extra: Optional[Dict[str, Any]] = None, timeout: Optional[float] = None, **kwargs): + self.extra = extra + self.timeout = timeout + return self.geometries class MockBase(Base): def __init__(self, name: str): @@ -1077,3 +1161,4 @@ async def push(self, *, extra: Optional[Mapping[str, Any]] = None, timeout: Opti async def do_command(self, command: Mapping[str, ValueTypes], *, timeout: Optional[float] = None, **kwargs) -> Mapping[str, ValueTypes]: return {"command": command} + diff --git a/tests/test_audio_in.py b/tests/test_audio_in.py new file mode 100644 index 000000000..6182b829b --- /dev/null +++ b/tests/test_audio_in.py @@ -0,0 +1,189 @@ +import pytest +from grpclib.testing import ChannelFor + +from viam.components.audio_in import AudioIn, AudioInClient, AudioInRPCService, AudioResponse +from viam.components.generic.service import GenericRPCService +from viam.proto.common import ( + DoCommandRequest, + DoCommandResponse, + GetGeometriesRequest, + GetGeometriesResponse, + GetPropertiesRequest, + GetPropertiesResponse, +) +from viam.proto.component.audioin import ( + AudioInServiceStub, + GetAudioRequest +) +from viam.resource.manager import ResourceManager +from viam.utils import dict_to_struct, struct_to_dict + +from . import loose_approx +from .mocks.components import GEOMETRIES, MockAudioIn + +# Test properties for the mock AudioIn +PROPERTIES = AudioIn.Properties( + supported_codecs=["pcm16", "mp3"], + sample_rate_hz=44100, + num_channels=2, +) + + +@pytest.fixture(scope="function") +def audio_in() -> MockAudioIn: + return MockAudioIn(name="audio_in", properties=PROPERTIES) + + +@pytest.fixture(scope="function") +def service(audio_in: MockAudioIn) -> AudioInRPCService: + manager = ResourceManager([audio_in]) + return AudioInRPCService(manager) + + +@pytest.fixture(scope="function") +def generic_service(audio_in: MockAudioIn) -> GenericRPCService: + manager = ResourceManager([audio_in]) + return GenericRPCService(manager) + + +class TestAudioIn: + async def test_get_audio(self, audio_in: AudioIn): + previous_timestamp = 1000000000 # 1 second in nanoseconds + codec = "pcm16" + duration_seconds = 2.0 + + stream = await audio_in.get_audio(codec, duration_seconds, previous_timestamp) + + chunk_count = 0 + async for response in stream: + assert response.audio.audio_data is not None + assert response.audio.audio_info.codec == codec + assert response.audio.audio_info.sample_rate_hz == PROPERTIES.sample_rate_hz + assert response.audio.audio_info.num_channels == PROPERTIES.num_channels + assert response.audio.sequence == chunk_count + assert response.audio.start_timestamp_nanoseconds >= previous_timestamp + assert response.audio.end_timestamp_nanoseconds > response.audio.start_timestamp_nanoseconds + chunk_count += 1 + + assert chunk_count == 2 # Should have received 2 chunks from mock + + async def test_get_properties(self, audio_in: AudioIn): + properties = await audio_in.get_properties() + assert properties.supported_codecs == PROPERTIES.supported_codecs + assert properties.sample_rate_hz == PROPERTIES.sample_rate_hz + assert properties.num_channels == PROPERTIES.num_channels + + async def test_do_command(self, audio_in: AudioIn): + command = {"command": "args"} + resp = await audio_in.do_command(command) + assert resp == {"command": command} + + @pytest.mark.asyncio + async def test_get_geometries(self, audio_in: AudioIn): + geometries = await audio_in.get_geometries() + assert geometries == GEOMETRIES + +class TestService: + async def test_get_audio(self, audio_in: AudioIn, service: AudioInRPCService): + async with ChannelFor([service]) as channel: + client = AudioInServiceStub(channel) + previous_timestamp = 1000000000 + codec = "pcm16" + duration_seconds = 2.0 + + request = GetAudioRequest( + name=audio_in.name, + codec=codec, + duration_seconds=duration_seconds, + previous_timestamp_nanoseconds=previous_timestamp + ) + + async with client.GetAudio.open() as stream: + await stream.send_message(request) + + chunk_count = 0 + async for response in stream: + assert isinstance(response, AudioResponse) + assert response.audio.audio_data is not None + assert response.audio.audio_info.codec == codec + assert response.audio.audio_info.sample_rate_hz == PROPERTIES.sample_rate_hz + assert response.audio.audio_info.num_channels == PROPERTIES.num_channels + assert response.audio.sequence == chunk_count + chunk_count += 1 + + assert chunk_count > 0 + + async def test_get_properties(self, audio_in: MockAudioIn, service: AudioInRPCService): + assert audio_in.timeout is None + async with ChannelFor([service]) as channel: + client = AudioInServiceStub(channel) + response: GetPropertiesResponse = await client.GetProperties( + GetPropertiesRequest(name=audio_in.name), timeout=1.82 + ) + assert response.supported_codecs == PROPERTIES.supported_codecs + assert response.sample_rate_hz == PROPERTIES.sample_rate_hz + assert response.num_channels == PROPERTIES.num_channels + assert audio_in.timeout == loose_approx(1.82) + + async def test_do_command(self, audio_in: MockAudioIn, service: AudioInRPCService): + async with ChannelFor([service]) as channel: + client = AudioInServiceStub(channel) + command = {"command": "args"} + request = DoCommandRequest(name=audio_in.name, command=dict_to_struct(command)) + response: DoCommandResponse = await client.DoCommand(request) + result = struct_to_dict(response.result) + assert result == {"command": command} + + async def test_get_geometries(self, audio_in: MockAudioIn, service: AudioInRPCService): + async with ChannelFor([service]) as channel: + client = AudioInServiceStub(channel) + request = GetGeometriesRequest(name=audio_in.name) + response: GetGeometriesResponse = await client.GetGeometries(request) + assert [geometry for geometry in response.geometries] == GEOMETRIES + + +class TestClient: + async def test_get_audio(self, audio_in: AudioIn, service: AudioInRPCService): + async with ChannelFor([service]) as channel: + client = AudioInClient(audio_in.name, channel) + + previous_timestamp = 1000000000 + codec = "pcm16" + duration_seconds = 2.0 + + stream = await client.get_audio(codec, duration_seconds, previous_timestamp) + + chunk_count = 0 + async for resp in stream: + assert resp.audio.audio_data is not None + assert resp.audio.audio_info.codec == codec + assert resp.audio.audio_info.sample_rate_hz == PROPERTIES.sample_rate_hz + assert resp.audio.audio_info.num_channels == PROPERTIES.num_channels + assert resp.audio.sequence == chunk_count + chunk_count += 1 + + assert chunk_count > 0 + + async def test_get_properties(self, audio_in: MockAudioIn, service: AudioInRPCService): + assert audio_in.timeout is None + async with ChannelFor([service]) as channel: + client = AudioInClient(audio_in.name, channel) + properties = await client.get_properties(timeout=4.4) + assert properties.supported_codecs == PROPERTIES.supported_codecs + assert properties.sample_rate_hz == PROPERTIES.sample_rate_hz + assert properties.num_channels == PROPERTIES.num_channels + assert audio_in.timeout == loose_approx(4.4) + + async def test_do_command(self, audio_in: AudioIn, service: AudioInRPCService): + async with ChannelFor([service]) as channel: + client = AudioInClient(audio_in.name, channel) + command = {"command": "args"} + resp = await client.do_command(command) + assert resp == {"command": command} + + @pytest.mark.asyncio + async def test_get_geometries(self, audio_in: AudioIn, service: AudioInRPCService): + async with ChannelFor([service]) as channel: + client = AudioInClient(audio_in.name, channel) + geometries = await client.get_geometries() + assert geometries == GEOMETRIES diff --git a/tests/test_audio_out.py b/tests/test_audio_out.py new file mode 100644 index 000000000..a43800854 --- /dev/null +++ b/tests/test_audio_out.py @@ -0,0 +1,204 @@ +import pytest +from grpclib.testing import ChannelFor + +from viam.components.audio_out import AudioOutClient, AudioOutRPCService, AudioOut +from viam.proto.common import ( + GetGeometriesResponse, + AudioInfo, + GetPropertiesRequest, + GetPropertiesResponse, + DoCommandRequest, +) +from viam.resource.manager import ResourceManager +from viam.utils import dict_to_struct, struct_to_dict +from viam.proto.component.audioout import PlayRequest, AudioOutServiceStub +from . import loose_approx + +from .mocks.components import MockAudioOut, GEOMETRIES + + +# Test properties for the mock AudioIn +PROPERTIES = AudioOut.Properties( + supported_codecs=["pcm16", "mp3"], + sample_rate_hz=44100, + num_channels=2, +) + +@pytest.fixture(scope="function") +def audio_out() -> MockAudioOut: + return MockAudioOut(name="audio_out", properties=PROPERTIES) + + +@pytest.fixture(scope="function") +def service(audio_out: MockAudioOut) -> AudioOutRPCService: + manager = ResourceManager([audio_out]) + return AudioOutRPCService(manager) + + +class TestAudioOut: + @pytest.mark.asyncio + async def test_play(self, audio_out: MockAudioOut): + audio_data = b"test_audio_data" + audio_info = AudioInfo(codec="pcm16", sample_rate_hz=44100, num_channels=2) + await audio_out.play(audio_data, audio_info) + assert audio_out.play_called + assert audio_out.last_audio_data == audio_data + assert audio_out.last_audio_info == audio_info + + @pytest.mark.asyncio + async def test_play_without_audio_info(self, audio_out: MockAudioOut): + audio_data = b"test_audio_data" + await audio_out.play(audio_data) + assert audio_out.play_called + assert audio_out.last_audio_data == audio_data + assert audio_out.last_audio_info is None + + @pytest.mark.asyncio + async def test_get_properties(self, audio_out: MockAudioOut): + properties = await audio_out.get_properties() + assert isinstance(properties, GetPropertiesResponse) + assert properties.supported_codecs == PROPERTIES.supported_codecs + assert properties.sample_rate_hz == PROPERTIES.sample_rate_hz + assert properties.num_channels == PROPERTIES.num_channels + + @pytest.mark.asyncio + async def test_do_command(self, audio_out: MockAudioOut): + command = {"test": "command"} + result = await audio_out.do_command(command) + assert result == command + + @pytest.mark.asyncio + async def test_get_geometries(self, audio_out: MockAudioOut): + geometries = await audio_out.get_geometries() + assert geometries == GEOMETRIES + +class TestService: + @pytest.mark.asyncio + async def test_play(self, audio_out: MockAudioOut, service: AudioOutRPCService): + + audio_data = b"test_audio_data" + audio_info = AudioInfo(codec="pcm16", sample_rate_hz=44100, num_channels=2) + + async with ChannelFor([service]) as channel: + client = AudioOutServiceStub(channel) + request = PlayRequest( + name=audio_out.name, + audio_data=audio_data, + audio_info=audio_info, + extra=dict_to_struct({}), + ) + + await client.Play(request) + + assert audio_out.play_called + assert audio_out.last_audio_data == audio_data + assert audio_out.last_audio_info == audio_info + + @pytest.mark.asyncio + async def test_play_without_audio_info(self, audio_out: MockAudioOut, service: AudioOutRPCService): + audio_data = b"test_audio_data" + + async with ChannelFor([service]) as channel: + client = AudioOutServiceStub(channel) + request = PlayRequest( + name=audio_out.name, + audio_data=audio_data, + audio_info=None, + extra=dict_to_struct({}), + ) + + await client.Play(request) + + assert audio_out.play_called + assert audio_out.last_audio_data == audio_data + assert audio_out.last_audio_info is None + + @pytest.mark.asyncio + async def test_get_properties(self, audio_out: MockAudioOut, service: AudioOutRPCService): + async with ChannelFor([service]) as channel: + client = AudioOutServiceStub(channel) + response: GetPropertiesResponse = await client.GetProperties( + GetPropertiesRequest(name=audio_out.name), timeout=1.82 + ) + assert response.supported_codecs == PROPERTIES.supported_codecs + assert response.sample_rate_hz == PROPERTIES.sample_rate_hz + assert response.num_channels == PROPERTIES.num_channels + assert audio_out.timeout == loose_approx(1.82) + + @pytest.mark.asyncio + async def test_do_command(self, audio_out: MockAudioOut, service: AudioOutRPCService): + + command = {"test": "command"} + + async with ChannelFor([service]) as channel: + client = AudioOutServiceStub(channel) + request = DoCommandRequest(name=audio_out.name, command=dict_to_struct(command)) + response = await client.DoCommand(request) + + result = struct_to_dict(response.result) + assert result == command + + @pytest.mark.asyncio + async def test_get_geometries(self, audio_out: MockAudioOut, service: AudioOutRPCService): + from viam.proto.common import GetGeometriesRequest + + async with ChannelFor([service]) as channel: + client = AudioOutServiceStub(channel) + request = GetGeometriesRequest(name=audio_out.name, extra=dict_to_struct({})) + response = await client.GetGeometries(request) + + assert isinstance(response, GetGeometriesResponse) + assert [geometry for geometry in response.geometries] == GEOMETRIES + + +class TestClient: + @pytest.mark.asyncio + async def test_play(self, audio_out: MockAudioOut, service: AudioOutRPCService): + async with ChannelFor([service]) as channel: + client = AudioOutClient(audio_out.name, channel) + audio_data = b"test_audio_data" + audio_info = AudioInfo(codec="pcm16", sample_rate_hz=44100, num_channels=2) + + await client.play(audio_data, audio_info) + + assert audio_out.play_called + assert audio_out.last_audio_data == audio_data + assert audio_out.last_audio_info == audio_info + + @pytest.mark.asyncio + async def test_play_without_audio_info(self, audio_out: MockAudioOut, service: AudioOutRPCService): + async with ChannelFor([service]) as channel: + client = AudioOutClient(audio_out.name, channel) + audio_data = b"test_audio_data" + + await client.play(audio_data) + + assert audio_out.play_called + assert audio_out.last_audio_data == audio_data + assert audio_out.last_audio_info is None + + @pytest.mark.asyncio + async def test_get_properties(self, audio_out: MockAudioOut,service: AudioOutRPCService): + async with ChannelFor([service]) as channel: + client = AudioOutClient(audio_out.name, channel) + properties = await client.get_properties(timeout=4.4) + assert isinstance(properties, GetPropertiesResponse) + assert properties.supported_codecs == PROPERTIES.supported_codecs + assert properties.sample_rate_hz == PROPERTIES.sample_rate_hz + assert properties.num_channels == PROPERTIES.num_channels + assert audio_out.timeout == loose_approx(4.4) + + @pytest.mark.asyncio + async def test_do_command(self, audio_out: MockAudioOut, service: AudioOutRPCService): + async with ChannelFor([service]) as channel: + client = AudioOutClient(audio_out.name, channel) + command = {"command": "args"} + resp = await client.do_command(command) + assert resp == command + + @pytest.mark.asyncio + async def test_get_geometries(self, audio_out: MockAudioOut, service: AudioOutRPCService): + async with ChannelFor([service]) as channel: + client = AudioOutClient(audio_out.name, channel) + geometries = await client.get_geometries() + assert geometries == GEOMETRIES