Skip to content

Commit

Permalink
Live Consent Sync (#1234)
Browse files Browse the repository at this point in the history
* update proto

* process streamed updates

* first stream update location

* use local events to dispatch update

* optional buffer on the local events

* cleanup

* rename

* cleanup

* add warn

* lint

* logs

* wip

* cleanup

* remove comment

* syncable groups fix

* revert this file

* invert the approach

* add down

* wip

* wip tests

* ignore non sync msgs

* cleanup, test fix

* lint

* wasm

* wasm

* cleanup

* map an error

* lint

* lint

* small fix

* test

* revert

* do not drop channel

* lint

* alternate remedy

* wasm fix

* revert gen_protos

* fix potential race condition in test
  • Loading branch information
codabrink authored Nov 18, 2024
1 parent e9489ae commit 0b0bcde
Show file tree
Hide file tree
Showing 12 changed files with 1,984 additions and 1,350 deletions.
2 changes: 2 additions & 0 deletions xmtp_id/src/associations/serialization.rs
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,8 @@ pub enum DeserializationError {
InvalidAccountId,
#[error("Invalid hash (needs to be 32 bytes)")]
InvalidHash,
#[error("A required field is unspecified: {0}")]
Unspecified(&'static str),
#[error("Error creating public key from proto bytes")]
Ed25519(#[from] ed25519_dalek::ed25519::Error),
}
Expand Down
16 changes: 15 additions & 1 deletion xmtp_mls/src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ use crate::{
refresh_state::EntityKind,
sql_key_store, EncryptedMessageStore, StorageError,
},
subscriptions::LocalEvents,
subscriptions::{LocalEventError, LocalEvents},
verified_key_package_v2::{KeyPackageVerificationError, VerifiedKeyPackageV2},
xmtp_openmls_provider::XmtpOpenMlsProvider,
Fetch, Store, XmtpApi,
Expand Down Expand Up @@ -106,6 +106,8 @@ pub enum ClientError {
// the box is to prevent infinite cycle between client and group errors
#[error(transparent)]
Group(Box<GroupError>),
#[error(transparent)]
LocalEvent(#[from] LocalEventError),
#[error("generic:{0}")]
Generic(String),
}
Expand Down Expand Up @@ -188,6 +190,10 @@ pub enum MessageProcessingError {
Generic(String),
#[error("intent is missing staged_commit field")]
IntentMissingStagedCommit,
#[error(transparent)]
Deserialization(#[from] xmtp_id::associations::DeserializationError),
#[error(transparent)]
LocalEvent(#[from] LocalEventError),
}

impl crate::retry::RetryableError for MessageProcessingError {
Expand Down Expand Up @@ -501,6 +507,14 @@ where
conn.insert_or_replace_consent_records(records)?;
conn.insert_or_replace_consent_records(&new_records)?;

if self.history_sync_url.is_some() {
let mut records = records.to_vec();
records.append(&mut new_records);
self.local_events
.send(LocalEvents::ConsentUpdate(records))
.map_err(|e| ClientError::Generic(e.to_string()))?;
}

Ok(())
}

Expand Down
Loading

0 comments on commit 0b0bcde

Please sign in to comment.