Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: [BE] redis 채널 구독 방식 수정 #233

Merged
merged 1 commit into from
Dec 8, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
25 changes: 0 additions & 25 deletions backEnd/chat/src/chat/chat.gateway.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -35,31 +35,6 @@ describe('ChatGateway', () => {
});

describe('JOIN_ROOM', () => {
it('클라이언트가 방에 참여하면 방마다 클라이언트 수를 기록한다.', async () => {
//GIVEN
const mockSockets = [
new MockSocket('1'),
new MockSocket('2'),
new MockSocket('3'),
new MockSocket('4'),
];

//WHEN
mockSockets
.slice(0, 3)
.forEach((socket) =>
gateway.handleJoin({ room: 'ABC' }, socket as unknown as Socket),
);
gateway.handleJoin({ room: 'DEF' }, mockSockets[3] as unknown as Socket);

//THEN
//HAPPY PATH
expect(gateway.getRoom('ABC')).toBe(true);
expect(gateway.getRoom('DEF')).toBe(true);
//EDGE CASE
expect(gateway.getRoom('GHI')).toBe(false);
});

it('클라이언트가 방에 참여하면 방이 생성된다.', async () => {
//GIVEN
const mockSockets = [
Expand Down
38 changes: 18 additions & 20 deletions backEnd/chat/src/chat/chat.gateway.ts
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@ export class ChatGateway implements OnGatewayConnection, OnGatewayDisconnect {
server: Server;

private readonly logger = new Logger();
private rooms: Map<string, boolean> = new Map();
private roomToCount: Map<string, number> = new Map();
private instanceId = process.env.NODE_APP_INSTANCE || os.hostname();
private subscriberClient: Redis;
Expand All @@ -40,6 +39,17 @@ export class ChatGateway implements OnGatewayConnection, OnGatewayDisconnect {
) {
this.subscriberClient = client.duplicate();
this.publisherClient = client.duplicate();

this.subscriberClient.subscribe(SOCKET.REDIS_CHAT_CHANEL);

this.subscriberClient.on('message', this.handleChatMessage.bind(this));
}

private handleChatMessage(channel: string, message: string) {
if (channel === SOCKET.REDIS_CHAT_CHANEL) {
const { room, ...messageData } = JSON.parse(message);
this.server.to(room).emit(SOCKET_EVENT.NEW_MESSAGE, messageData);
}
}

handleConnection(socket: Socket) {
Expand All @@ -62,19 +72,8 @@ export class ChatGateway implements OnGatewayConnection, OnGatewayDisconnect {

socket.join(room);

const isRoom = this.rooms.get(room);
const count = this.roomToCount.get(room) || 0;
this.roomToCount.set(room, count + 1);

if (!isRoom) {
this.subscriberClient.subscribe(room);
this.subscriberClient.on('message', (channel, message) => {
if (channel === room) {
this.server.to(room).emit(SOCKET_EVENT.NEW_MESSAGE, message);
}
});
this.rooms.set(room, true);
}
}

@SubscribeMessage(SOCKET_EVENT.LEAVE_ROOM)
Expand All @@ -93,8 +92,7 @@ export class ChatGateway implements OnGatewayConnection, OnGatewayDisconnect {
this.roomToCount.set(room, count);

if (count === SOCKET.EMPTY_ROOM) {
this.subscriberClient.unsubscribe(room);
this.rooms.delete(room);
this.roomToCount.delete(room);
this.chatService.deleteByRoom(room);
}
}
Expand All @@ -111,6 +109,7 @@ export class ChatGateway implements OnGatewayConnection, OnGatewayDisconnect {
const { room, message, nickname, ai } = data;

const response = {
room: room,
message: message,
nickname: nickname,
socketId: socket.id,
Expand All @@ -120,7 +119,7 @@ export class ChatGateway implements OnGatewayConnection, OnGatewayDisconnect {
if (ai) {
try {
await this.publisherClient.publish(
room,
SOCKET.REDIS_CHAT_CHANEL,
JSON.stringify({ using: true }),
);

Expand All @@ -141,7 +140,10 @@ export class ChatGateway implements OnGatewayConnection, OnGatewayDisconnect {
}

try {
await this.publisherClient.publish(room, JSON.stringify(response));
await this.publisherClient.publish(
SOCKET.REDIS_CHAT_CHANEL,
JSON.stringify(response),
);
} catch (error) {
throw new WsException({
statusCode: ERRORS.FAILED_PUBLISHING.statusCode,
Expand All @@ -154,10 +156,6 @@ export class ChatGateway implements OnGatewayConnection, OnGatewayDisconnect {
return this.roomToCount.get(room) || 0;
}

public getRoom(room: string): boolean {
return this.rooms.get(room) || false;
}

public async useLLM(room: string, message: string, socketId: string) {
const url = this.configService.get<string>('LLM_URL');
const headers = {
Expand Down
1 change: 1 addition & 0 deletions backEnd/chat/src/commons/utils.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ export const SOCKET = {
V1: 'single',
KAFKA: 'kafka',
EMPTY_ROOM: 0,
REDIS_CHAT_CHANEL: 'RedisChatting',
};

export const SOCKET_EVENT = {
Expand Down