Skip to content

Commit

Permalink
finish stream all, fix missing messages for good
Browse files Browse the repository at this point in the history
  • Loading branch information
insipx committed Jan 27, 2025
1 parent 1afc29b commit fbf4e3d
Show file tree
Hide file tree
Showing 11 changed files with 680 additions and 550 deletions.
115 changes: 115 additions & 0 deletions .lnav-json-config.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,115 @@
{
"$schema": "https://lnav.org/schemas/format-v1.schema.json",
"libxmtp_json_log": {
"title": "Libxmtp Log",
"url": "https://github.com/xmtp/libxmtp",
"json": true,
"file-type": "json",
"hide-extra": true,
"level-field": "level",
"timestamp-field": "timestamp",
"timestamp-format": "%a, %b %d, %Y %I:%M:%S %p %Z",
"body-field": "message",
"level": {
"error": "ERROR",
"warning": "WARN",
"info": "INFO",
"debug": "DEBUG",
"trace": "TRACE"
},
"line-format": [
{
"field": "__timestamp__",
"align": "right"
},
" ",
{
"field": "__level__",
"min-width": 4,
"max-width": 4,
"align": "right",
"text-transform": "uppercase",
"suffix": ":"
},
" ",
{
"prefix": "target=",
"field": "target",
"align": "left",
"default-value": ""
},
" ",
{
"field": "message"
},
" ",
{
"prefix": "span: ",
"field": "span",
"default-value": ""
},
" ",
{
"prefix": "spans: ",
"field": "spans",
"default-value": ""
},
" ",
{
"prefix": "signer=",
"field": "signer",
"default-value": ""
},
" ",
{
"prefix": "missing_signatures=",
"field": "missing_signatures",
"default-value": ""
},
" ",
{
"prefix": "inbox_id=",
"field": "inbox_id",
"default-value": ""
},
" ",
{
"prefix": "sender_inbox_id=",
"field": "sender_inbox_id",
"default-value": ""
},
" ",
{
"prefix": "installation_id=",
"field": "installation_id",
"default-value": ""
},
" ",
{
"prefix": "sender_installation_id=",
"field": "sender_installation_id",
"default-value": ""
},
" ",
{
"prefix": "group_id=",
"field": "group_id",
"default-value": ""
}
],
"value": {
"message": { "kind": "string" },
"target": { "kind": "string" },
"signer": { "kind": "string" },
"message_id": { "kind": "string" },
"installation_id": { "kind": "string" },
"sender_installation_id": { "kind": "string" },
"inbox_id": { "kind": "string" },
"sender_inbox_id": { "kind": "string" },
"group_id": { "kind": "string" },
"missing_signatures": { "kind": "string" },
"span": { "kind": "string", "hidden": true },
"spans": { "kind": "string", "hidden": true }
}
}
}
1 change: 0 additions & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,7 @@ sqlite-web = "0.0.1"
tonic = { version = "0.12", default-features = false }
tracing = { version = "0.1", features = ["log"] }
tracing-subscriber = { version = "0.3", default-features = false }
tracing-logfmt = "0.3"
trait-variant = "0.1.2"
url = "2.5.0"
wasm-bindgen = "=0.2.100"
Expand Down
2 changes: 1 addition & 1 deletion xmtp_debug/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ xmtp_proto.workspace = true
openmls.workspace = true
indicatif = "0.17"
color-eyre = "0.6"
tracing-logfmt = "0.3"
tracing-logfmt.workspace = true
owo-colors = "4.1"
url.workspace = true
redb = "2.4"
Expand Down
1 change: 0 additions & 1 deletion xmtp_mls/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,6 @@ update-schema = ["toml"]

[dependencies]
aes-gcm = { version = "0.10.3", features = ["std"] }
async-stream.workspace = true
async-trait.workspace = true
bincode.workspace = true
diesel_migrations.workspace = true
Expand Down
4 changes: 1 addition & 3 deletions xmtp_mls/src/groups/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -735,9 +735,7 @@ impl<ScopedClient: ScopedGroupClient> MlsGroup<ScopedClient> {

/// Send a message on this users XMTP [`Client`].
pub async fn send_message(&self, message: &[u8]) -> Result<Vec<u8>, GroupError> {
tracing::debug!(inbox_id = self.client.inbox_id(), "sending message");
let conn = self.context().store().conn()?;
let provider = XmtpOpenMlsProvider::from(conn);
let provider = self.mls_provider()?;
self.send_message_with_provider(message, &provider).await
}

Expand Down
34 changes: 18 additions & 16 deletions xmtp_mls/src/groups/subscriptions.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,8 @@ use crate::{
groups::ScopedGroupClient,
storage::group_message::StoredGroupMessage,
subscriptions::{
stream_messages::{MessagesStreamInfo, ProcessMessageFuture, StreamGroupMessages},
SubscribeError,
stream_messages::{ProcessMessageFuture, StreamGroupMessages},
Result, SubscribeError,
},
};
use xmtp_proto::api_client::{trait_impls::XmtpApi, XmtpMlsStreams};
Expand All @@ -24,36 +24,34 @@ impl<ScopedClient: ScopedGroupClient> MlsGroup<ScopedClient> {
pub async fn process_streamed_group_message(
&self,
envelope_bytes: Vec<u8>,
) -> Result<StoredGroupMessage, SubscribeError> {
) -> Result<StoredGroupMessage> {
let envelope = GroupMessage::decode(envelope_bytes.as_slice())?;
ProcessMessageFuture::new(&self.client, envelope)?
.process()
.await
.map(|(group, _)| group)
}

pub async fn stream<'a>(
&'a self,
) -> Result<
impl Stream<Item = Result<StoredGroupMessage, SubscribeError>> + use<'a, ScopedClient>,
SubscribeError,
>
) -> Result<impl Stream<Item = Result<StoredGroupMessage>> + use<'a, ScopedClient>>
where
<ScopedClient as ScopedGroupClient>::ApiClient: XmtpMlsStreams + 'a,
{
let group_list = HashMap::from([(self.group_id.clone(), MessagesStreamInfo { cursor: 0 })]);
Ok(StreamGroupMessages::new(&self.client, &group_list).await?)
let group_list = HashMap::from([(self.group_id.clone(), 0u64.into())]);
Ok(StreamGroupMessages::new(&self.client, group_list).await?)
}

pub fn stream_with_callback(
client: ScopedClient,
group_id: Vec<u8>,
callback: impl FnMut(Result<StoredGroupMessage, SubscribeError>) + Send + 'static,
) -> impl crate::StreamHandle<StreamOutput = Result<(), SubscribeError>>
callback: impl FnMut(Result<StoredGroupMessage>) + Send + 'static,
) -> impl crate::StreamHandle<StreamOutput = Result<()>>
where
ScopedClient: 'static,
<ScopedClient as ScopedGroupClient>::ApiClient: XmtpMlsStreams + 'static,
{
let group_list = HashMap::from([(group_id, MessagesStreamInfo { cursor: 0 })]);
let group_list = HashMap::from([(group_id, 0)]);
stream_messages_with_callback(client, group_list, callback)
}
}
Expand All @@ -62,9 +60,9 @@ impl<ScopedClient: ScopedGroupClient> MlsGroup<ScopedClient> {
/// messages along to a callback.
pub(crate) fn stream_messages_with_callback<ScopedClient>(
client: ScopedClient,
group_id_to_info: HashMap<Vec<u8>, MessagesStreamInfo>,
mut callback: impl FnMut(Result<StoredGroupMessage, SubscribeError>) + Send + 'static,
) -> impl crate::StreamHandle<StreamOutput = Result<(), SubscribeError>>
active_conversations: HashMap<Vec<u8>, u64>,
mut callback: impl FnMut(Result<StoredGroupMessage>) + Send + 'static,
) -> impl crate::StreamHandle<StreamOutput = Result<()>>
where
ScopedClient: ScopedGroupClient + 'static,
<ScopedClient as ScopedGroupClient>::ApiClient: XmtpApi + XmtpMlsStreams + 'static,
Expand All @@ -73,7 +71,11 @@ where

crate::spawn(Some(rx), async move {
let client_ref = &client;
let stream = StreamGroupMessages::new(client_ref, &group_id_to_info).await?;
let active_conversations = active_conversations
.into_iter()
.map(|(g, c)| (g, c.into()))
.collect();
let stream = StreamGroupMessages::new(client_ref, active_conversations).await?;
futures::pin_mut!(stream);
let _ = tx.send(());
while let Some(message) = stream.next().await {
Expand Down
Loading

0 comments on commit fbf4e3d

Please sign in to comment.