Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
22 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion docs/source/concurrency.rst
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ In the case of properties, the HTTP response is only returned once the `.Thing`

Many of the functions that handle HTTP requests are asynchronous, running in an :mod:`anyio` event loop. This enables many HTTP connections to be handled at once with good efficiency. The `anyio documentation`_ describes the functions that link between async and threaded code. When the LabThings server is started, we create an :class:`anyio.from_thread.BlockingPortal`, which allows threaded code to run code asynchronously in the event loop.

An action can obtain the blocking portal using the `~labthings_fastapi.dependencies.blocking_portal.BlockingPortal` dependency, i.e. by declaring an argument of that type. This avoids referring to the blocking portal through a global variable, which could lead to confusion if there are multiple event loops, e.g. during testing.
An action can run async code using its server interface. See `.ThingServerInterface.start_async_task_soon` for details.

There are relatively few occasions when `.Thing` code will need to consider this explicitly: more usually the blocking portal will be obtained by a LabThings function, for example the `.MJPEGStream` class.

Expand Down
6 changes: 3 additions & 3 deletions docs/source/dependencies/example.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
import labthings_fastapi as lt
from labthings_fastapi.example_things import MyThing

MyThingClient = lt.deps.direct_thing_client_class(MyThing, "/mything/")
MyThingClient = lt.deps.direct_thing_client_class(MyThing, "mything")
MyThingDep = Annotated[MyThingClient, Depends()]


Expand All @@ -19,8 +19,8 @@ def increment_counter(self, my_thing: MyThingDep) -> None:


server = lt.ThingServer()
server.add_thing(MyThing(), "/mything/")
server.add_thing(TestThing(), "/testthing/")
server.add_thing("mything", MyThing)
server.add_thing("testthing", TestThing)

if __name__ == "__main__":
import uvicorn
Expand Down
2 changes: 1 addition & 1 deletion docs/source/quickstart/counter.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ def slowly_increase_counter(self) -> None:
server = lt.ThingServer()

# The line below creates a TestThing instance and adds it to the server
server.add_thing(TestThing(), "/counter/")
server.add_thing("counter", TestThing)

# We run the server using `uvicorn`:
uvicorn.run(server.app, port=5000)
3 changes: 1 addition & 2 deletions docs/source/tutorial/writing_a_thing.rst
Original file line number Diff line number Diff line change
Expand Up @@ -30,9 +30,8 @@ Our first Thing will pretend to be a light: we can set its brightness and turn i
self.is_on = not self.is_on


light = Light()
server = lt.ThingServer()
server.add_thing("/light", light)
server.add_thing("light", Light)

if __name__ == "__main__":
import uvicorn
Expand Down
3 changes: 3 additions & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,9 @@ addopts = [
"--cov-report=html:htmlcov",
"--cov-report=lcov",
]
markers = [
"slow: marks tests as slow (deselect with '-m \"not slow\"')",
]

[tool.ruff]
target-version = "py310"
Expand Down
4 changes: 2 additions & 2 deletions src/labthings_fastapi/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
"""

from .thing import Thing
from .thing_server_interface import ThingServerInterface
from .properties import property, setting, DataProperty, DataSetting
from .decorators import (
thing_action,
Expand All @@ -30,7 +31,6 @@
from .outputs import blob
from .server import ThingServer, cli
from .client import ThingClient
from .utilities import get_blocking_portal

# The symbols in __all__ are part of our public API.
# They are imported when using `import labthings_fastapi as lt`.
Expand All @@ -40,6 +40,7 @@
# re-export style, we may switch in the future.
__all__ = [
"Thing",
"ThingServerInterface",
"property",
"setting",
"DataProperty",
Expand All @@ -52,5 +53,4 @@
"ThingServer",
"cli",
"ThingClient",
"get_blocking_portal",
]
10 changes: 5 additions & 5 deletions src/labthings_fastapi/actions/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -270,14 +270,14 @@ def run(self) -> None:
# self.action evaluates to an ActionDescriptor. This confuses mypy,
# which thinks we are calling ActionDescriptor.__get__.
action: ActionDescriptor = self.action # type: ignore[call-overload]
# Create a logger just for this invocation, keyed to the invocation id
# Logs that go to this logger will be copied into `self._log`
handler = DequeLogHandler(dest=self._log)
logger = invocation_logger(self.id)
logger.addHandler(handler)
try:
action.emit_changed_event(self.thing, self._status.value)

# Capture just this thread's log messages
handler = DequeLogHandler(dest=self._log)
logger = invocation_logger(self.id)
logger.addHandler(handler)

thing = self.thing
kwargs = model_to_dict(self.input)
if thing is None: # pragma: no cover
Expand Down
16 changes: 8 additions & 8 deletions src/labthings_fastapi/client/in_server.py
Original file line number Diff line number Diff line change
Expand Up @@ -51,8 +51,8 @@ class DirectThingClient:
__globals__ = globals() # "bake in" globals so dependency injection works
thing_class: type[Thing]
"""The class of the underlying `.Thing` we are wrapping."""
thing_path: str
"""The path to the Thing on the server. Relative to the server's base URL."""
thing_name: str
"""The name of the Thing on the server."""

def __init__(self, request: Request, **dependencies: Mapping[str, Any]) -> None:
r"""Wrap a `.Thing` so it works like a `.ThingClient`.
Expand All @@ -70,7 +70,7 @@ def __init__(self, request: Request, **dependencies: Mapping[str, Any]) -> None:
such as access to other `.Things`.
"""
server = find_thing_server(request.app)
self._wrapped_thing = server.things[self.thing_path]
self._wrapped_thing = server.things[self.thing_name]
self._request = request
self._dependencies = dependencies

Expand Down Expand Up @@ -254,15 +254,15 @@ def add_property(

def direct_thing_client_class(
thing_class: type[Thing],
thing_path: str,
thing_name: str,
actions: Optional[list[str]] = None,
) -> type[DirectThingClient]:
r"""Create a DirectThingClient from a Thing class and a path.

This is a class, not an instance: it's designed to be a FastAPI dependency.

:param thing_class: The `.Thing` subclass that will be wrapped.
:param thing_path: The path where the `.Thing` is found on the server.
:param thing_name: The name of the `.Thing` on the server.
:param actions: An optional list giving a subset of actions that will be
accessed. If this is specified, it may reduce the number of FastAPI
dependencies we need.
Expand Down Expand Up @@ -291,15 +291,15 @@ def init_proxy(
# of `DirectThingClient` with bad results.
DirectThingClient.__init__(self, request, **dependencies)

init_proxy.__doc__ = f"""Initialise a client for {thing_class} at {thing_path}"""
init_proxy.__doc__ = f"""Initialise a client for {thing_class}"""

# Using a class definition gets confused by the scope of the function
# arguments - this is equivalent to a class definition but all the
# arguments are evaluated in the right scope.
client_attrs = {
"thing_class": thing_class,
"thing_path": thing_path,
"__doc__": f"A client for {thing_class} at {thing_path}",
"thing_name": thing_name,
"__doc__": f"A client for {thing_class} named {thing_name}",
"__init__": init_proxy,
}
dependencies: list[inspect.Parameter] = []
Expand Down
12 changes: 11 additions & 1 deletion src/labthings_fastapi/dependencies/metadata.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,12 @@
def thing_states_getter(request: Request) -> Callable[[], Mapping[str, Any]]:
"""Generate a function to retrieve metadata from all Things in this server.

.. warning::

This function is deprecated in favour of the `.ThingServerInterface`, which
is available as a property of every Thing.
See `.ThingServerInterface.get_thing_states` for more information.

This is intended to make it easy for a `.Thing` to summarise the other
`.Things` in the same server, as is often appropriate when embedding metadata
in data files. For example, it's used to populate the ``UserComment``
Expand Down Expand Up @@ -64,7 +70,11 @@ def get_metadata() -> dict[str, Any]:
GetThingStates = Annotated[
Callable[[], Mapping[str, Any]], Depends(thing_states_getter)
]
"""A ready-made FastAPI dependency, returning a function to collect metadata.
r"""A ready-made FastAPI dependency, returning a function to collect metadata.

.. warning::

This dependency is deprecated in favour of the `.ThingServerInterface`\ .

This calls `.thing_states_getter` to provide a function that supplies a
dictionary of metadata. It describes the state of all `.Thing` instances on
Expand Down
15 changes: 2 additions & 13 deletions src/labthings_fastapi/descriptors/action.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@
from ..outputs.blob import BlobIOContextDep
from ..thing_description import type_to_dataschema
from ..thing_description._model import ActionAffordance, ActionOp, Form
from ..utilities import labthings_data, get_blocking_portal
from ..utilities import labthings_data
from ..exceptions import NotConnectedToServerError

if TYPE_CHECKING:
Expand Down Expand Up @@ -200,19 +200,8 @@ def emit_changed_event(self, obj: Thing, status: str) -> None:

:param obj: The `.Thing` on which the action is being observed.
:param status: The status of the action, to be sent to observers.

:raise NotConnectedToServerError: if the Thing calling the action is not
connected to a server with a running event loop.
"""
runner = get_blocking_portal(obj)
if not runner:
thing_name = obj.__class__.__name__
msg = (
f"Cannot emit action changed event. Is {thing_name} connected to "
"a running server?"
)
raise NotConnectedToServerError(msg)
runner.start_task_soon(
obj._thing_server_interface.start_async_task_soon(
self.emit_changed_event_async,
obj,
status,
Expand Down
7 changes: 5 additions & 2 deletions src/labthings_fastapi/example_things/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -137,11 +137,14 @@ def broken_property(self) -> None:
class ThingThatCantInstantiate(Thing):
"""A Thing that raises an exception in __init__."""

def __init__(self) -> None:
"""Fail to initialise.
def __init__(self, **kwargs: Any) -> None:
r"""Fail to initialise.

:param \**kwargs: keyword arguments passed to Thing.__init__

:raise RuntimeError: every time.
"""
super().__init__(**kwargs)
raise RuntimeError("This thing can't be instantiated")


Expand Down
37 changes: 21 additions & 16 deletions src/labthings_fastapi/outputs/mjpeg_stream.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,11 +24,11 @@
from contextlib import asynccontextmanager
import threading
import anyio
from anyio.from_thread import BlockingPortal
import logging

if TYPE_CHECKING:
from ..thing import Thing
from ..thing_server_interface import ThingServerInterface


@dataclass
Expand Down Expand Up @@ -126,19 +126,25 @@ class MJPEGStream:
of new frames, and then retrieving the frame (shortly) afterwards.
"""

def __init__(self, ringbuffer_size: int = 10) -> None:
def __init__(
self, thing_server_interface: ThingServerInterface, ringbuffer_size: int = 10
) -> None:
"""Initialise an MJPEG stream.

See the class docstring for `.MJPEGStream`. Note that it will
often be initialised by `.MJPEGStreamDescriptor`.

:param thing_server_interface: the `.ThingServerInterface` of the
`.Thing` associated with this stream. It's used to run the async
code that relays frames to open connections.
:param ringbuffer_size: The number of frames to retain in
memory, to allow retrieval after the frame has been sent.
"""
self._lock = threading.Lock()
self.condition = anyio.Condition()
self._streaming = False
self._ringbuffer: list[RingbufferEntry] = []
self._thing_server_interface = thing_server_interface
self.reset(ringbuffer_size=ringbuffer_size)

def reset(self, ringbuffer_size: Optional[int] = None) -> None:
Expand All @@ -161,18 +167,16 @@ def reset(self, ringbuffer_size: Optional[int] = None) -> None:
]
self.last_frame_i = -1

def stop(self, portal: BlockingPortal) -> None:
def stop(self) -> None:
"""Stop the stream.

Stop the stream and cause all clients to disconnect.

:param portal: an `anyio.from_thread.BlockingPortal` that allows
this function to use the event loop to notify that the stream
should stop.
"""
with self._lock:
self._streaming = False
portal.start_task_soon(self.notify_stream_stopped)
self._thing_server_interface.start_async_task_soon(
self.notify_stream_stopped
)

async def ringbuffer_entry(self, i: int) -> RingbufferEntry:
"""Return the ith frame acquired by the camera.
Expand Down Expand Up @@ -308,7 +312,7 @@ async def mjpeg_stream_response(self) -> MJPEGStreamResponse:
"""
return MJPEGStreamResponse(self.frame_async_generator())

def add_frame(self, frame: bytes, portal: BlockingPortal) -> None:
def add_frame(self, frame: bytes) -> None:
"""Add a JPEG to the MJPEG stream.

This function adds a frame to the stream. It may be called from
Expand All @@ -317,10 +321,6 @@ def add_frame(self, frame: bytes, portal: BlockingPortal) -> None:
are handled.

:param frame: The frame to add
:param portal: The blocking portal to use for scheduling tasks.
This is necessary because tasks are handled asynchronously.
The blocking portal may be obtained with a dependency, in
`labthings_fastapi.dependencies.blocking_portal.BlockingPortal`.

:raise ValueError: if the supplied frame does not start with the JPEG
start bytes and end with the end bytes.
Expand All @@ -337,7 +337,9 @@ def add_frame(self, frame: bytes, portal: BlockingPortal) -> None:
entry.timestamp = datetime.now()
entry.frame = frame
entry.index = self.last_frame_i + 1
portal.start_task_soon(self.notify_new_frame, entry.index)
self._thing_server_interface.start_async_task_soon(
self.notify_new_frame, entry.index
)

async def notify_new_frame(self, i: int) -> None:
"""Notify any waiting tasks that a new frame is available.
Expand Down Expand Up @@ -420,7 +422,10 @@ def __get__(
try:
return obj.__dict__[self.name]
except KeyError:
obj.__dict__[self.name] = MJPEGStream(**self._kwargs)
obj.__dict__[self.name] = MJPEGStream(
**self._kwargs,
thing_server_interface=obj._thing_server_interface,
)
return obj.__dict__[self.name]

async def viewer_page(self, url: str) -> HTMLResponse:
Expand Down Expand Up @@ -452,7 +457,7 @@ class Camera(lt.Thing):


server = lt.ThingServer()
server.add_thing(Camera(), "/camera")
server.add_thing("camera", Camera)

:param app: the `fastapi.FastAPI` application to which we are being added.
:param thing: the host `.Thing` instance.
Expand Down
Loading
Loading