Skip to content

Commit

Permalink
resubscribe on reconnect (#940)
Browse files Browse the repository at this point in the history
* resubscribe on reconnect

Signed-off-by: Teo Koon Peng <[email protected]>

* retry ci

Signed-off-by: Teo Koon Peng <[email protected]>

* throttle so ui is only updated once every 5s

Signed-off-by: Teo Koon Peng <[email protected]>

* use dummy test to make ci pass

Signed-off-by: Teo Koon Peng <[email protected]>

* reduce throttle on map; remove unused code

Signed-off-by: Teo Koon Peng <[email protected]>

* reduce throttle to 3s

Signed-off-by: Teo Koon Peng <[email protected]>

---------

Signed-off-by: Teo Koon Peng <[email protected]>
  • Loading branch information
koonpeng authored Apr 25, 2024
1 parent 4522b3e commit a764368
Show file tree
Hide file tree
Showing 15 changed files with 156 additions and 612 deletions.
30 changes: 4 additions & 26 deletions packages/api-client/lib/index.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,31 +8,9 @@ describe('subscriptions', () => {
spyOn(sioClient.sio, 'emit');
});

it('multiplexes subscriptions', () => {
const s1 = sioClient.subscribe('test', () => {
// empty
});
const s2 = sioClient.subscribe('test', () => {
// empty
});
expect(sioClient.sio.emit).toHaveBeenCalledOnceWith('subscribe', jasmine.anything());
(sioClient.sio.emit as jasmine.Spy).calls.reset();

sioClient.unsubscribe(s1);
sioClient.unsubscribe(s2);
expect(sioClient.sio.emit).toHaveBeenCalledOnceWith('unsubscribe', jasmine.anything());
});

it('does not unsubscribe early when there are multiple subscriptions with the same listener', () => {
const listener = jasmine.createSpy();
const s1 = sioClient.subscribe('test', listener);
const s2 = sioClient.subscribe('test', listener);
expect(sioClient.sio.emit).toHaveBeenCalledOnceWith('subscribe', jasmine.anything());
(sioClient.sio.emit as jasmine.Spy).calls.reset();

sioClient.unsubscribe(s1);
expect(sioClient.sio.emit).not.toHaveBeenCalled();
sioClient.unsubscribe(s2);
expect(sioClient.sio.emit).toHaveBeenCalledOnceWith('unsubscribe', jasmine.anything());
it('dummy', () => {
// Dummy test to ci passes.
// #940 removed multiplexing in order to support resubscribe on reconnect.
// With it gone, there is no more tests and ci fails as a result.
});
});
28 changes: 4 additions & 24 deletions packages/api-client/lib/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -31,41 +31,21 @@ export interface Subscription {

export class SioClient {
public sio: Socket;
private _subscriptions: Record<string, number> = {};

constructor(...args: Parameters<typeof io>) {
this.sio = io(...args);
}

subscribe<T>(room: string, listener: Listener<T>): Subscription {
const subs = this._subscriptions[room] || 0;
if (subs === 0) {
this.sio.emit('subscribe', { room });
debug(`subscribed to ${room}`);
} else {
debug(`reusing previous subscription to ${room}`);
}
this.sio.emit('subscribe', { room });
debug(`subscribed to ${room}`);
this.sio.on(room, listener);
this._subscriptions[room] = subs + 1;
return { room, listener };
}

unsubscribe(sub: Subscription): void {
const subCount = this._subscriptions[sub.room] || 0;
if (!subCount) {
debug(`tried to unsubscribe from ${sub.room}, but no subscriptions exist`);
// continue regardless
}
if (subCount <= 1) {
this.sio.emit('unsubscribe', { room: sub.room });
delete this._subscriptions[sub.room];
debug(`unsubscribed to ${sub.room}`);
} else {
this._subscriptions[sub.room] = subCount - 1;
debug(
`skipping unsubscribe to ${sub.room} because there are still ${subCount - 1} subscribers`,
);
}
this.sio.emit('unsubscribe', { room: sub.room });
debug(`unsubscribed to ${sub.room}`);
this.sio.off(sub.room, sub.listener);
}

Expand Down
47 changes: 30 additions & 17 deletions packages/api-server/api_server/fast_io/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -249,6 +249,34 @@ def _match_routes(
return match, r
return None

async def _add_subscription(
self,
req: SubscriptionRequest,
handler: Callable[[], Observable | Coroutine[Any, Any, Observable]],
):
if "_subscriptions" in req.session and req.session["_subscriptions"].get(
req.room
):
return

maybe_coro = handler()
if asyncio.iscoroutine(maybe_coro):
obs = await maybe_coro
else:
obs = maybe_coro
obs = cast(Observable, obs)

loop = asyncio.get_event_loop()

def on_next(data):
async def emit():
await self.sio.emit(req.room, data, to=req.sid)

loop.create_task(emit())

sub = obs.subscribe(on_next)
req.session.setdefault("_subscriptions", {})[req.room] = sub

async def _on_subscribe(self, sid: str, data: dict):
try:
sub_data = self._parse_sub_data(data)
Expand All @@ -270,23 +298,8 @@ async def _on_subscribe(self, sid: str, data: dict):
req = SubscriptionRequest(
sid=sid, sio=self.sio, room=sub_data.room, session=session
)
maybe_coro = route.endpoint(req, **match.groupdict())
if asyncio.iscoroutine(maybe_coro):
obs = await maybe_coro
else:
obs = maybe_coro
obs = cast(Observable, obs)

loop = asyncio.get_event_loop()

def on_next(data):
async def emit():
await self.sio.emit(sub_data.room, data, to=sid)

loop.create_task(emit())

sub = obs.subscribe(on_next)
session.setdefault("_subscriptions", {})[sub_data.room] = sub
handler = lambda: route.endpoint(req, **match.groupdict())
await self._add_subscription(req, handler)

except HTTPException as e:
await self.sio.emit(
Expand Down
50 changes: 27 additions & 23 deletions packages/dashboard/src/components/doors-app.tsx
Original file line number Diff line number Diff line change
@@ -1,8 +1,9 @@
import { BuildingMap } from 'api-client';
import React from 'react';
import { DoorDataGridTable, DoorTableData } from 'react-components';
import { AppEvents } from './app-events';
import { DoorMode as RmfDoorMode } from 'rmf-models';
import { throttleTime } from 'rxjs';
import { AppEvents } from './app-events';
import { createMicroApp } from './micro-app';
import { RmfAppContext } from './rmf-app';
import { getApiErrorMessage } from './utils';
Expand Down Expand Up @@ -31,29 +32,32 @@ export const DoorsApp = createMicroApp('Doors', () => {
try {
const { data } = await rmf.doorsApi.getDoorHealthDoorsDoorNameHealthGet(door.name);
const { health_status } = data;
const sub = rmf.getDoorStateObs(door.name).subscribe((doorState) => {
setDoorTableData((prev) => {
return {
...prev,
[door.name]: {
index: doorIndex++,
doorName: door.name,
opMode: health_status ? health_status : 'N/A',
levelName: level.name,
doorType: door.door_type,
doorState: doorState,
onClickOpen: () =>
rmf?.doorsApi.postDoorRequestDoorsDoorNameRequestPost(door.name, {
mode: RmfDoorMode.MODE_OPEN,
}),
onClickClose: () =>
rmf?.doorsApi.postDoorRequestDoorsDoorNameRequestPost(door.name, {
mode: RmfDoorMode.MODE_CLOSED,
}),
},
};
const sub = rmf
.getDoorStateObs(door.name)
.pipe(throttleTime(3000, undefined, { leading: true, trailing: true }))
.subscribe((doorState) => {
setDoorTableData((prev) => {
return {
...prev,
[door.name]: {
index: doorIndex++,
doorName: door.name,
opMode: health_status ? health_status : 'N/A',
levelName: level.name,
doorType: door.door_type,
doorState: doorState,
onClickOpen: () =>
rmf?.doorsApi.postDoorRequestDoorsDoorNameRequestPost(door.name, {
mode: RmfDoorMode.MODE_OPEN,
}),
onClickClose: () =>
rmf?.doorsApi.postDoorRequestDoorsDoorNameRequestPost(door.name, {
mode: RmfDoorMode.MODE_CLOSED,
}),
},
};
});
});
});
return () => sub.unsubscribe();
} catch (error) {
console.error(`Failed to get lift health: ${getApiErrorMessage(error)}`);
Expand Down
83 changes: 0 additions & 83 deletions packages/dashboard/src/components/doors-overlay.tsx

This file was deleted.

82 changes: 43 additions & 39 deletions packages/dashboard/src/components/lifts-app.tsx
Original file line number Diff line number Diff line change
@@ -1,13 +1,14 @@
import { TableContainer } from '@mui/material';
import { BuildingMap, Lift } from 'api-client';
import React from 'react';
import { LiftDataGridTable, LiftTableData } from 'react-components';
import { LiftRequest as RmfLiftRequest } from 'rmf-models';
import { LiftTableData, LiftDataGridTable } from 'react-components';
import { throttleTime } from 'rxjs';
import { AppEvents } from './app-events';
import { LiftSummary } from './lift-summary';
import { createMicroApp } from './micro-app';
import { RmfAppContext } from './rmf-app';
import { getApiErrorMessage } from './utils';
import { TableContainer } from '@mui/material';
import { LiftSummary } from './lift-summary';

export const LiftsApp = createMicroApp('Lifts', () => {
const rmf = React.useContext(RmfAppContext);
Expand All @@ -34,47 +35,50 @@ export const LiftsApp = createMicroApp('Lifts', () => {

buildingMap?.lifts.map(async (lift, i) => {
try {
const sub = rmf.getLiftStateObs(lift.name).subscribe((liftState) => {
setLiftTableData((prev) => {
return {
...prev,
[lift.name]: [
{
index: i,
name: lift.name,
mode: liftState.current_mode,
currentFloor: liftState.current_floor,
destinationFloor: liftState.destination_floor,
doorState: liftState.door_state,
motionState: liftState.motion_state,
sessionId: liftState.session_id,
lift: lift,
onRequestSubmit: async (_ev, doorState, requestType, destination) => {
let fleet_session_ids: string[] = [];
if (requestType === RmfLiftRequest.REQUEST_END_SESSION) {
const fleets = (await rmf?.fleetsApi.getFleetsFleetsGet()).data;
for (const fleet of fleets) {
if (!fleet.robots) {
continue;
}
for (const robotName of Object.keys(fleet.robots)) {
fleet_session_ids.push(`${fleet.name}/${robotName}`);
const sub = rmf
.getLiftStateObs(lift.name)
.pipe(throttleTime(3000, undefined, { leading: true, trailing: true }))
.subscribe((liftState) => {
setLiftTableData((prev) => {
return {
...prev,
[lift.name]: [
{
index: i,
name: lift.name,
mode: liftState.current_mode,
currentFloor: liftState.current_floor,
destinationFloor: liftState.destination_floor,
doorState: liftState.door_state,
motionState: liftState.motion_state,
sessionId: liftState.session_id,
lift: lift,
onRequestSubmit: async (_ev, doorState, requestType, destination) => {
let fleet_session_ids: string[] = [];
if (requestType === RmfLiftRequest.REQUEST_END_SESSION) {
const fleets = (await rmf?.fleetsApi.getFleetsFleetsGet()).data;
for (const fleet of fleets) {
if (!fleet.robots) {
continue;
}
for (const robotName of Object.keys(fleet.robots)) {
fleet_session_ids.push(`${fleet.name}/${robotName}`);
}
}
}
}

return rmf?.liftsApi.postLiftRequestLiftsLiftNameRequestPost(lift.name, {
destination,
door_mode: doorState,
request_type: requestType,
additional_session_ids: fleet_session_ids,
});
return rmf?.liftsApi.postLiftRequestLiftsLiftNameRequestPost(lift.name, {
destination,
door_mode: doorState,
request_type: requestType,
additional_session_ids: fleet_session_ids,
});
},
},
},
],
};
],
};
});
});
});
return () => sub.unsubscribe();
} catch (error) {
console.error(`Failed to get lift state: ${getApiErrorMessage(error)}`);
Expand Down
Loading

0 comments on commit a764368

Please sign in to comment.