Skip to content
This repository was archived by the owner on Oct 22, 2025. It is now read-only.

Commit d43e771

Browse files
committed
feat(core): disconnect stale connections (#1154)
Closes FRONT-742 ### TL;DR Added connection liveness checking to detect and clean up stale WebSocket connections. ### What changed? - Implemented a connection liveness system that periodically checks if WebSocket and SSE connections are still active - Added `connectionLivenessInterval` and `connectionLivenessTimeout` configuration options to the actor lifecycle settings - Enhanced the `Conn` class with liveness tracking and checking capabilities - Added `getConnectionReadyState` method to connection drivers to report connection status - Created a test actor (`connLivenessActor`) and test suite to verify liveness functionality - Implemented internal event scheduling for periodic liveness checks - Added timestamp tracking for last seen activity on connections ### How to test? The PR includes a comprehensive test suite in `actor-conn.ts` that verifies: 1. Connections report correct liveness status 2. Dead connections are properly detected 3. Stale connections are automatically cleaned up after the timeout period 4. Active connections continue to function normally Run the tests with: ``` pnpm test ``` ### Why make this change? WebSocket connections can become stale or disconnected without proper notification, leading to resource leaks and potential issues with connection management. This change implements a robust mechanism to detect and clean up these stale connections automatically, improving system reliability and resource utilization. The liveness check system ensures that the actor framework maintains an accurate view of active connections and can properly manage resources by removing connections that are no longer active.
1 parent 0433897 commit d43e771

File tree

16 files changed

+493
-105
lines changed

16 files changed

+493
-105
lines changed
Lines changed: 52 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,52 @@
1+
import { actor, CONNECTION_DRIVER_WEBSOCKET } from "@rivetkit/core";
2+
3+
export const connLivenessActor = actor({
4+
onAuth: () => {},
5+
state: {
6+
counter: 0,
7+
acceptingConnections: true,
8+
},
9+
options: {
10+
lifecycle: {
11+
connectionLivenessInterval: 5_000,
12+
connectionLivenessTimeout: 2_500,
13+
},
14+
},
15+
onConnect: (c, conn) => {
16+
if (!c.state.acceptingConnections) {
17+
conn.disconnect();
18+
throw new Error("Actor is not accepting connections");
19+
}
20+
},
21+
actions: {
22+
getWsConnectionsLiveness: (c) => {
23+
return Array.from(c.conns.values())
24+
.filter((conn) => conn.driver === CONNECTION_DRIVER_WEBSOCKET)
25+
.map((conn) => ({
26+
id: conn.id,
27+
status: conn.status,
28+
lastSeen: conn.lastSeen,
29+
}));
30+
},
31+
getConnectionId: (c) => {
32+
return c.conn.id;
33+
},
34+
kill: (c, connId: string) => {
35+
c.state.acceptingConnections = false;
36+
// Disconnect the connection with the given ID
37+
// This simulates a network failure or a manual disconnection
38+
// The connection will be cleaned up by the actor manager after the timeout
39+
const conn = c.conns.get(connId);
40+
if (conn) {
41+
conn.disconnect();
42+
}
43+
},
44+
getCounter: (c) => {
45+
return c.state.counter;
46+
},
47+
increment: (c, amount: number) => {
48+
c.state.counter += amount;
49+
return c.state.counter;
50+
},
51+
},
52+
});

packages/core/fixtures/driver-test-suite/registry.ts

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@ import {
1919
noAuthActor,
2020
publicActor,
2121
} from "./auth";
22+
import { connLivenessActor } from "./conn-liveness";
2223
import { counterWithParams } from "./conn-params";
2324
import { connStateActor } from "./conn-state";
2425
// Import actors from individual files
@@ -87,6 +88,8 @@ export const registry = setup({
8788
counterWithParams,
8889
// From conn-state.ts
8990
connStateActor,
91+
// From actor-conn.ts
92+
connLivenessActor,
9093
// From metadata.ts
9194
metadataActor,
9295
// From vars.ts

packages/core/scripts/dump-openapi.ts

Lines changed: 2 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -1,25 +1,10 @@
11
import * as fs from "node:fs/promises";
22
import { resolve } from "node:path";
3-
import { Context } from "hono";
4-
import WebSocket from "ws";
53
import type { ClientDriver } from "@/client/client";
64
import { createFileSystemOrMemoryDriver } from "@/drivers/file-system/mod";
7-
import type {
8-
ActorOutput,
9-
CreateInput,
10-
GetForIdInput,
11-
GetOrCreateWithKeyInput,
12-
GetWithKeyInput,
13-
ManagerDriver,
14-
} from "@/manager/driver";
15-
import { ActorQuery } from "@/manager/protocol/query";
5+
import type { ManagerDriver } from "@/manager/driver";
166
import { createManagerRouter } from "@/manager/router";
17-
import {
18-
Encoding,
19-
type RegistryConfig,
20-
RegistryConfigSchema,
21-
setup,
22-
} from "@/mod";
7+
import { type RegistryConfig, RegistryConfigSchema, setup } from "@/mod";
238
import { type RunConfig, RunConfigSchema } from "@/registry/run-config";
249
import { VERSION } from "@/utils";
2510

packages/core/src/actor/config.ts

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -65,6 +65,8 @@ export const ActorConfigSchema = z
6565
createVarsTimeout: z.number().positive().default(5000),
6666
createConnStateTimeout: z.number().positive().default(5000),
6767
onConnectTimeout: z.number().positive().default(5000),
68+
connectionLivenessTimeout: z.number().positive().default(2500),
69+
connectionLivenessInterval: z.number().positive().default(5000),
6870
})
6971
.strict()
7072
.default({}),

packages/core/src/actor/connection.ts

Lines changed: 79 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,10 @@
11
import type * as messageToClient from "@/actor/protocol/message/to-client";
22
import type * as wsToClient from "@/actor/protocol/message/to-client";
33
import type { AnyDatabaseProvider } from "./database";
4-
import type { ConnDriver } from "./driver";
4+
import { type ConnDriver, ConnectionReadyState } from "./driver";
55
import * as errors from "./errors";
66
import type { ActorInstance } from "./instance";
7+
import { logger } from "./log";
78
import type { PersistedConn } from "./persisted";
89
import { CachedSerializer } from "./protocol/serde";
910
import { generateSecureToken } from "./utils";
@@ -20,6 +21,19 @@ export type ConnId = string;
2021

2122
export type AnyConn = Conn<any, any, any, any, any, any, any>;
2223

24+
export const CONNECTION_DRIVER_WEBSOCKET = "webSocket";
25+
export const CONNECTION_DRIVER_SSE = "sse";
26+
export const CONNECTION_DRIVER_HTTP = "http";
27+
28+
export type ConnectionDriver =
29+
| typeof CONNECTION_DRIVER_WEBSOCKET
30+
| typeof CONNECTION_DRIVER_SSE
31+
| typeof CONNECTION_DRIVER_HTTP;
32+
33+
export type ConnectionStatus = "connected" | "reconnecting";
34+
35+
export const CONNECTION_CHECK_LIVENESS_SYMBOL = Symbol("checkLiveness");
36+
2337
/**
2438
* Represents a client connection to a actor.
2539
*
@@ -35,6 +49,8 @@ export class Conn<S, CP, CS, V, I, AD, DB extends AnyDatabaseProvider> {
3549
// TODO: Remove this cyclical reference
3650
#actor: ActorInstance<S, CP, CS, V, I, AD, DB>;
3751

52+
#status: ConnectionStatus = "connected";
53+
3854
/**
3955
* The proxied state that notifies of changes automatically.
4056
*
@@ -57,6 +73,10 @@ export class Conn<S, CP, CS, V, I, AD, DB extends AnyDatabaseProvider> {
5773
return this.__persist.a as AD;
5874
}
5975

76+
public get driver(): ConnectionDriver {
77+
return this.__persist.d as ConnectionDriver;
78+
}
79+
6080
public get _stateEnabled() {
6181
return this.#stateEnabled;
6282
}
@@ -96,6 +116,20 @@ export class Conn<S, CP, CS, V, I, AD, DB extends AnyDatabaseProvider> {
96116
return this.__persist.t;
97117
}
98118

119+
/**
120+
* Status of the connection.
121+
*/
122+
public get status(): ConnectionStatus {
123+
return this.#status;
124+
}
125+
126+
/**
127+
* Timestamp of the last time the connection was seen, i.e. the last time the connection was active and checked for liveness.
128+
*/
129+
public get lastSeen(): number {
130+
return this.__persist.l;
131+
}
132+
99133
/**
100134
* Initializes a new instance of the Connection class.
101135
*
@@ -164,6 +198,50 @@ export class Conn<S, CP, CS, V, I, AD, DB extends AnyDatabaseProvider> {
164198
* @param reason - The reason for disconnection.
165199
*/
166200
public async disconnect(reason?: string) {
201+
this.#status = "reconnecting";
167202
await this.#driver.disconnect(this.#actor, this, this.__persist.ds, reason);
168203
}
204+
205+
/**
206+
* This method checks the connection's liveness by querying the driver for its ready state.
207+
* If the connection is not closed, it updates the last liveness timestamp and returns `true`.
208+
* Otherwise, it returns `false`.
209+
* @internal
210+
*/
211+
[CONNECTION_CHECK_LIVENESS_SYMBOL]() {
212+
const readyState = this.#driver.getConnectionReadyState?.(
213+
this.#actor,
214+
this,
215+
);
216+
217+
const isConnectionClosed =
218+
readyState === ConnectionReadyState.CLOSED ||
219+
readyState === ConnectionReadyState.CLOSING ||
220+
readyState === undefined;
221+
222+
const newLastSeen = Date.now();
223+
const newStatus = isConnectionClosed ? "reconnecting" : "connected";
224+
225+
logger().debug("liveness probe for connection", {
226+
connId: this.id,
227+
actorId: this.#actor.id,
228+
readyState,
229+
230+
status: this.#status,
231+
newStatus,
232+
233+
lastSeen: this.__persist.l,
234+
currentTs: newLastSeen,
235+
});
236+
237+
if (!isConnectionClosed) {
238+
this.__persist.l = newLastSeen;
239+
}
240+
241+
this.#status = newStatus;
242+
return {
243+
status: this.#status,
244+
lastSeen: this.__persist.l,
245+
};
246+
}
169247
}

packages/core/src/actor/driver.ts

Lines changed: 19 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,15 +1,14 @@
11
import type * as messageToClient from "@/actor/protocol/message/to-client";
22
import type { CachedSerializer } from "@/actor/protocol/serde";
33
import type { AnyClient } from "@/client/client";
4-
import type { ActorInspector } from "@/inspector/actor";
54
import type { ManagerDriver } from "@/manager/driver";
65
import type { RegistryConfig } from "@/registry/config";
76
import type { RunConfig } from "@/registry/run-config";
8-
import type { AnyConn } from "./connection";
7+
import type { AnyConn, ConnectionDriver } from "./connection";
98
import type { GenericConnGlobalState } from "./generic-conn-driver";
109
import type { AnyActorInstance } from "./instance";
1110

12-
export type ConnDrivers = Record<string, ConnDriver>;
11+
export type ConnectionDriversMap = Record<ConnectionDriver, ConnDriver>;
1312

1413
export type ActorDriverBuilder = (
1514
registryConfig: RegistryConfig,
@@ -45,6 +44,14 @@ export interface ActorDriver {
4544
//readState(): void;
4645
}
4746

47+
export enum ConnectionReadyState {
48+
UNKNOWN = -1,
49+
CONNECTING = 0,
50+
OPEN = 1,
51+
CLOSING = 2,
52+
CLOSED = 3,
53+
}
54+
4855
export interface ConnDriver<ConnDriverState = unknown> {
4956
sendMessage?(
5057
actor: AnyActorInstance,
@@ -62,4 +69,13 @@ export interface ConnDriver<ConnDriverState = unknown> {
6269
state: ConnDriverState,
6370
reason?: string,
6471
): Promise<void>;
72+
73+
/**
74+
* Returns the ready state of the connection.
75+
* This is used to determine if the connection is ready to send messages, or if the connection is stale.
76+
*/
77+
getConnectionReadyState?(
78+
actor: AnyActorInstance,
79+
conn: AnyConn,
80+
): ConnectionReadyState | undefined;
6581
}

packages/core/src/actor/generic-conn-driver.ts

Lines changed: 52 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -1,13 +1,21 @@
11
import type { SSEStreamingApi } from "hono/streaming";
22
import type { WSContext } from "hono/ws";
33
import type { WebSocket } from "ws";
4-
import type { AnyConn } from "@/actor/connection";
5-
import type { ConnDriver } from "@/actor/driver";
4+
import {
5+
type AnyConn,
6+
CONNECTION_DRIVER_HTTP,
7+
CONNECTION_DRIVER_SSE,
8+
CONNECTION_DRIVER_WEBSOCKET,
9+
} from "@/actor/connection";
10+
import {
11+
type ConnDriver,
12+
type ConnectionDriversMap,
13+
ConnectionReadyState,
14+
} from "@/actor/driver";
615
import type { AnyActorInstance } from "@/actor/instance";
716
import type * as messageToClient from "@/actor/protocol/message/to-client";
817
import type { CachedSerializer, Encoding } from "@/actor/protocol/serde";
918
import { encodeDataToString } from "@/actor/protocol/serde";
10-
import { dbg } from "@/utils";
1119
import { logger } from "./log";
1220

1321
// This state is different than `PersistedConn` state since the connection-specific state is persisted & must be serializable. This is also part of the connection driver, not part of the core actor.
@@ -25,17 +33,15 @@ export class GenericConnGlobalState {
2533
*/
2634
export function createGenericConnDrivers(
2735
globalState: GenericConnGlobalState,
28-
): Record<string, ConnDriver> {
36+
): ConnectionDriversMap {
2937
return {
30-
[CONN_DRIVER_GENERIC_WEBSOCKET]: createGenericWebSocketDriver(globalState),
31-
[CONN_DRIVER_GENERIC_SSE]: createGenericSseDriver(globalState),
32-
[CONN_DRIVER_GENERIC_HTTP]: createGeneircHttpDriver(),
38+
[CONNECTION_DRIVER_WEBSOCKET]: createGenericWebSocketDriver(globalState),
39+
[CONNECTION_DRIVER_SSE]: createGenericSseDriver(globalState),
40+
[CONNECTION_DRIVER_HTTP]: createGenericHttpDriver(),
3341
};
3442
}
3543

3644
// MARK: WebSocket
37-
export const CONN_DRIVER_GENERIC_WEBSOCKET = "genericWebSocket";
38-
3945
export interface GenericWebSocketDriverState {
4046
encoding: Encoding;
4147
}
@@ -114,7 +120,6 @@ export function createGenericWebSocketDriver(
114120
});
115121
return;
116122
}
117-
118123
const raw = ws.raw as WebSocket;
119124
if (!raw) {
120125
logger().warn("ws.raw does not exist");
@@ -130,12 +135,27 @@ export function createGenericWebSocketDriver(
130135

131136
await promise;
132137
},
138+
139+
getConnectionReadyState: (
140+
_actor: AnyActorInstance,
141+
conn: AnyConn,
142+
): ConnectionReadyState | undefined => {
143+
const ws = globalState.websockets.get(conn.id);
144+
if (!ws) {
145+
logger().warn("missing ws for getConnectionReadyState", {
146+
connId: conn.id,
147+
});
148+
return undefined;
149+
}
150+
151+
const raw = ws.raw as WebSocket;
152+
153+
return raw.readyState as ConnectionReadyState;
154+
},
133155
};
134156
}
135157

136158
// MARK: SSE
137-
export const CONN_DRIVER_GENERIC_SSE = "genericSse";
138-
139159
export interface GenericSseDriverState {
140160
encoding: Encoding;
141161
}
@@ -174,15 +194,32 @@ export function createGenericSseDriver(globalState: GenericConnGlobalState) {
174194

175195
stream.close();
176196
},
197+
198+
getConnectionReadyState: (
199+
_actor: AnyActorInstance,
200+
conn: AnyConn,
201+
): ConnectionReadyState | undefined => {
202+
const stream = globalState.sseStreams.get(conn.id);
203+
if (!stream) {
204+
logger().warn("missing sse stream for getConnectionReadyState", {
205+
connId: conn.id,
206+
});
207+
return undefined;
208+
}
209+
210+
if (stream.aborted || stream.closed) {
211+
return ConnectionReadyState.CLOSED;
212+
}
213+
214+
return ConnectionReadyState.OPEN;
215+
},
177216
};
178217
}
179218

180219
// MARK: HTTP
181-
export const CONN_DRIVER_GENERIC_HTTP = "genericHttp";
182-
183220
export type GenericHttpDriverState = Record<never, never>;
184221

185-
export function createGeneircHttpDriver() {
222+
export function createGenericHttpDriver() {
186223
return {
187224
disconnect: async () => {
188225
// Noop

0 commit comments

Comments
 (0)