Skip to content

Commit

Permalink
fix bindings
Browse files Browse the repository at this point in the history
  • Loading branch information
insipx committed Jan 28, 2025
1 parent bae85e7 commit bdf2694
Show file tree
Hide file tree
Showing 28 changed files with 391 additions and 847 deletions.
3 changes: 1 addition & 2 deletions .github/workflows/test-webassembly.yml
Original file line number Diff line number Diff line change
Expand Up @@ -39,8 +39,7 @@ jobs:
- name: test with chrome
run: |
cargo test --release --target wasm32-unknown-unknown -p xmtp_mls -p xmtp_id -p xmtp_api_http -p xmtp_cryptography -- \
--skip xmtp_mls::subscriptions --skip xmtp_mls::groups::subscriptions \
--skip xmtp_mls::storage::encrypted_store::group_message::tests::it_cannot_insert_message_without_group \
-skip xmtp_mls::storage::encrypted_store::group_message::tests::it_cannot_insert_message_without_group \
--skip xmtp_mls::groups::tests::process_messages_abort_on_retryable_error \
--skip xmtp_mls::storage::encrypted_store::group::tests::test_find_groups \
--skip xmtp_mls::storage::encrypted_store::group::tests::test_installations_last_checked_is_updated
Expand Down
1 change: 1 addition & 0 deletions .vscode/settings.json
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
],
"rust-analyzer.procMacro.attributes.enable": true,
"rust-analyzer.procMacro.ignored": {
"tracing": ["instrument"],
"async-trait": ["async_trait"],
"napi-derive": ["napi"],
"async-recursion": ["async_recursion"],
Expand Down
16 changes: 9 additions & 7 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,7 @@ wasm-bindgen-test = "0.3.50"
web-sys = "0.3"
zeroize = "1.8"
pin-project-lite = "0.2"
reqwest = { version = "0.12.5", features = ["json", "stream"] }
reqwest = { version = "0.12.12", features = ["json", "stream"] }
bytes = "1.9"

# Internal Crate Dependencies
Expand Down
2 changes: 2 additions & 0 deletions bindings_ffi/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,8 @@ pub enum GenericError {
DeviceSync(#[from] xmtp_mls::groups::device_sync::DeviceSyncError),
#[error(transparent)]
Identity(#[from] xmtp_mls::identity::IdentityError),
#[error(transparent)]
Subscription(#[from] xmtp_mls::subscriptions::SubscribeError),
}

#[derive(uniffi::Error, thiserror::Error, Debug)]
Expand Down
26 changes: 14 additions & 12 deletions bindings_ffi/src/mls.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ use xmtp_mls::storage::group_message::{SortDirection, StoredGroupMessageWithReac
use xmtp_mls::{
api::ApiClientWrapper,
builder::ClientBuilder,
client::{Client as MlsClient, ClientError},
client::Client as MlsClient,
groups::{
group_metadata::GroupMetadata,
group_mutable_metadata::MetadataField,
Expand All @@ -47,6 +47,7 @@ use xmtp_mls::{
group_message::{DeliveryStatus, GroupMessageKind, StoredGroupMessage},
EncryptedMessageStore, EncryptionKey, StorageOption,
},
subscriptions::SubscribeError,
AbortHandle, GenericStreamHandle, StreamHandle,
};
use xmtp_proto::xmtp::mls::message_contents::content_types::ReactionV2;
Expand Down Expand Up @@ -1770,15 +1771,13 @@ impl FfiConversation {
}

pub async fn stream(&self, message_callback: Arc<dyn FfiMessageCallback>) -> FfiStreamCloser {
let handle = MlsGroup::stream_with_callback(
self.inner.client.clone(),
self.id(),
self.inner.created_at_ns,
move |message| match message {
Ok(m) => message_callback.on_message(m.into()),
Err(e) => message_callback.on_error(e.into()),
},
);
let handle =
MlsGroup::stream_with_callback(self.inner.client.clone(), self.id(), move |message| {
match message {
Ok(m) => message_callback.on_message(m.into()),
Err(e) => message_callback.on_error(e.into()),
}
});

FfiStreamCloser::new(handle)
}
Expand Down Expand Up @@ -2065,7 +2064,7 @@ impl From<FfiConsent> for StoredConsentRecord {
}
}

type FfiHandle = Box<GenericStreamHandle<Result<(), ClientError>>>;
type FfiHandle = Box<GenericStreamHandle<Result<(), SubscribeError>>>;

#[derive(uniffi::Object, Clone)]
pub struct FfiStreamCloser {
Expand All @@ -2076,7 +2075,10 @@ pub struct FfiStreamCloser {

impl FfiStreamCloser {
pub fn new(
stream_handle: impl StreamHandle<StreamOutput = Result<(), ClientError>> + Send + Sync + 'static,
stream_handle: impl StreamHandle<StreamOutput = Result<(), SubscribeError>>
+ Send
+ Sync
+ 'static,
) -> Self {
Self {
abort_handle: Arc::new(stream_handle.abort_handle()),
Expand Down
1 change: 0 additions & 1 deletion bindings_node/src/conversation.rs
Original file line number Diff line number Diff line change
Expand Up @@ -559,7 +559,6 @@ impl Conversation {
let stream_closer = MlsGroup::stream_with_callback(
self.inner_client.clone(),
self.group_id.clone(),
self.created_at_ns,
move |message| {
tsfn.call(
message
Expand Down
8 changes: 4 additions & 4 deletions bindings_node/src/streams.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,13 @@ use napi::bindgen_prelude::Error;
use std::sync::Arc;
use tokio::sync::Mutex;
use xmtp_mls::{
client::ClientError, AbortHandle, GenericStreamHandle, StreamHandle as XmtpStreamHandle,
StreamHandleError,
subscriptions::SubscribeError, AbortHandle, GenericStreamHandle,
StreamHandle as XmtpStreamHandle, StreamHandleError,
};

use napi_derive::napi;

type StreamHandle = Box<GenericStreamHandle<Result<(), ClientError>>>;
type StreamHandle = Box<GenericStreamHandle<Result<(), SubscribeError>>>;

#[napi]
pub struct StreamCloser {
Expand All @@ -18,7 +18,7 @@ pub struct StreamCloser {

impl StreamCloser {
pub fn new(
handle: impl XmtpStreamHandle<StreamOutput = Result<(), ClientError>> + Send + Sync + 'static,
handle: impl XmtpStreamHandle<StreamOutput = Result<(), SubscribeError>> + Send + Sync + 'static,
) -> Self {
let abort = handle.abort_handle();
Self {
Expand Down
6 changes: 3 additions & 3 deletions common/src/test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -73,9 +73,9 @@ pub fn logger() {
use tracing_subscriber::EnvFilter;

INIT.get_or_init(|| {
let filter = EnvFilter::builder()
.parse_lossy("xmtp_mls::subscriptions=TRACE,xmtp_api_http=TRACE,xmtp_common=TRACE,wasm_streams=TRACE,reqwest=TRACE");
// .with_default_directive(tracing::metadata::LevelFilter::DEBUG.into())
let filter = EnvFilter::builder().parse_lossy("xmtp_mls::subscriptions=debug");
// .parse_lossy("xmtp_mls::subscriptions=TRACE,xmtp_api_http=TRACE,xmtp_common=TRACE,wasm_streams=TRACE,reqwest=TRACE");
// .with_default_directive(tracing::metadata::LevelFilter::TRACE.into());

tracing_subscriber::registry()
.with(tracing_wasm::WASMLayer::default())
Expand Down
25 changes: 6 additions & 19 deletions common/src/wasm.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
use std::{pin::Pin, task::Poll, future::Future};
use futures::{Stream, FutureExt, StreamExt};
use futures::{FutureExt, Stream, StreamExt};
use std::{future::Future, pin::Pin, task::Poll};

#[cfg(target_arch = "wasm32")]
use wasm_bindgen::prelude::*;
Expand All @@ -23,7 +23,10 @@ pub struct StreamWrapper<'a, I> {
impl<'a, I> Stream for StreamWrapper<'a, I> {
type Item = I;

fn poll_next(mut self: Pin<&mut Self>, cx: &mut std::task::Context<'_>) -> Poll<Option<Self::Item>> {
fn poll_next(
mut self: Pin<&mut Self>,
cx: &mut std::task::Context<'_>,
) -> Poll<Option<Self::Item>> {
let inner = &mut self.inner;
futures::pin_mut!(inner);
inner.as_mut().poll_next(cx)
Expand Down Expand Up @@ -106,19 +109,3 @@ pub async fn yield_() {
pub async fn yield_() {
crate::time::sleep(crate::time::Duration::from_millis(100)).await;
}

#[cfg(target_arch = "wasm32")]
mod inner {
use super::*;

#[wasm_bindgen]
extern "C" {
#[wasm_bindgen (extends = js_sys::Object, js_name = Scheduler, typescript_type = "Scheduler")]
pub type Scheduler;

#[wasm_bindgen(method, structural, js_class = "Scheduler", js_name = yield)]
pub fn r#yield(this: &Scheduler) -> js_sys::Promise;
}
}
#[cfg(target_arch = "wasm32")]
use inner::*;
2 changes: 1 addition & 1 deletion dev/test-wasm-interactive
Original file line number Diff line number Diff line change
Expand Up @@ -15,4 +15,4 @@ WASM_BINDGEN_SPLIT_LINKED_MODULES=1 \
WASM_BINDGEN_TEST_ONLY_WEB=1 \
NO_HEADLESS=1 \
cargo test --target wasm32-unknown-unknown --release \
-p $PACKAGE -- subscriptions::stream_conversations::test::test_stream_welcomes
-p $PACKAGE -- subscriptions::
55 changes: 27 additions & 28 deletions xmtp_api_http/src/http_stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ use serde_json::Deserializer;
use std::{
marker::PhantomData,
pin::Pin,
task::{Context, Poll, ready},
task::{ready, Context, Poll},
};
use xmtp_common::StreamWrapper;
use xmtp_proto::{Error, ErrorKind};
Expand Down Expand Up @@ -49,15 +49,11 @@ where
use Poll::*;
let this = self.as_mut().project();
let response = ready!(this.inner.poll(cx));
tracing::info!("ESTABLISH READY");
let stream = response
.inspect_err(|e| {
tracing::error!(
"Error during http subscription with grpc http gateway {e}"
);
})
.map_err(|_| Error::new(ErrorKind::SubscribeError))?;
tracing::info!("Calling bytes stream!");
.inspect_err(|e| {
tracing::error!("Error during http subscription with grpc http gateway {e}");
})
.map_err(|_| Error::new(ErrorKind::SubscribeError))?;
Ready(Ok(StreamWrapper::new(stream.bytes_stream())))
}
}
Expand Down Expand Up @@ -89,13 +85,13 @@ where
.inspect_err(|e| tracing::error!("Error in http stream to grpc gateway {e}"))
.map_err(|_| Error::new(ErrorKind::SubscribeError))?;
let item = Self::on_bytes(bytes, this.remaining)?.pop();
if let None = item {
if item.is_none() {
self.poll_next(cx)
} else {
Ready(Some(Ok(item.expect("handled none;"))))
}
},
None => Ready(None)
}
None => Ready(None),
}
}
}
Expand All @@ -119,7 +115,6 @@ where
for<'de> R: Deserialize<'de> + DeserializeOwned + Send,
{
fn on_bytes(bytes: bytes::Bytes, remaining: &mut Vec<u8>) -> Result<Vec<R>, Error> {
tracing::info!("BYTES: {:x}", bytes);
let bytes = &[remaining.as_ref(), bytes.as_ref()].concat();
let de = Deserializer::from_slice(bytes);
let mut deser_stream = de.into_iter::<GrpcResponse<R>>();
Expand All @@ -138,7 +133,6 @@ where
Err(e) => {
if e.is_eof() {
*remaining = (&**bytes)[deser_stream.byte_offset()..].to_vec();
tracing::debug!("IS EOF");
break;
} else {
return Err(Error::new(ErrorKind::MlsError).with(e.to_string()));
Expand All @@ -147,6 +141,10 @@ where
Ok(GrpcResponse::Empty {}) => continue,
}
}

if items.len() > 1 {
tracing::warn!("more than one item deserialized from http stream");
}
Ok(items)
}
}
Expand Down Expand Up @@ -179,7 +177,9 @@ where
let id = xmtp_common::rand_string::<12>();
tracing::info!("new http stream id={}", &id);
Self {
state: HttpStreamState::NotStarted { future: HttpStreamEstablish::new(request) },
state: HttpStreamState::NotStarted {
future: HttpStreamEstablish::new(request),
},
id,
}
}
Expand All @@ -197,22 +197,21 @@ where
cx: &mut std::task::Context<'_>,
) -> std::task::Poll<Option<Self::Item>> {
use ProjectHttpStream::*;
tracing::info!("Polling http stream id={}", &self.id);
tracing::trace!("Polling http stream id={}", &self.id);
let mut this = self.as_mut().project();
match this.state.as_mut().project() {
NotStarted { future } => {
let stream = ready!(future.poll(cx))?;
tracing::info!("Ready TOP LEVEL");
this.state.set(HttpStreamState::Started { stream: HttpPostStream::new(stream)});
tracing::info!("Stream {} ready, polling for the first time...", &self.id);
this.state.set(HttpStreamState::Started {
stream: HttpPostStream::new(stream),
});
tracing::debug!("Stream {} ready, polling for the first time...", &self.id);
self.poll_next(cx)
},
}
Started { stream } => {
let res = stream.poll_next(cx);
if let Poll::Ready(_) = res {
tracing::info!("stream id={} ready with item", &self.id);
}
res
let item = ready!(stream.poll_next(cx));
tracing::debug!("stream id={} ready with item", &self.id);
Poll::Ready(item)
}
}
}
Expand All @@ -221,8 +220,8 @@ where
impl<'a, F, R> std::fmt::Debug for HttpStream<'a, F, R> {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> Result<(), std::fmt::Error> {
match self.state {
HttpStreamState::NotStarted{..} => write!(f, "not started"),
HttpStreamState::Started{..} => write!(f, "started"),
HttpStreamState::NotStarted { .. } => write!(f, "not started"),
HttpStreamState::Started { .. } => write!(f, "started"),
}
}
}
Expand Down Expand Up @@ -256,7 +255,7 @@ where
for<'de> R: Deserialize<'de> + DeserializeOwned + Send + 'static,
{
async fn establish(&mut self) {
tracing::info!("Establishing...");
tracing::debug!("establishing new http stream {}...", self.id);
// we need to poll the future once to progress the future state &
// establish the initial POST request.
// It should always be pending
Expand Down
1 change: 1 addition & 0 deletions xmtp_api_http/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -166,6 +166,7 @@ impl XmtpMlsClient for XmtpHttpApiClient {
}

async fn send_group_messages(&self, request: SendGroupMessagesRequest) -> Result<(), Error> {
tracing::info!("SENDING GRP MSG");
let res = self
.http_client
.post(self.endpoint(ApiEndpoints::SEND_GROUP_MESSAGES))
Expand Down
Loading

0 comments on commit bdf2694

Please sign in to comment.