Skip to content

Commit

Permalink
Persist changes to bloom filter to data store
Browse files Browse the repository at this point in the history
  • Loading branch information
richvdh committed Jun 6, 2024
1 parent e046c0c commit 0df026c
Show file tree
Hide file tree
Showing 2 changed files with 56 additions and 11 deletions.
25 changes: 19 additions & 6 deletions bindings/matrix-sdk-ffi/src/sync_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ use matrix_sdk_ui::{
UnableToDecryptHook, UnableToDecryptInfo as SdkUnableToDecryptInfo, UtdHookManager,
},
};
use tracing::error;

use crate::{
error::ClientError, helpers::unwrap_or_clone_arc, room_list::RoomListService, TaskHandle,
Expand Down Expand Up @@ -117,18 +118,30 @@ impl SyncServiceBuilder {
Arc::new(Self { client: this.client, builder, utd_hook: this.utd_hook })
}

pub fn with_utd_hook(self: Arc<Self>, delegate: Box<dyn UnableToDecryptDelegate>) -> Arc<Self> {
pub async fn with_utd_hook(
self: Arc<Self>,
delegate: Box<dyn UnableToDecryptDelegate>,
) -> Arc<Self> {
// UTDs detected before this duration may be reclassified as "late decryption"
// events (or discarded, if they get decrypted fast enough).
const UTD_HOOK_GRACE_PERIOD: Duration = Duration::from_secs(60);

let this = unwrap_or_clone_arc(self);

let utd_hook = Some(Arc::new(
UtdHookManager::new(Arc::new(UtdHook { delegate }), this.client.clone())
.with_max_delay(UTD_HOOK_GRACE_PERIOD),
));
Arc::new(Self { client: this.client, builder: this.builder, utd_hook })
let mut utd_hook = UtdHookManager::new(Arc::new(UtdHook { delegate }), this.client.clone())
.with_max_delay(UTD_HOOK_GRACE_PERIOD);

if let Err(e) = utd_hook.reload_from_store().await {
error!("Unable to reload UTD hook data from data store: {}", e);
// Carry on with the setup anyway; we shouldn't fail setup just
// because the UTD hook failed to load its data.
}

Arc::new(Self {
client: this.client,
builder: this.builder,
utd_hook: Some(Arc::new(utd_hook)),
})
}

pub async fn finish(self: Arc<Self>) -> Result<Arc<SyncService>, ClientError> {
Expand Down
42 changes: 37 additions & 5 deletions crates/matrix-sdk-ui/src/unable_to_decrypt_hook.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,13 +26,15 @@ use std::{

use growable_bloom_filter::{GrowableBloom, GrowableBloomBuilder};
use matrix_sdk::{crypto::types::events::UtdCause, Client};
use matrix_sdk_base::{StateStoreDataKey, StateStoreDataValue, StoreError};
use ruma::{EventId, OwnedEventId};
use tokio::{
spawn,
sync::{Mutex as AsyncMutex, MutexGuard},
task::JoinHandle,
time::sleep,
};
use tracing::error;

/// A generic interface which methods get called whenever we observe a
/// unable-to-decrypt (UTD) event.
Expand Down Expand Up @@ -174,6 +176,24 @@ impl UtdHookManager {
self
}

/// Load the persistent data for the UTD hook from the store.
///
/// If the client previously used a UtdHookManager, and UTDs were
/// encountered, the data on the reported UTDs is loaded from the store.
/// Otherwise, there is no effect.
pub async fn reload_from_store(&mut self) -> Result<(), StoreError> {
let existing_data =
self.client.store().get_kv_data(StateStoreDataKey::UtdHookManagerData).await?;

if let Some(existing_data) = existing_data {
let bloom_filter = existing_data
.into_utd_hook_manager_data()
.expect("StateStore::get_kv_data should return data of the right type");
self.reported_utds = Arc::new(AsyncMutex::new(bloom_filter));
}
Ok(())
}

/// The function to call whenever a UTD is seen for the first time.
///
/// Pipe in any information that needs to be included in the final report.
Expand All @@ -198,14 +218,15 @@ impl UtdHookManager {

let Some(max_delay) = self.max_delay else {
// No delay: immediately report the event to the parent hook.
Self::report_utd(info, &self.parent, &mut reported_utds_lock);
Self::report_utd(info, &self.parent, &self.client, &mut reported_utds_lock).await;
return;
};

// Clone data shared with the task below.
let pending_delayed = self.pending_delayed.clone();
let reported_utds = self.reported_utds.clone();
let parent = self.parent.clone();
let client = self.client.clone();

// Spawn a task that will wait for the given delay, and maybe call the parent
// hook then.
Expand All @@ -223,7 +244,7 @@ impl UtdHookManager {
// Remove the task from the outstanding set. But if it's already been removed,
// it's been decrypted since the task was added!
if pending_delayed.lock().unwrap().remove(&info.event_id).is_some() {
Self::report_utd(info, &parent, &mut reported_utds_lock);
Self::report_utd(info, &parent, &client, &mut reported_utds_lock).await;
}
});

Expand Down Expand Up @@ -260,7 +281,7 @@ impl UtdHookManager {
time_to_decrypt: Some(pending_utd_report.marked_utd_at.elapsed()),
cause,
};
Self::report_utd(info, &self.parent, &mut reported_utds_lock);
Self::report_utd(info, &self.parent, &self.client, &mut reported_utds_lock).await;
}

/// Helper for [`UtdHookManager::on_utd`] and
Expand All @@ -269,14 +290,25 @@ impl UtdHookManager {
///
/// Must be called with the lock held on [`UtdHookManager::reported_utds`],
/// and takes a `MutexGuard` to enforce that.
fn report_utd(
async fn report_utd<'a>(
info: UnableToDecryptInfo,
parent_hook: &Arc<dyn UnableToDecryptHook>,
reported_utds_lock: &mut MutexGuard<GrowableBloom>,
client: &Client,
reported_utds_lock: &mut MutexGuard<'a, GrowableBloom>,
) {
let event_id = info.event_id.clone();
parent_hook.on_utd(info);
reported_utds_lock.insert(event_id);
if let Err(e) = client
.store()
.set_kv_data(
StateStoreDataKey::UtdHookManagerData,
StateStoreDataValue::UtdHookManagerData(reported_utds_lock.clone()),
)
.await
{
error!("Unable to persist UTD report data: {}", e);
}
}
}

Expand Down

0 comments on commit 0df026c

Please sign in to comment.