From cf37daa870c3afa1cef7d00ada67025c66a65aa2 Mon Sep 17 00:00:00 2001 From: Meriem-BM Date: Sun, 25 Aug 2024 13:11:07 +0100 Subject: [PATCH 1/2] fix: change from WS to SSE --- src/resolvers/draftDonationResolver.ts | 10 ++-- src/server/bootstrap.ts | 15 +++--- .../cronJobs/checkQRTransactionJob.ts | 10 ++-- src/services/sse/sse.ts | 45 ++++++++++++++++++ src/services/ws/webSocketServer.ts | 46 ------------------- 5 files changed, 66 insertions(+), 60 deletions(-) create mode 100644 src/services/sse/sse.ts delete mode 100644 src/services/ws/webSocketServer.ts diff --git a/src/resolvers/draftDonationResolver.ts b/src/resolvers/draftDonationResolver.ts index 9ccc85f22..8df3f9dac 100644 --- a/src/resolvers/draftDonationResolver.ts +++ b/src/resolvers/draftDonationResolver.ts @@ -27,6 +27,7 @@ import { import { RecurringDonation } from '../entities/recurringDonation'; import { checkTransactions } from '../services/cronJobs/checkQRTransactionJob'; import { findProjectById } from '../repositories/projectRepository'; +import { notifyDonationFailed } from '../services/sse/sse'; const draftDonationEnabled = process.env.ENABLE_DRAFT_DONATION === 'true'; const draftRecurringDonationEnabled = @@ -419,9 +420,12 @@ export class DraftDonationResolver { ); // Notify clients of new donation - (global as any).notifyDraftDonationFailed({ - draftDonationId: id, - expiresAt: draftDonation.expiresAt, + notifyDonationFailed({ + type: 'draft-donation-failed', + data: { + draftDonationId: id, + expiresAt: draftDonation.expiresAt, + }, }); return true; diff --git a/src/server/bootstrap.ts b/src/server/bootstrap.ts index 21a1ca581..3ca543943 100644 --- a/src/server/bootstrap.ts +++ b/src/server/bootstrap.ts @@ -7,7 +7,7 @@ import { ApolloServer } from '@apollo/server'; import { expressMiddleware } from '@apollo/server/express4'; import { ApolloServerPluginSchemaReporting } from '@apollo/server/plugin/schemaReporting'; import { ApolloServerPluginLandingPageGraphQLPlayground } from '@apollo/server-plugin-landing-page-graphql-playground'; -import express, { json, Request } from 'express'; +import express, { json, Request, Response } from 'express'; import { Container } from 'typedi'; import { Resource } from '@adminjs/typeorm'; import { validate } from 'class-validator'; @@ -68,7 +68,7 @@ import { runDraftDonationMatchWorkerJob } from '../services/cronJobs/draftDonati import { runCheckUserSuperTokenBalancesJob } from '../services/cronJobs/checkUserSuperTokenBalancesJob'; import { runCheckPendingRecurringDonationsCronJob } from '../services/cronJobs/syncRecurringDonationsWithNetwork'; import { runCheckQRTransactionJob } from '../services/cronJobs/checkQRTransactionJob'; -import { startWebSocketServer } from '../services/ws/webSocketServer'; +import { addClient } from '../services/sse/sse'; Resource.validate = validate; @@ -290,13 +290,12 @@ export async function bootstrap() { app.post('/fiat_webhook', onramperWebhookHandler); app.post('/transak_webhook', webhookHandler); - const httpServer = http.createServer(app); + // Route to handle SSE connections + app.get('/events', (_req: Request, res: Response) => { + addClient(res); + }); - // Start WebSocket server - const { notifyDonationAdded, notifyDraftDonationFailed } = - startWebSocketServer(httpServer); - (global as any).notifyDonationAdded = notifyDonationAdded; - (global as any).notifyDraftDonationFailed = notifyDraftDonationFailed; + const httpServer = http.createServer(app); await new Promise((resolve, reject) => { httpServer diff --git a/src/services/cronJobs/checkQRTransactionJob.ts b/src/services/cronJobs/checkQRTransactionJob.ts index 6b465a4a0..fa0ea2730 100644 --- a/src/services/cronJobs/checkQRTransactionJob.ts +++ b/src/services/cronJobs/checkQRTransactionJob.ts @@ -15,6 +15,7 @@ import { findUserById } from '../../repositories/userRepository'; import { relatedActiveQfRoundForProject } from '../qfRoundService'; import { QfRound } from '../../entities/qfRound'; import { syncDonationStatusWithBlockchainNetwork } from '../donationService'; +import { notifyClients } from '../sse/sse'; const STELLAR_HORIZON_API = (config.get('STELLAR_HORIZON_API_URL') as string) || @@ -182,9 +183,12 @@ export async function checkTransactions( }); // Notify clients of new donation - (global as any).notifyDonationAdded({ - donationId: returnedDonation.id, - draftDonationId: donation.id, + notifyClients({ + type: 'new-donation', + data: { + donationId: returnedDonation.id, + draftDonationId: donation.id, + }, }); return; diff --git a/src/services/sse/sse.ts b/src/services/sse/sse.ts new file mode 100644 index 000000000..bc1d1a586 --- /dev/null +++ b/src/services/sse/sse.ts @@ -0,0 +1,45 @@ +import { Response } from 'express'; + +let clients: Response[] = []; +type TNewDonation = { + type: 'new-donation'; + data: { + donationId: number; + draftDonationId: number; + }; +}; + +type TDraftDonationFailed = { + type: 'draft-donation-failed'; + data: { + draftDonationId: number; + expiresAt?: Date; + }; +}; + +// Add a new client to the SSE stream +export function addClient(res: Response) { + res.setHeader('Access-Control-Allow-Origin', '*'); + res.setHeader('Content-Type', 'text/event-stream'); + res.setHeader('Cache-Control', 'no-cache'); + res.setHeader('Connection', 'keep-alive'); + + res.flushHeaders(); + + clients.push(res); + + // Remove the client on disconnect + res.on('close', () => { + clients = clients.filter(client => client !== res); + }); +} + +// Notify all connected clients about a new donation +export function notifyClients(data: TNewDonation) { + clients.forEach(client => client.write(`data: ${JSON.stringify(data)}\n\n`)); +} + +// Notify all connected clients about a failed donation +export function notifyDonationFailed(data: TDraftDonationFailed) { + clients.forEach(client => client.write(`data: ${JSON.stringify(data)}\n\n`)); +} diff --git a/src/services/ws/webSocketServer.ts b/src/services/ws/webSocketServer.ts deleted file mode 100644 index 16568a2b2..000000000 --- a/src/services/ws/webSocketServer.ts +++ /dev/null @@ -1,46 +0,0 @@ -import WebSocket from 'ws'; -import { logger } from '../../utils/logger'; - -export function startWebSocketServer(server) { - const wss = new WebSocket.Server({ server }); - - // Handle WebSocket connections - wss.on('connection', ws => { - ws.on('message', message => { - logger.info(`Received message: ${message}`); - }); - }); - - // Broadcast a message to all connected clients - function broadcastMessage(message) { - wss.clients.forEach(client => { - if (client.readyState === WebSocket.OPEN) { - client.send(message); - } - }); - } - - // Exported function to be called from cron job - function notifyDonationAdded(donation) { - const message = JSON.stringify({ - type: 'new-donation', - data: donation, - }); - broadcastMessage(message); - } - - // Exported function to to be called when marking draft donation as failed - function notifyDraftDonationFailed(donation) { - const message = JSON.stringify({ - type: 'draft-donation-failed', - data: donation, - }); - broadcastMessage(message); - } - - // Export the notifyDonationAdded function so it can be used externally - return { - notifyDonationAdded, - notifyDraftDonationFailed, - }; -} From a14ccba4fe5b80a009c0aa3e48c3e634c41fc5e9 Mon Sep 17 00:00:00 2001 From: Meriem-BM Date: Sun, 25 Aug 2024 13:28:34 +0100 Subject: [PATCH 2/2] fix: remove ws dependency --- package-lock.json | 3 +-- package.json | 3 +-- 2 files changed, 2 insertions(+), 4 deletions(-) diff --git a/package-lock.json b/package-lock.json index d615618d4..bb84a3cbf 100644 --- a/package-lock.json +++ b/package-lock.json @@ -75,8 +75,7 @@ "twitter-api-sdk": "^1.0.9", "type-graphql": "2.0.0-beta.1", "typedi": "0.8.0", - "typeorm": "0.3.20", - "ws": "^8.18.0" + "typeorm": "0.3.20" }, "devDependencies": { "@types/axios": "^0.14.0", diff --git a/package.json b/package.json index 8366ab74b..edcb23176 100644 --- a/package.json +++ b/package.json @@ -69,8 +69,7 @@ "twitter-api-sdk": "^1.0.9", "type-graphql": "2.0.0-beta.1", "typedi": "0.8.0", - "typeorm": "0.3.20", - "ws": "^8.18.0" + "typeorm": "0.3.20" }, "lint-staged": { "*.ts": [