Skip to content

Commit

Permalink
feat(client-presence): System Workspace (microsoft#22670)
Browse files Browse the repository at this point in the history
1. Add infrastructure for custom System Workspace to handle internal
states including ClientConnectionId to ClientSessionId.
2. Move audience support to System Workspace
3. Add `attendeeJoined` implementation
4. Add test coverage for new attendee's joining and consistency of
lookup. New test cases:
```
    PresenceManager
      ✔ throws when unknown attendee is requested via `getAttendee`
      when connected
        attendee
          ✔ is announced via `attendeeJoined` when new
          already known
            ✔ is available from `getAttendee` by connection id
            ✔ is available from `getAttendee` by session id
            ✔ is available from `getAttendees`
            ✔ is NOT announced when rejoined with same connection (duplicate signal)
            ✔ is NOT announced when rejoined with different connection and current information is updated
```
5. Update general update protocol to always include client connection id
-> current session id entry to ensure all updates are always working
with known (already registered) session id even if a prior join or
broadcast were missed.
  • Loading branch information
jason-ha authored Oct 7, 2024
1 parent efa8231 commit f57799c
Show file tree
Hide file tree
Showing 6 changed files with 546 additions and 123 deletions.
12 changes: 1 addition & 11 deletions packages/framework/presence/src/internalTypes.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,9 @@

import type { IContainerRuntime } from "@fluidframework/container-runtime-definitions/internal";
import type { IFluidDataStoreRuntime } from "@fluidframework/datastore-definitions/internal";
import type { MonitoringContext } from "@fluidframework/telemetry-utils/internal";

import type { InternalTypes } from "./exposedInternalTypes.js";
import type { ClientSessionId, IPresence, ISessionClient } from "./presence.js";
import type { ClientSessionId, ISessionClient } from "./presence.js";

import type { IRuntimeInternal } from "@fluid-experimental/presence/internal/container-definitions/internal";

Expand Down Expand Up @@ -48,15 +47,6 @@ export type IEphemeralRuntime = Pick<
> &
Partial<Pick<IFluidDataStoreRuntime, "logger">>;

/**
* Collection of utilities provided by PresenceManager that are used by presence sub-components.
*
* @internal
*/
export type PresenceManagerInternal = Pick<IPresence, "getAttendee"> & {
readonly mc: MonitoringContext | undefined;
};

/**
* @internal
*/
Expand Down
95 changes: 47 additions & 48 deletions packages/framework/presence/src/presenceDatastoreManager.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,18 +5,23 @@

import { assert } from "@fluidframework/core-utils/internal";
import type { IInboundSignalMessage } from "@fluidframework/runtime-definitions/internal";
import type { ITelemetryLoggerExt } from "@fluidframework/telemetry-utils/internal";

import type { ClientConnectionId } from "./baseTypes.js";
import type { InternalTypes } from "./exposedInternalTypes.js";
import type { IEphemeralRuntime, PresenceManagerInternal } from "./internalTypes.js";
import type { ClientSessionId } from "./presence.js";
import type { IEphemeralRuntime } from "./internalTypes.js";
import type { ClientSessionId, ISessionClient } from "./presence.js";
import type {
ClientUpdateEntry,
PresenceStatesInternal,
ValueElementMap,
} from "./presenceStates.js";
import { createPresenceStates, mergeUntrackedDatastore } from "./presenceStates.js";
import type { PresenceStates, PresenceStatesSchema } from "./types.js";
import type { SystemWorkspaceDatastore } from "./systemWorkspace.js";
import type {
PresenceStates,
PresenceStatesSchema,
PresenceWorkspaceAddress,
} from "./types.js";

import type { IExtensionMessage } from "@fluid-experimental/presence/internal/container-definitions/internal";

Expand All @@ -26,18 +31,14 @@ interface PresenceStatesEntry<TSchema extends PresenceStatesSchema> {
}

interface SystemDatastore {
"system:presence": {
clientToSessionId: {
[
ClientConnectionId: ClientConnectionId
]: InternalTypes.ValueRequiredState<ClientSessionId>;
};
};
"system:presence": SystemWorkspaceDatastore;
}

type PresenceDatastore = {
type InternalWorkspaceAddress = `${"s" | "n"}:${PresenceWorkspaceAddress}`;

type PresenceDatastore = SystemDatastore & {
[WorkspaceAddress: string]: ValueElementMap<PresenceStatesSchema>;
} & SystemDatastore;
};

interface GeneralDatastoreMessageContent {
[WorkspaceAddress: string]: {
Expand All @@ -47,7 +48,7 @@ interface GeneralDatastoreMessageContent {
};
}

type DatastoreMessageContent = GeneralDatastoreMessageContent & SystemDatastore;
type DatastoreMessageContent = SystemDatastore & GeneralDatastoreMessageContent;

const datastoreUpdateMessageType = "Pres:DatastoreUpdate";
interface DatastoreUpdateMessage extends IInboundSignalMessage {
Expand All @@ -56,7 +57,7 @@ interface DatastoreUpdateMessage extends IInboundSignalMessage {
sendTimestamp: number;
avgLatency: number;
isComplete?: true;
data: GeneralDatastoreMessageContent & Partial<SystemDatastore>;
data: DatastoreMessageContent;
};
}

Expand All @@ -81,8 +82,9 @@ function isPresenceMessage(
* @internal
*/
export interface PresenceDatastoreManager {
joinSession(clientId: ClientConnectionId): void;
getWorkspace<TSchema extends PresenceStatesSchema>(
internalWorkspaceAddress: string,
internalWorkspaceAddress: InternalWorkspaceAddress,
requestedContent: TSchema,
): PresenceStates<TSchema>;
processSignal(message: IExtensionMessage, local: boolean): void;
Expand All @@ -92,9 +94,7 @@ export interface PresenceDatastoreManager {
* Manages singleton datastore for all Presence.
*/
export class PresenceDatastoreManagerImpl implements PresenceDatastoreManager {
private readonly datastore: PresenceDatastore = {
"system:presence": { clientToSessionId: {} },
};
private readonly datastore: PresenceDatastore;
private averageLatency = 0;
private returnedMessages = 0;
private refreshBroadcastRequested = false;
Expand All @@ -104,29 +104,17 @@ export class PresenceDatastoreManagerImpl implements PresenceDatastoreManager {
public constructor(
private readonly clientSessionId: ClientSessionId,
private readonly runtime: IEphemeralRuntime,
private readonly presence: PresenceManagerInternal,
private readonly lookupClient: (clientId: ClientSessionId) => ISessionClient,
private readonly logger: ITelemetryLoggerExt | undefined,
systemWorkspaceDatastore: SystemWorkspaceDatastore,
systemWorkspace: PresenceStatesEntry<PresenceStatesSchema>,
) {
runtime.on("connected", this.onConnect.bind(this));

// Check if already connected at the time of construction.
// If constructed during data store load, the runtime may already be connected
// and the "connected" event will be raised during completion. With construction
// delayed we expect that "connected" event has passed.
// Note: In some manual testing, this does not appear to be enough to
// always trigger an initial connect.
const clientId = runtime.clientId;
if (clientId !== undefined && runtime.connected) {
this.onConnect(clientId);
}
// eslint-disable-next-line @typescript-eslint/consistent-type-assertions
this.datastore = { "system:presence": systemWorkspaceDatastore } as PresenceDatastore;
this.workspaces.set("system:presence", systemWorkspace);
}

private onConnect(clientId: ClientConnectionId): void {
this.datastore["system:presence"].clientToSessionId[clientId] = {
rev: 0,
timestamp: Date.now(),
value: this.clientSessionId,
};

public joinSession(clientId: ClientConnectionId): void {
// Broadcast join message to all clients
const updateProviders = [...this.runtime.getQuorum().getMembers().keys()].filter(
(quorumClientId) => quorumClientId !== clientId,
Expand All @@ -145,7 +133,7 @@ export class PresenceDatastoreManagerImpl implements PresenceDatastoreManager {
}

public getWorkspace<TSchema extends PresenceStatesSchema>(
internalWorkspaceAddress: string,
internalWorkspaceAddress: InternalWorkspaceAddress,
requestedContent: TSchema,
): PresenceStates<TSchema> {
const existing = this.workspaces.get(internalWorkspaceAddress);
Expand All @@ -167,12 +155,26 @@ export class PresenceDatastoreManagerImpl implements PresenceDatastoreManager {
return;
}

const updates: GeneralDatastoreMessageContent[string] = {};
const clientConnectionId = this.runtime.clientId;
assert(clientConnectionId !== undefined, "Client connected without clientId");
const currentClientToSessionValueState =
this.datastore["system:presence"].clientToSessionId[clientConnectionId];

const updates: GeneralDatastoreMessageContent[InternalWorkspaceAddress] = {};
for (const [key, value] of Object.entries(states)) {
updates[key] = { [this.clientSessionId]: value };
}
this.localUpdate(
{
// Always send current connection mapping for some resiliency against
// lost signals. This ensures that client session id found in `updates`
// (which is this client's client session id) is always represented in
// system workspace of recipient clients.
"system:presence": {
clientToSessionId: {
[clientConnectionId]: { ...currentClientToSessionValueState },
},
},
[internalWorkspaceAddress]: updates,
},
forceBroadcast,
Expand All @@ -182,7 +184,7 @@ export class PresenceDatastoreManagerImpl implements PresenceDatastoreManager {
const entry = createPresenceStates(
{
clientSessionId: this.clientSessionId,
lookupClient: this.presence.getAttendee.bind(this.presence),
lookupClient: this.lookupClient,
localUpdate,
},
workspaceDatastore,
Expand All @@ -193,10 +195,7 @@ export class PresenceDatastoreManagerImpl implements PresenceDatastoreManager {
return entry.public;
}

private localUpdate(
data: GeneralDatastoreMessageContent & Partial<SystemDatastore>,
_forceBroadcast: boolean,
): void {
private localUpdate(data: DatastoreMessageContent, _forceBroadcast: boolean): void {
const content = {
sendTimestamp: Date.now(),
avgLatency: this.averageLatency,
Expand Down Expand Up @@ -304,7 +303,7 @@ export class PresenceDatastoreManagerImpl implements PresenceDatastoreManager {
if (updateProviders.includes(clientId)) {
// Send all current state to the new client
this.broadcastAllKnownState();
this.presence.mc?.logger.sendTelemetryEvent({
this.logger?.sendTelemetryEvent({
eventName: "JoinResponse",
details: {
type: "broadcastAll",
Expand Down Expand Up @@ -342,7 +341,7 @@ export class PresenceDatastoreManagerImpl implements PresenceDatastoreManager {
// If not connected, nothing we can do.
if (this.refreshBroadcastRequested && this.runtime.connected) {
this.broadcastAllKnownState();
this.presence.mc?.logger.sendTelemetryEvent({
this.logger?.sendTelemetryEvent({
eventName: "JoinResponse",
details: {
type: "broadcastAll",
Expand Down
Loading

0 comments on commit f57799c

Please sign in to comment.