Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Bug/#315 unsubscribe, subscribe에 queue 적용 #337

Merged
merged 19 commits into from
Dec 3, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions packages/backend/src/scraper/openapi/api/openapiToken.api.ts
Original file line number Diff line number Diff line change
Expand Up @@ -68,8 +68,8 @@ export class OpenapiTokenApi {
private isTokenExpired(startDate?: Date) {
if (!startDate) return true;
const now = new Date();
//실제 만료 시간은 24시간이지만, 문제가 발생할 여지를 줄이기 위해 12시간으로 설정
const baseTimeToMilliSec = 12 * 60 * 60 * 1000;
//실제 만료 시간은 24시간이지만, 문제가 발생할 여지를 줄이기 위해 6시간으로 설정
const baseTimeToMilliSec = 6 * 60 * 60 * 1000;
const timeDiff = now.getTime() - startDate.getTime();

return timeDiff >= baseTimeToMilliSec;
Expand Down
13 changes: 9 additions & 4 deletions packages/backend/src/scraper/openapi/liveData.service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ export class LiveData {
stockId,
);
if (stockLiveData) {
this.openapiLiveData.saveLiveData(stockLiveData);
await this.openapiLiveData.saveLiveData(stockLiveData);
}
} catch (error) {
this.logger.warn(`Subscribe error in open api : ${error}`);
Expand All @@ -60,6 +60,9 @@ export class LiveData {
}

async subscribe(stockId: string) {
if (stockId === null || stockId === undefined) {
return;
}
await this.openapiSubscribe(stockId);

if (!this.isCloseTime(new Date(), this.startTime, this.endTime)) {
Expand All @@ -85,7 +88,7 @@ export class LiveData {
const idx = this.subscribeStocks.get(stockId);
this.subscribeStocks.delete(stockId);

if (idx) {
if (idx !== undefined) {
this.configSubscribeSize[idx]--;
} else {
this.logger.warn(`Websocket error : ${stockId} has invalid idx`);
Expand All @@ -98,7 +101,7 @@ export class LiveData {
'2',
);

this.websocketClient[idx].discribe(message);
this.websocketClient[idx].unsubscribe(message);
}
}

Expand All @@ -122,11 +125,13 @@ export class LiveData {
if (message.header) {
if (message.header.tr_id === 'PINGPONG') {
client.pong(data);
} else {
this.logger.info(JSON.stringify(message));
}
return;
}
const liveData = this.openapiLiveData.convertLiveData(message);
await this.openapiLiveData.saveLiveData(liveData[0]);
await this.openapiLiveData.saveLiveData(liveData[0])
} catch (error) {
this.logger.warn(error);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@ import { OpenapiMinuteData } from './api/openapiMinuteData.api';
import { OpenapiPeriodData } from './api/openapiPeriodData.api';
import { OpenapiTokenApi } from './api/openapiToken.api';
import { LiveData } from './liveData.service';
import { OpenapiScraperService } from './openapi-scraper.service';
import { WebsocketClient } from './websocket/websocketClient.websocket';
import { OpenapiFluctuationData } from '@/scraper/openapi/api/openapiFluctuationData.api';
import { OpenapiRankViewApi } from '@/scraper/openapi/api/openapiRankView.api';
Expand Down Expand Up @@ -49,7 +48,6 @@ import { StockLiveData } from '@/stock/domain/stockLiveData.entity';
OpenapiPeriodData,
OpenapiMinuteData,
OpenapiDetailData,
OpenapiScraperService,
OpenapiFluctuationData,
OpenapiIndex,
OpenapiRankViewApi,
Expand Down
15 changes: 0 additions & 15 deletions packages/backend/src/scraper/openapi/openapi-scraper.service.ts

This file was deleted.

Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
/* eslint-disable @typescript-eslint/no-explicit-any */
import { Inject, Injectable } from '@nestjs/common';
import { Logger } from 'winston';
import { RawData, WebSocket } from 'ws';
Expand All @@ -7,24 +6,23 @@ import { RawData, WebSocket } from 'ws';
export class WebsocketClient {
static url = process.env.WS_URL ?? 'ws://ops.koreainvestment.com:21000';
private client: WebSocket;
//현재 factory 패턴을 이용해 할당하면 socket이 열리기 전에 message가 가는 문제가 있음.
// 소켓이 할당되기 전에(client에 소켓이 없을 때) message를 보내려 시도함.
private messageQueue: string[] = [];

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

shift하면 비용이 n이라 비효율적이라 큐를 쓰는게 좋긴하지만, JS에서는 직접 구현해야되는게 아쉽네요

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

이건 너무 아쉽습니다. 큐 말고 더 좋은 방법이 있을까 해서 onModuleInit도 고려했는 데, websocket을 열 수 있는 개수를 런타임때야 알 수 있으니 어쩔 수 없었다고 생각합니다.

constructor(@Inject('winston') private readonly logger: Logger) {
this.client = new WebSocket(WebsocketClient.url);
this.initOpen(() => this.flushQueue());
this.initError((error) => this.logger.error('WebSocket error', error));
}

static websocketFactory(logger: Logger) {
const websocket = new WebsocketClient(logger);

return websocket;
return new WebsocketClient(logger);
}

subscribe(message: string) {
this.sendMessage(message);
}

discribe(message: string) {
unsubscribe(message: string) {
this.sendMessage(message);
}

Expand All @@ -50,22 +48,28 @@ export class WebsocketClient {
initCloseCallback: () => void,
initErrorCallback: (error: unknown) => void,
) {
this.initOpen(initOpenCallback(this.sendMessage));
this.initOpen(initOpenCallback(this.sendMessage.bind(this)));
this.initMessage(initMessageCallback(this.client));
this.initDisconnect(initCloseCallback);
this.initError(initErrorCallback);
}

private sendMessage(message: string) {
if (!this.client || !this.client.readyState) {
this.logger.warn('WebSocket is not open. Message not sent. ');
return;
}
if (this.client.readyState === WebSocket.OPEN) {
this.client.send(message);
this.logger.info(`Sent message: ${message}`);
} else {
this.logger.warn('WebSocket is not open. Message not sent. ');
this.logger.warn('WebSocket not open. Queueing message.');
this.messageQueue.push(message); // 큐에 메시지를 추가
}
}

private flushQueue() {
while (this.messageQueue.length > 0) {
const message = this.messageQueue.shift();
if (message) {
this.sendMessage(message);
}
}
}
}
63 changes: 34 additions & 29 deletions packages/backend/src/stock/stock.gateway.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ import { Inject, Injectable } from '@nestjs/common';
import {
ConnectedSocket,
MessageBody,
OnGatewayDisconnect,
SubscribeMessage,
WebSocketGateway,
WebSocketServer,
Expand All @@ -17,11 +18,13 @@ import { LiveData } from '@/scraper/openapi/liveData.service';
pingTimeout: 5000,
})
@Injectable()
export class StockGateway {
export class StockGateway implements OnGatewayDisconnect {
@WebSocketServer()
server: Server;
private readonly mutex = new Mutex();

private readonly users: Map<string, string> = new Map();

constructor(
private readonly liveData: LiveData,
@Inject('winston') private readonly logger: Logger,
Expand All @@ -32,40 +35,42 @@ export class StockGateway {
@MessageBody() stockId: string,
@ConnectedSocket() client: Socket,
) {
client.join(stockId);

await this.mutex.runExclusive(async () => {
const connectedSockets = await this.server.to(stockId).fetchSockets();
try {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

try-catch로 묶으신 이유가 궁금합니다

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

현재 try-catch로 에러처리하는 부분이 따로 없고, 외부 유저와 connection하는 부분이다 보니 예기치 못한 에러가 발생할 수 있다고 생각했습니다

client.join(stockId);
this.users.set(client.id, stockId);

if (connectedSockets.length > 0 && !this.liveData.isSubscribe(stockId)) {
await this.liveData.subscribe(stockId);
this.logger.info(`${stockId} is subscribed`);
}
});
await this.mutex.runExclusive(async () => {
const connectedSockets = await this.server.to(stockId).fetchSockets();

client.on('disconnecting', () => {
client.rooms.delete(client.id);
const stocks = Array.from(client.rooms.values());
for (const stock of stocks) {
this.handleDisconnectStock(stock);
}
});
if (
connectedSockets.length > 0 &&
!this.liveData.isSubscribe(stockId)
) {
await this.liveData.subscribe(stockId);
this.logger.info(`${stockId} is subscribed`);
}
});

client.emit('connectionSuccess', {
message: `Successfully connected to stock room: ${stockId}`,
stockId,
});
client.emit('connectionSuccess', {
message: `Successfully connected to stock room: ${stockId}`,
stockId,
});
} catch (e) {
const error = e as Error;
this.logger.warn(error.message);
client.emit('error', error.message);
client.disconnect();
}
}

async handleDisconnectStock(stockId: string) {
await this.mutex.runExclusive(async () => {
const connectedSockets = await this.server.in(stockId).fetchSockets();

if (connectedSockets.length === 0) {
async handleDisconnect(client: Socket) {
const stockId = this.users.get(client.id);
if (stockId) {
await this.mutex.runExclusive(async () => {
await this.liveData.unsubscribe(stockId);
this.logger.info(`${stockId} is unsubscribed`);
}
});
this.users.delete(client.id);
});
}
}

onUpdateStock(
Expand Down
Loading