Skip to content

Commit 691501d

Browse files
committed
chore(rivetkit): add runner sse ping interval
1 parent ed48aff commit 691501d

File tree

1 file changed

+27
-0
lines changed

1 file changed

+27
-0
lines changed

rivetkit-typescript/packages/rivetkit/src/drivers/engine/actor-driver.ts

Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -54,6 +54,8 @@ import {
5454
import { KEYS } from "./kv";
5555
import { logger } from "./log";
5656

57+
const RUNNER_SSE_PING_INTERVAL = 1000;
58+
5759
interface ActorHandler {
5860
actor?: AnyActorInstance;
5961
actorStartPromise?: ReturnType<typeof promiseWithResolvers<void>>;
@@ -75,6 +77,7 @@ export class EngineActorDriver implements ActorDriver {
7577

7678
#runnerStarted: PromiseWithResolvers<undefined> = promiseWithResolvers();
7779
#runnerStopped: PromiseWithResolvers<undefined> = promiseWithResolvers();
80+
#isRunnerStopped: boolean = false;
7881

7982
constructor(
8083
registryConfig: RegistryConfig,
@@ -143,6 +146,7 @@ export class EngineActorDriver implements ActorDriver {
143146
},
144147
onShutdown: () => {
145148
this.#runnerStopped.resolve(undefined);
149+
this.#isRunnerStopped = true;
146150
},
147151
fetch: this.#runnerFetch.bind(this),
148152
websocket: this.#runnerWebSocket.bind(this),
@@ -594,6 +598,29 @@ export class EngineActorDriver implements ActorDriver {
594598
invariant(payload, "runnerId not set");
595599
await stream.writeSSE({ data: payload });
596600

601+
// Send ping every second to keep the connection alive
602+
while (true) {
603+
if (this.#isRunnerStopped) {
604+
logger().debug({
605+
msg: "runner is stopped",
606+
});
607+
break;
608+
}
609+
610+
if (stream.closed || stream.aborted) {
611+
logger().debug({
612+
msg: "runner sse stream closed",
613+
closed: stream.closed,
614+
aborted: stream.aborted,
615+
});
616+
break;
617+
}
618+
619+
await stream.writeSSE({ event: "ping", data: "" });
620+
await stream.sleep(RUNNER_SSE_PING_INTERVAL);
621+
}
622+
623+
// Wait for the runner to stop if the SSE stream aborted early for any reason
597624
await this.#runnerStopped.promise;
598625
});
599626
}

0 commit comments

Comments
 (0)