diff --git a/ws/src/StramManager.ts b/ws/src/StreamManager.ts similarity index 77% rename from ws/src/StramManager.ts rename to ws/src/StreamManager.ts index 6546edd..5af0f59 100644 --- a/ws/src/StramManager.ts +++ b/ws/src/StreamManager.ts @@ -1,9 +1,10 @@ -import { WebSocket } from "ws"; +import WebSocket from "ws"; import { createClient, RedisClientType } from "redis"; //@ts-ignore import youtubesearchapi from "youtube-search-api"; import { Job, Queue, Worker } from "bullmq"; import { PrismaClient } from "@prisma/client"; +import { getVideoId, isValidYoutubeURL } from "./utils"; const TIME_SPAN_FOR_VOTE = 1200000; // 20min const TIME_SPAN_FOR_QUEUE = 1200000; // 20min @@ -31,7 +32,7 @@ export class RoomManager { public prisma: PrismaClient; public queue: Queue; public worker: Worker; - public wstoSpace: Map + public wstoSpace: Map; private constructor() { this.spaces = new Map(); @@ -46,7 +47,7 @@ export class RoomManager { this.worker = new Worker(process.pid.toString(), this.processJob, { connection, }); - this.wstoSpace=new Map(); + this.wstoSpace = new Map(); } static getInstance() { @@ -75,10 +76,7 @@ export class RoomManager { data.existingActiveStream ); } else if (name === "play-next") { - await RoomManager.getInstance().adminPlayNext( - data.spaceId, - data.userId - ); + await RoomManager.getInstance().adminPlayNext(data.spaceId, data.userId); } else if (name === "remove-song") { await RoomManager.getInstance().adminRemoveSong( data.spaceId, @@ -97,7 +95,7 @@ export class RoomManager { } onSubscribeRoom(message: string, spaceId: string) { - console.log("Subscibe Room",spaceId) + console.log("Subscibe Room", spaceId); const { type, data } = JSON.parse(message); if (type === "new-stream") { RoomManager.getInstance().publishNewStream(spaceId, data); @@ -121,8 +119,8 @@ export class RoomManager { console.log(process.pid + ": createRoom: ", { spaceId }); if (!this.spaces.has(spaceId)) { this.spaces.set(spaceId, { - users:new Map(), - creatorId:"", + users: new Map(), + creatorId: "", }); // const roomsString = await this.redisClient.get("rooms"); // if (roomsString) { @@ -144,34 +142,31 @@ export class RoomManager { async addUser(userId: string, ws: WebSocket, token: string) { let user = this.users.get(userId); - if(!user){ + if (!user) { this.users.set(userId, { userId, - ws:[ws], + ws: [ws], token, }); - } - else{ - if (!user.ws.some(existingWs => existingWs === ws)) { + } else { + if (!user.ws.some((existingWs) => existingWs === ws)) { user.ws.push(ws); } } - } async joinRoom( - spaceId:string, + spaceId: string, creatorId: string, userId: string, ws: WebSocket, token: string ) { + console.log("Join Room" + spaceId); - console.log("Join Room"+spaceId) - let space = this.spaces.get(spaceId); let user = this.users.get(userId); - + if (!space) { await this.createRoom(spaceId); space = this.spaces.get(spaceId); @@ -180,50 +175,47 @@ export class RoomManager { if (!user) { await this.addUser(userId, ws, token); user = this.users.get(userId); - } - else{ - if (!user.ws.some(existingWs => existingWs === ws)) { + } else { + if (!user.ws.some((existingWs) => existingWs === ws)) { user.ws.push(ws); } } - this.wstoSpace.set(ws,spaceId); - + this.wstoSpace.set(ws, spaceId); + if (space && user) { - space.users.set(userId,user) + space.users.set(userId, user); this.spaces.set(spaceId, { ...space, - users:new Map(space.users), - creatorId:creatorId + users: new Map(space.users), + creatorId: creatorId, }); - } } publishEmptyQueue(spaceId: string) { const space = this.spaces.get(spaceId); - space?.users.forEach((user,userId) => { - user?.ws.forEach((ws)=>{ + space?.users.forEach((user, userId) => { + user?.ws.forEach((ws) => { ws.send( JSON.stringify({ type: `empty-queue/${spaceId}`, }) ); - }) + }); }); } async adminEmptyQueue(spaceId: string) { - const room = this.spaces.get(spaceId); - const userId = this.spaces.get(spaceId)?.creatorId + const userId = this.spaces.get(spaceId)?.creatorId; const user = this.users.get(userId as string); if (room && user) { await this.prisma.stream.updateMany({ where: { played: false, - spaceId:spaceId + spaceId: spaceId, }, data: { played: true, @@ -242,18 +234,18 @@ export class RoomManager { publishRemoveSong(spaceId: string, streamId: string) { console.log("publishRemoveSong"); const space = this.spaces.get(spaceId); - space?.users.forEach((user,userId) => { - user?.ws.forEach((ws)=>{ + space?.users.forEach((user, userId) => { + user?.ws.forEach((ws) => { ws.send( JSON.stringify({ type: `remove-song/${spaceId}`, data: { streamId, - spaceId + spaceId, }, }) ); - }) + }); }); } @@ -262,11 +254,11 @@ export class RoomManager { const user = this.users.get(userId); const creatorId = this.spaces.get(spaceId)?.creatorId; - if (user && userId==creatorId) { + if (user && userId == creatorId) { await this.prisma.stream.delete({ where: { id: streamId, - spaceId:spaceId + spaceId: spaceId, }, }); @@ -276,13 +268,12 @@ export class RoomManager { type: "remove-song", data: { streamId, - spaceId + spaceId, }, }) ); - } - else{ - user?.ws.forEach((ws)=>{ + } else { + user?.ws.forEach((ws) => { ws.send( JSON.stringify({ type: "error", @@ -291,31 +282,45 @@ export class RoomManager { }, }) ); - }) + }); } } publishPlayNext(spaceId: string) { const space = this.spaces.get(spaceId); - space?.users.forEach((user,userId) => { - user?.ws.forEach((ws)=>{ + space?.users.forEach((user, userId) => { + user?.ws.forEach((ws) => { ws.send( JSON.stringify({ type: `play-next/${spaceId}`, }) ); - }) + }); }); } - async payAndPlayNext(spaceId:string,userId:string,url:string){ + async payAndPlayNext(spaceId: string, userId: string, url: string) { const creatorId = this.spaces.get(spaceId)?.creatorId; - console.log("payAndPlayNext",creatorId,userId); + console.log("payAndPlayNext", creatorId, userId); let targetUser = this.users.get(userId); if (!targetUser || !creatorId) { return; } - const extractedId = url.split("?v=")[1]; + + const extractedId = getVideoId(url); + + if (!extractedId) { + targetUser?.ws.forEach((ws) => { + ws.send( + JSON.stringify({ + type: "error", + data: { message: "Invalid YouTube URL" }, + }) + ); + }); + return; + } + const res = await youtubesearchapi.GetVideoDetails(extractedId); if (res.thumbnail) { @@ -342,24 +347,23 @@ export class RoomManager { bigImg: thumbnails[thumbnails.length - 1].url ?? "https://cdn.pixabay.com/photo/2024/02/28/07/42/european-shorthair-8601492_640.jpg", - spaceId:spaceId + spaceId: spaceId, }, - }); // update currentStream - await Promise.all([ + await Promise.all([ this.prisma.currentStream.upsert({ where: { - spaceId:spaceId, + spaceId: spaceId, }, update: { - spaceId:spaceId, + spaceId: spaceId, userId, streamId: stream.id, }, create: { id: crypto.randomUUID(), - spaceId:spaceId, + spaceId: spaceId, userId, streamId: stream.id, }, @@ -380,21 +384,19 @@ export class RoomManager { type: "play-next", }) ); - } - } async adminPlayNext(spaceId: string, userId: string) { const creatorId = this.spaces.get(spaceId)?.creatorId; - console.log("adminPlayNext",creatorId,userId); + console.log("adminPlayNext", creatorId, userId); let targetUser = this.users.get(userId); if (!targetUser) { return; } if (targetUser.userId !== creatorId) { - targetUser.ws.forEach((ws)=>{ + targetUser.ws.forEach((ws) => { ws.send( JSON.stringify({ type: "error", @@ -403,16 +405,14 @@ export class RoomManager { }, }) ); - }) + }); return; } - - const mostUpvotedStream = await this.prisma.stream.findFirst({ where: { played: false, - spaceId:spaceId + spaceId: spaceId, }, orderBy: { upvotes: { @@ -422,7 +422,7 @@ export class RoomManager { }); if (!mostUpvotedStream) { - targetUser.ws.forEach((ws)=>{ + targetUser.ws.forEach((ws) => { ws.send( JSON.stringify({ type: "error", @@ -431,22 +431,22 @@ export class RoomManager { }, }) ); - }) + }); return; } - + await Promise.all([ this.prisma.currentStream.upsert({ where: { - spaceId:spaceId, + spaceId: spaceId, }, update: { - spaceId:spaceId, + spaceId: spaceId, userId, streamId: mostUpvotedStream.id, }, create: { - spaceId:spaceId, + spaceId: spaceId, userId, streamId: mostUpvotedStream.id, }, @@ -472,7 +472,7 @@ export class RoomManager { previousQueueLength - 1 ); } - + await this.publisher.publish( spaceId, JSON.stringify({ @@ -488,9 +488,9 @@ export class RoomManager { votedBy: string ) { console.log(process.pid + " publishNewVote"); - const spaces= this.spaces.get(spaceId); - spaces?.users.forEach((user,userId) => { - user?.ws.forEach((ws)=>{ + const spaces = this.spaces.get(spaceId); + spaces?.users.forEach((user, userId) => { + user?.ws.forEach((ws) => { ws.send( JSON.stringify({ type: `new-vote/${spaceId}`, @@ -498,11 +498,11 @@ export class RoomManager { vote, streamId, votedBy, - spaceId + spaceId, }, }) ); - }) + }); }); } @@ -511,7 +511,7 @@ export class RoomManager { userId: string, streamId: string, vote: string, - spaceId:string + spaceId: string ) { console.log(process.pid + " adminCastVote"); if (vote === "upvote") { @@ -553,24 +553,24 @@ export class RoomManager { userId: string, streamId: string, vote: "upvote" | "downvote", - spaceId:string + spaceId: string ) { console.log(process.pid + " castVote"); - const space= this.spaces.get(spaceId); + const space = this.spaces.get(spaceId); const currentUser = this.users.get(userId); - const creatorId=this.spaces.get(spaceId)?.creatorId; - const isCreator = currentUser?.userId===creatorId; - + const creatorId = this.spaces.get(spaceId)?.creatorId; + const isCreator = currentUser?.userId === creatorId; + if (!space || !currentUser) { return; } - if(!isCreator){ + if (!isCreator) { const lastVoted = await this.redisClient.get( `lastVoted-${spaceId}-${userId}` ); - + if (lastVoted) { - currentUser?.ws.forEach((ws)=>{ + currentUser?.ws.forEach((ws) => { ws.send( JSON.stringify({ type: "error", @@ -579,38 +579,35 @@ export class RoomManager { }, }) ); - }) + }); return; } } - await this.queue.add("cast-vote", { creatorId, userId, streamId, vote, - spaceId:spaceId + spaceId: spaceId, }); } publishNewStream(spaceId: string, data: any) { console.log(process.pid + ": publishNewStream"); - console.log("Publish New Stream",spaceId) - const space= this.spaces.get(spaceId); - + console.log("Publish New Stream", spaceId); + const space = this.spaces.get(spaceId); if (space) { - - space?.users.forEach((user,userId) => { - user?.ws.forEach((ws)=>{ + space?.users.forEach((user, userId) => { + user?.ws.forEach((ws) => { ws.send( JSON.stringify({ type: `new-stream/${spaceId}`, data: data, }) ); - }) + }); }); } } @@ -622,7 +619,7 @@ export class RoomManager { existingActiveStream: number ) { console.log(process.pid + " adminAddStreamHandler"); - console.log("adminAddStreamHandler",spaceId) + console.log("adminAddStreamHandler", spaceId); const room = this.spaces.get(spaceId); const currentUser = this.users.get(userId); @@ -630,7 +627,20 @@ export class RoomManager { return; } - const extractedId = url.split("?v=")[1]; + const extractedId = getVideoId(url); + + if (!extractedId) { + currentUser?.ws.forEach((ws) => { + ws.send( + JSON.stringify({ + type: "error", + data: { message: "Invalid YouTube URL" }, + }) + ); + }); + return; + } + await this.redisClient.set( `queue-length-${spaceId}`, existingActiveStream + 1 @@ -662,23 +672,22 @@ export class RoomManager { bigImg: thumbnails[thumbnails.length - 1].url ?? "https://cdn.pixabay.com/photo/2024/02/28/07/42/european-shorthair-8601492_640.jpg", - spaceId:spaceId + spaceId: spaceId, }, - + }); + + await this.redisClient.set(`${spaceId}-${url}`, new Date().getTime(), { + EX: TIME_SPAN_FOR_REPEAT / 1000, }); await this.redisClient.set( - `${spaceId}-${url}`, + `lastAdded-${spaceId}-${userId}`, new Date().getTime(), { - EX: TIME_SPAN_FOR_REPEAT / 1000, + EX: TIME_SPAN_FOR_QUEUE / 1000, } ); - await this.redisClient.set(`lastAdded-${spaceId}-${userId}`, new Date().getTime(), { - EX: TIME_SPAN_FOR_QUEUE / 1000, - }); - await this.publisher.publish( spaceId, JSON.stringify({ @@ -691,7 +700,7 @@ export class RoomManager { }) ); } else { - currentUser?.ws.forEach((ws)=>{ + currentUser?.ws.forEach((ws) => { ws.send( JSON.stringify({ type: "error", @@ -700,23 +709,35 @@ export class RoomManager { }, }) ); - }) + }); } } async addToQueue(spaceId: string, currentUserId: string, url: string) { console.log(process.pid + ": addToQueue"); - + const space = this.spaces.get(spaceId); const currentUser = this.users.get(currentUserId); - const creatorId=this.spaces.get(spaceId)?.creatorId; - const isCreator = currentUserId===creatorId + const creatorId = this.spaces.get(spaceId)?.creatorId; + const isCreator = currentUserId === creatorId; if (!space || !currentUser) { console.log("433: Room or User not defined"); return; } + if (!isValidYoutubeURL(url)) { + currentUser?.ws.forEach((ws) => { + ws.send( + JSON.stringify({ + type: "error", + data: { message: "Invalid YouTube URL" }, + }) + ); + }); + return; + } + let previousQueueLength = parseInt( (await this.redisClient.get(`queue-length-${spaceId}`)) || "0", 10 @@ -732,53 +753,54 @@ export class RoomManager { }); } - if(!isCreator){ - let lastAdded = await this.redisClient.get(`lastAdded-${spaceId}-${currentUserId}`); + if (!isCreator) { + let lastAdded = await this.redisClient.get( + `lastAdded-${spaceId}-${currentUserId}` + ); - if (lastAdded) { - currentUser.ws.forEach((ws)=>{ - ws.send( - JSON.stringify({ - type: "error", - data: { - message: "You can add again after 20 min.", - }, - }) - ); - }) - return; - } - let alreadyAdded = await this.redisClient.get(`${spaceId}-${url}`); + if (lastAdded) { + currentUser.ws.forEach((ws) => { + ws.send( + JSON.stringify({ + type: "error", + data: { + message: "You can add again after 20 min.", + }, + }) + ); + }); + return; + } + let alreadyAdded = await this.redisClient.get(`${spaceId}-${url}`); - if (alreadyAdded) { - currentUser.ws.forEach((ws)=>{ - ws.send( - JSON.stringify({ - type: "error", - data: { - message: "This song is blocked for 1 hour", - }, - }) - ); - }) - return; - } + if (alreadyAdded) { + currentUser.ws.forEach((ws) => { + ws.send( + JSON.stringify({ + type: "error", + data: { + message: "This song is blocked for 1 hour", + }, + }) + ); + }); + return; + } - if (previousQueueLength >= MAX_QUEUE_LENGTH) { - currentUser.ws.forEach((ws)=>{ - ws.send( - JSON.stringify({ - type: "error", - data: { - message: "Queue limit reached", - }, - }) - ); - }) - return; - } + if (previousQueueLength >= MAX_QUEUE_LENGTH) { + currentUser.ws.forEach((ws) => { + ws.send( + JSON.stringify({ + type: "error", + data: { + message: "Queue limit reached", + }, + }) + ); + }); + return; + } } - await this.queue.add("add-to-queue", { spaceId, @@ -791,29 +813,28 @@ export class RoomManager { disconnect(ws: WebSocket) { console.log(process.pid + ": disconnect"); let userId: string | null = null; - const spaceId= this.wstoSpace.get(ws); - this.users.forEach((user,id) => { - const wsIndex= user.ws.indexOf(ws); + const spaceId = this.wstoSpace.get(ws); + this.users.forEach((user, id) => { + const wsIndex = user.ws.indexOf(ws); - if(wsIndex!==-1){ + if (wsIndex !== -1) { userId = id; user.ws.splice(wsIndex, 1); } - if(user.ws.length===0){ - this.users.delete(id) + if (user.ws.length === 0) { + this.users.delete(id); } }); - if (userId && spaceId) { - const space=this.spaces.get(spaceId) - if(space){ + const space = this.spaces.get(spaceId); + if (space) { const updatedUsers = new Map( Array.from(space.users).filter(([usrId]) => userId !== usrId) - ) + ); this.spaces.set(spaceId, { ...space, - users: updatedUsers + users: updatedUsers, }); } } @@ -828,5 +849,5 @@ type User = { type Space = { creatorId: string; - users: Map; -}; \ No newline at end of file + users: Map; +}; diff --git a/ws/src/app.ts b/ws/src/app.ts index bf28243..64ba23d 100644 --- a/ws/src/app.ts +++ b/ws/src/app.ts @@ -1,11 +1,12 @@ -import { WebSocketServer } from "ws"; +import { WebSocket, WebSocketServer } from "ws"; import cluster from "cluster"; import http from "http"; import dotenv from "dotenv"; import jwt from "jsonwebtoken"; +import { sendError } from "./utils"; // import os from "os"; -import { RoomManager } from "./StramManager"; +import { RoomManager } from "./StreamManager"; dotenv.config(); const cors = 1; // os.cpus().length // for vertical scaling @@ -22,112 +23,138 @@ if (cluster.isPrimary) { main(); } -async function main() { - const server = http.createServer((req, res) => { +type Data = { + userId: string; + spaceId: string; + token: string; + url: string; + vote: "upvote" | "downvote"; + streamId: string; +}; + + +function createHttpServer() { + return http.createServer((req, res) => { res.statusCode = 200; res.setHeader("Content-Type", "text/plain"); + res.end("Hello, this is some data from the server!"); + }); +} + +async function handleConnection(ws: WebSocket) { + ws.on("message", async (raw: { toString: () => string }) => { + const { type, data } = JSON.parse(raw.toString()) || {}; - const data = "Hello, this is some data from the server!"; - res.write(data); - res.end(); + switch (type) { + case "join-room": + await handleJoinRoom(ws, data); + break; + default: + await handleUserAction(ws, type, data); + } }); - const wss = new WebSocketServer({ server }); - await RoomManager.getInstance().initRedisClient(); - wss.on("connection", (ws) => { - ws.on("message", async (raw) => { - const { type, data } = JSON.parse(raw.toString()) || {}; - console.log(type) - if (type === "join-room") { - jwt.verify( - data.token, - process.env.NEXTAUTH_SECRET as string, - (err: any, decoded: any) => { - if (err) { - console.log(err); - ws.send( - JSON.stringify({ - type: "error", - data: { - message: "Token verification failed", - }, - }) - ); - } else { - RoomManager.getInstance().joinRoom( - data.spaceId, - decoded.creatorId, - decoded.userId, - ws, - data.token, - - ); - } - } - ); + ws.on("close", () => { + RoomManager.getInstance().disconnect(ws); + }); +} + +async function handleJoinRoom(ws: WebSocket, data: Data) { + jwt.verify( + data.token, + process.env.NEXTAUTH_SECRET as string, + (err: any, decoded: any) => { + if (err) { + console.error(err); + sendError(ws, "Token verification failed"); } else { - const user = RoomManager.getInstance().users.get(data.userId); - - // Adding this to verify the user who is sending this message is not mocking other user. - if (user) { - data.userId = user.userId; - if (type === "cast-vote") { - await RoomManager.getInstance().castVote( - data.userId, - data.streamId, - data.vote, - data.spaceId - ); - } else if (type === "add-to-queue") { - await RoomManager.getInstance().addToQueue( - data.spaceId, - data.userId, - data.url - ); - } else if (type === "play-next") { - await RoomManager.getInstance().queue.add("play-next", { - spaceId: data.spaceId, - userId: data.userId, - }); - } else if (type === "remove-song") { - await RoomManager.getInstance().queue.add("remove-song", { - ...data, - spaceId: data.spaceId, - userId: data.userId, - }); - } else if (type === "empty-queue") { - await RoomManager.getInstance().queue.add("empty-queue", { - ...data, - spaceId: data.spaceId, - userId: data.userId, - }); - } else if (type === "pay-and-play-next"){ - await RoomManager.getInstance().payAndPlayNext( - data.spaceId, - data.userId, - data.url - ) - } - } else { - ws.send( - JSON.stringify({ - type: "error", - data: { - message: "You are unauthorized to perform this action", - }, - }) - ); - } + RoomManager.getInstance().joinRoom( + data.spaceId, + decoded.creatorId, + decoded.userId, + ws, + data.token + ); } - }); + } + ); +} - ws.on("close", () => { - RoomManager.getInstance().disconnect(ws); - }); - }); +async function processUserAction(type: string, data: Data) { + switch (type) { + case "cast-vote": + await RoomManager.getInstance().castVote( + data.userId, + data.streamId, + data.vote, + data.spaceId + ); + break; + + case "add-to-queue": + await RoomManager.getInstance().addToQueue( + data.spaceId, + data.userId, + data.url + ); + break; + + case "play-next": + await RoomManager.getInstance().queue.add("play-next", { + spaceId: data.spaceId, + userId: data.userId, + }); + break; + + case "remove-song": + await RoomManager.getInstance().queue.add("remove-song", { + ...data, + spaceId: data.spaceId, + userId: data.userId, + }); + break; + + case "empty-queue": + await RoomManager.getInstance().queue.add("empty-queue", { + ...data, + spaceId: data.spaceId, + userId: data.userId, + }); + break; + + case "pay-and-play-next": + await RoomManager.getInstance().payAndPlayNext( + data.spaceId, + data.userId, + data.url + ); + break; + + default: + console.warn("Unknown message type:", type); + } +} + +async function handleUserAction(ws: WebSocket, type: string, data: Data) { + const user = RoomManager.getInstance().users.get(data.userId); + + if (user) { + data.userId = user.userId; + await processUserAction(type, data); + } else { + sendError(ws, "You are unauthorized to perform this action"); + } +} + +async function main() { + const server = createHttpServer(); + const wss = new WebSocketServer({ server }); + await RoomManager.getInstance().initRedisClient(); + + wss.on("connection", (ws) => handleConnection(ws)); - const PORT = process.env.PORT; + const PORT = process.env.PORT ?? 8080; server.listen(PORT, () => { - console.log(`${process.pid}: ` + "WebSocket server is running on " + PORT); + console.log(`${process.pid}: WebSocket server is running on ${PORT}`); }); } diff --git a/ws/src/utils.ts b/ws/src/utils.ts new file mode 100644 index 0000000..4f5ed0e --- /dev/null +++ b/ws/src/utils.ts @@ -0,0 +1,16 @@ +import { WebSocket } from "ws"; + +const YT_REGEX = + /^(?:https?:\/\/)?(?:www\.)?(?:m\.)?(?:youtube\.com\/(?:watch\?(?!.*\blist=)(?:.*&)?v=|embed\/|v\/)|youtu\.be\/)([a-zA-Z0-9_-]{11})(?:[?&]\S+)?$/; + +export const isValidYoutubeURL = (data: string) => { + return data.match(YT_REGEX); +}; + +export const getVideoId = (url: string) => { + return url.match(YT_REGEX)?.[1]; +}; + +export function sendError(ws: WebSocket, message: string) { + ws.send(JSON.stringify({ type: "error", data: { message } })); +} \ No newline at end of file