Skip to content

Commit

Permalink
[lib] improve processOutboundP2PMessages in terms of performance
Browse files Browse the repository at this point in the history
Summary:
[ENG-9096](https://linear.app/comm/issue/ENG-9096/improve-processoutboundp2pmessages-in-terms-of-performance)

We need to do it per device, not per message to avoid creating multiple sessions in parallel.

Test Plan: Test DM ops (text messages, settings, reactions, etc.)

Reviewers: tomek, marcin

Reviewed By: tomek

Subscribers: ashoat

Differential Revision: https://phab.comm.dev/D13204
  • Loading branch information
xsanm committed Aug 29, 2024
1 parent 02f539b commit 65803b8
Showing 1 changed file with 41 additions and 33 deletions.
74 changes: 41 additions & 33 deletions lib/tunnelbroker/peer-to-peer-context.js
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import {
} from '../types/tunnelbroker/peer-to-peer-message-types.js';
import { getConfig } from '../utils/config.js';
import { getMessageForException } from '../utils/errors.js';
import { entries } from '../utils/objects.js';
import { olmSessionErrors } from '../utils/olm-utils.js';

type PeerToPeerContextType = {
Expand Down Expand Up @@ -128,53 +129,60 @@ async function processOutboundP2PMessages(
}
};

for (const peerDeviceID in devicesMap) {
for (const message of devicesMap[peerDeviceID]) {
if (message.status === outboundP2PMessageStatuses.persisted) {
try {
const result = await olmAPI.encryptAndPersist(
message.plaintext,
message.deviceID,
message.messageID,
);

const encryptedMessage: OutboundP2PMessage = {
...message,
ciphertext: JSON.stringify(result),
};
await sendMessageToPeer(encryptedMessage);
} catch (e) {
if (!e.message?.includes(olmSessionErrors.sessionDoesNotExists)) {
console.log(`Error sending messages to peer ${peerDeviceID}`, e);
break;
}
const devicePromises = entries(devicesMap).map(
async ([peerDeviceID, deviceMessages]) => {
for (const message of deviceMessages) {
if (message.status === outboundP2PMessageStatuses.persisted) {
try {
await peerOlmSessionsCreator(message.userID, peerDeviceID);
const result = await olmAPI.encryptAndPersist(
message.plaintext,
message.deviceID,
message.messageID,
);

const encryptedMessage: OutboundP2PMessage = {
...message,
ciphertext: JSON.stringify(result),
};

await sendMessageToPeer(encryptedMessage);
} catch (err) {
console.log(`Error sending messages to peer ${peerDeviceID}`, err);
break;
} catch (e) {
if (!e.message?.includes(olmSessionErrors.sessionDoesNotExists)) {
console.log(`Error sending messages to peer ${peerDeviceID}`, e);
break;
}
try {
await peerOlmSessionsCreator(message.userID, peerDeviceID);
const result = await olmAPI.encryptAndPersist(
message.plaintext,
message.deviceID,
message.messageID,
);
const encryptedMessage: OutboundP2PMessage = {
...message,
ciphertext: JSON.stringify(result),
};

await sendMessageToPeer(encryptedMessage);
} catch (err) {
console.log(
`Error sending messages to peer ${peerDeviceID}`,
err,
);
break;
}
}
} else if (message.status === outboundP2PMessageStatuses.encrypted) {
await sendMessageToPeer(message);
} else if (message.status === outboundP2PMessageStatuses.sent) {
// Handle edge-case when message was sent, but it wasn't updated
// in the message store.
sentMessagesMap[message.messageID] = true;
}
} else if (message.status === outboundP2PMessageStatuses.encrypted) {
await sendMessageToPeer(message);
} else if (message.status === outboundP2PMessageStatuses.sent) {
// Handle edge-case when message was sent, but it wasn't updated
// in the message store.
sentMessagesMap[message.messageID] = true;
}
}
}
},
);

await Promise.all(devicePromises);
return Object.keys(sentMessagesMap);
}

Expand Down

0 comments on commit 65803b8

Please sign in to comment.