Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 3 additions & 6 deletions src/frequenz/client/dispatch/_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -270,12 +270,9 @@ def _get_stream(
request = StreamMicrogridDispatchesRequest(microgrid_id=int(microgrid_id))
broadcaster = GrpcStreamBroadcaster(
stream_name="StreamMicrogridDispatches",
stream_method=lambda: cast(
AsyncIterator[StreamMicrogridDispatchesResponse],
self.stub.StreamMicrogridDispatches(
request,
timeout=self._stream_timeout_seconds,
),
stream_method=lambda: self.stub.StreamMicrogridDispatches(
request,
timeout=self._stream_timeout_seconds,
),
Comment on lines -273 to 276
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah, interesting. Maybe other projects could be affected about this, if they didn't remove the casts after moving to use the proper async stub. 🤔

Once more life shows that there are no small or improbable breaking changes 😢

transform=DispatchEvent.from_protobuf,
retry_strategy=LinearBackoff(interval=1, limit=None),
Expand Down
46 changes: 32 additions & 14 deletions src/frequenz/client/dispatch/test/_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
from dataclasses import dataclass, replace
from datetime import datetime, timezone
from typing import AsyncIterator
from unittest.mock import AsyncMock, MagicMock

import grpc
import grpc.aio
Expand Down Expand Up @@ -109,7 +110,7 @@ async def ListMicrogridDispatches(
),
)

async def StreamMicrogridDispatches(
def StreamMicrogridDispatches(
self,
request: StreamMicrogridDispatchesRequest,
timeout: int = 5, # pylint: disable=unused-argument
Expand All @@ -122,20 +123,37 @@ async def StreamMicrogridDispatches(

Returns:
An async generator for dispatch changes.

Yields:
An event for each dispatch change.
"""
receiver = self._stream_channel.new_receiver()

async for message in receiver:
_logger.debug("Received message: %s", message)
if message.microgrid_id == MicrogridId(request.microgrid_id):
response = StreamMicrogridDispatchesResponse(
event=message.event.event.value,
dispatch=message.event.dispatch.to_protobuf(),
)
yield response

async def stream() -> AsyncIterator[StreamMicrogridDispatchesResponse]:
"""Stream microgrid dispatches changes."""
_logger.debug("Starting stream for microgrid %s", request.microgrid_id)
Comment on lines +128 to +130
Copy link

Copilot AI Sep 2, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[nitpick] The nested stream() function should be extracted to a separate method or moved outside the method to improve readability and maintainability.

Copilot uses AI. Check for mistakes.

receiver = self._stream_channel.new_receiver()

async for message in receiver:
_logger.debug("Received message: %s", message)
if message.microgrid_id == MicrogridId(request.microgrid_id):
response = StreamMicrogridDispatchesResponse(
event=message.event.event.value,
dispatch=message.event.dispatch.to_protobuf(),
)
yield response
else:
_logger.debug(
"Skipping message for microgrid %s",
message.microgrid_id,
)

_logger.debug("Creating mock stream for microgrid %s", request.microgrid_id)

mock_stream = MagicMock(name="StreamMicrogridDispatches")
mock_stream.__aiter__.side_effect = stream
mock_stream.initial_metadata = AsyncMock(
side_effect=lambda: _logger.debug(
"Initial metadata requested for microgrid %s", request.microgrid_id
)
)
return mock_stream

# pylint: disable=too-many-branches
@staticmethod
Expand Down