Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Stream fixes #1417

Merged
merged 15 commits into from
Nov 6, 2024
75 changes: 64 additions & 11 deletions packages/restapi/src/lib/pushstream/PushStream.ts
Original file line number Diff line number Diff line change
Expand Up @@ -116,14 +116,12 @@ export class PushStream extends EventEmitter {
listen: STREAM[],
newOptions: PushStreamInitializeProps
): Promise<void> {
this.uid = uuidv4();
this.listen = listen;
this.options = { ...this.options, ...newOptions };
await this.disconnect();
await this.connect();
await this.connect(true);
}

public async connect(): Promise<void> {
public async connect(reinit = false): Promise<void> {
return new Promise<void>((resolve, reject) => {
(async () => {
const shouldInitializeChatSocket =
Expand All @@ -140,10 +138,25 @@ export class PushStream extends EventEmitter {
this.listen.includes(STREAM.NOTIF_OPS) ||
this.listen.includes(STREAM.VIDEO);

let isChatSocketConnected = false;
let isNotifSocketConnected = false;
console.log('RestAPI::PushStream::connect - Initialization flags:', {
shouldInitializeChatSocket,
shouldInitializeNotifSocket,
});

let isChatSocketConnected = reinit ? this.chatSocketConnected : false;
let isNotifSocketConnected = reinit ? this.notifSocketConnected : false;
// Function to check and emit the STREAM.CONNECT event
const checkAndEmitConnectEvent = () => {
console.log(
'RestAPI::PushStream::connect - Checking conditions for STREAM.CONNECT event.',
{
shouldInitializeChatSocket,
isChatSocketConnected,
shouldInitializeNotifSocket,
isNotifSocketConnected,
}
);

if (
((shouldInitializeChatSocket && isChatSocketConnected) ||
!shouldInitializeChatSocket) &&
Expand All @@ -155,6 +168,8 @@ export class PushStream extends EventEmitter {
'RestAPI::PushStream::connect - Emitted STREAM.CONNECT'
);
resolve();
} else {
console.log('RestAPI::PushStream:: not emitting');
}
};

Expand All @@ -171,7 +186,20 @@ export class PushStream extends EventEmitter {
if (socketType === 'chat') {
isChatSocketConnected = false;
this.chatSocketConnected = false;

console.log(
'RestAPI::PushStream::handleSocketDisconnection - Chat socket disconnected. Decrementing chatSocketCount.',
'Previous chatSocketCount:',
this.chatSocketCount
);

this.chatSocketCount--;

console.log(
'RestAPI::PushStream::handleSocketDisconnection - New chatSocketCount:',
this.chatSocketCount
);

if (isNotifSocketConnected) {
if (
this.pushNotificationSocket &&
Expand All @@ -183,7 +211,6 @@ export class PushStream extends EventEmitter {
this.pushNotificationSocket.disconnect();
}
} else {
// Emit STREAM.DISCONNECT only if the notification socket was already disconnected
this.emit(STREAM.DISCONNECT);
console.log(
'RestAPI::PushStream::handleSocketDisconnection - Emitted STREAM.DISCONNECT for chat.'
Expand All @@ -192,7 +219,20 @@ export class PushStream extends EventEmitter {
} else if (socketType === 'notif') {
isNotifSocketConnected = false;
this.notifSocketConnected = false;

console.log(
'RestAPI::PushStream::handleSocketDisconnection - Notification socket disconnected. Decrementing notifSocketCount.',
'Previous notifSocketCount:',
this.notifSocketCount
);

this.notifSocketCount--;

console.log(
'RestAPI::PushStream::handleSocketDisconnection - New notifSocketCount:',
this.notifSocketCount
);

if (isChatSocketConnected) {
if (this.pushChatSocket && this.pushChatSocket.connected) {
console.log(
Expand All @@ -201,7 +241,6 @@ export class PushStream extends EventEmitter {
this.pushChatSocket.disconnect();
}
} else {
// Emit STREAM.DISCONNECT only if the chat socket was already disconnected
this.emit(STREAM.DISCONNECT);
console.log(
'RestAPI::PushStream::handleSocketDisconnection - Emitted STREAM.DISCONNECT for notification.'
Expand Down Expand Up @@ -277,7 +316,6 @@ export class PushStream extends EventEmitter {
console.log(
'RestAPI::PushStream::NotifSocket::Reconnect - Attempting to reconnect push notification socket...'
);
this.notifSocketCount++;
this.pushNotificationSocket.connect(); // Assuming connect() is the method to re-establish connection
} else {
// If pushNotificationSocket is already connected
Expand All @@ -295,6 +333,8 @@ export class PushStream extends EventEmitter {
};

if (this.pushChatSocket) {
checkAndEmitConnectEvent();
this.pushChatSocket.off(EVENTS.CONNECT);
this.pushChatSocket.on(EVENTS.CONNECT, async () => {
isChatSocketConnected = true;
this.chatSocketCount++;
Expand All @@ -305,10 +345,15 @@ export class PushStream extends EventEmitter {
);
});

this.pushChatSocket.off(EVENTS.DISCONNECT);
this.pushChatSocket.on(EVENTS.DISCONNECT, async () => {
console.log(
'RestAPI::PushStream::ChatSocket::Disconnect - Chat socket disconnected.'
);
await handleSocketDisconnection('chat');
});

this.pushChatSocket.off(EVENTS.CHAT_GROUPS);
this.pushChatSocket.on(EVENTS.CHAT_GROUPS, (data: any) => {
try {
const modifiedData = DataModifier.handleChatGroupEvent(
Expand Down Expand Up @@ -347,6 +392,7 @@ export class PushStream extends EventEmitter {
}
});

this.pushChatSocket.off(EVENTS.CHAT_RECEIVED_MESSAGE);
this.pushChatSocket.on(
EVENTS.CHAT_RECEIVED_MESSAGE,
async (data: any) => {
Expand Down Expand Up @@ -386,6 +432,7 @@ export class PushStream extends EventEmitter {
}
);

this.pushChatSocket.off('SPACES');
this.pushChatSocket.on('SPACES', (data: any) => {
try {
const modifiedData = DataModifier.handleSpaceEvent(
Expand Down Expand Up @@ -426,6 +473,7 @@ export class PushStream extends EventEmitter {
}
});

this.pushChatSocket.off('SPACES_MESSAGES');
this.pushChatSocket.on('SPACES_MESSAGES', (data: any) => {
try {
const modifiedData = DataModifier.handleSpaceEvent(
Expand Down Expand Up @@ -455,6 +503,8 @@ export class PushStream extends EventEmitter {
}

if (this.pushNotificationSocket) {
checkAndEmitConnectEvent();
this.pushNotificationSocket.off(EVENTS.CONNECT);
this.pushNotificationSocket.on(EVENTS.CONNECT, async () => {
console.log(
`RestAPI::PushStream::NotifSocket::Connect - Notification Socket Connected (ID: ${this.pushNotificationSocket.id})`
Expand All @@ -465,13 +515,15 @@ export class PushStream extends EventEmitter {
checkAndEmitConnectEvent();
});

this.pushNotificationSocket.off(EVENTS.DISCONNECT);
this.pushNotificationSocket.on(EVENTS.DISCONNECT, async () => {
console.log(
'RestAPI::PushStream::NotifSocket::Disconnect - Notification socket disconnected.'
);
await handleSocketDisconnection('notif');
});

this.pushNotificationSocket.off(EVENTS.USER_FEEDS);
this.pushNotificationSocket.on(EVENTS.USER_FEEDS, (data: any) => {
try {
if (
Expand Down Expand Up @@ -514,6 +566,7 @@ export class PushStream extends EventEmitter {
}
});

this.pushNotificationSocket.off(EVENTS.USER_SPAM_FEEDS);
this.pushNotificationSocket.on(
EVENTS.USER_SPAM_FEEDS,
(data: any) => {
Expand Down Expand Up @@ -578,10 +631,10 @@ export class PushStream extends EventEmitter {
}
}

public info() {
public info(): { options: PushStreamInitializeProps; listen: STREAM[] } {
return {
options: this.options,
listen: this.listen,
listen: this.listen as STREAM[],
};
}

Expand Down
Loading
Loading