Skip to content

[BE] SSE 에러

SeongHyeon edited this page Nov 14, 2024 · 1 revision

SSE 구독 초기 데이터

clinet가 SSE 구독을 시작했을 때 모든 코인의 정보가 바로 전송되지 않은 이슈가 발생했습니다.

문제점 1 - 거래가 활발하지 않은 코인들

현재 저희는 업비트와 WebSocket을 통해 전달 받은 데이터를 실시간으로 처리하고 있었습니다.

하지만 어떤 데이터가 올지 알 수 없었고 그에 따라 비트코인이 100번 들어올때, 이더리움은 10번이 들어올 수 도 있었습니다.

이로 인해 초기 구독 시, 모든 코인의 최신 데이터가 일관되게 전송되지 않는 문제가 발생하고 있습니다.

해결책

//sse.service.ts
initPriceStream(coins, dto: Function) {
		const events: MessageEvent[] = []; 
		if (coins && typeof coins === 'string') {
			coins = [coins];
		}
		coins.forEach(async (coin) => {
			while (this.coinLatestInfo.get(coin) === undefined) {
				await new Promise(resolve => setTimeout(resolve, 100));
			}
			const initData = this.coinLatestInfo.get(coin); 
			const setDto = dto(initData);
			const msgEvent = new MessageEvent('price-update', {
				data: JSON.stringify(setDto),
			}) as MessageEvent;
				
			events.push(msgEvent); 
		});
	  
		return events;
	  }
//upbit.controller.ts
@Sse('price-updates')
  priceUpdates(@Query('coins') coins:string[]): Observable<MessageEvent> {
    const initData = this.sseService.initPriceStream(coins, this.coinListService.coinAddNameAndUrl);
    return concat(initData,this.sseService.getPriceUpdatesStream(coins,this.coinListService.coinAddNameAndUrl));
  }

initPriceStream 메서드는 구독 요청된 코인 리스트를 받아, 각 코인의 최신 데이터가 수신될 때까지 대기한 후 초기값을 MessageEvent 객체로 생성하여 배열에 저장합니다. 이렇게 초기값이 준비된 상태에서 SSE 연결을 시작하고, 구독 중에는 실시간 데이터 업데이트를 수신할 수 있도록 설정했습니다. 이 방식으로 모든 구독 코인의 초기값이 구독 시작 시 전송될 수 있게 했습니다.

문제점 2 - 비동기 에러

coins.forEach(async (coin) => {
			while (this.coinLatestInfo.get(coin) === undefined) {
				//수정전
				setTimeout(()=>1000);
				//수정후
				await new Promise(resolve => setTimeout(resolve, 100));
			}
			const initData = this.coinLatestInfo.get(coin); 
			const setDto = dto(initData);
			const msgEvent = new MessageEvent('price-update', {
				data: JSON.stringify(setDto),
			}) as MessageEvent;
				
			events.push(msgEvent); 
		});

기존 코드에서는 while 문 내부에서 setTimeout을 사용하여 값을 기다렸지만, setTimeout이 비동기적으로 작동하면서 EventLoop에 쌓여 대기 시간이 보장되지 않는 문제가 있었습니다. 그 결과, 값이 수신되지 않았음에도 while 문이 계속 실행되며 무한 반복되는 상황이 발생했습니다.

이 문제를 해결하기 위해 await와 Promise를 사용하여 원하는 시간 동안 대기한 후 while 조건을 재검사하도록 수정했습니다.

GPT가 추천하는 추가 개선사항

// sse.service.ts
async initPriceStream(coins: string[], dto: Function): Promise<MessageEvent[]> {
    const events: MessageEvent[] = []; 
    if (typeof coins === 'string') {
        coins = [coins];
    }

    await Promise.all(coins.map(async (coin) => {
        let retryCount = 0;
        const maxRetries = 10;
        const retryDelay = 100;

        while (!this.coinLatestInfo.has(coin) && retryCount < maxRetries) {
            await new Promise(resolve => setTimeout(resolve, retryDelay));
            retryCount++;
        }

        const initData = this.coinLatestInfo.get(coin); 
        if (initData) {
            const setDto = dto(initData);
            events.push(new MessageEvent('price-update', {
                data: JSON.stringify(setDto),
            }));
        }
    }));

    return events;
}

// upbit.controller.ts
@Sse('price-updates')
priceUpdates(@Query('coins') coins: string[]): Observable<MessageEvent> {
    const initialEvents$ = from(this.sseService.initPriceStream(coins, this.coinListService.coinAddNameAndUrl));
    const priceUpdates$ = this.sseService.getPriceUpdatesStream(coins, this.coinListService.coinAddNameAndUrl);
    
    return concat(initialEvents$, priceUpdates$);
}

1. 비동기 동작 개선 - Promise.all 사용

현재 coins.forEach 내부에서 await를 사용하고 있어서 모든 코인의 초기 데이터를 개별적으로 처리하는 동안, 이벤트를 모아서 반환하기까지 시간이 걸릴 수 있습니다. Promise.all을 이용하여 병렬로 처리하면 성능이 조금 더 개선될 수 있습니다.

2. 초기값 대기 방식 개선 - 설정 가능한 TimeoutRetry

while문과 setTimeout으로 데이터를 기다리는 방식은 안정적이지만, 코인 데이터가 영원히 존재하지 않을 가능성에 대한 예외 처리가 필요할 수 있습니다. 예를 들어 retryCounttimeout을 설정하여, 특정 시간 이후에 초기값을 포기하도록 할 수 있습니다. 이는 구독에 필요한 최소한의 데이터를 보장하면서도 서비스가 무한 대기 상태에 빠지지 않도록 합니다.

3. 이벤트 전송 방식 - 초기값과 스트림을 더 명확하게 구분

현재 concat을 사용하여 초기 이벤트와 실시간 이벤트 스트림을 연결하는 방식도 좋지만, 이벤트가 복잡해질 경우 더 명확한 구분이 필요할 수 있습니다. 초기 이벤트와 실시간 스트림을 독립적으로 구성해 관리하면 유지보수에 유리할 수 있습니다.

웹 소켓, SSE 완전 분리

문제점 - 클라이언트 요청 증가로 인한 API 요청 제한 발생

저희는 항상 최신 코인 정보를 제공하기 위해 클라이언트 요청 시마다 업비트 API로부터 코인 리스트를 새로 수신하도록 설정했습니다. 그러나, 클라이언트 요청이 많아지면 업비트 API 요청이 과다하게 발생하여 요청 제한에 도달하는 문제가 발생했습니다. 처음에는 클라이언트 요청이 올 때마다 업비트 API에 최신 정보를 요청하는 것이 최선의 방법이라 생각했으나, 실시간 데이터를 다수의 클라이언트에 안정적으로 제공하기에는 부적절했습니다.

해결책 - 서버와 업비트 간 데이터 주기적 수신, 서버와 클라이언트 간 데이터 제공 분리

해결을 위해 서버와 업비트, 서버와 클라이언트 간의 관계를 분리했습니다. 서버는 정기적으로 업비트 API에 접근하여 최신 코인 정보를 수신하고 이를 자체 캐시에 저장해둡니다. 클라이언트가 요청을 보낼 때 서버는 업비트가 아닌 캐시된 데이터를 제공하므로 업비트 API 요청 횟수를 대폭 줄일 수 있습니다.

이 방식으로, 서버는 일정 간격으로만 업비트 API에 접근하게 되어 요청 제한을 피하고, 클라이언트에게는 안정적으로 최신 정보를 제공할 수 있습니다.

//coin-ticker-websocket.service.ts
@Injectable()
export class CoinTickerService implements OnModuleInit {
	private websocket: WebSocket;
	private sending: Boolean = false;
	private timeoutId: NodeJS.Timeout | null = null;
	private coinLatestInfo = new Map();
	constructor(
		private readonly coinListService: CoinListService,
		private readonly sseService: SseService,
	) {}

	onModuleInit() {
		this.websocket = new WebSocket(UPBIT_WEBSOCKET_URL);
		this.connectWebSocket();
	}

	connectWebSocket() {
		this.websocket.on('open', () => {
			try {
				console.log('CoinTickerWebSocket 연결이 열렸습니다.');
				this.sendWebSocket();
			} catch (error) {
				console.error('sendWebSocket 실행 중 오류 발생:', error);
			}
		});
		this.websocket.on('message', (data) => {
			try{
				const message = JSON.parse(data.toString());
				this.sseService.coinTickerData(message);
				this.sseService.setCoinLastestInfo(message);
			}catch(error){
				console.error('CoinTickerWebSocket 오류:', error);
			}
		});
		this.websocket.on('close', () => {
			try {
				console.log('CoinTickerWebSocket 연결이 닫혔습니다. 재연결 시도 중...');
				setTimeout(
					() => this.connectWebSocket(),
					UPBIT_WEBSOCKET_CONNECTION_TIME
				);
			} catch (error) {
				console.error('WebSocket 재연결 설정 중 오류 발생:', error);
			}
		});

		this.websocket.on('error', (error) => {
			console.error('CoinTickerWebSocket 오류:', error);
		});
	}
	async sendWebSocket() {
		if (this.sending) return;
		this.sending = true;
		try{
			const coin_list = this.coinListService.getCoinNameList();
			const subscribeMessage = JSON.stringify([
				{ ticket: 'test' },
				{ type: 'ticker', codes: coin_list },
			]);
			this.websocket.send(subscribeMessage);
		}catch(error){
			console.error('CoinTickerWebSocket 오류:', error);
		}finally{
			this.sending = false;
			if (this.timeoutId) clearTimeout(this.timeoutId);
			this.timeoutId = setTimeout(() => this.sendWebSocket(), UPBIT_UPDATED_COIN_INFO_TIME);
		}
	}
}
//upbit.controller.ts
@Sse('price-updates')
  priceUpdates(@Query('coins') coins:string[]): Observable<MessageEvent> {
    const initData = this.sseService.initPriceStream(coins, this.coinListService.coinAddNameAndUrl);
    return concat(initData,this.sseService.getPriceUpdatesStream(coins,this.coinListService.coinAddNameAndUrl));
  }

업비트 가격 정보 분리

[웹 소켓, SSE 완전 분리](https://www.notion.so/SSE-13c3b8b7e1a0804da661d7d5a13b18e6?pvs=21)

이 문제를 해결하고 나니 관련된 문제점이 보였습니다.

문제점 - 현재가, 호가창에서 사용되는 최신 코인 리스트의 중복 요청

현재 저희 시스템에서는 현재가호가창에서 사용되는 최신 코인 리스트를 각각 관리하고 있습니다. 이 두 로직에서 매번 업비트 API로부터 최신 코인 리스트를 요청하고 있어, 불필요한 중복 요청이 발생하고 있었습니다.

이를 해결하기 위해, 코인 리스트를 관리하는 모듈에서 최신 코인 리스트를 한 번만 요청하고, 이를 모든 관련 로직에서 공유하는 방식으로 개선할 필요가 있었습니다. 이렇게 함으로써, 매 순간마다 중복된 요청을 줄일 수 있습니다.

해결책 - 최신 코인 리스트 요청 통합 및 관리

최신 코인 리스트는 서버에서 한 번만 요청하여 내부 캐시나 공유 가능한 데이터 구조에 저장하고, 현재가호가창 로직에서 이 데이터를 참조하도록 수정했습니다. 이로 인해, 동일한 데이터를 중복해서 요청할 필요가 없게 되어 요청 횟수를 줄일 수 있었습니다.

이 방식으로 시스템의 효율성을 높이고, 불필요한 API 요청을 줄이는 동시에 코인 리스트 관리를 중앙화하여 유지보수의 용이성도 향상되었습니다.

Clone this wiki locally