Skip to content

Commit

Permalink
refactor: wss revert has been reimplemented to utilize socket.io ackn…
Browse files Browse the repository at this point in the history
…owledgement.
  • Loading branch information
mmoehabb committed Dec 25, 2024
1 parent bcf8d6f commit 505cdef
Show file tree
Hide file tree
Showing 10 changed files with 155 additions and 175 deletions.
2 changes: 1 addition & 1 deletion apps/nova/src/hooks/chat.ts
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ export function useChat(onMessage?: OnMessage) {

const sendMessage = useCallback(
({ roomId, text }: { roomId: number; text: string }) => {
socket?.emit(Wss.ClientEvent.SendMessage, { ref: 1, roomId, text });
socket?.emit(Wss.ClientEvent.SendMessage, { roomId, text });
},
[socket]
);
Expand Down
38 changes: 10 additions & 28 deletions packages/types/src/wss.ts
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,6 @@ export enum ServerEvent {
RoomMessageDeleted = "RoomMessageDeleted",
RoomMessageRead = "RoomMessageRead",
JoinedRooms = "JoinedRooms",
Revert = "Revert",

MessageRead = "MessageRead",

Expand Down Expand Up @@ -72,47 +71,31 @@ export enum Room {
ServerStats = "ServerStats",
}

export enum RevertErrorCode {
export enum AcknowledgeCodes {
EmptyText = 'empty-text',
RoomNotFound = 'room-not-found',
MessageNotFound = 'message-not-found',
NotMember = 'not-member',
NotOwner = 'not-owner',
Unreachable = 'unreachable',
Unallowed = 'unallowed',
Ok = 'ok',
}

export type WithRevetCode<T extends object> = T & {code: RevertErrorCode};

export type RevertPayload = WithRevetCode<
| {
type: "send-message";
ref: number;
reason: string;
}
| {
type: "update-message" | "delete-message" | "mark-msg-as-read";
id: number;
reason: string;
}
| {
type: "user-typing";
roomId: number;
reason: string;
}>;

type EventCallback<T> = (arg: T) => Promise<void> | void;
export type AcknowledgePayload = {
code: AcknowledgeCodes;
message?: string;
}

export type AcknowledgeCallback = (payload?: AcknowledgePayload) => any;

type EventCallback<T> = (arg: T, callback?: AcknowledgeCallback) => Promise<void> | void;

/**
* Events emitted by the client
*/
export type ClientEventsMap = {
[ClientEvent.SendMessage]: EventCallback<{
/**
* Temporarily id set by the client. Will be emitted by the server incase
* the is a problem with tihs message.
*/
ref: number;
roomId: number;
text: string;
}>;
Expand Down Expand Up @@ -144,7 +127,6 @@ export type ServerEventsMap = {
roomId: number;
messageId: number;
}>;
[ServerEvent.Revert]: EventCallback<RevertPayload>;
[ServerEvent.RoomMessageRead]: EventCallback<{ userId: number }>;

[ServerEvent.UserJoinedCall]: EventCallback<{ peerId: string }>;
Expand Down
24 changes: 17 additions & 7 deletions services/server/fixtures/wss.ts
Original file line number Diff line number Diff line change
Expand Up @@ -13,27 +13,37 @@ export class ClientSocket {
}

userTyping(roomId: number) {
this.client.emit(Wss.ClientEvent.UserTyping, { roomId });
return this.emit( Wss.ClientEvent.UserTyping, { roomId });
}

joinCall(callId: number, type: ICall.Type) {
this.client.emit(Wss.ClientEvent.JoinCall, { callId, type });
return this.emit(Wss.ClientEvent.JoinCall, { callId, type });
}

leaveCall(callId: number) {
this.client.emit(Wss.ClientEvent.LeaveCall, { callId });
return this.emit(Wss.ClientEvent.LeaveCall, { callId });
}

sendMessage(roomId: number, ref: number, text: string) {
this.client.emit(Wss.ClientEvent.SendMessage, { roomId, ref, text });
sendMessage(roomId: number, text: string) {
return this.emit(Wss.ClientEvent.SendMessage, { roomId, text });
}

deleteMessage(msgId: number) {
this.client.emit(Wss.ClientEvent.DeleteMessage, { id: msgId });
return this.emit(Wss.ClientEvent.DeleteMessage, { id: msgId });
}

markMessageAsRead(msgId: number) {
this.client.emit(Wss.ClientEvent.MarkMessageAsRead, { id: msgId });
return this.emit(Wss.ClientEvent.MarkMessageAsRead, { id: msgId });
}

async emit(
event: keyof Wss.ClientEventsMap,
data: any
): Promise<Wss.AcknowledgePayload> {
return new Promise((resolve, _) => {
this.client.emit( event, data, (ack) => ack && resolve(ack));
setTimeout(() => resolve({ code: Wss.AcknowledgeCodes.Ok }), 2_000);
});
}

/**
Expand Down
5 changes: 3 additions & 2 deletions services/server/src/wss/handlers/base.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import { IUser, Wss } from "@litespace/types";
import { AcknowledgeCallback, AcknowledgePayload } from "@litespace/types/dist/esm/wss";
import { Socket } from "socket.io";

export abstract class WssHandler {
Expand All @@ -19,7 +20,7 @@ export abstract class WssHandler {
this.socket.broadcast.to(room).emit(event, ...data);
}

revert(payload: Wss.RevertPayload) {
this.socket.emit(Wss.ServerEvent.Revert, payload);
protected must(callback: AcknowledgeCallback | undefined, payload: AcknowledgePayload): void {
return callback ? callback(payload) : console.error("Socket.io callback is not defined!");
}
}
5 changes: 3 additions & 2 deletions services/server/src/wss/handlers/call.ts
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,8 @@ export class Call extends WssHandler {
this.broadcast(
Wss.ServerEvent.MemberJoinedCall,
asCallRoomId(callId),
{ userId: user.id } // TODO: define the payload struct type in the types package
{ userId: user.id }, // TODO: define the payload struct type in the types package
() => {}
);
});
if (result instanceof Error) stdout.error(result.message);
Expand Down Expand Up @@ -97,7 +98,7 @@ export class Call extends WssHandler {
.to(asCallRoomId(callId))
.emit(Wss.ServerEvent.MemberLeftCall, {
userId: user.id,
});
}, () => {});
});
if (result instanceof Error) stdout.error(result.message);
}
Expand Down
17 changes: 12 additions & 5 deletions services/server/src/wss/handlers/connection.ts
Original file line number Diff line number Diff line change
Expand Up @@ -58,15 +58,20 @@ export class Connection extends WssHandler {
this.broadcast(
Wss.ServerEvent.UserStatusChanged,
room.toString(),
{ online }
{ online },
() => {}
);
}
}

private async emitServerStats() {
background.on("message", (message: PartentPortMessage) => {
if (message.type === PartentPortMessageType.Stats)
return this.socket.emit(Wss.ServerEvent.ServerStats, message.stats);
return this.socket.emit(
Wss.ServerEvent.ServerStats,
message.stats,
() => {}
);
});
}

Expand Down Expand Up @@ -124,8 +129,10 @@ export class Connection extends WssHandler {
// notify members that a member has left the call
this.socket.broadcast
.to(asCallRoomId(callId))
.emit(Wss.ServerEvent.MemberLeftCall, {
userId: user.id,
});
.emit(
Wss.ServerEvent.MemberLeftCall,
{ userId: user.id },
() => {}
);
}
}
6 changes: 2 additions & 4 deletions services/server/src/wss/handlers/inputDevices.ts
Original file line number Diff line number Diff line change
@@ -1,11 +1,9 @@
import { logger, safe } from "@litespace/sol";
import { rooms } from "@litespace/models";
import { isGhost } from "@litespace/auth";
import { Wss } from "@litespace/types";
import { id, boolean } from "@/validation/utils";
import { WssHandler } from "@/wss/handlers/base";
import zod from "zod";
import { isEmpty } from "lodash";

const toggleCameraPayload = zod.object({ call: id, camera: boolean });
const toggleMicPayload = zod.object({ call: id, mic: boolean });
Expand All @@ -31,7 +29,7 @@ export class InputDevices extends WssHandler {
this.broadcast(Wss.ServerEvent.CameraToggled, call.toString(), {
user: user.id,
camera,
});
}, () => {});
});
if (error instanceof Error) stdout.error(error.message);
}
Expand All @@ -45,7 +43,7 @@ export class InputDevices extends WssHandler {
this.broadcast(Wss.ServerEvent.MicToggled, call.toString(), {
user: user.id,
mic,
});
}, () => {});
});
if (error instanceof Error) stdout.error(error.message);
}
Expand Down
Loading

0 comments on commit 505cdef

Please sign in to comment.