Skip to content

Commit

Permalink
cleanup logs
Browse files Browse the repository at this point in the history
  • Loading branch information
insipx committed Jan 28, 2025
1 parent acf4395 commit 38688bf
Show file tree
Hide file tree
Showing 17 changed files with 93 additions and 119 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/test-webassembly.yml
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ jobs:
- name: test with chrome
run: |
cargo test --release --target wasm32-unknown-unknown -p xmtp_mls -p xmtp_id -p xmtp_api_http -p xmtp_cryptography -- \
-skip xmtp_mls::storage::encrypted_store::group_message::tests::it_cannot_insert_message_without_group \
--skip xmtp_mls::storage::encrypted_store::group_message::tests::it_cannot_insert_message_without_group \
--skip xmtp_mls::groups::tests::process_messages_abort_on_retryable_error \
--skip xmtp_mls::storage::encrypted_store::group::tests::test_find_groups \
--skip xmtp_mls::storage::encrypted_store::group::tests::test_installations_last_checked_is_updated
Expand Down
7 changes: 3 additions & 4 deletions bindings_ffi/src/mls.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3122,10 +3122,9 @@ mod tests {
.await
.unwrap();
message_callbacks.wait_for_delivery(None).await.unwrap();
message_callbacks.wait_for_delivery(None).await.unwrap();
assert_eq!(bo_provider.conn_ref().intents_published(), 4);

assert_eq!(message_callbacks.message_count(), 6);
assert_eq!(message_callbacks.message_count(), 5);

stream_messages.end_and_wait().await.unwrap();

Expand Down Expand Up @@ -4780,7 +4779,7 @@ mod tests {
// Verify the settings were applied
let group_from_db = alix_provider
.conn_ref()
.find_group(alix_group.id())
.find_group(&alix_group.id())
.unwrap();
assert_eq!(
group_from_db
Expand Down Expand Up @@ -4825,7 +4824,7 @@ mod tests {
// Verify disappearing settings are disabled
let group_from_db = alix_provider
.conn_ref()
.find_group(alix_group.id())
.find_group(&alix_group.id())
.unwrap();
assert_eq!(
group_from_db
Expand Down
4 changes: 2 additions & 2 deletions common/src/wasm.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ pub struct StreamWrapper<'a, I> {
inner: Pin<Box<dyn Stream<Item = I> + 'a>>,
}

impl<'a, I> Stream for StreamWrapper<'a, I> {
impl<I> Stream for StreamWrapper<'_, I> {
type Item = I;

fn poll_next(
Expand Down Expand Up @@ -66,7 +66,7 @@ pub struct FutureWrapper<'a, O> {
inner: Pin<Box<dyn Future<Output = O> + 'a>>,
}

impl<'a, O> Future for FutureWrapper<'a, O> {
impl<O> Future for FutureWrapper<'_, O> {
type Output = O;

fn poll(mut self: Pin<&mut Self>, cx: &mut std::task::Context<'_>) -> Poll<Self::Output> {
Expand Down
6 changes: 5 additions & 1 deletion dev/test-wasm
Original file line number Diff line number Diff line change
Expand Up @@ -6,4 +6,8 @@ WASM_BINDGEN_SPLIT_LINKED_MODULES=1 \
WASM_BINDGEN_TEST_TIMEOUT=120 \
CHROMEDRIVER="chromedriver" \
cargo test --target wasm32-unknown-unknown --release \
-p xmtp_mls -- xmtp_mls::subscriptions
-p xmtp_mls -p xmtp_id -p xmtp_api_http -p xmtp_cryptography -- \
--skip xmtp_mls::storage::encrypted_store::group_message::tests::it_cannot_insert_message_without_group \
--skip xmtp_mls::groups::tests::process_messages_abort_on_retryable_error \
--skip xmtp_mls::storage::encrypted_store::group::tests::test_find_groups \
--skip xmtp_mls::storage::encrypted_store::group::tests::test_installations_last_checked_is_updated
6 changes: 5 additions & 1 deletion dev/test-wasm-interactive
Original file line number Diff line number Diff line change
Expand Up @@ -15,4 +15,8 @@ WASM_BINDGEN_SPLIT_LINKED_MODULES=1 \
WASM_BINDGEN_TEST_ONLY_WEB=1 \
NO_HEADLESS=1 \
cargo test --target wasm32-unknown-unknown --release \
-p $PACKAGE -- subscriptions::
-p $PACKAGE -- subscriptions:: \
--skip xmtp_mls::storage::encrypted_store::group_message::tests::it_cannot_insert_message_without_group \
--skip xmtp_mls::groups::tests::process_messages_abort_on_retryable_error \
--skip xmtp_mls::storage::encrypted_store::group::tests::test_find_groups \
--skip xmtp_mls::storage::encrypted_store::group::tests::test_installations_last_checked_is_updated
27 changes: 13 additions & 14 deletions xmtp_api_http/src/http_stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ pin_project! {
}
}

impl<'a, F> HttpStreamEstablish<'a, F> {
impl<F> HttpStreamEstablish<'_, F> {
fn new(inner: F) -> Self {
Self {
inner,
Expand Down Expand Up @@ -66,7 +66,7 @@ pin_project! {
}
}

impl<'a, R> Stream for HttpPostStream<'a, R>
impl<R> Stream for HttpPostStream<'_, R>
where
for<'de> R: Send + Deserialize<'de>,
{
Expand All @@ -85,10 +85,10 @@ where
.inspect_err(|e| tracing::error!("Error in http stream to grpc gateway {e}"))
.map_err(|_| Error::new(ErrorKind::SubscribeError))?;
let item = Self::on_bytes(bytes, this.remaining)?.pop();
if item.is_none() {
self.poll_next(cx)
if let Some(item) = item {
Ready(Some(Ok(item)))
} else {
Ready(Some(Ok(item.expect("handled none;"))))
self.poll_next(cx)
}
}
None => Ready(None),
Expand All @@ -101,7 +101,6 @@ where
R: Send + 'static,
{
pub fn new(establish: StreamWrapper<'a, Result<bytes::Bytes, reqwest::Error>>) -> Self {
tracing::info!("New post stream");
Self {
http: establish,
remaining: Vec::new(),
Expand Down Expand Up @@ -169,7 +168,7 @@ pin_project! {
}
}

impl<'a, F, R> HttpStream<'a, F, R>
impl<F, R> HttpStream<'_, F, R>
where
F: Future<Output = Result<Response, reqwest::Error>>,
{
Expand All @@ -185,7 +184,7 @@ where
}
}

impl<'a, F, R> Stream for HttpStream<'a, F, R>
impl<F, R> Stream for HttpStream<'_, F, R>
where
F: Future<Output = Result<Response, reqwest::Error>>,
for<'de> R: Send + Deserialize<'de> + 'static,
Expand All @@ -205,19 +204,19 @@ where
this.state.set(HttpStreamState::Started {
stream: HttpPostStream::new(stream),
});
tracing::debug!("Stream {} ready, polling for the first time...", &self.id);
tracing::trace!("Stream {} ready, polling for the first time...", &self.id);
self.poll_next(cx)
}
Started { stream } => {
let item = ready!(stream.poll_next(cx));
tracing::debug!("stream id={} ready with item", &self.id);
tracing::trace!("stream id={} ready with item", &self.id);
Poll::Ready(item)
}
}
}
}

impl<'a, F, R> std::fmt::Debug for HttpStream<'a, F, R> {
impl<F, R> std::fmt::Debug for HttpStream<'_, F, R> {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> Result<(), std::fmt::Error> {
match self.state {
HttpStreamState::NotStarted { .. } => write!(f, "not started"),
Expand All @@ -227,7 +226,7 @@ impl<'a, F, R> std::fmt::Debug for HttpStream<'a, F, R> {
}

#[cfg(not(target_arch = "wasm32"))]
impl<'a, F, R> HttpStream<'a, F, R>
impl<F, R> HttpStream<'_, F, R>
where
F: Future<Output = Result<Response, reqwest::Error>> + Unpin,
for<'de> R: Deserialize<'de> + DeserializeOwned + Send + 'static,
Expand All @@ -241,7 +240,7 @@ where
let mut cx = std::task::Context::from_waker(&noop_waker);
// let mut this = Pin::new(self);
let mut this = Pin::new(self);
if let Poll::Ready(_) = this.poll_next_unpin(&mut cx) {
if this.poll_next_unpin(&mut cx).is_ready() {
tracing::error!("Stream ready before established");
unreachable!()
}
Expand All @@ -263,7 +262,7 @@ where
let mut cx = std::task::Context::from_waker(&noop_waker);
let mut this = unsafe { Pin::new_unchecked(self) };
if let Poll::Ready(_) = this.as_mut().poll_next(&mut cx) {
tracing::info!("stream ready before established...");
tracing::error!("stream ready before established...");
unreachable!()
}
}
Expand Down
1 change: 0 additions & 1 deletion xmtp_api_http/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -166,7 +166,6 @@ impl XmtpMlsClient for XmtpHttpApiClient {
}

async fn send_group_messages(&self, request: SendGroupMessagesRequest) -> Result<(), Error> {
tracing::info!("SENDING GRP MSG");
let res = self
.http_client
.post(self.endpoint(ApiEndpoints::SEND_GROUP_MESSAGES))
Expand Down
5 changes: 0 additions & 5 deletions xmtp_mls/logs.json

This file was deleted.

14 changes: 7 additions & 7 deletions xmtp_mls/src/api/mls.rs
Original file line number Diff line number Diff line change
Expand Up @@ -121,7 +121,7 @@ where
group_id: Id,
) -> Result<Option<GroupMessage>, ApiError> {
tracing::debug!(
group_id = hex::encode(&group_id),
group_id = hex::encode(group_id),
inbox_id = self.inbox_id,
"query latest group message"
);
Expand Down Expand Up @@ -304,10 +304,10 @@ where
Ok(())
}

pub(crate) async fn subscribe_group_messages<'a>(
&'a self,
pub(crate) async fn subscribe_group_messages(
&self,
filters: Vec<GroupFilter>,
) -> Result<<ApiClient as XmtpMlsStreams>::GroupMessageStream<'a>, ApiError>
) -> Result<<ApiClient as XmtpMlsStreams>::GroupMessageStream<'_>, ApiError>
where
ApiClient: XmtpMlsStreams,
{
Expand All @@ -319,11 +319,11 @@ where
.await
}

pub(crate) async fn subscribe_welcome_messages<'a>(
&'a self,
pub(crate) async fn subscribe_welcome_messages(
&self,
installation_key: &[u8],
id_cursor: Option<u64>,
) -> Result<<ApiClient as XmtpMlsStreams>::WelcomeMessageStream<'a>, ApiError>
) -> Result<<ApiClient as XmtpMlsStreams>::WelcomeMessageStream<'_>, ApiError>
where
ApiClient: XmtpMlsStreams,
{
Expand Down
2 changes: 1 addition & 1 deletion xmtp_mls/src/groups/mls_sync.rs
Original file line number Diff line number Diff line change
Expand Up @@ -742,7 +742,7 @@ where
installation_id = %self.client.installation_id(),
group_id = hex::encode(&self.group_id),
msg_id = envelope.id,
"Processing envelope with hash {:?}, id = {}",
"Processing envelope with hash {}, id = {}",
hex::encode(sha256(envelope.data.as_slice())),
envelope.id
);
Expand Down
4 changes: 2 additions & 2 deletions xmtp_mls/src/groups/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -396,7 +396,7 @@ impl<ScopedClient: ScopedGroupClient> MlsGroup<ScopedClient> {
))
} else {
tracing::error!("Failed to validate existence of group");
return Err(NotFound::GroupById(group_id).into());
Err(NotFound::GroupById(group_id).into())
}
}

Expand Down Expand Up @@ -615,7 +615,7 @@ impl<ScopedClient: ScopedGroupClient> MlsGroup<ScopedClient> {
dm_members,
),
ConversationType::Dm => {
validate_dm_group(&client, &mls_group, &added_by_inbox)?;
validate_dm_group(client, &mls_group, &added_by_inbox)?;
StoredGroup::new_from_welcome(
group_id.clone(),
now_ns(),
Expand Down
2 changes: 1 addition & 1 deletion xmtp_mls/src/groups/subscriptions.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ impl<ScopedClient: ScopedGroupClient> MlsGroup<ScopedClient> {
where
<ScopedClient as ScopedGroupClient>::ApiClient: XmtpMlsStreams + 'a,
{
Ok(StreamGroupMessages::new(&self.client, vec![self.group_id.clone().into()]).await?)
StreamGroupMessages::new(&self.client, vec![self.group_id.clone().into()]).await
}

pub fn stream_with_callback(
Expand Down
2 changes: 1 addition & 1 deletion xmtp_mls/src/groups/validated_commit.rs
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,7 @@ impl std::fmt::Debug for CommitParticipant {
} = self;
write!(f, "CommitParticipant {{ inbox_id={}, installation_id={}, is_creator={}, is_admin={}, is_super_admin={} }}",
inbox_id,
hex::encode(&installation_id),
hex::encode(installation_id),
is_creator,
is_admin,
is_super_admin,
Expand Down
15 changes: 6 additions & 9 deletions xmtp_mls/src/subscriptions/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -253,17 +253,15 @@ where
) -> Result<MlsGroup<Self>> {
let provider = self.mls_provider()?;
let conn = provider.conn_ref();
let envelope = WelcomeMessage::decode(envelope_bytes.as_slice())
.map_err(|e| SubscribeError::from(e))?;
let envelope =
WelcomeMessage::decode(envelope_bytes.as_slice()).map_err(SubscribeError::from)?;
let known_welcomes = HashSet::from_iter(conn.group_welcome_ids()?.into_iter());
let future = ProcessWelcomeFuture::new(
known_welcomes,
self.clone(),
WelcomeOrGroup::Welcome(envelope),
None,
)?;
// this should never happen, b/c a conversation type into the process
// future, but is here defensively regardless.
future
.process()
.await?
Expand All @@ -273,11 +271,11 @@ where
})
}

// #[tracing::instrument(level = "debug", skip_all)]
pub async fn stream_conversations<'a>(
&'a self,
#[tracing::instrument(level = "debug", skip_all)]
pub async fn stream_conversations(
&self,
conversation_type: Option<ConversationType>,
) -> Result<impl Stream<Item = Result<MlsGroup<Self>>> + 'a>
) -> Result<impl Stream<Item = Result<MlsGroup<Self>>> + use<'_, ApiClient, V>>
where
ApiClient: XmtpMlsStreams,
{
Expand All @@ -302,7 +300,6 @@ where
futures::pin_mut!(stream);
let _ = tx.send(());
while let Some(convo) = stream.next().await {
tracing::info!("Trigger conversation callback");
convo_callback(convo)
}
tracing::debug!("`stream_conversations` stream ended, dropping stream");
Expand Down
Loading

0 comments on commit 38688bf

Please sign in to comment.