Skip to content

Commit

Permalink
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
fix stream conversation & conversation_type
Browse files Browse the repository at this point in the history
insipx committed Jan 27, 2025
1 parent cb2a0c0 commit 1afc29b
Showing 7 changed files with 354 additions and 336 deletions.
30 changes: 9 additions & 21 deletions xmtp_mls/src/client.rs
Original file line number Diff line number Diff line change
@@ -25,10 +25,7 @@ use xmtp_id::{
InboxId, InboxIdRef,
};

use xmtp_proto::xmtp::mls::api::v1::{
welcome_message::{Version as WelcomeMessageVersion, V1 as WelcomeMessageV1},
GroupMessage, WelcomeMessage,
};
use xmtp_proto::xmtp::mls::api::v1::{welcome_message, GroupMessage, WelcomeMessage};

#[cfg(any(test, feature = "test-utils"))]
use crate::groups::device_sync::WorkerHandle;
@@ -825,16 +822,18 @@ where
pub async fn sync_welcomes(
&self,
provider: &XmtpOpenMlsProvider,
) -> Result<Vec<MlsGroup<Self>>, ClientError> {
) -> Result<Vec<MlsGroup<Self>>, GroupError> {
let envelopes = self.query_welcome_messages(provider.conn_ref()).await?;
let num_envelopes = envelopes.len();

let groups: Vec<MlsGroup<Self>> = stream::iter(envelopes.into_iter())
.filter_map(|envelope: WelcomeMessage| async {
let welcome_v1 = match extract_welcome_message(envelope) {
Ok(inner) => inner,
Err(err) => {
tracing::error!("failed to extract welcome message: {}", err);
let welcome_v1 = match envelope.version {
Some(welcome_message::Version::V1(v1)) => v1,
_ => {
tracing::error!(
"failed to extract welcome message, invalid payload only v1 supported."
);
return None;
}
};
@@ -861,7 +860,7 @@ where
async fn process_new_welcome(
&self,
provider: &XmtpOpenMlsProvider,
welcome: &WelcomeMessageV1,
welcome: &welcome_message::V1,
) -> Result<MlsGroup<Self>, GroupError> {
provider
.transaction_async(|provider| async move {
@@ -1030,17 +1029,6 @@ where
}
}

pub(crate) fn extract_welcome_message(
welcome: WelcomeMessage,
) -> Result<WelcomeMessageV1, ClientError> {
match welcome.version {
Some(WelcomeMessageVersion::V1(welcome)) => Ok(welcome),
_ => Err(ClientError::Generic(
"unexpected message type in welcome".to_string(),
)),
}
}

pub fn deserialize_welcome(welcome_bytes: &Vec<u8>) -> Result<Welcome, ClientError> {
// let welcome_proto = WelcomeMessageProto::decode(&mut welcome_bytes.as_slice())?;
let welcome = MlsMessageIn::tls_deserialize(&mut welcome_bytes.as_slice())?;
6 changes: 5 additions & 1 deletion xmtp_mls/src/groups/mod.rs
Original file line number Diff line number Diff line change
@@ -401,7 +401,11 @@ impl<ScopedClient: ScopedGroupClient> MlsGroup<ScopedClient> {
}
}

pub(crate) fn new_from_arc(client: Arc<ScopedClient>, group_id: Vec<u8>, created_at_ns: i64) -> Self {
pub(crate) fn new_from_arc(
client: Arc<ScopedClient>,
group_id: Vec<u8>,
created_at_ns: i64,
) -> Self {
let mut mutexes = client.context().mutexes.clone();
Self {
group_id: group_id.clone(),
29 changes: 12 additions & 17 deletions xmtp_mls/src/groups/subscriptions.rs
Original file line number Diff line number Diff line change
@@ -8,7 +8,10 @@ use super::MlsGroup;
use crate::{
groups::ScopedGroupClient,
storage::group_message::StoredGroupMessage,
subscriptions::{stream_messages::{ProcessMessageFuture, StreamGroupMessages, MessagesStreamInfo}, SubscribeError},
subscriptions::{
stream_messages::{MessagesStreamInfo, ProcessMessageFuture, StreamGroupMessages},
SubscribeError,
},
};
use xmtp_proto::api_client::{trait_impls::XmtpApi, XmtpMlsStreams};
use xmtp_proto::xmtp::mls::api::v1::GroupMessage;
@@ -37,12 +40,7 @@ impl<ScopedClient: ScopedGroupClient> MlsGroup<ScopedClient> {
where
<ScopedClient as ScopedGroupClient>::ApiClient: XmtpMlsStreams + 'a,
{
let group_list = HashMap::from([(
self.group_id.clone(),
MessagesStreamInfo {
cursor: 0,
},
)]);
let group_list = HashMap::from([(self.group_id.clone(), MessagesStreamInfo { cursor: 0 })]);
Ok(StreamGroupMessages::new(&self.client, &group_list).await?)
}

@@ -55,12 +53,7 @@ impl<ScopedClient: ScopedGroupClient> MlsGroup<ScopedClient> {
ScopedClient: 'static,
<ScopedClient as ScopedGroupClient>::ApiClient: XmtpMlsStreams + 'static,
{
let group_list = HashMap::from([(
group_id,
MessagesStreamInfo {
cursor: 0,
},
)]);
let group_list = HashMap::from([(group_id, MessagesStreamInfo { cursor: 0 })]);
stream_messages_with_callback(client, group_list, callback)
}
}
@@ -99,14 +92,14 @@ pub(crate) mod tests {
use std::sync::Arc;

use super::*;
use xmtp_cryptography::utils::generate_local_wallet;
use crate::{
use crate::{
builder::ClientBuilder, groups::GroupMetadataOptions,
storage::group_message::GroupMessageKind,
};
use xmtp_cryptography::utils::generate_local_wallet;

use wasm_bindgen_test::wasm_bindgen_test;
use futures::StreamExt;
use wasm_bindgen_test::wasm_bindgen_test;

#[wasm_bindgen_test(unsupported = tokio::test(flavor = "multi_thread", worker_threads = 10))]
async fn test_decode_group_message_bytes() {
@@ -178,7 +171,9 @@ pub(crate) mod tests {
#[wasm_bindgen_test(unsupported = tokio::test(flavor = "multi_thread", worker_threads = 10))]
async fn test_subscribe_multiple() {
let amal = Arc::new(ClientBuilder::new_test_client(&generate_local_wallet()).await);
let group = amal.create_group(None, GroupMetadataOptions::default()).unwrap();
let group = amal
.create_group(None, GroupMetadataOptions::default())
.unwrap();

let stream = group.stream().await.unwrap();
futures::pin_mut!(stream);
1 change: 0 additions & 1 deletion xmtp_mls/src/storage/xmtp_openmls_provider.rs
Original file line number Diff line number Diff line change
@@ -80,4 +80,3 @@ where
&self.key_store
}
}

Loading

0 comments on commit 1afc29b

Please sign in to comment.