Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix http stream with stream all messages #1510

Closed
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
wipo
insipx committed Jan 27, 2025
commit 48b8e094f1483fba35d18ea52908833dfe0619b3
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
@@ -101,6 +101,7 @@ web-sys = "0.3"
zeroize = "1.8"
pin-project-lite = "0.2"
reqwest = { version = "0.12.5", features = ["json", "stream"] }
bytes = "1.9"

# Internal Crate Dependencies
xmtp_api_grpc = { path = "xmtp_api_grpc" }
10 changes: 7 additions & 3 deletions common/src/wasm.rs
Original file line number Diff line number Diff line change
@@ -1,13 +1,13 @@
use std::{pin::Pin, task::Poll, future::Future, sync::atomic::{Ordering, AtomicU32, AtomicBool}};
use std::{pin::Pin, task::Poll, future::Future};
use futures::{Stream, FutureExt, StreamExt};

#[cfg(target_arch = "wasm32")]
use wasm_bindgen::prelude::*;

#[cfg(target_arch = "wasm32")]
static LOCAL_EXECUTOR_TICKS: AtomicU32 = AtomicU32::new(0u32);
const YIELD_AFTER: u32 = 10_000;
static NEEDS_YIELD: AtomicBool = AtomicBool::new(true);
// const YIELD_AFTER: u32 = 10_000;
// static NEEDS_YIELD: AtomicBool = AtomicBool::new(true);

/// Global Marker trait for WebAssembly
#[cfg(target_arch = "wasm32")]
@@ -158,6 +158,7 @@ pub fn ensure_browser_fairness() {
pub struct Fairness;

impl Fairness {
#[cfg(target_arch = "wasm32")]
pub fn wake() {
tracing::info!("NEEDS YIELD {}", NEEDS_YIELD.load(Ordering::SeqCst));
let ticks = LOCAL_EXECUTOR_TICKS.load(Ordering::SeqCst);
@@ -167,4 +168,7 @@ impl Fairness {
// wraps on overflow
LOCAL_EXECUTOR_TICKS.fetch_add(1, Ordering::SeqCst);
}

#[cfg(not(target_arch = "wasm32"))]
pub fn wake() {}
}
148 changes: 68 additions & 80 deletions xmtp_api_http/src/http_stream.rs
Original file line number Diff line number Diff line change
@@ -12,9 +12,9 @@ use serde_json::Deserializer;
use std::{
marker::PhantomData,
pin::Pin,
task::{Context, Poll},
task::{Context, Poll, ready},
};
use xmtp_common::{StreamWrapper, Fairness};
use xmtp_common::StreamWrapper;
use xmtp_proto::{Error, ErrorKind};

#[derive(Deserialize, Serialize, Debug)]
@@ -48,24 +48,17 @@ where
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
use Poll::*;
let this = self.as_mut().project();
match this.inner.poll(cx) {
Ready(response) => {
tracing::info!("ESTABLISH READY");
let stream = response
.inspect_err(|e| {
tracing::error!(
"Error during http subscription with grpc http gateway {e}"
);
})
.map_err(|_| Error::new(ErrorKind::SubscribeError))?;
tracing::info!("Calling bytes stream!");
Ready(Ok(StreamWrapper::new(stream.bytes_stream())))
}
Pending => {
Fairness::wake();
Pending
}
}
let response = ready!(this.inner.poll(cx));
tracing::info!("ESTABLISH READY");
let stream = response
.inspect_err(|e| {
tracing::error!(
"Error during http subscription with grpc http gateway {e}"
);
})
.map_err(|_| Error::new(ErrorKind::SubscribeError))?;
tracing::info!("Calling bytes stream!");
Ready(Ok(StreamWrapper::new(stream.bytes_stream())))
}
}

@@ -84,34 +77,25 @@ where
type Item = Result<R, Error>;

fn poll_next(
self: std::pin::Pin<&mut Self>,
mut self: std::pin::Pin<&mut Self>,
cx: &mut std::task::Context<'_>,
) -> Poll<Option<Self::Item>> {
use Poll::*;
let this = self.project();
match this.http.poll_next(cx) {
Ready(Some(bytes)) => {
tracing::info!("READY THIS IS WHAT WE WANT");
let mut this = self.as_mut().project();
let item = ready!(this.http.as_mut().poll_next(cx));
match item {
Some(bytes) => {
let bytes = bytes
.inspect_err(|e| tracing::error!("Error in http stream to grpc gateway {e}"))
.map_err(|_| Error::new(ErrorKind::SubscribeError))?;
match Self::on_bytes(bytes, this.remaining)? {
None => {
tracing::info!("ON BYTES NONE PENDING");
xmtp_common::Fairness::wake();
Pending
},
Some(r) => {
tracing::info!("READY");
Ready(Some(Ok(r)))
},
let item = Self::on_bytes(bytes, this.remaining)?.pop();
if let None = item {
self.poll_next(cx)
} else {
Ready(Some(Ok(item.expect("handled none;"))))
}
}
Ready(None) => Ready(None),
Pending => {
xmtp_common::Fairness::wake();
Pending
},
None => Ready(None)
}
}
}
@@ -134,41 +118,50 @@ impl<R> HttpPostStream<'_, R>
where
for<'de> R: Deserialize<'de> + DeserializeOwned + Send,
{
fn on_bytes(bytes: bytes::Bytes, remaining: &mut Vec<u8>) -> Result<Option<R>, Error> {
fn on_bytes(bytes: bytes::Bytes, remaining: &mut Vec<u8>) -> Result<Vec<R>, Error> {
tracing::info!("BYTES: {:x}", bytes);
let bytes = &[remaining.as_ref(), bytes.as_ref()].concat();
let de = Deserializer::from_slice(bytes);
let mut deser_stream = de.into_iter::<GrpcResponse<R>>();
let mut items = Vec::new();
loop {
let item = deser_stream.next();
match item {
Some(Ok(GrpcResponse::Ok(response))) => return Ok(Some(response)),
Some(Ok(GrpcResponse::SubscriptionItem(item))) => return Ok(Some(item.result)),
Some(Ok(GrpcResponse::Err(e))) => {
if item.is_none() {
break;
}
match item.expect("checked for none;") {
Ok(GrpcResponse::Ok(response)) => items.push(response),
Ok(GrpcResponse::SubscriptionItem(item)) => items.push(item.result),
Ok(GrpcResponse::Err(e)) => {
return Err(Error::new(ErrorKind::MlsError).with(e.message));
}
Some(Err(e)) => {
Err(e) => {
if e.is_eof() {
*remaining = (&**bytes)[deser_stream.byte_offset()..].to_vec();
tracing::debug!("IS EOF");
return Ok(None);
break;
} else {
return Err(Error::new(ErrorKind::MlsError).with(e.to_string()));
}
}
Some(Ok(GrpcResponse::Empty {})) => continue,
None => {
tracing::debug!("IS NONE");
return Ok(None)
},
Ok(GrpcResponse::Empty {}) => continue,
}
}
Ok(items)
}
}

pin_project! {
struct HttpStream<'a, F, R> {
#[pin] state: HttpStreamState<'a, F, R>,
id: String
}
}

pin_project! {
/// The establish future for the http post stream
#[project = ProjectHttpStream]
enum HttpStream<'a, F, R> {
enum HttpStreamState<'a, F, R> {
NotStarted {
#[pin] future: HttpStreamEstablish<'a, F>,
},
@@ -183,7 +176,12 @@ where
F: Future<Output = Result<Response, reqwest::Error>>,
{
fn new(request: F) -> Self {
Self::NotStarted{ future: HttpStreamEstablish::new(request) }
let id = xmtp_common::rand_string::<12>();
tracing::info!("new http stream id={}", &id);
Self {
state: HttpStreamState::NotStarted { future: HttpStreamEstablish::new(request) },
id,
}
}
}

@@ -199,41 +197,32 @@ where
cx: &mut std::task::Context<'_>,
) -> std::task::Poll<Option<Self::Item>> {
use ProjectHttpStream::*;
use Poll::*;
let this = self.as_mut().project();
match this {
NotStarted { future } => match future.poll(cx) {
Ready(stream) => {
tracing::info!("READY TOP LEVEL");
self.set(Self::Started { stream: HttpPostStream::new(stream?) } );
// cx.waker().wake_by_ref();
// tracing::info!("POLLING STREAM NEXT");
self.poll_next(cx)
},
Pending => {
Fairness::wake();
cx.waker().wake_by_ref();
Pending
}
tracing::info!("Polling http stream id={}", &self.id);
let mut this = self.as_mut().project();
match this.state.as_mut().project() {
NotStarted { future } => {
let stream = ready!(future.poll(cx))?;
tracing::info!("Ready TOP LEVEL");
this.state.set(HttpStreamState::Started { stream: HttpPostStream::new(stream)});
tracing::info!("Stream {} ready, polling for the first time...", &self.id);
self.poll_next(cx)
},
Started { stream } => {
Fairness::wake();
let p = stream.poll_next(cx);
if let Pending = p {
Fairness::wake();
cx.waker().wake_by_ref();
let res = stream.poll_next(cx);
if let Poll::Ready(_) = res {
tracing::info!("stream id={} ready with item", &self.id);
}
p
res
}
}
}
}

impl<'a, F, R> std::fmt::Debug for HttpStream<'a, F, R> {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> Result<(), std::fmt::Error> {
match self {
Self::NotStarted{..} => write!(f, "not started"),
Self::Started{..} => write!(f, "started"),
match self.state {
HttpStreamState::NotStarted{..} => write!(f, "not started"),
HttpStreamState::Started{..} => write!(f, "started"),
}
}
}
@@ -320,7 +309,6 @@ where
T: Serialize + 'static,
R: DeserializeOwned + Send + 'static,
{
tracing::info!("CREATING STREAM");
let request = http_client.post(endpoint).json(&request).send();
let mut http = HttpStream::new(request);
http.establish().await;
21 changes: 21 additions & 0 deletions xmtp_api_http/src/lib.rs
Original file line number Diff line number Diff line change
@@ -263,7 +263,28 @@ impl XmtpMlsStreams for XmtpHttpApiClient {
&self,
request: SubscribeGroupMessagesRequest,
) -> Result<Self::GroupMessageStream<'_>, Error> {
use xmtp_proto::xmtp::mls::api::v1::subscribe_group_messages_request::Filter;
use xmtp_proto::xmtp::mls::api::v1::PagingInfo;
use xmtp_proto::xmtp::mls::api::v1::SortDirection;
tracing::debug!("subscribe_group_messages");
tracing::info!("MESSAGES \n\n");
for Filter {
group_id,
id_cursor,
} in request.filters.iter()
{
// tracing::info!("Group {}, Cursor: {}", hex::encode(group_id), id_cursor);
let r = QueryGroupMessagesRequest {
group_id: group_id.clone(),
paging_info: Some(PagingInfo {
id_cursor: 3,
limit: 100,
direction: SortDirection::Ascending as i32,
}),
};
let messages = self.query_group_messages(r).await;
tracing::info!("\nQUERY GROUP MSG \n\n {:?}\n\n", messages);
}
Ok(create_grpc_stream::<_, GroupMessage>(
request,
self.endpoint(ApiEndpoints::SUBSCRIBE_GROUP_MESSAGES),
Loading