Skip to content

Commit

Permalink
refactor: move code from external_commit module to `PendingConversa…
Browse files Browse the repository at this point in the history
…tion`

Also,
* Do some refactoring in both modules
* Rename `pending_group_exists()` to `pending_conversation_exists()`
* Rename `merge_pending_group_and_build_decrypted_message_instance()` to
to `merge_and_restore_messages()`, because it merges `self`
(`PendingGroup`) and restores buffered messages.
  • Loading branch information
SimonThormeyer committed Mar 6, 2025
1 parent 6d16dab commit bf626f0
Show file tree
Hide file tree
Showing 7 changed files with 227 additions and 295 deletions.
13 changes: 5 additions & 8 deletions crypto/src/mls/conversation/buffer_messages.rs
Original file line number Diff line number Diff line change
Expand Up @@ -542,20 +542,17 @@ mod tests {
.unwrap();

// now bump the epoch in external_0: the new member has joined
let new_member_join_commit = new_member
let (new_member_join_commit, mut pending_conversation) = new_member
.create_unmerged_external_commit(
observer.get_group_info(&conv_id).await,
case.custom_cfg(),
case.credential_type,
)
.await
.commit;
.await;

new_member
.context
.merge_pending_group_from_external_commit(&conv_id)
.await
.unwrap();
let new_member_join_commit = new_member_join_commit.commit;

pending_conversation.merge().await.unwrap();

// also create the same proposal with the epoch increased by 1
let leaf_of_114 = new_member.index_of(&conv_id, member_114.get_client_id().await).await;
Expand Down
12 changes: 4 additions & 8 deletions crypto/src/mls/conversation/duplicate.rs
Original file line number Diff line number Diff line change
Expand Up @@ -138,15 +138,11 @@ mod tests {
let gi = alice_central.get_group_info(&id).await;

// an external commit to verify that we can still detect wrong epoch correctly
let unknown_ext_commit = bob_central
let (unknown_ext_commit, mut pending_conversation) = bob_central
.create_unmerged_external_commit(gi.clone(), case.custom_cfg(), case.credential_type)
.await
.commit;
bob_central
.context
.clear_pending_group_from_external_commit(&id)
.await
.unwrap();
.await;
let unknown_ext_commit = unknown_ext_commit.commit;
pending_conversation.clear().await.unwrap();

bob_central
.context
Expand Down
2 changes: 1 addition & 1 deletion crypto/src/mls/conversation/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ mod leaf_node_validation;
pub(crate) mod merge;
mod orphan_welcome;
mod own_commit;
mod pending_conversation;
pub(crate) mod pending_conversation;
pub(crate) mod proposal;
mod renew;
pub(crate) mod welcome;
Expand Down
147 changes: 130 additions & 17 deletions crypto/src/mls/conversation/pending_conversation.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,18 +2,22 @@
//! (most of the time renewed external proposals) for the new epoch whereas it does not yet have
//! the confirmation from the DS that the external join commit has been accepted.
use super::Error;
use super::Result;
use super::{ConversationWithMls, Error};
use crate::context::CentralContext;
use crate::mls::credential::crl::{extract_crl_uris_from_group, get_new_crl_distribution_points};
use crate::mls::credential::ext::CredentialExt as _;
use crate::prelude::{ConversationId, MlsConversationDecryptMessage};
use crate::prelude::{
ConversationId, MlsBufferedConversationDecryptMessage, MlsConversation, MlsConversationConfiguration,
MlsConversationDecryptMessage, MlsCustomConfiguration,
};
use crate::{KeystoreError, LeafError, MlsError, RecursiveError};
use core_crypto_keystore::CryptoKeystoreMls as _;
use core_crypto_keystore::entities::{MlsPendingMessage, PersistedMlsPendingGroup};
use mls_crypto_provider::{CryptoKeystore, MlsCryptoProvider};
use openmls::credentials::CredentialWithKey;
use openmls::prelude::{MlsGroup, MlsMessageIn, MlsMessageInBody};
use openmls_traits::OpenMlsCryptoProvider;
use tls_codec::Deserialize as _;

/// A pending conversation is a conversation that has been created via an external join commit
Expand All @@ -29,6 +33,25 @@ impl PendingConversation {
Self { inner, context }
}

pub(crate) fn from_mls_group(
group: MlsGroup,
custom_cfg: MlsCustomConfiguration,
context: CentralContext,
) -> Result<Self> {
let serialized_cfg = serde_json::to_vec(&custom_cfg).map_err(MlsError::wrap("serializing custom config"))?;
let serialized_group =
core_crypto_keystore::ser(&group).map_err(KeystoreError::wrap("serializing mls group"))?;
let group_id = group.group_id().to_vec();

let inner = PersistedMlsPendingGroup {
id: group_id,
state: serialized_group,
custom_configuration: serialized_cfg,
parent_id: None,
};
Ok(Self::new(inner, context))
}

async fn mls_provider(&self) -> Result<MlsCryptoProvider> {
self.context
.mls_provider()
Expand All @@ -46,13 +69,26 @@ impl PendingConversation {
&self.inner.id
}

pub(crate) async fn save(&self) -> Result<()> {
let keystore = self.keystore().await?;
keystore
.mls_pending_groups_save(self.id(), &self.inner.state, &self.inner.custom_configuration, None)
.await
.map_err(KeystoreError::wrap("saving mls pending groups"))
.map_err(Into::into)
}

/// If the given message is the commit generated by [CentralContext::join_by_external_commit],
/// merge the pending group and restore any buffered messages.
///
/// Otherwise, the given message will be buffered.
pub async fn try_process_own_join_commit(
&self,
&mut self,
message: impl AsRef<[u8]>,
) -> Result<MlsConversationDecryptMessage> {
// If the confirmation tag of the pending group and this incoming message are identical, we can merge the pending group.
if self.incoming_message_is_own_join_commit(message.as_ref()).await? {
return self.merge_pending_group_and_build_decrypted_message_instance().await;
return self.merge_and_restore_messages().await;
}

let keystore = self.keystore().await?;
Expand Down Expand Up @@ -101,20 +137,16 @@ impl PendingConversation {
Ok(*msg_ct == group_ct)
}

async fn merge_pending_group_and_build_decrypted_message_instance(&self) -> Result<MlsConversationDecryptMessage> {
/// Merges the [Self] instance and restores any buffered messages.
async fn merge_and_restore_messages(&mut self) -> Result<MlsConversationDecryptMessage> {
let buffered_messages = self.merge().await?;
let context = &self.context;
let backend = self.mls_provider().await?;
let id = self.id();
// TODO(SimonThormeyer): The implementation of that should be moved here
let buffered_messages = context
.merge_pending_group_from_external_commit(id)
.await
.map_err(RecursiveError::mls("merging pending group from external commit"))?;
let conversation = context
.get_conversation(self.id())
.await
.map_err(RecursiveError::mls_conversation("getting conversation by id"))?;
let conversation = conversation.read().await;

// This is the now merged conversation
let conversation = context.conversation_guard(id).await?;
let conversation = conversation.conversation().await;
let own_leaf = conversation.group.own_leaf().ok_or(LeafError::InternalMlsError)?;

// We return self identity here, probably not necessary to check revocation
Expand Down Expand Up @@ -146,6 +178,87 @@ impl PendingConversation {
crl_new_distribution_points,
})
}

/// This merges the commit generated by [CentralContext::join_by_external_commit],
/// persists the group permanently and deletes the temporary one. After merging, the group
/// is fully functional.
///
/// # Errors
/// Errors resulting from OpenMls, the KeyStore calls and deserialization
pub(crate) async fn merge(&mut self) -> Result<Option<Vec<MlsBufferedConversationDecryptMessage>>> {
let mls_provider = self.mls_provider().await?;
let id = self.id();
let group = self.inner.state.clone();
let cfg = self.inner.custom_configuration.clone();

let mut mls_group =
core_crypto_keystore::deser::<MlsGroup>(&group).map_err(KeystoreError::wrap("deserializing mls group"))?;

// Merge it aka bring the MLS group to life and make it usable
mls_group
.merge_pending_commit(&mls_provider)
.await
.map_err(MlsError::wrap("merging pending commit"))?;

// Restore the custom configuration and build a conversation from it
let custom_cfg =
serde_json::from_slice(&cfg).map_err(MlsError::wrap("deserializing mls custom configuration"))?;
let configuration = MlsConversationConfiguration {
ciphersuite: mls_group.ciphersuite().into(),
custom: custom_cfg,
..Default::default()
};

let is_rejoin = mls_provider.key_store().mls_group_exists(id.as_slice()).await;

// Persist the now usable MLS group in the keystore
let mut conversation = MlsConversation::from_mls_group(mls_group, configuration, &mls_provider)
.await
.map_err(RecursiveError::mls_conversation(
"constructing conversation from mls group",
))?;

let context = &self.context;

let pending_messages = context
.restore_pending_messages(&mut conversation, is_rejoin)
.await
.map_err(RecursiveError::mls_conversation("restoring pending messages"))?;

context
.mls_groups()
.await
.map_err(RecursiveError::root("getting mls groups"))?
.insert(id.clone(), conversation);

if pending_messages.is_some() {
mls_provider
.key_store()
.remove::<MlsPendingMessage, _>(id)
.await
.map_err(KeystoreError::wrap("deleting mls pending message by id"))?;
}

// cleanup the pending group we no longer need
self.clear().await?;

Ok(pending_messages)
}

/// In case the external commit generated by [CentralContext::join_by_external_commit] is
/// rejected by the Delivery Service, and we want to abort this external commit,
/// we can wipe out the pending group from the keystore.
///
/// # Errors
/// Errors resulting from the KeyStore calls
pub(crate) async fn clear(&mut self) -> Result<()> {
self.keystore()
.await?
.mls_pending_groups_delete(self.id())
.await
.map_err(KeystoreError::wrap("deleting pending groups by id"))?;
Ok(())
}
}

#[cfg(test)]
Expand Down Expand Up @@ -174,7 +287,7 @@ mod tests {
.unwrap();
// Bob tries to join Alice's group with an external commit
let gi = alice_central.get_group_info(&id).await;
let external_commit = bob_central
let (external_commit, _) = bob_central
.create_unmerged_external_commit(gi, case.custom_cfg(), case.credential_type)
.await;

Expand Down Expand Up @@ -247,7 +360,7 @@ mod tests {
let messages = vec![commit.commit, external_proposal, proposal]
.into_iter()
.map(|m| m.to_bytes().unwrap());
let Err(Error::PendingConversation(pending_conversation)) =
let Err(Error::PendingConversation(mut pending_conversation)) =
bob_central.context.conversation_guard(&id).await
else {
panic!("Bob should not have the conversation yet")
Expand Down
Loading

0 comments on commit bf626f0

Please sign in to comment.