Skip to content

Commit

Permalink
Fixed bug where ChangeStream threw error when one user times out
Browse files Browse the repository at this point in the history
  • Loading branch information
kervyntan committed Nov 13, 2024
1 parent cb96b87 commit 7cea4cb
Showing 1 changed file with 15 additions and 4 deletions.
19 changes: 15 additions & 4 deletions message-queue/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,8 @@ const addDataToCollabExchange = (data: CollabExchangeData, key: string) => {

let waitingUsers: { [key: string]: (data: any) => void } = {}

let isStreamClosed = false;

const matchUsers = async (userData: any, key: string): Promise<{ matchedUsers: any[] }> => {
let changeStream: ChangeStream = null
try {
Expand Down Expand Up @@ -108,16 +110,18 @@ const matchUsers = async (userData: any, key: string): Promise<{ matchedUsers: a
const timer = setInterval(async () => {
console.log("Step 4: Timeout reached, removing user from queue")
await db.collection("usersQueue").deleteOne({ user_id: userData.user_id })
resolve({ matchedUsers: [] })
delete waitingUsers[userData.user_id]

// Close change stream on timeout
if (changeStream) {
if (changeStream && !isStreamClosed) {
try {
console.log("Closing change stream on timeout")
changeStream.close()
isStreamClosed = true;
} catch (e) {
console.error(e)
} finally {
resolve({ matchedUsers: [] })
return { matchedUsers: [] }
}
}
Expand All @@ -127,8 +131,8 @@ const matchUsers = async (userData: any, key: string): Promise<{ matchedUsers: a
changeStream = db.collection("usersQueue").watch()
changeStream.on("change", async (change) => {
console.log("Step 5: Change detected", change.operationType)

if (change.operationType === "insert") {
isStreamClosed = false;
if (change.operationType === "insert" && !isStreamClosed) {
const newUser = change.fullDocument

if (
Expand Down Expand Up @@ -158,6 +162,7 @@ const matchUsers = async (userData: any, key: string): Promise<{ matchedUsers: a
if (changeStream) {
console.log("Closing change stream after match")
changeStream.close()
isStreamClosed = true;
}
}
} else {
Expand All @@ -166,6 +171,7 @@ const matchUsers = async (userData: any, key: string): Promise<{ matchedUsers: a
if (changeStream) {
console.log("Closing change stream after delete event")
changeStream.close()
isStreamClosed = true;
}
}
})
Expand All @@ -177,6 +183,7 @@ const matchUsers = async (userData: any, key: string): Promise<{ matchedUsers: a
if (changeStream) {
console.log("Closing change stream on error")
changeStream.close()
isStreamClosed = true;
}

return { matchedUsers: [] }
Expand Down Expand Up @@ -243,6 +250,10 @@ app.post("/match", async (req: Request, res: Response) => {
}
})

app.post("/match/cancel", async (req: Request, res: Response) => {

})

const port = process.env.PORT || 3002;
app.listen(port, () => {
console.log(`Matching service running on port ${port}.`);
Expand Down

0 comments on commit 7cea4cb

Please sign in to comment.