diff --git a/lib/tunnelbroker/peer-to-peer-context.js b/lib/tunnelbroker/peer-to-peer-context.js index 10008936e4..3e51eaab4f 100644 --- a/lib/tunnelbroker/peer-to-peer-context.js +++ b/lib/tunnelbroker/peer-to-peer-context.js @@ -26,6 +26,7 @@ import { } from '../types/tunnelbroker/peer-to-peer-message-types.js'; import { getConfig } from '../utils/config.js'; import { getMessageForException } from '../utils/errors.js'; +import { entries } from '../utils/objects.js'; import { olmSessionErrors } from '../utils/olm-utils.js'; type PeerToPeerContextType = { @@ -128,53 +129,60 @@ async function processOutboundP2PMessages( } }; - for (const peerDeviceID in devicesMap) { - for (const message of devicesMap[peerDeviceID]) { - if (message.status === outboundP2PMessageStatuses.persisted) { - try { - const result = await olmAPI.encryptAndPersist( - message.plaintext, - message.deviceID, - message.messageID, - ); - - const encryptedMessage: OutboundP2PMessage = { - ...message, - ciphertext: JSON.stringify(result), - }; - await sendMessageToPeer(encryptedMessage); - } catch (e) { - if (!e.message?.includes(olmSessionErrors.sessionDoesNotExists)) { - console.log(`Error sending messages to peer ${peerDeviceID}`, e); - break; - } + const devicePromises = entries(devicesMap).map( + async ([peerDeviceID, deviceMessages]) => { + for (const message of deviceMessages) { + if (message.status === outboundP2PMessageStatuses.persisted) { try { - await peerOlmSessionsCreator(message.userID, peerDeviceID); const result = await olmAPI.encryptAndPersist( message.plaintext, message.deviceID, message.messageID, ); + const encryptedMessage: OutboundP2PMessage = { ...message, ciphertext: JSON.stringify(result), }; - await sendMessageToPeer(encryptedMessage); - } catch (err) { - console.log(`Error sending messages to peer ${peerDeviceID}`, err); - break; + } catch (e) { + if (!e.message?.includes(olmSessionErrors.sessionDoesNotExists)) { + console.log(`Error sending messages to peer ${peerDeviceID}`, e); + break; + } + try { + await peerOlmSessionsCreator(message.userID, peerDeviceID); + const result = await olmAPI.encryptAndPersist( + message.plaintext, + message.deviceID, + message.messageID, + ); + const encryptedMessage: OutboundP2PMessage = { + ...message, + ciphertext: JSON.stringify(result), + }; + + await sendMessageToPeer(encryptedMessage); + } catch (err) { + console.log( + `Error sending messages to peer ${peerDeviceID}`, + err, + ); + break; + } } + } else if (message.status === outboundP2PMessageStatuses.encrypted) { + await sendMessageToPeer(message); + } else if (message.status === outboundP2PMessageStatuses.sent) { + // Handle edge-case when message was sent, but it wasn't updated + // in the message store. + sentMessagesMap[message.messageID] = true; } - } else if (message.status === outboundP2PMessageStatuses.encrypted) { - await sendMessageToPeer(message); - } else if (message.status === outboundP2PMessageStatuses.sent) { - // Handle edge-case when message was sent, but it wasn't updated - // in the message store. - sentMessagesMap[message.messageID] = true; } - } - } + }, + ); + + await Promise.all(devicePromises); return Object.keys(sentMessagesMap); }