Skip to content

Commit

Permalink
fix missed messages
Browse files Browse the repository at this point in the history
  • Loading branch information
insipx committed Jan 27, 2025
1 parent bae85e7 commit c03b9f4
Show file tree
Hide file tree
Showing 16 changed files with 270 additions and 792 deletions.
22 changes: 13 additions & 9 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 0 additions & 2 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -158,6 +158,4 @@ diesel = { git = "https://github.com/diesel-rs/diesel", branch = "master" }
diesel_derives = { git = "https://github.com/diesel-rs/diesel", branch = "master" }
diesel_migrations = { git = "https://github.com/diesel-rs/diesel", branch = "master" }
sqlite-web = { git = "https://github.com/xmtp/sqlite-web-rs", branch = "main" }
reqwest = { path = "/Users/insipx/Projects/seanmonster/reqwest" }
wasm-streams = { path = "/Users/insipx/Projects/wasm-streams" }

6 changes: 3 additions & 3 deletions common/src/test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -73,9 +73,9 @@ pub fn logger() {
use tracing_subscriber::EnvFilter;

INIT.get_or_init(|| {
let filter = EnvFilter::builder()
.parse_lossy("xmtp_mls::subscriptions=TRACE,xmtp_api_http=TRACE,xmtp_common=TRACE,wasm_streams=TRACE,reqwest=TRACE");
// .with_default_directive(tracing::metadata::LevelFilter::DEBUG.into())
let filter = EnvFilter::builder().parse_lossy("xmtp_mls::subscriptions=debug");
// .parse_lossy("xmtp_mls::subscriptions=TRACE,xmtp_api_http=TRACE,xmtp_common=TRACE,wasm_streams=TRACE,reqwest=TRACE");
// .with_default_directive(tracing::metadata::LevelFilter::TRACE.into());

tracing_subscriber::registry()
.with(tracing_wasm::WASMLayer::default())
Expand Down
25 changes: 6 additions & 19 deletions common/src/wasm.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
use std::{pin::Pin, task::Poll, future::Future};
use futures::{Stream, FutureExt, StreamExt};
use futures::{FutureExt, Stream, StreamExt};
use std::{future::Future, pin::Pin, task::Poll};

#[cfg(target_arch = "wasm32")]
use wasm_bindgen::prelude::*;
Expand All @@ -23,7 +23,10 @@ pub struct StreamWrapper<'a, I> {
impl<'a, I> Stream for StreamWrapper<'a, I> {
type Item = I;

fn poll_next(mut self: Pin<&mut Self>, cx: &mut std::task::Context<'_>) -> Poll<Option<Self::Item>> {
fn poll_next(
mut self: Pin<&mut Self>,
cx: &mut std::task::Context<'_>,
) -> Poll<Option<Self::Item>> {
let inner = &mut self.inner;
futures::pin_mut!(inner);
inner.as_mut().poll_next(cx)
Expand Down Expand Up @@ -106,19 +109,3 @@ pub async fn yield_() {
pub async fn yield_() {
crate::time::sleep(crate::time::Duration::from_millis(100)).await;
}

#[cfg(target_arch = "wasm32")]
mod inner {
use super::*;

#[wasm_bindgen]
extern "C" {
#[wasm_bindgen (extends = js_sys::Object, js_name = Scheduler, typescript_type = "Scheduler")]
pub type Scheduler;

#[wasm_bindgen(method, structural, js_class = "Scheduler", js_name = yield)]
pub fn r#yield(this: &Scheduler) -> js_sys::Promise;
}
}
#[cfg(target_arch = "wasm32")]
use inner::*;
2 changes: 1 addition & 1 deletion dev/test-wasm-interactive
Original file line number Diff line number Diff line change
Expand Up @@ -15,4 +15,4 @@ WASM_BINDGEN_SPLIT_LINKED_MODULES=1 \
WASM_BINDGEN_TEST_ONLY_WEB=1 \
NO_HEADLESS=1 \
cargo test --target wasm32-unknown-unknown --release \
-p $PACKAGE -- subscriptions::stream_conversations::test::test_stream_welcomes
-p $PACKAGE -- subscriptions::
53 changes: 26 additions & 27 deletions xmtp_api_http/src/http_stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ use serde_json::Deserializer;
use std::{
marker::PhantomData,
pin::Pin,
task::{Context, Poll, ready},
task::{ready, Context, Poll},
};
use xmtp_common::StreamWrapper;
use xmtp_proto::{Error, ErrorKind};
Expand Down Expand Up @@ -49,15 +49,11 @@ where
use Poll::*;
let this = self.as_mut().project();
let response = ready!(this.inner.poll(cx));
tracing::info!("ESTABLISH READY");
let stream = response
.inspect_err(|e| {
tracing::error!(
"Error during http subscription with grpc http gateway {e}"
);
})
.map_err(|_| Error::new(ErrorKind::SubscribeError))?;
tracing::info!("Calling bytes stream!");
.inspect_err(|e| {
tracing::error!("Error during http subscription with grpc http gateway {e}");
})
.map_err(|_| Error::new(ErrorKind::SubscribeError))?;
Ready(Ok(StreamWrapper::new(stream.bytes_stream())))
}
}
Expand Down Expand Up @@ -89,13 +85,13 @@ where
.inspect_err(|e| tracing::error!("Error in http stream to grpc gateway {e}"))
.map_err(|_| Error::new(ErrorKind::SubscribeError))?;
let item = Self::on_bytes(bytes, this.remaining)?.pop();
if let None = item {
if item.is_none() {
self.poll_next(cx)
} else {
Ready(Some(Ok(item.expect("handled none;"))))
}
},
None => Ready(None)
}
None => Ready(None),
}
}
}
Expand All @@ -119,7 +115,6 @@ where
for<'de> R: Deserialize<'de> + DeserializeOwned + Send,
{
fn on_bytes(bytes: bytes::Bytes, remaining: &mut Vec<u8>) -> Result<Vec<R>, Error> {
tracing::info!("BYTES: {:x}", bytes);
let bytes = &[remaining.as_ref(), bytes.as_ref()].concat();
let de = Deserializer::from_slice(bytes);
let mut deser_stream = de.into_iter::<GrpcResponse<R>>();
Expand All @@ -138,7 +133,6 @@ where
Err(e) => {
if e.is_eof() {
*remaining = (&**bytes)[deser_stream.byte_offset()..].to_vec();
tracing::debug!("IS EOF");
break;
} else {
return Err(Error::new(ErrorKind::MlsError).with(e.to_string()));
Expand All @@ -147,6 +141,10 @@ where
Ok(GrpcResponse::Empty {}) => continue,
}
}

if items.len() > 1 {
tracing::warn!("more than one item deserialized from http stream");
}
Ok(items)
}
}
Expand Down Expand Up @@ -179,7 +177,9 @@ where
let id = xmtp_common::rand_string::<12>();
tracing::info!("new http stream id={}", &id);
Self {
state: HttpStreamState::NotStarted { future: HttpStreamEstablish::new(request) },
state: HttpStreamState::NotStarted {
future: HttpStreamEstablish::new(request),
},
id,
}
}
Expand All @@ -197,22 +197,21 @@ where
cx: &mut std::task::Context<'_>,
) -> std::task::Poll<Option<Self::Item>> {
use ProjectHttpStream::*;
tracing::info!("Polling http stream id={}", &self.id);
tracing::trace!("Polling http stream id={}", &self.id);
let mut this = self.as_mut().project();
match this.state.as_mut().project() {
NotStarted { future } => {
let stream = ready!(future.poll(cx))?;
tracing::info!("Ready TOP LEVEL");
this.state.set(HttpStreamState::Started { stream: HttpPostStream::new(stream)});
tracing::info!("Stream {} ready, polling for the first time...", &self.id);
this.state.set(HttpStreamState::Started {
stream: HttpPostStream::new(stream),
});
tracing::debug!("Stream {} ready, polling for the first time...", &self.id);
self.poll_next(cx)
},
}
Started { stream } => {
let res = stream.poll_next(cx);
if let Poll::Ready(_) = res {
tracing::info!("stream id={} ready with item", &self.id);
}
res
let item = ready!(stream.poll_next(cx));
tracing::debug!("stream id={} ready with item", &self.id);
Poll::Ready(item)
}
}
}
Expand All @@ -221,8 +220,8 @@ where
impl<'a, F, R> std::fmt::Debug for HttpStream<'a, F, R> {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> Result<(), std::fmt::Error> {
match self.state {
HttpStreamState::NotStarted{..} => write!(f, "not started"),
HttpStreamState::Started{..} => write!(f, "started"),
HttpStreamState::NotStarted { .. } => write!(f, "not started"),
HttpStreamState::Started { .. } => write!(f, "started"),
}
}
}
Expand Down
Loading

0 comments on commit c03b9f4

Please sign in to comment.