From 8d637a981292d8943cc0f338171dccca7eb761b9 Mon Sep 17 00:00:00 2001 From: Andrew Plaza Date: Tue, 26 Nov 2024 12:22:59 -0500 Subject: [PATCH] Node Streams Debugging + Logging (#1342) * more logs around streaming and api queries * fix node streams * fix IdentityStrategy and lints --- Cargo.lock | 1 + bindings_ffi/src/mls.rs | 2 +- bindings_node/Cargo.toml | 2 + bindings_node/src/client.rs | 2 +- bindings_node/src/conversations.rs | 10 +++++ bindings_node/src/streams.rs | 8 ++++ bindings_node/test/Client.test.ts | 40 ++++++++++++++++- bindings_node/test/helpers.ts | 6 +++ bindings_wasm/src/client.rs | 2 +- examples/cli/cli-client.rs | 2 +- xmtp_debug/src/app/clients.rs | 2 +- xmtp_mls/src/api/mls.rs | 22 +++++++++ xmtp_mls/src/api/mod.rs | 10 ++++- xmtp_mls/src/builder.rs | 42 ++++++++--------- xmtp_mls/src/client.rs | 4 +- xmtp_mls/src/groups/intents.rs | 1 + xmtp_mls/src/groups/mod.rs | 1 + xmtp_mls/src/groups/subscriptions.rs | 8 +++- xmtp_mls/src/identity.rs | 45 ++++++++++++++++--- xmtp_mls/src/storage/encrypted_store/group.rs | 12 +++++ xmtp_mls/src/subscriptions.rs | 17 +++++-- xmtp_mls/src/utils/test/mod.rs | 4 +- 22 files changed, 201 insertions(+), 42 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 63d6dbd5e..c6e3452c5 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -431,6 +431,7 @@ dependencies = [ name = "bindings_node" version = "0.1.0" dependencies = [ + "futures", "hex", "napi", "napi-build", diff --git a/bindings_ffi/src/mls.rs b/bindings_ffi/src/mls.rs index 1261d2ef9..1dc95f2a7 100644 --- a/bindings_ffi/src/mls.rs +++ b/bindings_ffi/src/mls.rs @@ -112,7 +112,7 @@ pub async fn create_client( None => EncryptedMessageStore::new_unencrypted(storage_option).await?, }; log::info!("Creating XMTP client"); - let identity_strategy = IdentityStrategy::CreateIfNotFound( + let identity_strategy = IdentityStrategy::new( inbox_id.clone(), account_address.clone(), nonce, diff --git a/bindings_node/Cargo.toml b/bindings_node/Cargo.toml index e3beae204..8cc4ee49e 100644 --- a/bindings_node/Cargo.toml +++ b/bindings_node/Cargo.toml @@ -10,6 +10,7 @@ crate-type = ["cdylib"] # Default enable napi4 feature, see https://nodejs.org/api/n-api.html#node-api-version-matrix hex.workspace = true napi = { version = "2.12.2", default-features = false, features = [ + "napi4", "napi6", "async", ] } @@ -23,6 +24,7 @@ xmtp_cryptography = { path = "../xmtp_cryptography" } xmtp_id = { path = "../xmtp_id" } xmtp_mls = { path = "../xmtp_mls" } xmtp_proto = { path = "../xmtp_proto", features = ["proto_full"] } +futures.workspace = true [build-dependencies] napi-build = "2.0.1" diff --git a/bindings_node/src/client.rs b/bindings_node/src/client.rs index 32740d90c..d554274f0 100644 --- a/bindings_node/src/client.rs +++ b/bindings_node/src/client.rs @@ -155,7 +155,7 @@ pub async fn create_client( .map_err(|_| Error::from_reason("Error creating unencrypted message store"))?, }; - let identity_strategy = IdentityStrategy::CreateIfNotFound( + let identity_strategy = IdentityStrategy::new( inbox_id.clone(), account_address.clone().to_lowercase(), // this is a temporary solution diff --git a/bindings_node/src/conversations.rs b/bindings_node/src/conversations.rs index d6a449e70..fb35a4d83 100644 --- a/bindings_node/src/conversations.rs +++ b/bindings_node/src/conversations.rs @@ -345,12 +345,22 @@ impl Conversations { callback: JsFunction, conversation_type: Option, ) -> Result { + tracing::trace!( + inbox_id = self.inner_client.inbox_id(), + conversation_type = ?conversation_type, + ); let tsfn: ThreadsafeFunction = callback.create_threadsafe_function(0, |ctx| Ok(vec![ctx.value]))?; + let inbox_id = self.inner_client.inbox_id().to_string(); let stream_closer = RustXmtpClient::stream_all_messages_with_callback( self.inner_client.clone(), conversation_type.map(Into::into), move |message| { + tracing::trace!( + inbox_id, + conversation_type = ?conversation_type, + "[received] calling tsfn callback" + ); tsfn.call( message .map(Into::into) diff --git a/bindings_node/src/streams.rs b/bindings_node/src/streams.rs index 1094bf54d..782f85edc 100644 --- a/bindings_node/src/streams.rs +++ b/bindings_node/src/streams.rs @@ -63,6 +63,14 @@ impl StreamCloser { } } + #[napi] + pub async fn wait_for_ready(&self) -> Result<(), Error> { + let mut stream_handle = self.handle.lock().await; + futures::future::OptionFuture::from((*stream_handle).as_mut().map(|s| s.wait_for_ready())) + .await; + Ok(()) + } + /// Checks if this stream is closed #[napi] pub fn is_closed(&self) -> bool { diff --git a/bindings_node/test/Client.test.ts b/bindings_node/test/Client.test.ts index ea84ea155..ee10c30bc 100644 --- a/bindings_node/test/Client.test.ts +++ b/bindings_node/test/Client.test.ts @@ -1,7 +1,13 @@ import { v4 } from 'uuid' import { toBytes } from 'viem' import { describe, expect, it } from 'vitest' -import { createClient, createRegisteredClient, createUser } from '@test/helpers' +import { + createClient, + createRegisteredClient, + createUser, + encodeTextMessage, + sleep, +} from '@test/helpers' import { ConsentEntityType, ConsentState, @@ -250,3 +256,35 @@ describe('Client', () => { ).toThrow() }) }) + +describe('Streams', () => { + it('should stream all messages', async () => { + const user = createUser() + const client1 = await createRegisteredClient(user) + + const user2 = createUser() + const client2 = await createRegisteredClient(user2) + + const group = await client1 + .conversations() + .createGroup([user2.account.address]) + + await client2.conversations().sync() + const group2 = client2.conversations().findGroupById(group.id()) + + let messages = new Array() + client2.conversations().syncAllConversations() + let stream = client2.conversations().streamAllMessages((msg) => { + console.log('Message', msg) + messages.push(msg) + }) + await stream.waitForReady() + group.send(encodeTextMessage('Test1')) + group.send(encodeTextMessage('Test2')) + group.send(encodeTextMessage('Test3')) + group.send(encodeTextMessage('Test4')) + await sleep(1000) + await stream.endAndWait() + expect(messages.length).toBe(4) + }) +}) diff --git a/bindings_node/test/helpers.ts b/bindings_node/test/helpers.ts index 1d3260925..fc44c2229 100644 --- a/bindings_node/test/helpers.ts +++ b/bindings_node/test/helpers.ts @@ -80,3 +80,9 @@ export const encodeTextMessage = (text: string) => { content: new TextEncoder().encode(text), } } + +export function sleep(ms) { + return new Promise((resolve) => { + setTimeout(resolve, ms) + }) +} diff --git a/bindings_wasm/src/client.rs b/bindings_wasm/src/client.rs index be29aea38..1db20bf67 100644 --- a/bindings_wasm/src/client.rs +++ b/bindings_wasm/src/client.rs @@ -153,7 +153,7 @@ pub async fn create_client( .map_err(|_| JsError::new("Error creating unencrypted message store"))?, }; - let identity_strategy = IdentityStrategy::CreateIfNotFound( + let identity_strategy = IdentityStrategy::new( inbox_id.clone(), account_address.clone().to_lowercase(), // this is a temporary solution diff --git a/examples/cli/cli-client.rs b/examples/cli/cli-client.rs index 0448e03b4..9e28e9cc8 100755 --- a/examples/cli/cli-client.rs +++ b/examples/cli/cli-client.rs @@ -535,7 +535,7 @@ where let inbox_id = generate_inbox_id(&w.get_address(), &nonce)?; let client = create_client( cli, - IdentityStrategy::CreateIfNotFound(inbox_id, w.get_address(), nonce, None), + IdentityStrategy::new(inbox_id, w.get_address(), nonce, None), client, ) .await?; diff --git a/xmtp_debug/src/app/clients.rs b/xmtp_debug/src/app/clients.rs index 6ed848eeb..fe07c5a9c 100644 --- a/xmtp_debug/src/app/clients.rs +++ b/xmtp_debug/src/app/clients.rs @@ -72,7 +72,7 @@ async fn new_client_inner( dir.join(db_name) }; - let client = crate::DbgClient::builder(IdentityStrategy::CreateIfNotFound( + let client = crate::DbgClient::builder(IdentityStrategy::new( inbox_id, wallet.get_address(), nonce, diff --git a/xmtp_mls/src/api/mls.rs b/xmtp_mls/src/api/mls.rs index 01f903546..826f84325 100644 --- a/xmtp_mls/src/api/mls.rs +++ b/xmtp_mls/src/api/mls.rs @@ -70,6 +70,12 @@ where group_id: Vec, id_cursor: Option, ) -> Result, ApiError> { + tracing::debug!( + group_id = hex::encode(&group_id), + id_cursor, + inbox_id = self.inbox_id, + "query group messages" + ); let mut out: Vec = vec![]; let page_size = 100; let mut id_cursor = id_cursor; @@ -114,6 +120,12 @@ where installation_id: Vec, id_cursor: Option, ) -> Result, ApiError> { + tracing::debug!( + installation_id = hex::encode(&installation_id), + cursor = id_cursor, + inbox_id = self.inbox_id, + "query welcomes" + ); let mut out: Vec = vec![]; let page_size = 100; let mut id_cursor = id_cursor; @@ -162,6 +174,7 @@ where key_package: Vec, is_inbox_id_credential: bool, ) -> Result<(), ApiError> { + tracing::debug!(inbox_id = self.inbox_id, "upload key packages"); retry_async!( self.retry_strategy, (async { @@ -184,6 +197,7 @@ where &self, installation_keys: Vec>, ) -> Result { + tracing::debug!(inbox_id = self.inbox_id, "fetch key packages"); let res = retry_async!( self.retry_strategy, (async { @@ -220,6 +234,7 @@ where &self, messages: &[WelcomeMessageInput], ) -> Result<(), ApiError> { + tracing::debug!(inbox_id = self.inbox_id, "send welcome messages"); retry_async!( self.retry_strategy, (async { @@ -236,6 +251,11 @@ where #[tracing::instrument(level = "trace", skip_all)] pub async fn send_group_messages(&self, group_messages: Vec<&[u8]>) -> Result<(), ApiError> { + tracing::debug!( + inbox_id = self.inbox_id, + "sending [{}] group messages", + group_messages.len() + ); let to_send: Vec = group_messages .iter() .map(|msg| GroupMessageInput { @@ -267,6 +287,7 @@ where where ApiClient: XmtpMlsStreams, { + tracing::debug!(inbox_id = self.inbox_id, "subscribing to group messages"); self.api_client .subscribe_group_messages(SubscribeGroupMessagesRequest { filters: filters.into_iter().map(|f| f.into()).collect(), @@ -282,6 +303,7 @@ where where ApiClient: XmtpMlsStreams, { + tracing::debug!(inbox_id = self.inbox_id, "subscribing to welcome messages"); self.api_client .subscribe_welcome_messages(SubscribeWelcomeMessagesRequest { filters: vec![WelcomeFilterProto { diff --git a/xmtp_mls/src/api/mod.rs b/xmtp_mls/src/api/mod.rs index 64d173c13..c9255e01f 100644 --- a/xmtp_mls/src/api/mod.rs +++ b/xmtp_mls/src/api/mod.rs @@ -10,7 +10,7 @@ use crate::{ XmtpApi, }; use thiserror::Error; -use xmtp_id::associations::DeserializationError as AssociationDeserializationError; +use xmtp_id::{associations::DeserializationError as AssociationDeserializationError, InboxId}; use xmtp_proto::Error as ApiError; pub use identity::*; @@ -34,6 +34,7 @@ impl RetryableError for WrappedApiError { pub struct ApiClientWrapper { pub(crate) api_client: Arc, pub(crate) retry_strategy: Retry, + pub(crate) inbox_id: Option, } impl ApiClientWrapper @@ -44,6 +45,13 @@ where Self { api_client, retry_strategy, + inbox_id: None, } } + + /// Attach an InboxId to this API Client Wrapper. + /// Attaches an inbox_id context to tracing logs, useful for debugging + pub(crate) fn attach_inbox_id(&mut self, inbox_id: Option) { + self.inbox_id = inbox_id; + } } diff --git a/xmtp_mls/src/builder.rs b/xmtp_mls/src/builder.rs index 1e0fc216f..e8303e1ec 100644 --- a/xmtp_mls/src/builder.rs +++ b/xmtp_mls/src/builder.rs @@ -186,7 +186,11 @@ where let store = store .take() .ok_or(ClientBuilderError::MissingParameter { parameter: "store" })?; - debug!("Initializing identity"); + + debug!( + inbox_id = identity_strategy.inbox_id(), + "Initializing identity" + ); let identity = identity_strategy .initialize_identity(&api_client_wrapper, &store, &scw_verifier) @@ -340,7 +344,7 @@ pub(crate) mod tests { IdentityStrategyTestCase { strategy: { let (legacy_key, legacy_account_address) = generate_random_legacy_key().await; - IdentityStrategy::CreateIfNotFound( + IdentityStrategy::new( generate_inbox_id(&legacy_account_address, &1).unwrap(), legacy_account_address.clone(), 1, @@ -352,7 +356,7 @@ pub(crate) mod tests { IdentityStrategyTestCase { strategy: { let (legacy_key, legacy_account_address) = generate_random_legacy_key().await; - IdentityStrategy::CreateIfNotFound( + IdentityStrategy::new( generate_inbox_id(&legacy_account_address, &1).unwrap(), legacy_account_address.clone(), 0, @@ -364,7 +368,7 @@ pub(crate) mod tests { IdentityStrategyTestCase { strategy: { let (legacy_key, legacy_account_address) = generate_random_legacy_key().await; - IdentityStrategy::CreateIfNotFound( + IdentityStrategy::new( generate_inbox_id(&legacy_account_address, &0).unwrap(), legacy_account_address.clone(), 0, @@ -377,7 +381,7 @@ pub(crate) mod tests { IdentityStrategyTestCase { strategy: { let account_address = generate_local_wallet().get_address(); - IdentityStrategy::CreateIfNotFound( + IdentityStrategy::new( generate_inbox_id(&account_address, &1).unwrap(), account_address.clone(), 0, @@ -390,7 +394,7 @@ pub(crate) mod tests { strategy: { let nonce = 1; let account_address = generate_local_wallet().get_address(); - IdentityStrategy::CreateIfNotFound( + IdentityStrategy::new( generate_inbox_id(&account_address, &nonce).unwrap(), account_address.clone(), nonce, @@ -403,7 +407,7 @@ pub(crate) mod tests { strategy: { let nonce = 0; let account_address = generate_local_wallet().get_address(); - IdentityStrategy::CreateIfNotFound( + IdentityStrategy::new( generate_inbox_id(&account_address, &nonce).unwrap(), account_address.clone(), nonce, @@ -446,7 +450,7 @@ pub(crate) mod tests { let id = generate_inbox_id(&legacy_account_address, &0).unwrap(); println!("{}", id.len()); - let identity_strategy = IdentityStrategy::CreateIfNotFound( + let identity_strategy = IdentityStrategy::new( generate_inbox_id(&legacy_account_address, &0).unwrap(), legacy_account_address.clone(), 0, @@ -479,7 +483,7 @@ pub(crate) mod tests { assert!(client1.inbox_id() == client2.inbox_id()); assert!(client1.installation_public_key() == client2.installation_public_key()); - let client3 = ClientBuilder::new(IdentityStrategy::CreateIfNotFound( + let client3 = ClientBuilder::new(IdentityStrategy::new( generate_inbox_id(&legacy_account_address, &0).unwrap(), legacy_account_address.to_string(), 0, @@ -495,7 +499,7 @@ pub(crate) mod tests { assert!(client1.inbox_id() == client3.inbox_id()); assert!(client1.installation_public_key() == client3.installation_public_key()); - let client4 = ClientBuilder::new(IdentityStrategy::CreateIfNotFound( + let client4 = ClientBuilder::new(IdentityStrategy::new( generate_inbox_id(&legacy_account_address, &0).unwrap(), legacy_account_address.to_string(), 0, @@ -544,8 +548,7 @@ pub(crate) mod tests { let wrapper = ApiClientWrapper::new(mock_api.into(), Retry::default()); - let identity = - IdentityStrategy::CreateIfNotFound("other_inbox_id".to_string(), address, nonce, None); + let identity = IdentityStrategy::new("other_inbox_id".to_string(), address, nonce, None); assert!(matches!( identity .initialize_identity(&wrapper, &store, &scw_verifier) @@ -586,7 +589,7 @@ pub(crate) mod tests { let wrapper = ApiClientWrapper::new(mock_api.into(), Retry::default()); - let identity = IdentityStrategy::CreateIfNotFound(inbox_id.clone(), address, nonce, None); + let identity = IdentityStrategy::new(inbox_id.clone(), address, nonce, None); assert!(dbg!( identity .initialize_identity(&wrapper, &store, &scw_verifier) @@ -625,7 +628,7 @@ pub(crate) mod tests { stored.store(&store.conn().unwrap()).unwrap(); let wrapper = ApiClientWrapper::new(mock_api.into(), Retry::default()); - let identity = IdentityStrategy::CreateIfNotFound(inbox_id.clone(), address, nonce, None); + let identity = IdentityStrategy::new(inbox_id.clone(), address, nonce, None); assert!(identity .initialize_identity(&wrapper, &store, &scw_verifier) .await @@ -665,8 +668,7 @@ pub(crate) mod tests { let wrapper = ApiClientWrapper::new(mock_api.into(), Retry::default()); let inbox_id = "inbox_id".to_string(); - let identity = - IdentityStrategy::CreateIfNotFound(inbox_id.clone(), address.clone(), nonce, None); + let identity = IdentityStrategy::new(inbox_id.clone(), address.clone(), nonce, None); let err = identity .initialize_identity(&wrapper, &store, &scw_verifier) .await @@ -691,7 +693,7 @@ pub(crate) mod tests { let nonce = 1; let inbox_id = generate_inbox_id(&wallet.get_address(), &nonce).unwrap(); - let client_a = Client::builder(IdentityStrategy::CreateIfNotFound( + let client_a = Client::builder(IdentityStrategy::new( inbox_id.clone(), wallet.get_address(), nonce, @@ -715,7 +717,7 @@ pub(crate) mod tests { .await .unwrap(); - let client_b = Client::builder(IdentityStrategy::CreateIfNotFound( + let client_b = Client::builder(IdentityStrategy::new( inbox_id, wallet.get_address(), nonce, @@ -739,7 +741,7 @@ pub(crate) mod tests { // EncryptedMessageStore::new_unencrypted(StorageOption::Persistent(tmpdb.clone())) // .unwrap(); - // ClientBuilder::new(IdentityStrategy::CreateIfNotFound( + // ClientBuilder::new(IdentityStrategy::new( // generate_local_wallet().get_address(), // None, // )) @@ -801,7 +803,7 @@ pub(crate) mod tests { let account_address = format!("{scw_addr:?}"); let account_id = AccountId::new_evm(anvil_meta.chain_id, account_address.clone()); - let identity_strategy = IdentityStrategy::CreateIfNotFound( + let identity_strategy = IdentityStrategy::new( generate_inbox_id(&account_address, &0).unwrap(), account_address, 0, diff --git a/xmtp_mls/src/client.rs b/xmtp_mls/src/client.rs index d680e3018..055252832 100644 --- a/xmtp_mls/src/client.rs +++ b/xmtp_mls/src/client.rs @@ -218,7 +218,7 @@ where /// It is expected that most users will use the [`ClientBuilder`](crate::builder::ClientBuilder) instead of instantiating /// a client directly. pub fn new( - api_client: ApiClientWrapper, + mut api_client: ApiClientWrapper, identity: Identity, store: EncryptedMessageStore, scw_verifier: V, @@ -227,6 +227,7 @@ where where V: SmartContractSignatureVerifier, { + api_client.attach_inbox_id(Some(identity.inbox_id().to_string())); let context = Arc::new(XmtpMlsLocalContext { identity, store, @@ -723,6 +724,7 @@ where /// Download all unread welcome messages and converts to a group struct, ignoring malformed messages. /// Returns any new groups created in the operation + #[tracing::instrument(level = "debug", skip_all)] pub async fn sync_welcomes( &self, conn: &DbConnection, diff --git a/xmtp_mls/src/groups/intents.rs b/xmtp_mls/src/groups/intents.rs index 13767ff5c..80f5eb167 100644 --- a/xmtp_mls/src/groups/intents.rs +++ b/xmtp_mls/src/groups/intents.rs @@ -86,6 +86,7 @@ impl MlsGroup { if intent_kind != IntentKind::SendMessage { conn.update_rotated_at_ns(self.group_id.clone())?; } + tracing::debug!(inbox_id = self.client.inbox_id(), intent_kind = %intent_kind, "queued intent"); Ok(intent) } diff --git a/xmtp_mls/src/groups/mod.rs b/xmtp_mls/src/groups/mod.rs index f6de6dd48..143ca5f28 100644 --- a/xmtp_mls/src/groups/mod.rs +++ b/xmtp_mls/src/groups/mod.rs @@ -591,6 +591,7 @@ impl MlsGroup { /// Send a message on this users XMTP [`Client`]. pub async fn send_message(&self, message: &[u8]) -> Result, GroupError> { + tracing::debug!(inbox_id = self.client.inbox_id(), "sending message"); let conn = self.context().store().conn()?; let provider = XmtpOpenMlsProvider::from(conn); self.send_message_with_provider(message, &provider).await diff --git a/xmtp_mls/src/groups/subscriptions.rs b/xmtp_mls/src/groups/subscriptions.rs index 5701bb882..2caf9bec9 100644 --- a/xmtp_mls/src/groups/subscriptions.rs +++ b/xmtp_mls/src/groups/subscriptions.rs @@ -180,6 +180,7 @@ impl MlsGroup { } /// Stream messages from groups in `group_id_to_info` +#[tracing::instrument(level = "debug", skip_all)] pub(crate) async fn stream_messages( client: &ScopedClient, group_id_to_info: Arc, MessagesStreamInfo>>, @@ -200,9 +201,12 @@ where let group_id_to_info = group_id_to_info.clone(); async move { let envelope = res.map_err(GroupError::from)?; - tracing::info!("Received message streaming payload"); let group_id = extract_group_id(&envelope)?; - tracing::info!("Extracted group id {}", hex::encode(&group_id)); + tracing::info!( + inbox_id = client.inbox_id(), + group_id = hex::encode(&group_id), + "Received message streaming payload" + ); let stream_info = group_id_to_info .get(&group_id) diff --git a/xmtp_mls/src/identity.rs b/xmtp_mls/src/identity.rs index 0f7ffdaa5..0b73a784d 100644 --- a/xmtp_mls/src/identity.rs +++ b/xmtp_mls/src/identity.rs @@ -60,7 +60,12 @@ use xmtp_proto::xmtp::identity::MlsCredential; #[derive(Debug, Clone)] pub enum IdentityStrategy { /// Tries to get an identity from the disk store. If not found, getting one from backend. - CreateIfNotFound(InboxId, String, u64, Option>), // (inbox_id, address, nonce, legacy_signed_private_key) + CreateIfNotFound { + inbox_id: InboxId, + address: String, + nonce: u64, + legacy_signed_private_key: Option>, + }, /// Identity that is already in the disk store CachedOnly, /// An already-built Identity for testing purposes @@ -68,6 +73,32 @@ pub enum IdentityStrategy { ExternalIdentity(Identity), } +impl IdentityStrategy { + pub fn inbox_id(&self) -> Option> { + use IdentityStrategy::*; + match self { + CreateIfNotFound { ref inbox_id, .. } => Some(inbox_id), + _ => None, + } + } + + /// Create a new Identity Strategy, with [`IdentityStrategy::CreateIfNotFound`]. + /// If an Identity is not found in the local store, creates a new one. + pub fn new( + inbox_id: InboxId, + address: String, + nonce: u64, + legacy_signed_private_key: Option>, + ) -> Self { + Self::CreateIfNotFound { + inbox_id, + address, + nonce, + legacy_signed_private_key, + } + } +} + impl IdentityStrategy { /** * Initialize an identity from the given strategy. If a stored identity is found in the database, @@ -83,6 +114,8 @@ impl IdentityStrategy { store: &EncryptedMessageStore, scw_signature_verifier: impl SmartContractSignatureVerifier, ) -> Result { + use IdentityStrategy::*; + info!("Initializing identity"); let conn = store.conn()?; let provider = XmtpOpenMlsProvider::new(conn); @@ -94,15 +127,13 @@ impl IdentityStrategy { debug!("identity in store: {:?}", stored_identity); match self { - IdentityStrategy::CachedOnly => { - stored_identity.ok_or(IdentityError::RequiredIdentityNotFound) - } - IdentityStrategy::CreateIfNotFound( + CachedOnly => stored_identity.ok_or(IdentityError::RequiredIdentityNotFound), + CreateIfNotFound { inbox_id, address, nonce, legacy_signed_private_key, - ) => { + } => { if let Some(stored_identity) = stored_identity { if inbox_id != stored_identity.inbox_id { return Err(IdentityError::InboxIdMismatch { @@ -126,7 +157,7 @@ impl IdentityStrategy { } } #[cfg(test)] - IdentityStrategy::ExternalIdentity(identity) => Ok(identity), + ExternalIdentity(identity) => Ok(identity), } } } diff --git a/xmtp_mls/src/storage/encrypted_store/group.rs b/xmtp_mls/src/storage/encrypted_store/group.rs index f5241f53e..1bd426db7 100644 --- a/xmtp_mls/src/storage/encrypted_store/group.rs +++ b/xmtp_mls/src/storage/encrypted_store/group.rs @@ -492,6 +492,7 @@ pub enum ConversationType { Dm = 2, Sync = 3, } + impl ToSql for ConversationType where i32: ToSql, @@ -516,6 +517,17 @@ where } } +impl std::fmt::Display for ConversationType { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + use ConversationType::*; + match self { + Group => write!(f, "group"), + Dm => write!(f, "dm"), + Sync => write!(f, "sync"), + } + } +} + #[cfg(test)] pub(crate) mod tests { #[cfg(target_arch = "wasm32")] diff --git a/xmtp_mls/src/subscriptions.rs b/xmtp_mls/src/subscriptions.rs index 90de8461f..c4d958f7b 100644 --- a/xmtp_mls/src/subscriptions.rs +++ b/xmtp_mls/src/subscriptions.rs @@ -235,6 +235,7 @@ where match result { Ok(Some(group)) => { tracing::info!( + inbox_id = self.inbox_id(), group_id = hex::encode(&group.id), welcome_id = ?group.welcome_id, "Loading existing group for welcome_id: {:?}", @@ -261,6 +262,7 @@ where Ok(welcome) } + #[tracing::instrument(level = "debug", skip_all)] pub async fn stream_conversations( &self, conversation_type: Option, @@ -298,7 +300,7 @@ where let installation_key = self.installation_public_key(); let id_cursor = 0; - tracing::info!("Setting up conversation stream"); + tracing::info!(inbox_id = self.inbox_id(), "Setting up conversation stream"); let subscription = self .api_client .subscribe_welcome_messages(installation_key, Some(id_cursor)) @@ -306,7 +308,10 @@ where let stream = subscription .map(|welcome| async { - tracing::info!("Received conversation streaming payload"); + tracing::info!( + inbox_id = self.inbox_id(), + "Received conversation streaming payload" + ); self.process_streamed_welcome(welcome?).await }) .filter_map(|v| async { Some(v.await) }); @@ -340,11 +345,18 @@ where }) } + #[tracing::instrument(level = "debug", skip_all)] pub async fn stream_all_messages( &self, conversation_type: Option, ) -> Result> + '_, ClientError> { + tracing::debug!( + inbox_id = self.inbox_id(), + conversation_type = ?conversation_type, + "stream all messages" + ); + let conn = self.store().conn()?; self.sync_welcomes(&conn).await?; @@ -364,7 +376,6 @@ where .await?; futures::pin_mut!(messages_stream); - tracing::info!("Setting up conversation stream in stream_all_messages"); let convo_stream = self.stream_conversations(conversation_type).await?; futures::pin_mut!(convo_stream); diff --git a/xmtp_mls/src/utils/test/mod.rs b/xmtp_mls/src/utils/test/mod.rs index 1616556ea..952b31988 100755 --- a/xmtp_mls/src/utils/test/mod.rs +++ b/xmtp_mls/src/utils/test/mod.rs @@ -175,7 +175,7 @@ where let nonce = 1; let inbox_id = generate_inbox_id(&owner.get_address(), &nonce).unwrap(); - let client = Client::::builder(IdentityStrategy::CreateIfNotFound( + let client = Client::::builder(IdentityStrategy::new( inbox_id, owner.get_address(), nonce, @@ -208,7 +208,7 @@ where let nonce = 1; let inbox_id = generate_inbox_id(&owner.get_address(), &nonce).unwrap(); - let mut builder = Client::::builder(IdentityStrategy::CreateIfNotFound( + let mut builder = Client::::builder(IdentityStrategy::new( inbox_id, owner.get_address(), nonce,