Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix http stream with stream all messages #1510

Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 1 addition & 2 deletions .github/workflows/test-webassembly.yml
Original file line number Diff line number Diff line change
Expand Up @@ -39,8 +39,7 @@ jobs:
- name: test with chrome
run: |
cargo test --release --target wasm32-unknown-unknown -p xmtp_mls -p xmtp_id -p xmtp_api_http -p xmtp_cryptography -- \
--skip xmtp_mls::subscriptions --skip xmtp_mls::groups::subscriptions \
--skip xmtp_mls::storage::encrypted_store::group_message::tests::it_cannot_insert_message_without_group \
-skip xmtp_mls::storage::encrypted_store::group_message::tests::it_cannot_insert_message_without_group \
--skip xmtp_mls::groups::tests::process_messages_abort_on_retryable_error \
--skip xmtp_mls::storage::encrypted_store::group::tests::test_find_groups \
--skip xmtp_mls::storage::encrypted_store::group::tests::test_installations_last_checked_is_updated
Expand Down
1 change: 1 addition & 0 deletions .vscode/settings.json
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
],
"rust-analyzer.procMacro.attributes.enable": true,
"rust-analyzer.procMacro.ignored": {
"tracing": ["instrument"],
"async-trait": ["async_trait"],
"napi-derive": ["napi"],
"async-recursion": ["async_recursion"],
Expand Down
25 changes: 14 additions & 11 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

5 changes: 5 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,8 @@ wasm-bindgen-test = "0.3.50"
web-sys = "0.3"
zeroize = "1.8"
pin-project-lite = "0.2"
reqwest = { version = "0.12.12", features = ["json", "stream"] }
bytes = "1.9"

# Internal Crate Dependencies
xmtp_api_grpc = { path = "xmtp_api_grpc" }
Expand Down Expand Up @@ -156,3 +158,6 @@ diesel = { git = "https://github.com/diesel-rs/diesel", branch = "master" }
diesel_derives = { git = "https://github.com/diesel-rs/diesel", branch = "master" }
diesel_migrations = { git = "https://github.com/diesel-rs/diesel", branch = "master" }
sqlite-web = { git = "https://github.com/xmtp/sqlite-web-rs", branch = "main" }
reqwest = { path = "/Users/insipx/Projects/seanmonster/reqwest" }
wasm-streams = { path = "/Users/insipx/Projects/wasm-streams" }

2 changes: 2 additions & 0 deletions bindings_ffi/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,8 @@ pub enum GenericError {
DeviceSync(#[from] xmtp_mls::groups::device_sync::DeviceSyncError),
#[error(transparent)]
Identity(#[from] xmtp_mls::identity::IdentityError),
#[error(transparent)]
Subscription(#[from] xmtp_mls::subscriptions::SubscribeError),
}

#[derive(uniffi::Error, thiserror::Error, Debug)]
Expand Down
26 changes: 14 additions & 12 deletions bindings_ffi/src/mls.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ use xmtp_mls::storage::group_message::{SortDirection, StoredGroupMessageWithReac
use xmtp_mls::{
api::ApiClientWrapper,
builder::ClientBuilder,
client::{Client as MlsClient, ClientError},
client::Client as MlsClient,
groups::{
group_metadata::GroupMetadata,
group_mutable_metadata::MetadataField,
Expand All @@ -47,6 +47,7 @@ use xmtp_mls::{
group_message::{DeliveryStatus, GroupMessageKind, StoredGroupMessage},
EncryptedMessageStore, EncryptionKey, StorageOption,
},
subscriptions::SubscribeError,
AbortHandle, GenericStreamHandle, StreamHandle,
};
use xmtp_proto::xmtp::mls::message_contents::content_types::ReactionV2;
Expand Down Expand Up @@ -1770,15 +1771,13 @@ impl FfiConversation {
}

pub async fn stream(&self, message_callback: Arc<dyn FfiMessageCallback>) -> FfiStreamCloser {
let handle = MlsGroup::stream_with_callback(
self.inner.client.clone(),
self.id(),
self.inner.created_at_ns,
move |message| match message {
Ok(m) => message_callback.on_message(m.into()),
Err(e) => message_callback.on_error(e.into()),
},
);
let handle =
MlsGroup::stream_with_callback(self.inner.client.clone(), self.id(), move |message| {
match message {
Ok(m) => message_callback.on_message(m.into()),
Err(e) => message_callback.on_error(e.into()),
}
});

FfiStreamCloser::new(handle)
}
Expand Down Expand Up @@ -2065,7 +2064,7 @@ impl From<FfiConsent> for StoredConsentRecord {
}
}

type FfiHandle = Box<GenericStreamHandle<Result<(), ClientError>>>;
type FfiHandle = Box<GenericStreamHandle<Result<(), SubscribeError>>>;

#[derive(uniffi::Object, Clone)]
pub struct FfiStreamCloser {
Expand All @@ -2076,7 +2075,10 @@ pub struct FfiStreamCloser {

impl FfiStreamCloser {
pub fn new(
stream_handle: impl StreamHandle<StreamOutput = Result<(), ClientError>> + Send + Sync + 'static,
stream_handle: impl StreamHandle<StreamOutput = Result<(), SubscribeError>>
+ Send
+ Sync
+ 'static,
) -> Self {
Self {
abort_handle: Arc::new(stream_handle.abort_handle()),
Expand Down
1 change: 0 additions & 1 deletion bindings_node/src/conversation.rs
Original file line number Diff line number Diff line change
Expand Up @@ -559,7 +559,6 @@ impl Conversation {
let stream_closer = MlsGroup::stream_with_callback(
self.inner_client.clone(),
self.group_id.clone(),
self.created_at_ns,
move |message| {
tsfn.call(
message
Expand Down
8 changes: 4 additions & 4 deletions bindings_node/src/streams.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,13 @@ use napi::bindgen_prelude::Error;
use std::sync::Arc;
use tokio::sync::Mutex;
use xmtp_mls::{
client::ClientError, AbortHandle, GenericStreamHandle, StreamHandle as XmtpStreamHandle,
StreamHandleError,
subscriptions::SubscribeError, AbortHandle, GenericStreamHandle,
StreamHandle as XmtpStreamHandle, StreamHandleError,
};

use napi_derive::napi;

type StreamHandle = Box<GenericStreamHandle<Result<(), ClientError>>>;
type StreamHandle = Box<GenericStreamHandle<Result<(), SubscribeError>>>;

#[napi]
pub struct StreamCloser {
Expand All @@ -18,7 +18,7 @@ pub struct StreamCloser {

impl StreamCloser {
pub fn new(
handle: impl XmtpStreamHandle<StreamOutput = Result<(), ClientError>> + Send + Sync + 'static,
handle: impl XmtpStreamHandle<StreamOutput = Result<(), SubscribeError>> + Send + Sync + 'static,
) -> Self {
let abort = handle.abort_handle();
Self {
Expand Down
1 change: 1 addition & 0 deletions common/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ console_error_panic_hook = { version = "0.1", optional = true }
js-sys.workspace = true
web-sys = { workspace = true, features = ["Window"] }
wasm-bindgen-futures.workspace = true
wasm-bindgen.workspace = true

[dev-dependencies]
thiserror.workspace = true
Expand Down
19 changes: 2 additions & 17 deletions common/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,11 +13,8 @@ pub mod bench;
pub mod retry;
pub use retry::*;

/// Global Marker trait for WebAssembly
#[cfg(target_arch = "wasm32")]
pub trait Wasm {}
#[cfg(target_arch = "wasm32")]
impl<T> Wasm for T {}
pub mod wasm;
pub use wasm::*;

pub mod time;

Expand All @@ -36,15 +33,3 @@ pub fn rand_array<const N: usize>() -> [u8; N] {
crypto_utils::rng().fill_bytes(&mut buffer);
buffer
}

/// Yield back control to the async runtime
#[cfg(not(target_arch = "wasm32"))]
pub async fn yield_() {
tokio::task::yield_now().await
}

/// Yield back control to the async runtime
#[cfg(target_arch = "wasm32")]
pub async fn yield_() {
time::sleep(crate::time::Duration::from_millis(1)).await;
}
6 changes: 3 additions & 3 deletions common/src/test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -73,9 +73,9 @@ pub fn logger() {
use tracing_subscriber::EnvFilter;

INIT.get_or_init(|| {
let filter = EnvFilter::builder()
.with_default_directive(tracing::metadata::LevelFilter::DEBUG.into())
.from_env_lossy();
let filter = EnvFilter::builder().parse_lossy("xmtp_mls::subscriptions=debug");
// .parse_lossy("xmtp_mls::subscriptions=TRACE,xmtp_api_http=TRACE,xmtp_common=TRACE,wasm_streams=TRACE,reqwest=TRACE");
// .with_default_directive(tracing::metadata::LevelFilter::TRACE.into());

tracing_subscriber::registry()
.with(tracing_wasm::WASMLayer::default())
Expand Down
1 change: 0 additions & 1 deletion common/src/time.rs
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,6 @@ pub async fn sleep(duration: Duration) {
let worker = js_sys::global()
.dyn_into::<WorkerGlobalScope>()
.expect("xmtp_mls should always act in worker in browser");

worker
.set_timeout_with_callback_and_timeout_and_arguments_0(
&resolve,
Expand Down
Loading
Loading