Skip to content

Commit

Permalink
clean up ctrlshell socket service
Browse files Browse the repository at this point in the history
  • Loading branch information
jsbroks committed Nov 6, 2024
1 parent dc01112 commit b4a08c4
Show file tree
Hide file tree
Showing 26 changed files with 457 additions and 274 deletions.
File renamed without changes.
File renamed without changes.
File renamed without changes.
Original file line number Diff line number Diff line change
@@ -1,10 +1,10 @@
{
"name": "@ctrlplane/webshell-router",
"name": "@ctrlplane/ctrlshell",
"private": true,
"type": "module",
"scripts": {
"clean": "rm -rf .turbo node_modules",
"dev:new": "tsx watch --clear-screen=false src/index.ts",
"dev": "tsx watch --clear-screen=false src/index.ts",
"lint": "eslint",
"format": "prettier --check . --ignore-path ../../.gitignore"
},
Expand All @@ -19,6 +19,7 @@
"helmet": "^7.1.0",
"ms": "^2.1.3",
"next-auth": "catalog:",
"uuid": "^10.0.0",
"ws": "^8.17.0",
"zod": "catalog:"
},
Expand All @@ -32,6 +33,7 @@
"@types/express": "^4.17.21",
"@types/ms": "^0.7.34",
"@types/node": "catalog:node20",
"@types/uuid": "^10.0.0",
"@types/ws": "^8.5.10",
"eslint": "catalog:",
"prettier": "catalog:",
Expand Down
118 changes: 118 additions & 0 deletions apps/ctrlshell/src/agent-socket.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,118 @@
import type { IncomingMessage } from "http";
import type WebSocket from "ws";
import type { MessageEvent } from "ws";
import { v4 as uuidv4 } from "uuid";

import type {
AgentHeartbeat,
SessionCreate,
SessionDelete,
SessionInput,
SessionOutput,
} from "./payloads";
import { agentHeartbeat, sessionOutput } from "./payloads";
import { ifMessage } from "./utils";

export class AgentSocket {
static from(socket: WebSocket, request: IncomingMessage) {
if (request.headers["x-api-key"] == null) return null;
return new AgentSocket(socket, request);
}

private stdoutListeners = new Set<(data: SessionOutput) => void>();
readonly id: string;

private constructor(
private readonly socket: WebSocket,
private readonly request: IncomingMessage,
) {
this.id = uuidv4();
this.socket.addEventListener(
"message",
ifMessage()
.is(sessionOutput, (data) => this.notifySubscribers(data))
.is(agentHeartbeat, (data) => this.updateStatus(data))
.handle(),
);
}

onSessionStdout(callback: (data: SessionOutput) => void) {
this.stdoutListeners.add(callback);
}

private notifySubscribers(data: SessionOutput) {
for (const subscriber of this.stdoutListeners) {
subscriber(data);
}
}

private updateStatus(data: AgentHeartbeat) {
console.log("status", data.timestamp);
}

createSession(username = "", shell = "") {
const createSession: SessionCreate = {
type: "session.create",
username,
shell,
};

this.send(createSession);

return this.waitForResponse(
(response) => response.type === "session.created",
);
}

async deleteSession(sessionId: string) {
const deletePayload: SessionDelete = {
type: "session.delete",
sessionId,
};
this.send(deletePayload);

return this.waitForResponse(
(response) => response.type === "session.delete.success",
);
}

waitForResponse<T>(predicate: (response: any) => boolean, timeoutMs = 5000) {
return waitForResponse<T>(this.socket, predicate, timeoutMs);
}

send(data: SessionCreate | SessionDelete | SessionInput) {
return this.socket.send(JSON.stringify(data));
}
}

async function waitForResponse<T>(
socket: WebSocket,
predicate: (response: any) => boolean,
timeoutMs = 5000,
): Promise<T> {
return new Promise<T>((resolve, reject) => {
const timeout = setTimeout(() => {
socket.removeEventListener("message", onMessage);
reject(new Error(`Response timeout after ${timeoutMs}ms`));
}, timeoutMs);

const onMessage = (event: MessageEvent) => {
try {
const response = JSON.parse(
typeof event.data === "string" ? event.data : "",
);
if (predicate(response)) {
clearTimeout(timeout);
socket.removeEventListener("message", onMessage);
resolve(response);
}
} catch {
clearTimeout(timeout);
socket.removeEventListener("message", onMessage);
reject(new Error("Failed to parse response"));
}
};

socket.addEventListener("message", onMessage);
});
}
File renamed without changes.
File renamed without changes.
20 changes: 20 additions & 0 deletions apps/ctrlshell/src/index.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
import ms from "ms";

import { env } from "./config";
import { addSocket } from "./routing";
import { app } from "./server";

const server = addSocket(app).listen(env.PORT, () => {
console.log(`Server is running on port ${env.PORT}`);
});

const onCloseSignal = () => {
server.close(() => {
console.log("Server closed");
process.exit(0);
});
setTimeout(() => process.exit(1), ms("10s")).unref(); // Force shutdown after 10s
};

process.on("SIGINT", onCloseSignal);
process.on("SIGTERM", onCloseSignal);
17 changes: 17 additions & 0 deletions apps/ctrlshell/src/payloads/agent-connect.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
import { z } from "zod";

export default z.object({
type: z
.literal("agent.connect")
.describe("Type of payload - must be agent.register"),
id: z.string().describe("Unique identifier for the agent"),
name: z.string().describe("Optional ID for the session"),
config: z
.record(z.any())
.describe("Optional configuration for the agent")
.optional(),
metadata: z
.record(z.string())
.describe("Optional metadata for the agent as key-value string pairs")
.optional(),
});
13 changes: 13 additions & 0 deletions apps/ctrlshell/src/payloads/agent-heartbeat.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
import { z } from "zod";

export default z.object({
id: z.string().describe("Unique identifier for the client"),
type: z
.literal("client.heartbeat")
.describe("Type of payload - must be client.heartbeat"),
timestamp: z
.string()
.datetime({ offset: true })
.describe("Timestamp of the heartbeat")
.optional(),
});
24 changes: 24 additions & 0 deletions apps/ctrlshell/src/payloads/index.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
import { z } from "zod";

Check warning on line 1 in apps/ctrlshell/src/payloads/index.ts

View workflow job for this annotation

GitHub Actions / Lint

All imports in the declaration are only used as types. Use `import type`

import agentConnect from "./agent-connect";
import agentHeartbeat from "./agent-heartbeat";
import sessionCreate from "./session-create";
import sessionDelete from "./session-delete";
import sessionInput from "./session-input";
import sessionOutput from "./session-output";

export type AgentHeartbeat = z.infer<typeof agentHeartbeat>;
export type AgentConnect = z.infer<typeof agentConnect>;
export type SessionCreate = z.infer<typeof sessionCreate>;
export type SessionInput = z.infer<typeof sessionInput>;
export type SessionOutput = z.infer<typeof sessionOutput>;
export type SessionDelete = z.infer<typeof sessionDelete>;

export {
agentConnect,
agentHeartbeat,
sessionCreate,
sessionDelete,
sessionInput,
sessionOutput,
};
16 changes: 16 additions & 0 deletions apps/ctrlshell/src/payloads/session-create.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
import { z } from "zod";

export default z.object({
type: z
.literal("session.create")
.describe("Type of payload - must be session.create"),
sessionId: z.string().describe("Optional ID for the session").optional(),
username: z
.string()
.describe("Optional username for the session")
.default(""),
shell: z
.string()
.describe("Optional shell to use for the session")
.default(""),
});
8 changes: 8 additions & 0 deletions apps/ctrlshell/src/payloads/session-delete.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
import { z } from "zod";

export default z.object({
type: z
.literal("session.delete")
.describe("Type of payload - must be session.create"),
sessionId: z.string().describe("ID of the session to delete"),
});
19 changes: 19 additions & 0 deletions apps/ctrlshell/src/payloads/session-input.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
import { z } from "zod";

export default z.object({
type: z
.literal("session.input")
.describe(
"Type of payload - must be session.input to identify this as session input data",
),
sessionId: z
.string()
.describe(
"Unique identifier of the PTY session that should receive this input data",
),
data: z
.string()
.describe(
"The input data to send to the PTY session's standard input (stdin)",
),
});
11 changes: 11 additions & 0 deletions apps/ctrlshell/src/payloads/session-output.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
import { z } from "zod";

export default z.object({
type: z
.literal("session.output")
.describe(
"Type of payload - must be session.output to identify this as session output data",
),
sessionId: z.string().describe("ID of the session that generated the output"),
data: z.string().describe("Output data from the PTY session"),
});
52 changes: 52 additions & 0 deletions apps/ctrlshell/src/routing.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
import { createServer } from "node:http";
import type { Express } from "express";
import type { IncomingMessage } from "node:http";
import type WebSocket from "ws";
import { WebSocketServer } from "ws";

import { AgentSocket } from "./agent-socket";
import { agents, users } from "./sockets";
import { UserSocket } from "./user-socket";

const onConnect = async (ws: WebSocket, request: IncomingMessage) => {
const agent = AgentSocket.from(ws, request);
if (agent != null) {
agents.set(agent.id, agent);
return;
}

const user = await UserSocket.from(ws, request);
if (user != null) {
users.set(user.id, user);
return;
}

ws.close();
};

export const addSocket = (expressApp: Express) => {
const server = createServer(expressApp);
const wss = new WebSocketServer({ noServer: true });

server.on("upgrade", (request, socket, head) => {
if (request.url == null) {
socket.destroy();
return;
}

const { pathname } = new URL(request.url, "ws://base.ws");
if (pathname !== "/api/shell/ws") {
socket.destroy();
return;
}

wss.handleUpgrade(request, socket, head, (ws) => {
wss.emit("connection", ws, request);
});
});

// eslint-disable-next-line @typescript-eslint/no-misused-promises
wss.on("connection", onConnect);

return server;
};
File renamed without changes.
5 changes: 5 additions & 0 deletions apps/ctrlshell/src/sockets.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
import type { AgentSocket } from "./agent-socket";
import type { UserSocket } from "./user-socket";

export const agents = new Map<string, AgentSocket>();
export const users = new Map<string, UserSocket>();
27 changes: 27 additions & 0 deletions apps/ctrlshell/src/user-socket.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
import type { IncomingMessage } from "node:http";
import type WebSocket from "ws";
import { v4 as uuidv4 } from "uuid";

import { getSession } from "./auth";

export class UserSocket {
static async from(socket: WebSocket, request: IncomingMessage) {
const session = await getSession(request);
if (session == null) return null;

const { user } = session;
if (user == null) return null;

console.log(`${user.name ?? user.email} (${user.id}) connected`);
return new UserSocket(socket, request);
}

readonly id: string;

private constructor(
private readonly socket: WebSocket,
private readonly request: IncomingMessage,
) {
this.id = uuidv4();
}
}
22 changes: 22 additions & 0 deletions apps/ctrlshell/src/utils.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
import type { MessageEvent } from "ws";
import type { z } from "zod";

export const ifMessage = () => {
const checks: ((event: MessageEvent) => void)[] = [];
return {
is<T>(schema: z.ZodSchema<T>, callback: (data: T) => void) {
checks.push((e: MessageEvent) => {
const result = schema.safeParse(e.data);
if (result.success) {
callback(result.data);
}
});
return this;
},
handle() {
return (event: MessageEvent) => {
for (const check of checks) check(event);
};
},
};
};
File renamed without changes.
File renamed without changes.
Loading

0 comments on commit b4a08c4

Please sign in to comment.