Skip to content

Commit

Permalink
remote: add session feature in labgrid-client
Browse files Browse the repository at this point in the history
Adds a --session option to the labgrid-client monitor, reserve and acquire commands.
Enables the user to setup a monitor session. Reservations and place acquisitions which are registered to the same monitor sessions will be released if the monitor session ends.

Signed-off-by: Luke Hackwell <[email protected]>
  • Loading branch information
luke-hackwell committed Feb 7, 2025
1 parent 9636068 commit 1c1c3a6
Show file tree
Hide file tree
Showing 9 changed files with 589 additions and 136 deletions.
17 changes: 17 additions & 0 deletions doc/usage.rst
Original file line number Diff line number Diff line change
Expand Up @@ -872,3 +872,20 @@ like this:
$ labgrid-client -p example allow sirius/john
To remove the allow it is currently necessary to unlock and lock the place.

Using places within a session
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
This workflow enables places and reservations to be released when a "session" is killed.
This is useful when running a session within a CI job, so that if the CI job is killed,
its places and reservations are freed.

Use the ``labgrid-client monitor --session <session-name>`` command to start a session.
The ``--session`` option can be used to tie reservations and place acquisitions to a session.
If the ``monitor`` process is killed then all reservations and place acquisitions will be freed.

Example:
.. code-block:: bash
$ labgrid-client monitor --session=my-session &
$ labgrid-client reserve --session=my-session tag=my-tag
$ labgrid-client -p +<token> acquire --session=my-session
11 changes: 9 additions & 2 deletions labgrid/remote/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,7 @@ async def start(self):
msg = labgrid_coordinator_pb2.ClientInMessage()
msg.startup.version = labgrid_version()
msg.startup.name = f"{self.gethostname()}/{self.getuser()}"
msg.startup.session = self.args.monitor_session if hasattr(self.args, "monitor_session") else ""
self.out_queue.put_nowait(msg)
msg = labgrid_coordinator_pb2.ClientInMessage()
msg.subscribe.all_places = True
Expand Down Expand Up @@ -676,7 +677,7 @@ async def acquire(self):
if not self.args.allow_unmatched:
self.check_matches(place)

request = labgrid_coordinator_pb2.AcquirePlaceRequest(placename=place.name)
request = labgrid_coordinator_pb2.AcquirePlaceRequest(placename=place.name, session=self.args.session)

try:
await self.stub.AcquirePlace(request)
Expand Down Expand Up @@ -1423,6 +1424,7 @@ def write_image(self):

async def create_reservation(self):
prio = self.args.prio
session = self.args.session

fltr = {}
for pair in self.args.filters:
Expand All @@ -1440,7 +1442,7 @@ async def create_reservation(self):
"main": labgrid_coordinator_pb2.Reservation.Filter(filter=fltr),
}

request = labgrid_coordinator_pb2.CreateReservationRequest(filters=fltrs, prio=prio)
request = labgrid_coordinator_pb2.CreateReservationRequest(filters=fltrs, prio=prio, session=session)

try:
response: labgrid_coordinator_pb2.CreateReservationResponse = await self.stub.CreateReservation(request)
Expand Down Expand Up @@ -1719,6 +1721,9 @@ def main():
subparser.set_defaults(func=ClientSession.complete)

subparser = subparsers.add_parser("monitor", help="monitor events from the coordinator")
subparser.add_argument(
"--session", default="", dest="monitor_session", help="Create a session to book places and reservations under"
)
subparser.set_defaults(func=ClientSession.do_monitor)

subparser = subparsers.add_parser("resources", aliases=("r",), help="list available resources")
Expand Down Expand Up @@ -1785,6 +1790,7 @@ def main():
subparser.add_argument(
"--allow-unmatched", action="store_true", help="allow missing resources for matches when locking the place"
)
subparser.add_argument("--session", default="", help="Acquire a place within a given session")
subparser.set_defaults(func=ClientSession.acquire)

subparser = subparsers.add_parser("release", aliases=("unlock",), help="release a place")
Expand Down Expand Up @@ -2008,6 +2014,7 @@ def main():
"--prio", type=float, default=0.0, help="priority relative to other reservations (default 0)"
)
subparser.add_argument("filters", metavar="KEY=VALUE", nargs="+", help="required tags")
subparser.add_argument("--session", default="", help="Make a reservation within a given session")
subparser.set_defaults(func=ClientSession.create_reservation)

subparser = subparsers.add_parser("cancel-reservation", help="cancel a reservation")
Expand Down
12 changes: 12 additions & 0 deletions labgrid/remote/common.py
Original file line number Diff line number Diff line change
Expand Up @@ -230,6 +230,7 @@ class Place:
created = attr.ib(default=attr.Factory(time.time))
changed = attr.ib(default=attr.Factory(time.time))
reservation = attr.ib(default=None)
session = attr.ib(default="")

def asdict(self):
# in the coordinator, we have resource objects, otherwise just a path
Expand All @@ -251,6 +252,7 @@ def asdict(self):
"created": self.created,
"changed": self.changed,
"reservation": self.reservation,
"session": self.session,
}

def update_from_pb2(self, place_pb2):
Expand Down Expand Up @@ -298,6 +300,8 @@ def show(self, level=0):
print(indent + f"changed: {datetime.fromtimestamp(self.changed)}")
if self.reservation:
print(indent + f"reservation: {self.reservation}")
if self.session:
print(indent + f"session: {self.session}")

def getmatch(self, resource_path):
"""Return the ResourceMatch object for the given resource path or None if not found.
Expand Down Expand Up @@ -350,6 +354,7 @@ def as_pb2(self):
place.created = self.created
if self.reservation:
place.reservation = self.reservation
place.session = self.session
for key, value in self.tags.items():
place.tags[key] = value
return place
Expand Down Expand Up @@ -377,6 +382,7 @@ def from_pb2(cls, pb2):
created=pb2.created,
changed=pb2.changed,
reservation=pb2.reservation if pb2.HasField("reservation") else None,
session=pb2.session,
)


Expand Down Expand Up @@ -406,6 +412,7 @@ class Reservation:
allocations = attr.ib(default=attr.Factory(dict), validator=attr.validators.instance_of(dict))
created = attr.ib(default=attr.Factory(time.time))
timeout = attr.ib(default=attr.Factory(lambda: time.time() + 60))
session = attr.ib(default="")

def asdict(self):
return {
Expand All @@ -416,6 +423,7 @@ def asdict(self):
"allocations": self.allocations,
"created": self.created,
"timeout": self.timeout,
"session": self.session,
}

def refresh(self, delta=60):
Expand All @@ -441,6 +449,8 @@ def show(self, level=0):
print(indent + f" {name}: {', '.join(allocation)}")
print(indent + f"created: {datetime.fromtimestamp(self.created)}")
print(indent + f"timeout: {datetime.fromtimestamp(self.timeout)}")
if self.session:
print(indent + f"session: {self.session}")

def as_pb2(self):
res = labgrid_coordinator_pb2.Reservation()
Expand All @@ -459,6 +469,7 @@ def as_pb2(self):
res.allocations.update({"main": allocation[0]})
res.created = self.created
res.timeout = self.timeout
res.session = self.session
return res

@classmethod
Expand All @@ -478,6 +489,7 @@ def from_pb2(cls, pb2: labgrid_coordinator_pb2.Reservation):
allocations=allocations,
created=pb2.created,
timeout=pb2.timeout,
session=pb2.session,
)


Expand Down
104 changes: 100 additions & 4 deletions labgrid/remote/coordinator.py
Original file line number Diff line number Diff line change
Expand Up @@ -127,6 +127,8 @@ def get_resources(self):

@attr.s(eq=False)
class ClientSession(RemoteSession):
id = attr.ib()

def subscribe_places(self):
# send initial places
out_msg = labgrid_coordinator_pb2.ClientOutMessage()
Expand Down Expand Up @@ -208,13 +210,62 @@ class ExporterError(Exception):
pass


class _SessionManager:
def __init__(self):
self._sessions = {}

def add_session(self, session):
assert session not in self._sessions, f"Session {session} already exists"
self._sessions[session] = {
"places": set(),
"reservations": set(),
}

def remove_session(self, session):
self._assert_session(session)
del self._sessions[session]

def get_session(self, session):
self._assert_session(session)
return (
copy.copy(self._sessions[session]["places"]),
copy.copy(self._sessions[session]["reservations"]),
)

def add_place(self, place):
self._assert_session(place.session)
self._sessions[place.session]["places"].add(place)

def add_reservation(self, reservation):
self._assert_session(reservation.session)
self._sessions[reservation.session]["reservations"].add(reservation)

def remove_place(self, place):
self._assert_session(place.session)
places = self._sessions[place.session]["places"]
assert place in places, f"Session {place.session} does not contain place {place.name}"

def remove_reservation(self, reservation):
self._assert_session(reservation.session)
reservations = self._sessions[reservation.session]["reservations"]
assert (
reservation in reservations
), f"Session {reservation.session} does not contain reservation {reservation.token}"
reservations.remove(reservation)

def _assert_session(self, session):
assert session in self._sessions, f"Session {session} does not exist"


class Coordinator(labgrid_coordinator_pb2_grpc.CoordinatorServicer):
def __init__(self) -> None:
self.places: dict[str, Place] = {}
self.reservations = {}
self.poll_tasks = []
self.save_scheduled = False

self.session_manager = _SessionManager()

self.lock = asyncio.Lock()
self.exporters: dict[str, ExporterSession] = {}
self.clients: dict[str, ClientSession] = {}
Expand Down Expand Up @@ -302,6 +353,8 @@ def load(self):
del config["allowed"]
if "reservation" in config:
del config["reservation"]
if "session" in config:
del config["session"]
config["matches"] = [ResourceMatch(**match) for match in config["matches"]]
place = Place(**config)
self.places[placename] = place
Expand Down Expand Up @@ -330,7 +383,10 @@ async def request_task():
elif kind == "startup":
version = in_msg.startup.version
name = in_msg.startup.name
session = self.clients[peer] = ClientSession(self, peer, name, out_msg_queue, version)
id_ = self._session_id(peer, in_msg.startup.session)
session = self.clients[peer] = ClientSession(self, peer, name, out_msg_queue, version, id_)
if id_:
self.session_manager.add_session(id_)
logging.debug("Received startup from %s with %s", name, version)
asyncio.current_task().set_name(f"client-{peer}-rx/started-{name}")
elif kind == "subscribe":
Expand Down Expand Up @@ -359,6 +415,9 @@ async def request_task():
logging.info("Never received startup from peer %s that disconnected", peer)
return

if session.id:
await self._release_session(session)

running_request_task.cancel()
await running_request_task
logging.debug("client aborted %s, cancelled: %s", session, context.cancelled())
Expand Down Expand Up @@ -844,6 +903,9 @@ async def AcquirePlace(self, request, context):
# FIXME use the session object instead? or something else which
# survives disconnecting clients?
place.acquired = username
place.session = self._session_id(peer, request.session)
if place.session:
self.session_manager.add_place(place)
resources = []
for _, session in sorted(self.exporters.items()):
for _, group in sorted(session.groups.items()):
Expand Down Expand Up @@ -878,16 +940,23 @@ async def ReleasePlace(self, request, context):
if fromuser and place.acquired != fromuser:
return labgrid_coordinator_pb2.ReleasePlaceResponse()

await self._release_place(place)
return labgrid_coordinator_pb2.ReleasePlaceResponse()

async def _release_place(self, place):
await self._release_resources(place, place.acquired_resources)

if place.session:
self.session_manager.remove_place(place)

place.acquired = None
place.session = ""
place.allowed = set()
place.touch()
self._publish_place(place)
self.save_later()
self.schedule_reservations()
print(f"{place.name}: place released")
return labgrid_coordinator_pb2.ReleasePlaceResponse()

@locked
async def AllowPlace(self, request, context):
Expand Down Expand Up @@ -1049,7 +1118,11 @@ async def CreateReservation(self, request: labgrid_coordinator_pb2.CreateReserva
fltr[k] = v

owner = self.clients[peer].name
res = Reservation(owner=owner, prio=request.prio, filters=fltrs)
res = Reservation(
owner=owner, prio=request.prio, filters=fltrs, session=self._session_id(peer, request.session)
)
if request.session:
self.session_manager.add_reservation(res)
self.reservations[res.token] = res
self.schedule_reservations()
return labgrid_coordinator_pb2.CreateReservationResponse(reservation=res.as_pb2())
Expand All @@ -1061,9 +1134,15 @@ async def CancelReservation(self, request: labgrid_coordinator_pb2.CancelReserva
await context.abort(grpc.StatusCode.INVALID_ARGUMENT, f"Invalid token {token}")
if token not in self.reservations:
await context.abort(grpc.StatusCode.FAILED_PRECONDITION, f"Reservation {token} does not exist")
self._release_reservation(token)
return labgrid_coordinator_pb2.CancelReservationResponse()

def _release_reservation(self, token):
reservation = self.reservations[token]
if reservation.session:
self.session_manager.remove_reservation(reservation)
del self.reservations[token]
self.schedule_reservations()
return labgrid_coordinator_pb2.CancelReservationResponse()

@locked
async def PollReservation(self, request: labgrid_coordinator_pb2.PollReservationRequest, context):
Expand All @@ -1080,6 +1159,23 @@ async def GetReservations(self, request: labgrid_coordinator_pb2.GetReservations
reservations = [x.as_pb2() for x in self.reservations.values()]
return labgrid_coordinator_pb2.GetReservationsResponse(reservations=reservations)

async def _release_session(self, client_session):
places, reservations = self.session_manager.get_session(client_session.id)
await self.lock.acquire()
for place in places:
await self._release_place(place)
for reservation in reservations:
self._release_reservation(reservation.token)
self.lock.release()

self.session_manager.remove_session(client_session.id)

def _session_id(self, peer, name):
if not name:
return ""
ip = ":".join(peer.split(":")[:2])
return f"{ip}/{name}"


async def serve(listen, cleanup) -> None:
asyncio.current_task().set_name("coordinator-serve")
Expand Down
Loading

0 comments on commit 1c1c3a6

Please sign in to comment.