Skip to content

Commit

Permalink
refactor(server): refactor wss handler
Browse files Browse the repository at this point in the history
  • Loading branch information
neuodev committed Dec 15, 2024
1 parent 8010a5f commit 0d4d588
Show file tree
Hide file tree
Showing 10 changed files with 109 additions and 93 deletions.
Binary file added .DS_Store
Binary file not shown.
4 changes: 2 additions & 2 deletions services/server/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import express, { json } from "express";
import routes from "@/routes";
import { ghostConfig, jwtSecret, serverConfig } from "@/constants";
import { errorHandler } from "@/middleware/error";
import { wssHandler } from "@/wss";
import bodyParser from "body-parser";
import cors from "cors";
import logger from "morgan";
Expand All @@ -15,7 +16,6 @@ import { authMiddleware, adminOnly } from "@litespace/auth";
import { isAllowedOrigin } from "@/lib/cors";
import { cache } from "@/lib/cache";
import "colors";
import { wssConnectionHandler } from "./wss";

// Stablish connection with the redis cache.
cache.connect();
Expand All @@ -33,7 +33,7 @@ io.engine.use(
)
);
io.engine.use(onlyForHandshake(authorizeSocket));
io.on("connection", wssConnectionHandler);
io.on("connection", wssHandler);

app.use(
logger(function (tokens, req, res) {
Expand Down
2 changes: 1 addition & 1 deletion services/server/src/wss/handlers/base.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import { IUser, Wss } from "@litespace/types";
import { Socket } from "socket.io";

export abstract class WSSHandler {
export abstract class WssHandler {
protected readonly socket: Socket;
protected readonly user: IUser.Self | IUser.Ghost;

Expand Down
18 changes: 13 additions & 5 deletions services/server/src/wss/handlers/call.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,18 +3,27 @@ import { canJoinCall } from "@/lib/call";
import { isGhost } from "@litespace/auth";
import { logger, safe } from "@litespace/sol";
import { ICall, Wss } from "@litespace/types";
import { WSSHandler } from "@/wss/handlers/base";
import { WssHandler } from "@/wss/handlers/base";
import { asCallRoomId } from "@/wss/utils";

import zod from "zod";
import { asCallRoomId } from "../utils";

const callTypes = ["lesson", "interview"] as const satisfies ICall.Type[];
const stdout = logger("wss");

const onJoinCallPayload = zod.object({ callId: zod.number(), type: zod.enum(callTypes) });
const onJoinCallPayload = zod.object({
callId: zod.number(),
type: zod.enum(callTypes),
});
const onLeaveCallPayload = zod.object({ callId: zod.number() });

export class CallHandler extends WSSHandler {
export class Call extends WssHandler {
public init(): Call {
this.socket.on(Wss.ClientEvent.JoinCall, this.onJoinCall.bind(this));
this.socket.on(Wss.ClientEvent.LeaveCall, this.onLeaveCall.bind(this));
return this;
}

/*
* This event listener will be called whenever a user
* joins a call. For instance, when
Expand Down Expand Up @@ -93,4 +102,3 @@ export class CallHandler extends WSSHandler {
if (result instanceof Error) stdout.error(result.message);
}
}

15 changes: 9 additions & 6 deletions services/server/src/wss/handlers/connection.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import { isAdmin, isGhost, isStudent, isTutor } from "@litespace/auth";
import { dayjs, logger, safe } from "@litespace/sol";
import { IUser, Wss } from "@litespace/types";
import { WSSHandler } from "@/wss/handlers/base";
import { WssHandler } from "@/wss/handlers/base";
import { calls, rooms, users } from "@litespace/models";
import { background } from "@/workers";
import { PartentPortMessage, PartentPortMessageType } from "@/workers/messages";
Expand All @@ -11,7 +11,13 @@ import { getGhostCall } from "@litespace/sol/ghost";

const stdout = logger("wss");

export class ConnectionHandler extends WSSHandler {
export class Connection extends WssHandler {
public init(): Connection {
this.connect();
this.socket.on(Wss.ClientEvent.Disconnect, this.disconnect.bind(this));
return this;
}

async connect() {
const error = safe(async () => {
const user = this.user;
Expand Down Expand Up @@ -79,9 +85,7 @@ export class ConnectionHandler extends WSSHandler {
before: dayjs.utc().add(1, "day").toISOString(),
});

this.socket.join(
callsList.list.map((call) => asCallRoomId(call.id))
);
this.socket.join(callsList.list.map((call) => asCallRoomId(call.id)));
});

if (error instanceof Error) stdout.error(error.message);
Expand Down Expand Up @@ -118,4 +122,3 @@ export class ConnectionHandler extends WSSHandler {
});
}
}

32 changes: 16 additions & 16 deletions services/server/src/wss/handlers/index.ts
Original file line number Diff line number Diff line change
@@ -1,24 +1,24 @@
import { Socket } from "socket.io";
import { IUser } from "@litespace/types";

import { CallHandler } from "./call";
import { ConnectionHandler } from "./connection";
import { MessageHandler } from "./message";
import { PeerHandler } from "./peer";
import { InputDevicesHandler } from "./inputDevices";
import { Call } from "@/wss/handlers/call";
import { Connection } from "@/wss/handlers/connection";
import { Messages } from "@/wss/handlers/message";
import { Peer } from "@/wss/handlers/peer";
import { InputDevices } from "./inputDevices";

export class WSSHandlers {
public readonly connection: ConnectionHandler;
public readonly call: CallHandler;
public readonly message: MessageHandler;
public readonly peer: PeerHandler;
public readonly inputDevices: InputDevicesHandler;
export class WssHandlers {
public readonly connection: Connection;
public readonly call: Call;
public readonly messages: Messages;
public readonly peer: Peer;
public readonly inputDevices: InputDevices;

constructor(socket: Socket, user: IUser.Self | IUser.Ghost) {
this.connection = new ConnectionHandler(socket, user);
this.call = new CallHandler(socket, user);
this.message = new MessageHandler(socket, user);
this.peer = new PeerHandler(socket, user);
this.inputDevices = new InputDevicesHandler(socket, user);
this.connection = new Connection(socket, user).init();
this.call = new Call(socket, user).init();
this.messages = new Messages(socket, user).init();
this.peer = new Peer(socket, user);
this.inputDevices = new InputDevices(socket, user).init();
}
}
42 changes: 13 additions & 29 deletions services/server/src/wss/handlers/inputDevices.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,19 +3,26 @@ import { rooms } from "@litespace/models";
import { isGhost } from "@litespace/auth";
import { Wss } from "@litespace/types";
import { id, boolean } from "@/validation/utils";
import { WSSHandler } from "./base";

import { WssHandler } from "@/wss/handlers/base";
import zod from "zod";
import { isEmpty } from "lodash";

const toggleCameraPayload = zod.object({ call: id, camera: boolean });
const toggleMicPayload = zod.object({ call: id, mic: boolean });
const userTypingPayload = zod.object({ roomId: zod.number() });

const stdout = logger("wss");

export class InputDevicesHandler extends WSSHandler {
async toggleCamera(data: unknown) {
export class InputDevices extends WssHandler {
public init(): InputDevices {
this.socket.on(
Wss.ClientEvent.ToggleCamera,
this.onToggleCamera.bind(this)
);
this.socket.on(Wss.ClientEvent.ToggleMic, this.onToggleMic.bind(this));
return this;
}

async onToggleCamera(data: unknown) {
const error = safe(async () => {
const user = this.user;
if (isGhost(user)) return;
Expand All @@ -29,7 +36,7 @@ export class InputDevicesHandler extends WSSHandler {
if (error instanceof Error) stdout.error(error.message);
}

async toggleMic(data: unknown) {
async onToggleMic(data: unknown) {
const error = safe(async () => {
const user = this.user;
if (isGhost(user)) return;
Expand All @@ -42,27 +49,4 @@ export class InputDevicesHandler extends WSSHandler {
});
if (error instanceof Error) stdout.error(error.message);
}

async userTyping(data: unknown) {
const error = safe(async () => {
const { roomId } = userTypingPayload.parse(data);

const user = this.user;
if (isGhost(user)) return;

const members = await rooms.findRoomMembers({ roomIds: [roomId] });
if (isEmpty(members)) return;

const isMember = members.find((member) => member.id === user.id);
if (!isMember)
throw new Error(`User(${user.id}) isn't member of room Id: ${roomId}`);

this.socket.to(roomId.toString()).emit(Wss.ServerEvent.UserTyping, {
roomId,
userId: user.id,
});
});

if (error instanceof Error) stdout.error(error.message);
}
}
46 changes: 42 additions & 4 deletions services/server/src/wss/handlers/message.ts
Original file line number Diff line number Diff line change
@@ -1,19 +1,34 @@
import { isGhost } from "@litespace/auth";
import { logger, safe, sanitizeMessage } from "@litespace/sol";
import { Wss } from "@litespace/types";
import { WSSHandler } from "@/wss/handlers/base";
import { WssHandler } from "@/wss/handlers/base";
import { messages, rooms } from "@litespace/models";
import { asChatRoomId } from "@/wss/utils";
import { id, string, withNamedId } from "@/validation/utils";
import wss from "@/validation/wss";

import zod from "zod";
import { id, string, withNamedId } from "@/validation/utils";
import { isEmpty } from "lodash";

const stdout = logger("wss");

const updateMessagePayload = zod.object({ text: string, id });
const userTypingPayload = zod.object({ roomId: zod.number() });

export class Messages extends WssHandler {
public init(): Messages {
this.socket.on(Wss.ClientEvent.SendMessage, this.sendMessage.bind(this));
this.socket.on(
Wss.ClientEvent.UpdateMessage,
this.updateMessage.bind(this)
);
this.socket.on(
Wss.ClientEvent.DeleteMessage,
this.deleteMessage.bind(this)
);
this.socket.on(Wss.ClientEvent.UserTyping, this.onUserTyping.bind(this));
return this;
}

export class MessageHandler extends WSSHandler {
async sendMessage(data: unknown) {
const error = safe(async () => {
const user = this.user;
Expand Down Expand Up @@ -100,4 +115,27 @@ export class MessageHandler extends WSSHandler {
});
if (error instanceof Error) stdout.error(error.message);
}

async onUserTyping(data: unknown) {
const error = safe(async () => {
const { roomId } = userTypingPayload.parse(data);

const user = this.user;
if (isGhost(user)) return;

const members = await rooms.findRoomMembers({ roomIds: [roomId] });
if (isEmpty(members)) return;

const isMember = members.find((member) => member.id === user.id);
if (!isMember)
throw new Error(`User(${user.id}) isn't member of room Id: ${roomId}`);

this.socket.to(roomId.toString()).emit(Wss.ServerEvent.UserTyping, {
roomId,
userId: user.id,
});
});

if (error instanceof Error) stdout.error(error.message);
}
}
11 changes: 6 additions & 5 deletions services/server/src/wss/handlers/peer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,7 @@ import { Wss } from "@litespace/types";
import { getGhostCall } from "@litespace/sol/ghost";
import { cache } from "@/lib/cache";
import { id, string } from "@/validation/utils";
import { WSSHandler } from "./base";

import { WssHandler } from "@/wss/handlers/base";
import zod from "zod";
import { isEmpty } from "lodash";

Expand All @@ -15,7 +14,10 @@ const registerPeerPayload = zod.object({ peer: zod.string() });

const stdout = logger("wss");

export class PeerHandler extends WSSHandler {
/**
* @deprecated should be removed in favor of the new call-cache-based design.
*/
export class Peer extends WssHandler {
async peerOpened(data: unknown) {
const error = await safe(async () => {
const { callId, peerId } = peerPayload.parse(data);
Expand Down Expand Up @@ -46,8 +48,7 @@ export class PeerHandler extends WSSHandler {

if (isGhost(user))
await cache.peer.setGhostPeerId(getGhostCall(user), peer);
if (isTutor(user))
await cache.peer.setUserPeerId(user.id, peer);
if (isTutor(user)) await cache.peer.setUserPeerId(user.id, peer);

// notify peers to refetch the peer id if needed
});
Expand Down
32 changes: 7 additions & 25 deletions services/server/src/wss/index.ts
Original file line number Diff line number Diff line change
@@ -1,32 +1,14 @@
import { Socket } from "socket.io";
import { WSSHandlers } from "./handlers";
import { WssHandlers } from "@/wss/handlers";
import { logger } from "@litespace/sol";
import { Wss } from "@litespace/types";

const stdout = logger("wss");

export function wssConnectionHandler(socket: Socket) {
export function wssHandler(socket: Socket) {
const user = socket.request.user;
if (!user) {
stdout.warning("(function) wssHandler: No user has been found in the request obj!");
return;
}
const handlers = new WSSHandlers(socket, user);

handlers.connection.connect();
socket.on(Wss.ClientEvent.Disconnect, handlers.connection.disconnect.bind(handlers.connection));

socket.on(Wss.ClientEvent.JoinCall, handlers.call.onJoinCall.bind(handlers.call));
socket.on(Wss.ClientEvent.LeaveCall, handlers.call.onLeaveCall.bind(handlers.call));

socket.on(Wss.ClientEvent.SendMessage, handlers.message.sendMessage.bind(handlers.message));
socket.on(Wss.ClientEvent.UpdateMessage, handlers.message.updateMessage.bind(handlers.message));
socket.on(Wss.ClientEvent.DeleteMessage, handlers.message.deleteMessage.bind(handlers.message));

socket.on(Wss.ClientEvent.PeerOpened, handlers.peer.peerOpened.bind(handlers.peer));
socket.on(Wss.ClientEvent.RegisterPeer, handlers.peer.registerPeer.bind(handlers.peer));

socket.on(Wss.ClientEvent.ToggleCamera, handlers.inputDevices.toggleCamera.bind(handlers.inputDevices));
socket.on(Wss.ClientEvent.ToggleMic, handlers.inputDevices.toggleMic.bind(handlers.inputDevices));
socket.on(Wss.ClientEvent.UserTyping, handlers.inputDevices.userTyping.bind(handlers.inputDevices));
if (!user)
return stdout.warning(
"wssHandler: No user has been found in the request obj!"
);
return new WssHandlers(socket, user);
}

0 comments on commit 0d4d588

Please sign in to comment.