diff --git a/crates/matrix-sdk-crypto/src/identities/manager.rs b/crates/matrix-sdk-crypto/src/identities/manager.rs index ba701ed0288..223bb9f7a1a 100644 --- a/crates/matrix-sdk-crypto/src/identities/manager.rs +++ b/crates/matrix-sdk-crypto/src/identities/manager.rs @@ -32,7 +32,7 @@ use tracing::{debug, enabled, info, instrument, trace, warn, Level}; use crate::{ error::OlmResult, identities::{DeviceData, OtherUserIdentityData, OwnUserIdentityData, UserIdentityData}, - olm::PrivateCrossSigningIdentity, + olm::{InboundGroupSession, PrivateCrossSigningIdentity, SenderDataFinder, SenderDataType}, requests::KeysQueryRequest, store::{ caches::SequenceNumber, Changes, DeviceChanges, IdentityChanges, KeyQueryManager, @@ -162,6 +162,27 @@ impl IdentityManager { self.store.save_changes(changes).await?; + // Update the sender data on any existing inbound group sessions based on the + // changes in this response. + // + // `update_sender_data_from_device_changes` relies on being able to look up the + // user identities from the store, so this has to happen *after* the + // changes from `handle_cross_signing_keys` are saved. + // + // FIXME: This thing is racy. If a session is created at the same time as a + // `/keys/query` response is being processed, it could be saved + // without up-to-date sender data, but it might be saved too late for + // it to be picked up by `update_sender_data_from_device_changes`. + // + // FIXME: Come to that, what if this races against some other operation on the + // session, such as marking it as needing backup? It's all a bit terrifying. + let igs_changes = self.update_sender_data_from_device_changes(&devices).await?; + if !igs_changes.is_empty() { + self.store + .save_changes(Changes { inbound_group_sessions: igs_changes, ..Default::default() }) + .await?; + } + // if this request is one of those we expected to be in flight, pass the // sequence number back to the store so that it can mark devices up to // date @@ -1019,6 +1040,127 @@ impl IdentityManager { _ => Ok(None), } } + + /// Given a list of changed devices, update any [`InboundGroupSession`]s + /// which were sent from those devices and which do not have complete + /// sender data. + /// + /// Returns a list of modified InboundGroupSessions which need storing in + /// the database. + async fn update_sender_data_from_device_changes( + &self, + device_changes: &DeviceChanges, + ) -> Result, CryptoStoreError> { + let mut result = Vec::new(); + + for device in device_changes.new.iter().chain(device_changes.changed.iter()) { + // 1. Look for InboundGroupSessions from the device whose sender_data is + // UnknownDevice. For such sessions, we now have the device, and can update + // the sender_data accordingly. + // + // In theory, we only need to do this for new devices. In practice, I'm a bit + // worried about races leading us to getting stuck in the + // UnknownDevice state, so we'll paper over that by doing this check + // on device updates too. + let mut result1 = self + .update_sender_data_for_sessions_for_device(device, SenderDataType::UnknownDevice) + .await?; + result.append(&mut result1); + + // 2. If, and only if, the device is now correctly cross-signed (ie, + // device.is_cross_signed_by_owner() is true, and we have the master + // cross-signing key for the owner), look for InboundGroupSessions from the + // device whose sender_data is DeviceInfo. We can also update the sender_data + // for these sessions. + // + // In theory, we can skip a couple of steps of the SenderDataFinder algorithm, + // because we're doing the cross-signing check here. In practice, + // it's *way* easier just to use the same logic. + let device_owner_identity = self.store.get_user_identity(device.user_id()).await?; + if device_owner_identity.is_some_and(|id| device.is_cross_signed_by_owner(&id)) { + let mut result1 = self + .update_sender_data_for_sessions_for_device(device, SenderDataType::DeviceInfo) + .await?; + result.append(&mut result1); + } + } + + Ok(result) + } + + /// Given a list of changed devices, look for [`InboundGroupSession`]s + /// whose sender data is in the given state, and update it. + /// + /// Returns a list of modified InboundGroupSessions which need storing in + /// the database. + #[instrument(skip(self))] + async fn update_sender_data_for_sessions_for_device( + &self, + device: &DeviceData, + sender_data_type: SenderDataType, + ) -> Result, CryptoStoreError> { + const IGS_BATCH_SIZE: usize = 50; + + let mut result = Vec::new(); + let Some(curve_key) = device.curve25519_key() else { return Ok(result) }; + + let mut last_session_id: Option = None; + loop { + let sessions = self + .store + .get_inbound_group_sessions_for_device_batch( + curve_key, + sender_data_type, + last_session_id, + IGS_BATCH_SIZE, + ) + .await?; + + if sessions.is_empty() { + // end of the session list + return Ok(result); + } + + last_session_id = None; + for mut session in sessions { + last_session_id = Some(session.session_id().to_owned()); + self.update_sender_data_for_session(&mut session, device).await?; + result.push(session); + } + } + } + + /// Update the sender data on the given inbound group session, using the + /// given device data. + #[instrument(skip(self, device, session), fields(session_id = session.session_id()))] + async fn update_sender_data_for_session( + &self, + session: &mut InboundGroupSession, + device: &DeviceData, + ) -> Result<(), CryptoStoreError> { + debug!("Updating existing InboundGroupSession with new SenderData"); + use crate::olm::sender_data_finder::SessionDeviceCheckError::*; + + match SenderDataFinder::find_using_device_data(&self.store, device.clone(), &session).await + { + Ok(sender_data) => { + session.sender_data = sender_data; + } + Err(CryptoStoreError(e)) => { + return Err(e); + } + Err(MismatchedIdentityKeys(e)) => { + warn!( + ?session, + ?device, + "cannot update existing InboundGroupSession due to ownership error: {}", + e + ); + } + }; + + Ok(()) + } } /// Log information about what changed after processing a /keys/query response. @@ -2286,4 +2428,81 @@ pub(crate) mod tests { // The latch should be set now assert!(bob_identity.was_previously_verified()); } + + mod update_sender_data { + use std::collections::HashSet; + + use assert_matches::assert_matches; + use matrix_sdk_test::async_test; + use ruma::room_id; + + use super::{device_id, manager_test_helper}; + use crate::{ + identities::manager::testing::{other_user_id, user_id}, + olm::{InboundGroupSession, SenderData}, + store::{Changes, DeviceChanges}, + Account, DeviceData, EncryptionSettings, + }; + + #[async_test] + async fn test_adds_device_info_to_existing_sessions() { + let manager = manager_test_helper(user_id(), device_id()).await; + + // Given that we have lots of sessions in the store, from each of two devices + let account1 = Account::new(user_id()); + let account2 = Account::new(other_user_id()); + + let mut account1_sessions = Vec::new(); + for _ in 0..60 { + account1_sessions.push(create_inbound_group_session(&account1).await); + } + let mut account2_sessions = Vec::new(); + for _ in 0..60 { + account2_sessions.push(create_inbound_group_session(&account2).await); + } + manager + .store + .save_changes(Changes { + inbound_group_sessions: [account1_sessions.clone(), account2_sessions.clone()] + .concat(), + ..Default::default() + }) + .await + .unwrap(); + + // When we get an update for one device + let device_data = DeviceData::from_account(&account1); + let updated_sessions = manager + .update_sender_data_from_device_changes(&DeviceChanges { + changed: vec![device_data], + ..Default::default() + }) + .await + .unwrap(); + + // Then those sessions should be updated + assert_eq!( + updated_sessions.iter().map(|s| s.session_id()).collect::>(), + account1_sessions.iter().map(|s| s.session_id()).collect::>(), + "Set of updated sessions did not match" + ); + + for s in updated_sessions { + assert_matches!(s.sender_data, SenderData::DeviceInfo { .. }); + } + } + + /// Create an InboundGroupSession sent from the given account + async fn create_inbound_group_session(account: &Account) -> InboundGroupSession { + let (_, igs) = account + .create_group_session_pair( + room_id!("!test:room"), + EncryptionSettings::default(), + SenderData::unknown(), + ) + .await + .unwrap(); + igs + } + } }