Skip to content

Commit

Permalink
Merge branch 'dev-be' into feature/#332
Browse files Browse the repository at this point in the history
  • Loading branch information
xjfcnfw3 committed Dec 3, 2024
2 parents c74e183 + d98bf6c commit 3a6d7ee
Show file tree
Hide file tree
Showing 9 changed files with 117 additions and 118 deletions.
4 changes: 4 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -60,3 +60,7 @@ report.[0-9]*.[0-9]*.[0-9]*.[0-9]*.json

# remote
.remote

# .zip, .mst
*.zip
*.mst
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ export class OpenapiLiveData {
stockLiveData.stock = { id: message.STOCK_ID } as Stock;
stockLiveData.currentPrice = parseFloat(message.STCK_PRPR);
stockLiveData.changeRate = parseFloat(message.PRDY_CTRT);
stockLiveData.volume = parseInt(message.CNTG_VOL);
stockLiveData.volume = parseInt(message.ACML_VOL);
stockLiveData.high = parseFloat(message.STCK_HGPR);
stockLiveData.low = parseFloat(message.STCK_LWPR);
stockLiveData.open = parseFloat(message.STCK_OPRC);
Expand Down
101 changes: 50 additions & 51 deletions packages/backend/src/scraper/openapi/api/openapiPeriodData.api.ts
Original file line number Diff line number Diff line change
Expand Up @@ -125,41 +125,6 @@ export class OpenapiPeriodData {
};
}

/* eslint-disable-next-line max-lines-per-function */
private getLiveDataSaveUntilEndCallback(
stockId: string,
period: Period,
end: string,
) {
/* eslint-disable-next-line max-lines-per-function */
return async (data: Json) => {
if (!data.output2 || !Array.isArray(data.output2)) return;
// 이거 빈값들어오는 케이스 있음(빈값 필터링 안하면 요청이 매우 많아짐)
data.output2 = data.output2.filter(
(data) => Object.keys(data).length !== 0,
);
if (data.output2.length === 0) return;
await this.saveChartData(period, stockId, data.output2 as ChartData[]);
const { endDate, startDate } = this.updateDates(end, period);
const query = this.getItemChartPriceQuery(
stockId,
startDate,
endDate,
period,
);
this.openApiQueue.enqueue({
url: this.url,
query,
trId: TR_IDS.ITEM_CHART_PRICE,
callback: this.getLiveDataSaveUntilEndCallback(
stockId,
period,
endDate,
),
});
};
}

private async getChartData(chunk: Stock[], period: Period) {
for (const stock of chunk) {
await this.processStockData(stock, period);
Expand Down Expand Up @@ -190,8 +155,7 @@ export class OpenapiPeriodData {
}

private async existsChartData(stock: StockData, entity: typeof StockData) {
const manager = this.datasource.manager;
return await manager.findOne(entity, {
return this.datasource.manager.exists(entity, {
where: {
stock: { id: stock.stock.id },
startTime: stock.startTime,
Expand Down Expand Up @@ -236,6 +200,55 @@ export class OpenapiPeriodData {
}
}

private async saveChartData(
period: Period,
stockId: string,
data: ChartData[],
) {
for (const item of data) {
if (!isChartData(item)) {
continue;
}
const stockPeriod = this.convertObjectToStockData(item, stockId);
await this.insertChartData(stockPeriod, period);
}
}

/* eslint-disable-next-line max-lines-per-function */
private getLiveDataSaveUntilEndCallback(
stockId: string,
period: Period,
end: string,
) {
/* eslint-disable-next-line max-lines-per-function */
return async (data: Json) => {
if (!data.output2 || !Array.isArray(data.output2)) return;
// 이거 빈값들어오는 케이스 있음(빈값 필터링 안하면 요청이 매우 많아짐)
data.output2 = data.output2.filter(
(data) => Object.keys(data).length !== 0,
);
if (data.output2.length === 0) return;
await this.saveChartData(period, stockId, data.output2 as ChartData[]);
const { endDate, startDate } = this.updateDates(end, period);
const query = this.getItemChartPriceQuery(
stockId,
startDate,
endDate,
period,
);
this.openApiQueue.enqueue({
url: this.url,
query,
trId: TR_IDS.ITEM_CHART_PRICE,
callback: this.getLiveDataSaveUntilEndCallback(
stockId,
period,
endDate,
),
});
};
}

private convertObjectToStockData(item: ChartData, stockId: string) {
const stockPeriod = new StockData();
stockPeriod.stock = { id: stockId } as Stock;
Expand All @@ -253,20 +266,6 @@ export class OpenapiPeriodData {
return stockPeriod;
}

private async saveChartData(
period: Period,
stockId: string,
data: ChartData[],
) {
for (const item of data) {
if (!isChartData(item)) {
continue;
}
const stockPeriod = this.convertObjectToStockData(item, stockId);
await this.insertChartData(stockPeriod, period);
}
}

private getItemChartPriceQuery(
stockId: string,
startDate: string,
Expand Down
4 changes: 2 additions & 2 deletions packages/backend/src/scraper/openapi/api/openapiToken.api.ts
Original file line number Diff line number Diff line change
Expand Up @@ -68,8 +68,8 @@ export class OpenapiTokenApi {
private isTokenExpired(startDate?: Date) {
if (!startDate) return true;
const now = new Date();
//실제 만료 시간은 24시간이지만, 문제가 발생할 여지를 줄이기 위해 12시간으로 설정
const baseTimeToMilliSec = 12 * 60 * 60 * 1000;
//실제 만료 시간은 24시간이지만, 문제가 발생할 여지를 줄이기 위해 6시간으로 설정
const baseTimeToMilliSec = 6 * 60 * 60 * 1000;
const timeDiff = now.getTime() - startDate.getTime();

return timeDiff >= baseTimeToMilliSec;
Expand Down
14 changes: 9 additions & 5 deletions packages/backend/src/scraper/openapi/liveData.service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ export class LiveData {
stockId,
);
if (stockLiveData) {
this.openapiLiveData.saveLiveData(stockLiveData);
await this.openapiLiveData.saveLiveData(stockLiveData);
}
} catch (error) {
this.logger.warn(`Subscribe error in open api : ${error}`);
Expand All @@ -60,8 +60,10 @@ export class LiveData {
}

async subscribe(stockId: string) {
if (stockId === null || stockId === undefined) {
return;
}
await this.openapiSubscribe(stockId);

if (!this.isCloseTime(new Date(), this.startTime, this.endTime)) {
for (const [idx, size] of this.configSubscribeSize.entries()) {
if (size >= this.SOCKET_LIMITS) continue;
Expand All @@ -85,7 +87,7 @@ export class LiveData {
const idx = this.subscribeStocks.get(stockId);
this.subscribeStocks.delete(stockId);

if (idx) {
if (idx !== undefined) {
this.configSubscribeSize[idx]--;
} else {
this.logger.warn(`Websocket error : ${stockId} has invalid idx`);
Expand All @@ -98,7 +100,7 @@ export class LiveData {
'2',
);

this.websocketClient[idx].discribe(message);
this.websocketClient[idx].unsubscribe(message);
}
}

Expand All @@ -122,11 +124,13 @@ export class LiveData {
if (message.header) {
if (message.header.tr_id === 'PINGPONG') {
client.pong(data);
} else {
this.logger.info(JSON.stringify(message));
}
return;
}
const liveData = this.openapiLiveData.convertLiveData(message);
await this.openapiLiveData.saveLiveData(liveData[0]);
await this.openapiLiveData.saveLiveData(liveData[0])
} catch (error) {
this.logger.warn(error);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@ import { OpenapiMinuteData } from './api/openapiMinuteData.api';
import { OpenapiPeriodData } from './api/openapiPeriodData.api';
import { OpenapiTokenApi } from './api/openapiToken.api';
import { LiveData } from './liveData.service';
import { OpenapiScraperService } from './openapi-scraper.service';
import { WebsocketClient } from './websocket/websocketClient.websocket';
import { OpenapiFluctuationData } from '@/scraper/openapi/api/openapiFluctuationData.api';
import { OpenapiRankViewApi } from '@/scraper/openapi/api/openapiRankView.api';
Expand Down Expand Up @@ -48,7 +47,6 @@ import { StockLiveData } from '@/stock/domain/stockLiveData.entity';
OpenapiPeriodData,
OpenapiMinuteData,
OpenapiDetailData,
OpenapiScraperService,
OpenapiFluctuationData,
OpenapiIndex,
OpenapiRankViewApi,
Expand Down
15 changes: 0 additions & 15 deletions packages/backend/src/scraper/openapi/openapi-scraper.service.ts

This file was deleted.

Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
/* eslint-disable @typescript-eslint/no-explicit-any */
import { Inject, Injectable } from '@nestjs/common';
import { Logger } from 'winston';
import { RawData, WebSocket } from 'ws';
Expand All @@ -7,24 +6,23 @@ import { RawData, WebSocket } from 'ws';
export class WebsocketClient {
static url = process.env.WS_URL ?? 'ws://ops.koreainvestment.com:21000';
private client: WebSocket;
//현재 factory 패턴을 이용해 할당하면 socket이 열리기 전에 message가 가는 문제가 있음.
// 소켓이 할당되기 전에(client에 소켓이 없을 때) message를 보내려 시도함.
private messageQueue: string[] = [];

constructor(@Inject('winston') private readonly logger: Logger) {
this.client = new WebSocket(WebsocketClient.url);
this.initOpen(() => this.flushQueue());
this.initError((error) => this.logger.error('WebSocket error', error));
}

static websocketFactory(logger: Logger) {
const websocket = new WebsocketClient(logger);

return websocket;
return new WebsocketClient(logger);
}

subscribe(message: string) {
this.sendMessage(message);
}

discribe(message: string) {
unsubscribe(message: string) {
this.sendMessage(message);
}

Expand All @@ -50,22 +48,28 @@ export class WebsocketClient {
initCloseCallback: () => void,
initErrorCallback: (error: unknown) => void,
) {
this.initOpen(initOpenCallback(this.sendMessage));
this.initOpen(initOpenCallback(this.sendMessage.bind(this)));
this.initMessage(initMessageCallback(this.client));
this.initDisconnect(initCloseCallback);
this.initError(initErrorCallback);
}

private sendMessage(message: string) {
if (!this.client || !this.client.readyState) {
this.logger.warn('WebSocket is not open. Message not sent. ');
return;
}
if (this.client.readyState === WebSocket.OPEN) {
this.client.send(message);
this.logger.info(`Sent message: ${message}`);
} else {
this.logger.warn('WebSocket is not open. Message not sent. ');
this.logger.warn('WebSocket not open. Queueing message.');
this.messageQueue.push(message); // 큐에 메시지를 추가
}
}

private flushQueue() {
while (this.messageQueue.length > 0) {
const message = this.messageQueue.shift();
if (message) {
this.sendMessage(message);
}
}
}
}
Loading

0 comments on commit 3a6d7ee

Please sign in to comment.