Skip to content

Commit

Permalink
crypto: update sender data on /keys/query responses
Browse files Browse the repository at this point in the history
When we receive an `/keys/query` response, look for existing
inboundgroupsessions created by updated devices, and see if we can update any
of their senderdata settings.
  • Loading branch information
richvdh committed Aug 30, 2024
1 parent e05ebbf commit e3faee0
Showing 1 changed file with 220 additions and 1 deletion.
221 changes: 220 additions & 1 deletion crates/matrix-sdk-crypto/src/identities/manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ use tracing::{debug, enabled, info, instrument, trace, warn, Level};
use crate::{
error::OlmResult,
identities::{DeviceData, OtherUserIdentityData, OwnUserIdentityData, UserIdentityData},
olm::PrivateCrossSigningIdentity,
olm::{InboundGroupSession, PrivateCrossSigningIdentity, SenderDataFinder, SenderDataType},
requests::KeysQueryRequest,
store::{
caches::SequenceNumber, Changes, DeviceChanges, IdentityChanges, KeyQueryManager,
Expand Down Expand Up @@ -162,6 +162,27 @@ impl IdentityManager {

self.store.save_changes(changes).await?;

// Update the sender data on any existing inbound group sessions based on the
// changes in this response.
//
// `update_sender_data_from_device_changes` relies on being able to look up the
// user identities from the store, so this has to happen *after* the
// changes from `handle_cross_signing_keys` are saved.
//
// FIXME: This thing is racy. If a session is created at the same time as a
// `/keys/query` response is being processed, it could be saved
// without up-to-date sender data, but it might be saved too late for
// it to be picked up by `update_sender_data_from_device_changes`.
//
// FIXME: Come to that, what if this races against some other operation on the
// session, such as marking it as needing backup? It's all a bit terrifying.
let igs_changes = self.update_sender_data_from_device_changes(&devices).await?;
if !igs_changes.is_empty() {
self.store
.save_changes(Changes { inbound_group_sessions: igs_changes, ..Default::default() })
.await?;
}

// if this request is one of those we expected to be in flight, pass the
// sequence number back to the store so that it can mark devices up to
// date
Expand Down Expand Up @@ -1019,6 +1040,127 @@ impl IdentityManager {
_ => Ok(None),
}
}

/// Given a list of changed devices, update any [`InboundGroupSession`]s
/// which were sent from those devices and which do not have complete
/// sender data.
///
/// Returns a list of modified InboundGroupSessions which need storing in
/// the database.
async fn update_sender_data_from_device_changes(
&self,
device_changes: &DeviceChanges,
) -> Result<Vec<InboundGroupSession>, CryptoStoreError> {
let mut result = Vec::new();

for device in device_changes.new.iter().chain(device_changes.changed.iter()) {
// 1. Look for InboundGroupSessions from the device whose sender_data is
// UnknownDevice. For such sessions, we now have the device, and can update
// the sender_data accordingly.
//
// In theory, we only need to do this for new devices. In practice, I'm a bit
// worried about races leading us to getting stuck in the
// UnknownDevice state, so we'll paper over that by doing this check
// on device updates too.
let mut result1 = self
.update_sender_data_for_sessions_for_device(device, SenderDataType::UnknownDevice)
.await?;
result.append(&mut result1);

// 2. If, and only if, the device is now correctly cross-signed (ie,
// device.is_cross_signed_by_owner() is true, and we have the master
// cross-signing key for the owner), look for InboundGroupSessions from the
// device whose sender_data is DeviceInfo. We can also update the sender_data
// for these sessions.
//
// In theory, we can skip a couple of steps of the SenderDataFinder algorithm,
// because we're doing the cross-signing check here. In practice,
// it's *way* easier just to use the same logic.
let device_owner_identity = self.store.get_user_identity(device.user_id()).await?;
if device_owner_identity.is_some_and(|id| device.is_cross_signed_by_owner(&id)) {
let mut result1 = self
.update_sender_data_for_sessions_for_device(device, SenderDataType::DeviceInfo)
.await?;
result.append(&mut result1);
}
}

Ok(result)
}

/// Given a list of changed devices, look for [`InboundGroupSession`]s
/// whose sender data is in the given state, and update it.
///
/// Returns a list of modified InboundGroupSessions which need storing in
/// the database.
#[instrument(skip(self))]
async fn update_sender_data_for_sessions_for_device(
&self,
device: &DeviceData,
sender_data_type: SenderDataType,
) -> Result<Vec<InboundGroupSession>, CryptoStoreError> {
const IGS_BATCH_SIZE: usize = 50;

let mut result = Vec::new();
let Some(curve_key) = device.curve25519_key() else { return Ok(result) };

let mut last_session_id: Option<String> = None;
loop {
let sessions = self
.store
.get_inbound_group_sessions_for_device_batch(
curve_key,
sender_data_type,
last_session_id,
IGS_BATCH_SIZE,
)
.await?;

if sessions.is_empty() {
// end of the session list
return Ok(result);
}

last_session_id = None;
for mut session in sessions {
last_session_id = Some(session.session_id().to_owned());
self.update_sender_data_for_session(&mut session, device).await?;
result.push(session);
}
}
}

/// Update the sender data on the given inbound group session, using the
/// given device data.
#[instrument(skip(self, device, session), fields(session_id = session.session_id()))]
async fn update_sender_data_for_session(
&self,
session: &mut InboundGroupSession,
device: &DeviceData,
) -> Result<(), CryptoStoreError> {
debug!("Updating existing InboundGroupSession with new SenderData");
use crate::olm::sender_data_finder::SessionDeviceCheckError::*;

match SenderDataFinder::find_using_device_data(&self.store, device.clone(), &session).await
{
Ok(sender_data) => {
session.sender_data = sender_data;
}
Err(CryptoStoreError(e)) => {
return Err(e);
}
Err(MismatchedIdentityKeys(e)) => {
warn!(
?session,
?device,
"cannot update existing InboundGroupSession due to ownership error: {}",
e
);
}
};

Ok(())
}
}

/// Log information about what changed after processing a /keys/query response.
Expand Down Expand Up @@ -2286,4 +2428,81 @@ pub(crate) mod tests {
// The latch should be set now
assert!(bob_identity.was_previously_verified());
}

mod update_sender_data {
use std::collections::HashSet;

use assert_matches::assert_matches;
use matrix_sdk_test::async_test;
use ruma::room_id;

use super::{device_id, manager_test_helper};
use crate::{
identities::manager::testing::{other_user_id, user_id},
olm::{InboundGroupSession, SenderData},
store::{Changes, DeviceChanges},
Account, DeviceData, EncryptionSettings,
};

#[async_test]
async fn test_adds_device_info_to_existing_sessions() {
let manager = manager_test_helper(user_id(), device_id()).await;

// Given that we have lots of sessions in the store, from each of two devices
let account1 = Account::new(user_id());
let account2 = Account::new(other_user_id());

let mut account1_sessions = Vec::new();
for _ in 0..60 {
account1_sessions.push(create_inbound_group_session(&account1).await);
}
let mut account2_sessions = Vec::new();
for _ in 0..60 {
account2_sessions.push(create_inbound_group_session(&account2).await);
}
manager
.store
.save_changes(Changes {
inbound_group_sessions: [account1_sessions.clone(), account2_sessions.clone()]
.concat(),
..Default::default()
})
.await
.unwrap();

// When we get an update for one device
let device_data = DeviceData::from_account(&account1);
let updated_sessions = manager
.update_sender_data_from_device_changes(&DeviceChanges {
changed: vec![device_data],
..Default::default()
})
.await
.unwrap();

// Then those sessions should be updated
assert_eq!(
updated_sessions.iter().map(|s| s.session_id()).collect::<HashSet<_>>(),
account1_sessions.iter().map(|s| s.session_id()).collect::<HashSet<_>>(),
"Set of updated sessions did not match"
);

for s in updated_sessions {
assert_matches!(s.sender_data, SenderData::DeviceInfo { .. });
}
}

/// Create an InboundGroupSession sent from the given account
async fn create_inbound_group_session(account: &Account) -> InboundGroupSession {
let (_, igs) = account
.create_group_session_pair(
room_id!("!test:room"),
EncryptionSettings::default(),
SenderData::unknown(),
)
.await
.unwrap();
igs
}
}
}

0 comments on commit e3faee0

Please sign in to comment.