Skip to content

Commit

Permalink
Node Streams Debugging + Logging (#1342)
Browse files Browse the repository at this point in the history
* more logs around streaming and api queries

* fix node streams

* fix IdentityStrategy and lints
  • Loading branch information
insipx authored Nov 26, 2024
1 parent 14df3bc commit 8d637a9
Show file tree
Hide file tree
Showing 22 changed files with 201 additions and 42 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion bindings_ffi/src/mls.rs
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,7 @@ pub async fn create_client(
None => EncryptedMessageStore::new_unencrypted(storage_option).await?,
};
log::info!("Creating XMTP client");
let identity_strategy = IdentityStrategy::CreateIfNotFound(
let identity_strategy = IdentityStrategy::new(
inbox_id.clone(),
account_address.clone(),
nonce,
Expand Down
2 changes: 2 additions & 0 deletions bindings_node/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ crate-type = ["cdylib"]
# Default enable napi4 feature, see https://nodejs.org/api/n-api.html#node-api-version-matrix
hex.workspace = true
napi = { version = "2.12.2", default-features = false, features = [
"napi4",
"napi6",
"async",
] }
Expand All @@ -23,6 +24,7 @@ xmtp_cryptography = { path = "../xmtp_cryptography" }
xmtp_id = { path = "../xmtp_id" }
xmtp_mls = { path = "../xmtp_mls" }
xmtp_proto = { path = "../xmtp_proto", features = ["proto_full"] }
futures.workspace = true

[build-dependencies]
napi-build = "2.0.1"
Expand Down
2 changes: 1 addition & 1 deletion bindings_node/src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -155,7 +155,7 @@ pub async fn create_client(
.map_err(|_| Error::from_reason("Error creating unencrypted message store"))?,
};

let identity_strategy = IdentityStrategy::CreateIfNotFound(
let identity_strategy = IdentityStrategy::new(
inbox_id.clone(),
account_address.clone().to_lowercase(),
// this is a temporary solution
Expand Down
10 changes: 10 additions & 0 deletions bindings_node/src/conversations.rs
Original file line number Diff line number Diff line change
Expand Up @@ -345,12 +345,22 @@ impl Conversations {
callback: JsFunction,
conversation_type: Option<ConversationType>,
) -> Result<StreamCloser> {
tracing::trace!(
inbox_id = self.inner_client.inbox_id(),
conversation_type = ?conversation_type,
);
let tsfn: ThreadsafeFunction<Message, ErrorStrategy::CalleeHandled> =
callback.create_threadsafe_function(0, |ctx| Ok(vec![ctx.value]))?;
let inbox_id = self.inner_client.inbox_id().to_string();
let stream_closer = RustXmtpClient::stream_all_messages_with_callback(
self.inner_client.clone(),
conversation_type.map(Into::into),
move |message| {
tracing::trace!(
inbox_id,
conversation_type = ?conversation_type,
"[received] calling tsfn callback"
);
tsfn.call(
message
.map(Into::into)
Expand Down
8 changes: 8 additions & 0 deletions bindings_node/src/streams.rs
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,14 @@ impl StreamCloser {
}
}

#[napi]
pub async fn wait_for_ready(&self) -> Result<(), Error> {
let mut stream_handle = self.handle.lock().await;
futures::future::OptionFuture::from((*stream_handle).as_mut().map(|s| s.wait_for_ready()))
.await;
Ok(())
}

/// Checks if this stream is closed
#[napi]
pub fn is_closed(&self) -> bool {
Expand Down
40 changes: 39 additions & 1 deletion bindings_node/test/Client.test.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,13 @@
import { v4 } from 'uuid'
import { toBytes } from 'viem'
import { describe, expect, it } from 'vitest'
import { createClient, createRegisteredClient, createUser } from '@test/helpers'
import {
createClient,
createRegisteredClient,
createUser,
encodeTextMessage,
sleep,
} from '@test/helpers'
import {
ConsentEntityType,
ConsentState,
Expand Down Expand Up @@ -250,3 +256,35 @@ describe('Client', () => {
).toThrow()
})
})

describe('Streams', () => {
it('should stream all messages', async () => {
const user = createUser()
const client1 = await createRegisteredClient(user)

const user2 = createUser()
const client2 = await createRegisteredClient(user2)

const group = await client1
.conversations()
.createGroup([user2.account.address])

await client2.conversations().sync()
const group2 = client2.conversations().findGroupById(group.id())

let messages = new Array()
client2.conversations().syncAllConversations()
let stream = client2.conversations().streamAllMessages((msg) => {
console.log('Message', msg)
messages.push(msg)
})
await stream.waitForReady()
group.send(encodeTextMessage('Test1'))
group.send(encodeTextMessage('Test2'))
group.send(encodeTextMessage('Test3'))
group.send(encodeTextMessage('Test4'))
await sleep(1000)
await stream.endAndWait()
expect(messages.length).toBe(4)
})
})
6 changes: 6 additions & 0 deletions bindings_node/test/helpers.ts
Original file line number Diff line number Diff line change
Expand Up @@ -80,3 +80,9 @@ export const encodeTextMessage = (text: string) => {
content: new TextEncoder().encode(text),
}
}

export function sleep(ms) {
return new Promise((resolve) => {
setTimeout(resolve, ms)
})
}
2 changes: 1 addition & 1 deletion bindings_wasm/src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -153,7 +153,7 @@ pub async fn create_client(
.map_err(|_| JsError::new("Error creating unencrypted message store"))?,
};

let identity_strategy = IdentityStrategy::CreateIfNotFound(
let identity_strategy = IdentityStrategy::new(
inbox_id.clone(),
account_address.clone().to_lowercase(),
// this is a temporary solution
Expand Down
2 changes: 1 addition & 1 deletion examples/cli/cli-client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -535,7 +535,7 @@ where
let inbox_id = generate_inbox_id(&w.get_address(), &nonce)?;
let client = create_client(
cli,
IdentityStrategy::CreateIfNotFound(inbox_id, w.get_address(), nonce, None),
IdentityStrategy::new(inbox_id, w.get_address(), nonce, None),
client,
)
.await?;
Expand Down
2 changes: 1 addition & 1 deletion xmtp_debug/src/app/clients.rs
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ async fn new_client_inner(
dir.join(db_name)
};

let client = crate::DbgClient::builder(IdentityStrategy::CreateIfNotFound(
let client = crate::DbgClient::builder(IdentityStrategy::new(
inbox_id,
wallet.get_address(),
nonce,
Expand Down
22 changes: 22 additions & 0 deletions xmtp_mls/src/api/mls.rs
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,12 @@ where
group_id: Vec<u8>,
id_cursor: Option<u64>,
) -> Result<Vec<GroupMessage>, ApiError> {
tracing::debug!(
group_id = hex::encode(&group_id),
id_cursor,
inbox_id = self.inbox_id,
"query group messages"
);
let mut out: Vec<GroupMessage> = vec![];
let page_size = 100;
let mut id_cursor = id_cursor;
Expand Down Expand Up @@ -114,6 +120,12 @@ where
installation_id: Vec<u8>,
id_cursor: Option<u64>,
) -> Result<Vec<WelcomeMessage>, ApiError> {
tracing::debug!(
installation_id = hex::encode(&installation_id),
cursor = id_cursor,
inbox_id = self.inbox_id,
"query welcomes"
);
let mut out: Vec<WelcomeMessage> = vec![];
let page_size = 100;
let mut id_cursor = id_cursor;
Expand Down Expand Up @@ -162,6 +174,7 @@ where
key_package: Vec<u8>,
is_inbox_id_credential: bool,
) -> Result<(), ApiError> {
tracing::debug!(inbox_id = self.inbox_id, "upload key packages");
retry_async!(
self.retry_strategy,
(async {
Expand All @@ -184,6 +197,7 @@ where
&self,
installation_keys: Vec<Vec<u8>>,
) -> Result<KeyPackageMap, ApiError> {
tracing::debug!(inbox_id = self.inbox_id, "fetch key packages");
let res = retry_async!(
self.retry_strategy,
(async {
Expand Down Expand Up @@ -220,6 +234,7 @@ where
&self,
messages: &[WelcomeMessageInput],
) -> Result<(), ApiError> {
tracing::debug!(inbox_id = self.inbox_id, "send welcome messages");
retry_async!(
self.retry_strategy,
(async {
Expand All @@ -236,6 +251,11 @@ where

#[tracing::instrument(level = "trace", skip_all)]
pub async fn send_group_messages(&self, group_messages: Vec<&[u8]>) -> Result<(), ApiError> {
tracing::debug!(
inbox_id = self.inbox_id,
"sending [{}] group messages",
group_messages.len()
);
let to_send: Vec<GroupMessageInput> = group_messages
.iter()
.map(|msg| GroupMessageInput {
Expand Down Expand Up @@ -267,6 +287,7 @@ where
where
ApiClient: XmtpMlsStreams,
{
tracing::debug!(inbox_id = self.inbox_id, "subscribing to group messages");
self.api_client
.subscribe_group_messages(SubscribeGroupMessagesRequest {
filters: filters.into_iter().map(|f| f.into()).collect(),
Expand All @@ -282,6 +303,7 @@ where
where
ApiClient: XmtpMlsStreams,
{
tracing::debug!(inbox_id = self.inbox_id, "subscribing to welcome messages");
self.api_client
.subscribe_welcome_messages(SubscribeWelcomeMessagesRequest {
filters: vec![WelcomeFilterProto {
Expand Down
10 changes: 9 additions & 1 deletion xmtp_mls/src/api/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ use crate::{
XmtpApi,
};
use thiserror::Error;
use xmtp_id::associations::DeserializationError as AssociationDeserializationError;
use xmtp_id::{associations::DeserializationError as AssociationDeserializationError, InboxId};
use xmtp_proto::Error as ApiError;

pub use identity::*;
Expand All @@ -34,6 +34,7 @@ impl RetryableError for WrappedApiError {
pub struct ApiClientWrapper<ApiClient> {
pub(crate) api_client: Arc<ApiClient>,
pub(crate) retry_strategy: Retry,
pub(crate) inbox_id: Option<InboxId>,
}

impl<ApiClient> ApiClientWrapper<ApiClient>
Expand All @@ -44,6 +45,13 @@ where
Self {
api_client,
retry_strategy,
inbox_id: None,
}
}

/// Attach an InboxId to this API Client Wrapper.
/// Attaches an inbox_id context to tracing logs, useful for debugging
pub(crate) fn attach_inbox_id(&mut self, inbox_id: Option<InboxId>) {
self.inbox_id = inbox_id;
}
}
Loading

0 comments on commit 8d637a9

Please sign in to comment.