Skip to content

Commit

Permalink
Filter conversations by consent (#1173)
Browse files Browse the repository at this point in the history
* Join groups table to consent_records in `find_groups`
* add `GroupQueryArgs` to simplify arguments to `find_groups`

---------

Co-authored-by: Andrew Plaza <[email protected]>
  • Loading branch information
nplasterer and insipx authored Oct 30, 2024
1 parent 2ce7979 commit fd34e68
Show file tree
Hide file tree
Showing 12 changed files with 446 additions and 272 deletions.
241 changes: 116 additions & 125 deletions Cargo.lock

Large diffs are not rendered by default.

44 changes: 17 additions & 27 deletions bindings_ffi/src/mls.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ use xmtp_mls::storage::group_message::SortDirection;
use xmtp_mls::{
api::ApiClientWrapper,
builder::ClientBuilder,
client::{Client as MlsClient, ClientError, FindGroupParams},
client::{Client as MlsClient, ClientError},
groups::{
group_metadata::{ConversationType, GroupMetadata},
group_mutable_metadata::MetadataField,
Expand All @@ -38,12 +38,12 @@ use xmtp_mls::{
retry::Retry,
storage::{
consent_record::{ConsentState, ConsentType, StoredConsentRecord},
group::GroupQueryArgs,
group_message::{DeliveryStatus, GroupMessageKind, StoredGroupMessage},
EncryptedMessageStore, EncryptionKey, StorageOption,
},
AbortHandle, GenericStreamHandle, StreamHandle,
};

pub type RustXmtpClient = MlsClient<TonicApiClient>;

/// It returns a new client of the specified `inbox_id`.
Expand Down Expand Up @@ -529,6 +529,17 @@ pub struct FfiListConversationsOptions {
pub created_after_ns: Option<i64>,
pub created_before_ns: Option<i64>,
pub limit: Option<i64>,
pub consent_state: Option<FfiConsentState>,
}

impl From<FfiListConversationsOptions> for GroupQueryArgs {
fn from(opts: FfiListConversationsOptions) -> GroupQueryArgs {
GroupQueryArgs::default()
.maybe_created_before_ns(opts.created_before_ns)
.maybe_created_after_ns(opts.created_after_ns)
.maybe_limit(opts.limit)
.maybe_consent_state(opts.consent_state.map(Into::into))
}
}

#[derive(uniffi::Object)]
Expand Down Expand Up @@ -837,10 +848,7 @@ impl FfiConversations {

pub async fn sync_all_conversations(&self) -> Result<u32, GenericError> {
let inner = self.inner_client.as_ref();
let groups = inner.find_groups(FindGroupParams {
conversation_type: None,
..FindGroupParams::default()
})?;
let groups = inner.find_groups(GroupQueryArgs::default())?;

log::info!(
"groups for client inbox id {:?}: {:?}",
Expand All @@ -862,13 +870,7 @@ impl FfiConversations {
) -> Result<Vec<Arc<FfiConversation>>, GenericError> {
let inner = self.inner_client.as_ref();
let convo_list: Vec<Arc<FfiConversation>> = inner
.find_groups(FindGroupParams {
allowed_states: None,
created_after_ns: opts.created_after_ns,
created_before_ns: opts.created_before_ns,
limit: opts.limit,
conversation_type: None,
})?
.find_groups(opts.into())?
.into_iter()
.map(|group| Arc::new(group.into()))
.collect();
Expand All @@ -882,13 +884,7 @@ impl FfiConversations {
) -> Result<Vec<Arc<FfiConversation>>, GenericError> {
let inner = self.inner_client.as_ref();
let convo_list: Vec<Arc<FfiConversation>> = inner
.find_groups(FindGroupParams {
allowed_states: None,
created_after_ns: opts.created_after_ns,
created_before_ns: opts.created_before_ns,
limit: opts.limit,
conversation_type: Some(ConversationType::Group),
})?
.find_groups(GroupQueryArgs::from(opts).conversation_type(ConversationType::Group))?
.into_iter()
.map(|group| Arc::new(group.into()))
.collect();
Expand All @@ -902,13 +898,7 @@ impl FfiConversations {
) -> Result<Vec<Arc<FfiConversation>>, GenericError> {
let inner = self.inner_client.as_ref();
let convo_list: Vec<Arc<FfiConversation>> = inner
.find_groups(FindGroupParams {
allowed_states: None,
created_after_ns: opts.created_after_ns,
created_before_ns: opts.created_before_ns,
limit: opts.limit,
conversation_type: Some(ConversationType::Dm),
})?
.find_groups(GroupQueryArgs::from(opts).conversation_type(ConversationType::Dm))?
.into_iter()
.map(|group| Arc::new(group.into()))
.collect();
Expand Down
25 changes: 13 additions & 12 deletions bindings_node/src/conversations.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,10 @@ use napi::bindgen_prelude::{Error, Result, Uint8Array};
use napi::threadsafe_function::{ErrorStrategy, ThreadsafeFunction, ThreadsafeFunctionCallMode};
use napi::JsFunction;
use napi_derive::napi;
use xmtp_mls::client::FindGroupParams;
use xmtp_mls::groups::group_metadata::ConversationType;
use xmtp_mls::groups::{GroupMetadataOptions, PreconfiguredPolicies};
use xmtp_mls::storage::group::GroupMembershipState;
use xmtp_mls::storage::group::GroupQueryArgs;

use crate::messages::NapiMessage;
use crate::permissions::NapiGroupPermissionsOptions;
Expand Down Expand Up @@ -82,17 +82,18 @@ pub struct NapiListConversationsOptions {
pub conversation_type: Option<NapiConversationType>,
}

impl From<NapiListConversationsOptions> for FindGroupParams {
fn from(opts: NapiListConversationsOptions) -> Self {
FindGroupParams {
allowed_states: opts
.allowed_states
.map(|states| states.into_iter().map(From::from).collect()),
conversation_type: opts.conversation_type.map(|ct| ct.into()),
created_after_ns: opts.created_after_ns,
created_before_ns: opts.created_before_ns,
limit: opts.limit,
}
impl From<NapiListConversationsOptions> for GroupQueryArgs {
fn from(opts: NapiListConversationsOptions) -> GroupQueryArgs {
GroupQueryArgs::default()
.maybe_allowed_states(
opts
.allowed_states
.map(|states| states.into_iter().map(From::from).collect()),
)
.maybe_conversation_type(opts.conversation_type.map(|ct| ct.into()))
.maybe_created_after_ns(opts.created_after_ns)
.maybe_created_before_ns(opts.created_before_ns)
.maybe_limit(opts.limit)
}
}

Expand Down
23 changes: 16 additions & 7 deletions bindings_wasm/src/conversations.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
use std::sync::Arc;
use wasm_bindgen::prelude::wasm_bindgen;
use wasm_bindgen::{JsError, JsValue};
use xmtp_mls::client::FindGroupParams;
use xmtp_mls::groups::{GroupMetadataOptions, PreconfiguredPolicies};
use xmtp_mls::storage::group::GroupQueryArgs;

use crate::messages::WasmMessage;
use crate::permissions::WasmGroupPermissionsOptions;
Expand All @@ -15,6 +15,20 @@ pub struct WasmListConversationsOptions {
pub limit: Option<i64>,
}

impl From<WasmListConversationsOptions> for GroupQueryArgs {
fn from(opts: WasmListConversationsOptions) -> GroupQueryArgs {
let WasmListConversationsOptions {
created_after_ns,
created_before_ns,
limit,
} = opts;
GroupQueryArgs::default()
.maybe_created_after_ns(created_after_ns)
.maybe_created_before_ns(created_before_ns)
.maybe_limit(limit)
}
}

#[wasm_bindgen]
impl WasmListConversationsOptions {
#[wasm_bindgen(constructor)]
Expand Down Expand Up @@ -195,12 +209,7 @@ impl WasmConversations {
};
let convo_list: js_sys::Array = self
.inner_client
.find_groups(FindGroupParams {
created_after_ns: opts.created_after_ns,
created_before_ns: opts.created_before_ns,
limit: opts.limit,
..FindGroupParams::default()
})
.find_groups(opts.into())
.map_err(|e| JsError::new(format!("{}", e).as_str()))?
.into_iter()
.map(|group| {
Expand Down
5 changes: 2 additions & 3 deletions examples/cli/cli-client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,8 @@ use kv_log_macro::{error, info};
use prost::Message;
use xmtp_api_grpc::replication_client::ClientV4;
use xmtp_id::associations::unverified::{UnverifiedRecoverableEcdsaSignature, UnverifiedSignature};
use xmtp_mls::client::FindGroupParams;

use xmtp_mls::groups::device_sync::DeviceSyncContent;
use xmtp_mls::storage::group::GroupQueryArgs;
use xmtp_mls::storage::group_message::{GroupMessageKind, MsgQueryArgs};
use xmtp_mls::XmtpApi;

Expand Down Expand Up @@ -238,7 +237,7 @@ async fn main() {

// recv(&client).await.unwrap();
let group_list = client
.find_groups(FindGroupParams::default())
.find_groups(GroupQueryArgs::default())
.expect("failed to list groups");
for group in group_list.iter() {
group.sync().await.expect("error syncing group");
Expand Down
35 changes: 10 additions & 25 deletions xmtp_mls/src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,16 +38,16 @@ use xmtp_proto::xmtp::mls::api::v1::{
use crate::{
api::ApiClientWrapper,
groups::{
group_metadata::ConversationType, group_permissions::PolicySet,
validated_commit::CommitValidationError, GroupError, GroupMetadataOptions, IntentError,
MlsGroup,
group_permissions::PolicySet, validated_commit::CommitValidationError, GroupError,
GroupMetadataOptions, IntentError, MlsGroup,
},
identity::{parse_credential, Identity, IdentityError},
identity_updates::{load_identity_updates, IdentityUpdateError},
intents::Intents,
mutex_registry::MutexRegistry,
retry::Retry,
retry_async, retryable,
storage::group::GroupQueryArgs,
storage::{
consent_record::{ConsentState, ConsentType, StoredConsentRecord},
db_connection::DbConnection,
Expand Down Expand Up @@ -217,15 +217,6 @@ impl From<&str> for ClientError {
}
}

#[derive(Debug, Default)]
pub struct FindGroupParams {
pub allowed_states: Option<Vec<GroupMembershipState>>,
pub created_after_ns: Option<i64>,
pub created_before_ns: Option<i64>,
pub limit: Option<i64>,
pub conversation_type: Option<ConversationType>,
}

/// Clients manage access to the network, identity, and data store
pub struct Client<ApiClient, V = RemoteSignatureVerifier<ApiClient>> {
pub(crate) api_client: Arc<ApiClientWrapper<ApiClient>>,
Expand Down Expand Up @@ -671,17 +662,11 @@ where
/// - created_after_ns: only return groups created after the given timestamp (in nanoseconds)
/// - created_before_ns: only return groups created before the given timestamp (in nanoseconds)
/// - limit: only return the first `limit` groups
pub fn find_groups(&self, params: FindGroupParams) -> Result<Vec<MlsGroup<Self>>, ClientError> {
pub fn find_groups(&self, args: GroupQueryArgs) -> Result<Vec<MlsGroup<Self>>, ClientError> {
Ok(self
.store()
.conn()?
.find_groups(
params.allowed_states,
params.created_after_ns,
params.created_before_ns,
params.limit,
params.conversation_type,
)?
.find_groups(args)?
.into_iter()
.map(|stored_group| {
MlsGroup::new(self.clone(), stored_group.id, stored_group.created_at_ns)
Expand Down Expand Up @@ -972,12 +957,12 @@ pub(crate) mod tests {

use crate::{
builder::ClientBuilder,
client::FindGroupParams,
groups::GroupMetadataOptions,
hpke::{decrypt_welcome, encrypt_welcome},
identity::serialize_key_package_hash_ref,
storage::{
consent_record::{ConsentState, ConsentType, StoredConsentRecord},
group::GroupQueryArgs,
group_message::MsgQueryArgs,
schema::identity_updates,
},
Expand Down Expand Up @@ -1082,7 +1067,7 @@ pub(crate) mod tests {
.create_group(None, GroupMetadataOptions::default())
.unwrap();

let groups = client.find_groups(FindGroupParams::default()).unwrap();
let groups = client.find_groups(GroupQueryArgs::default()).unwrap();
assert_eq!(groups.len(), 2);
assert_eq!(groups[0].group_id, group_1.group_id);
assert_eq!(groups[1].group_id, group_2.group_id);
Expand Down Expand Up @@ -1163,7 +1148,7 @@ pub(crate) mod tests {
let bob_received_groups = bo.sync_welcomes(&bo.store().conn().unwrap()).await.unwrap();
assert_eq!(bob_received_groups.len(), 2);

let bo_groups = bo.find_groups(FindGroupParams::default()).unwrap();
let bo_groups = bo.find_groups(GroupQueryArgs::default()).unwrap();
let bo_group1 = bo.group(alix_bo_group1.clone().group_id).unwrap();
let bo_messages1 = bo_group1.find_messages(&MsgQueryArgs::default()).unwrap();
assert_eq!(bo_messages1.len(), 0);
Expand Down Expand Up @@ -1239,7 +1224,7 @@ pub(crate) mod tests {
bola.sync_welcomes(&bola.store().conn().unwrap())
.await
.unwrap();
let bola_groups = bola.find_groups(FindGroupParams::default()).unwrap();
let bola_groups = bola.find_groups(Default::default()).unwrap();
assert_eq!(bola_groups.len(), 1);
let bola_group = bola_groups.first().unwrap();
tracing::info!("Syncing bolas messages");
Expand Down Expand Up @@ -1377,7 +1362,7 @@ pub(crate) mod tests {
bo.sync_welcomes(&bo.store().conn().unwrap()).await.unwrap();

// Bo should have two groups now
let bo_groups = bo.find_groups(FindGroupParams::default()).unwrap();
let bo_groups = bo.find_groups(GroupQueryArgs::default()).unwrap();
assert_eq!(bo_groups.len(), 2);

// Bo's original key should be deleted
Expand Down
7 changes: 5 additions & 2 deletions xmtp_mls/src/groups/device_sync.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
use super::group_metadata::ConversationType;
use super::{GroupError, MlsGroup};
use crate::configuration::NS_IN_HOUR;
use crate::storage::group::GroupQueryArgs;
use crate::storage::group_message::MsgQueryArgs;
use crate::storage::DbConnection;
use crate::utils::time::now_ns;
Expand Down Expand Up @@ -289,7 +290,8 @@ where

self.sync_welcomes(provider.conn_ref()).await?;

let groups = conn.find_groups(None, None, None, None, Some(ConversationType::Group))?;
let groups =
conn.find_groups(GroupQueryArgs::default().conversation_type(ConversationType::Group))?;
for crate::storage::group::StoredGroup { id, .. } in groups.into_iter() {
let group = self.group(id)?;
Box::pin(group.sync()).await?;
Expand All @@ -303,7 +305,8 @@ where
conn: &DbConnection,
inbox_id: &str,
) -> Result<(), GroupError> {
let groups = conn.find_groups(None, None, None, None, Some(ConversationType::Group))?;
let groups =
conn.find_groups(GroupQueryArgs::default().conversation_type(ConversationType::Group))?;
for group in groups {
let group = self.group(group.id)?;
Box::pin(group.add_members_by_inbox_id(&[inbox_id.to_string()])).await?;
Expand Down
6 changes: 4 additions & 2 deletions xmtp_mls/src/groups/device_sync/message_sync.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
use super::*;
use crate::storage::group::GroupQueryArgs;
use crate::storage::group_message::MsgQueryArgs;
use crate::XmtpApi;
use crate::{storage::group::StoredGroup, Client};
Expand Down Expand Up @@ -53,15 +54,16 @@ where

fn syncable_groups(&self, conn: &DbConnection) -> Result<Vec<Syncable>, DeviceSyncError> {
let groups = conn
.find_groups(None, None, None, None, Some(ConversationType::Group))?
.find_groups(GroupQueryArgs::default().conversation_type(ConversationType::Group))?
.into_iter()
.map(Syncable::Group)
.collect();
Ok(groups)
}

fn syncable_messages(&self, conn: &DbConnection) -> Result<Vec<Syncable>, DeviceSyncError> {
let groups = conn.find_groups(None, None, None, None, Some(ConversationType::Group))?;
let groups =
conn.find_groups(GroupQueryArgs::default().conversation_type(ConversationType::Group))?;

let mut all_messages = vec![];
for StoredGroup { id, .. } in groups.into_iter() {
Expand Down
Loading

0 comments on commit fd34e68

Please sign in to comment.