Skip to content

Commit

Permalink
remote: implement acquire and release from coordinator to exporter
Browse files Browse the repository at this point in the history
Signed-off-by: Jan Luebbe <[email protected]>
  • Loading branch information
jluebbe committed Jun 13, 2024
1 parent af1bd5c commit 612b1eb
Show file tree
Hide file tree
Showing 5 changed files with 172 additions and 139 deletions.
68 changes: 50 additions & 18 deletions labgrid/remote/coordinator.py
Original file line number Diff line number Diff line change
Expand Up @@ -142,6 +142,23 @@ class ResourceImport(ResourceEntry):
path = attr.ib(kw_only=True, validator=attr.validators.instance_of(tuple))


class ExporterCommand:
def __init__(self, request) -> None:
self.request = request
self.response = None
self.completed = asyncio.Event()

def complete(self, response) -> None:
self.response = response
self.completed.set()

async def wait(self):
await asyncio.wait_for(self.completed.wait(), 10)

class ExporterError(Exception):
pass


class Coordinator(labgrid_coordinator_pb2_grpc.LabgridServicer):
def __init__(self) -> None:
self.places = {}
Expand Down Expand Up @@ -462,9 +479,12 @@ def schedule_reservations(self):
if old_map.get(name) != new_map.get(name):
self._publish_place(place)

def get_exporter_by_name(self, name):
for exporter in self.exporters.values():
if exporter.name == name:
return exporter

async def _acquire_resources(self, place, resources):
#FIXME fix implementation
return True
resources = resources.copy() # we may modify the list
# all resources need to be free
for resource in resources:
Expand All @@ -475,13 +495,21 @@ async def _acquire_resources(self, place, resources):
acquired = []
try:
for resource in resources:
print(f"foo {resource}")
# this triggers an update from the exporter which is published
# to the clients
await self.call(f'org.labgrid.exporter.{resource.path[0]}.acquire',
resource.path[1], resource.path[3], place.name)
request = labgrid_coordinator_pb2.ExporterSetAcquiredRequest()
request.group_name = resource.path[1]
request.resource_name = resource.path[3]
request.place_name = place.name
cmd = ExporterCommand(request)
self.get_exporter_by_name(resource.path[0]).queue.put_nowait(cmd)
await cmd.wait()
if not cmd.response.success:
raise ExporterError("failed to acquire {resource}")
acquired.append(resource)
except:
print(f"failed to acquire {resource}", file=sys.stderr)
except Exception as e:
logging.exception("failed to acquire %s", resource)
# cleanup
await self._release_resources(place, acquired)
return False
Expand Down Expand Up @@ -525,9 +553,6 @@ def _publish_place(self, place):
client.queue.put_nowait(msg)

async def _release_resources(self, place, resources, callback=True):
# FIXME: use the internal queue to push this to the correct resources
return

resources = resources.copy() # we may modify the list

for resource in resources:
Expand All @@ -541,10 +566,17 @@ async def _release_resources(self, place, resources, callback=True):
# this triggers an update from the exporter which is published
# to the clients
if callback:
await self.call(f'org.labgrid.exporter.{resource.path[0]}.release',
resource.path[1], resource.path[3])
except:
print(f"failed to release {resource}", file=sys.stderr)
request = labgrid_coordinator_pb2.ExporterSetAcquiredRequest()
request.group_name = resource.path[1]
request.resource_name = resource.path[3]
# request.place_name is left unset to indicate release
cmd = ExporterCommand(request)
self.get_exporter_by_name(resource.path[0]).queue.put_nowait(cmd)
await cmd.wait()
if not cmd.response.success:
raise ExporterError(f"failed to release {resource}")
except (ExporterError, TimeoutError):
logging.exception("failed to release %s", resource)
# at leaset try to notify the clients
try:
self._publish_resource(resource)
Expand All @@ -570,9 +602,9 @@ async def request_task():
in_msg: labgrid_coordinator_pb2.ExporterInMessage
logging.debug(f"exporter in_msg %s", in_msg)
kind = in_msg.WhichOneof("kind")
if kind == "response":
if kind in "response":
cmd = pending_commands.pop(0)
cmd.set() # set event flag
cmd.complete(in_msg.response)
logging.debug(f"Command %s is done", cmd)
elif kind == "startup":
version = in_msg.startup.version
Expand Down Expand Up @@ -607,13 +639,13 @@ async def request_task():
async for cmd in queue_as_aiter(command_queue):
logging.debug(f"exporter cmd {cmd}")
out_msg = labgrid_coordinator_pb2.ExporterOutMessage()
out_msg.request.group_name = "foo-group"
out_msg.request.resource_name = "foo-resource"
out_msg.request.place_name = "foo-place"
out_msg.set_acquired_request.CopyFrom(cmd.request)
pending_commands.append(cmd)
yield out_msg
except asyncio.exceptions.CancelledError:
logging.info(f"exporter disconnected {context.peer()}")
except Exception:
logging.exception("error in exporter command handler")
finally:
runnning_request_task.cancel()
await runnning_request_task
Expand Down
43 changes: 22 additions & 21 deletions labgrid/remote/exporter.py
Original file line number Diff line number Diff line change
Expand Up @@ -805,27 +805,28 @@ async def message_pump(self):
kind = out_message.WhichOneof("kind")
if kind == "hello":
logging.info("connected to exporter version %s", out_message.hello.version)
elif kind == "request":
logging.debug(f"aquire request")
if out_message.request.place_name:
await self.acquire(
out_message.request.group_name,
out_message.request.resource_name,
out_message.request.place_name
)
else:
await self.release(
out_message.request.group_name,
out_message.request.resource_name
)
in_message = labgrid_coordinator_pb2.ExporterInMessage()
in_message.response.status = 1
# FIXME update acquired status
# FIXME flush resource updates
logging.debug(f"queing {in_message}")
#await self.out_queue.join()
self.out_queue.put_nowait(in_message)
logging.debug(f"queued {in_message}")
elif kind == "set_acquired_request":
logging.debug(f"acquire request")
try:
success = False
if out_message.set_acquired_request.place_name:
await self.acquire(
out_message.set_acquired_request.group_name,
out_message.set_acquired_request.resource_name,
out_message.set_acquired_request.place_name
)
else:
await self.release(
out_message.set_acquired_request.group_name,
out_message.set_acquired_request.resource_name
)
success = True
finally:
in_message = labgrid_coordinator_pb2.ExporterInMessage()
in_message.response.success = success
logging.debug(f"queing {in_message}")
self.out_queue.put_nowait(in_message)
logging.debug(f"queued {in_message}")
else:
logging.debug(f"unknown request: {kind}")
except grpc.aio.AioRpcError as e:
Expand Down
Loading

0 comments on commit 612b1eb

Please sign in to comment.