Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

crypto: Don't deep copy the OlmMachine when creating a notification client #3992

Merged
merged 2 commits into from
Sep 16, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
33 changes: 27 additions & 6 deletions crates/matrix-sdk-base/src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -162,21 +162,42 @@ impl BaseClient {
/// Clones the current base client to use the same crypto store but a
/// different, in-memory store config, and resets transient state.
#[cfg(feature = "e2e-encryption")]
pub fn clone_with_in_memory_state_store(&self) -> Self {
pub async fn clone_with_in_memory_state_store(&self) -> Result<Self> {
let config = StoreConfig::new().state_store(MemoryStore::new());
let config = config.crypto_store(self.crypto_store.clone());

let mut result = Self::with_store_config(config);
result.room_key_recipient_strategy = self.room_key_recipient_strategy.clone();
result
let copy = Self {
store: Store::new(config.state_store),
event_cache_store: config.event_cache_store,
// We copy the crypto store as well as the `OlmMachine` for two reasons:
// 1. The `self.crypto_store` is the same as the one used inside the `OlmMachine`.
// 2. We need to ensure that the parent and child use the same data and caches inside
// the `OlmMachine` so the various ratchets and places where new randomness gets
// introduced don't diverge, i.e. one-time keys that get generated by the Olm Account
// or Olm sessions when they encrypt or decrypt messages.
crypto_store: self.crypto_store.clone(),
olm_machine: self.olm_machine.clone(),
poljar marked this conversation as resolved.
Show resolved Hide resolved
ignore_user_list_changes: Default::default(),
room_info_notable_update_sender: self.room_info_notable_update_sender.clone(),
room_key_recipient_strategy: self.room_key_recipient_strategy.clone(),
};

if let Some(session_meta) = self.session_meta().cloned() {
copy.store
.set_session_meta(session_meta, &copy.room_info_notable_update_sender)
.await?;
}

Ok(copy)
}

/// Clones the current base client to use the same crypto store but a
/// different, in-memory store config, and resets transient state.
#[cfg(not(feature = "e2e-encryption"))]
pub fn clone_with_in_memory_state_store(&self) -> Self {
#[allow(clippy::unused_async)]
pub async fn clone_with_in_memory_state_store(&self) -> Result<Self> {
let config = StoreConfig::new().state_store(MemoryStore::new());
Self::with_store_config(config)
Ok(Self::with_store_config(config))
}

/// Get the session meta information.
Expand Down
2 changes: 1 addition & 1 deletion crates/matrix-sdk-ui/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ assert-json-diff = { workspace = true }
assert_matches = { workspace = true }
assert_matches2 = { workspace = true }
eyeball-im-util = { workspace = true }
matrix-sdk = { workspace = true, features = ["testing"] }
matrix-sdk = { workspace = true, features = ["testing", "sqlite"] }
matrix-sdk-test = { workspace = true }
stream_assert = { workspace = true }
tempfile = "3.3.0"
Expand Down
252 changes: 249 additions & 3 deletions crates/matrix-sdk-ui/tests/integration/encryption_sync_service.rs
Original file line number Diff line number Diff line change
@@ -1,17 +1,35 @@
use std::sync::{Arc, Mutex};
use std::{
collections::{BTreeMap, HashSet},
sync::{
atomic::{AtomicBool, Ordering},
Arc, Mutex,
},
};

use futures_util::{pin_mut, StreamExt as _};
use matrix_sdk::test_utils::logged_in_client_with_server;
use matrix_sdk::{
config::RequestConfig,
matrix_auth::{MatrixSession, MatrixSessionTokens},
test_utils::{logged_in_client_with_server, test_client_builder_with_server},
SessionMeta,
};
use matrix_sdk_base::crypto::store::Changes;
use matrix_sdk_test::async_test;
use matrix_sdk_ui::encryption_sync_service::{
EncryptionSyncPermit, EncryptionSyncService, WithLocking,
};
use ruma::{device_id, user_id};
use serde::Deserialize;
use serde_json::json;
use tokio::sync::Mutex as AsyncMutex;
use wiremock::{Mock, MockGuard, MockServer, Request, ResponseTemplate};
use tracing::{error, info, trace, warn};
use wiremock::{
matchers::{method, path},
Mock, MockGuard, MockServer, Request, ResponseTemplate,
};

use crate::{
mock_sync,
sliding_sync::{check_requests, PartialSlidingSyncRequest, SlidingSyncMatcher},
sliding_sync_then_assert_request_and_fake_response,
};
Expand Down Expand Up @@ -320,3 +338,231 @@ async fn test_encryption_sync_always_reloads_todevice_token() -> anyhow::Result<

Ok(())
}

#[async_test]
async fn test_notification_client_does_not_upload_duplicate_one_time_keys() -> anyhow::Result<()> {
use tempfile::tempdir;

let dir = tempdir().unwrap();
let user_id = user_id!("@example:morpheus.localhost");

let (builder, server) = test_client_builder_with_server().await;
let client = builder
.request_config(RequestConfig::new().disable_retry())
.sqlite_store(dir.path(), None)
.build()
.await
.unwrap();

let session = MatrixSession {
meta: SessionMeta { user_id: user_id.into(), device_id: device_id!("DEVICEID").to_owned() },
tokens: MatrixSessionTokens { access_token: "1234".to_owned(), refresh_token: None },
};

client.restore_session(session.to_owned()).await.unwrap();

info!("Creating the notification client");
let notification_client = client
.notification_client()
.await
.expect("We should be able to build a notification client");

let sync_permit = Arc::new(AsyncMutex::new(EncryptionSyncPermit::new_for_testing()));
let sync_permit_guard = sync_permit.lock_owned().await;
let encryption_sync =
EncryptionSyncService::new("tests".to_owned(), client.clone(), None, WithLocking::Yes)
.await?;

let stream = encryption_sync.sync(sync_permit_guard);
pin_mut!(stream);

Mock::given(method("POST"))
.and(path("/_matrix/client/r0/keys/query"))
.respond_with(ResponseTemplate::new(200).set_body_json(json!({})))
.mount(&server)
.await;

info!("First sync, uploading 50 one-time keys");

sliding_sync_then_assert_request_and_fake_response! {
[server, stream]
assert request = {
"conn_id": "encryption",
"extensions": {
"e2ee": {
"enabled": true
},
"to_device": {
"enabled": true
}
}
},
respond with = {
"pos": "0",
"extensions": {
"to_device": {
"next_batch": "nb0"
},
}
},
};

#[derive(Debug, Deserialize)]
struct UploadRequest {
one_time_keys: BTreeMap<String, serde_json::Value>,
}

let found_duplicate = Arc::new(AtomicBool::new(false));
let uploaded_key_ids = Arc::new(Mutex::new(HashSet::new()));

Mock::given(method("POST"))
.and(path("/_matrix/client/r0/keys/upload"))
.respond_with({
let found_duplicate = found_duplicate.clone();
let uploaded_key_ids = uploaded_key_ids.clone();

move |request: &Request| {
let request: UploadRequest = request
.body_json()
.expect("The /keys/upload request should contain one-time keys");

let mut uploaded_key_ids = uploaded_key_ids.lock().unwrap();

let new_key_ids: HashSet<String> = request.one_time_keys.into_keys().collect();

warn!(?new_key_ids, "Got a new /keys/upload request");

let duplicates: HashSet<_> = uploaded_key_ids.intersection(&new_key_ids).collect();

if let Some(duplicate) = duplicates.into_iter().next() {
error!("Duplicate one-time keys were uploaded.");

found_duplicate.store(true, Ordering::SeqCst);

ResponseTemplate::new(400).set_body_json(json!({
"errcode": "M_WAT",
"error:": format!("One time key {duplicate} already exists!")
poljar marked this conversation as resolved.
Show resolved Hide resolved
}))
} else {
trace!("No duplicate one-time keys found.");
uploaded_key_ids.extend(new_key_ids);

ResponseTemplate::new(200).set_body_json(json!({
"one_time_key_counts": {
"signed_curve25519": 50
}
}))
}
}
})
.expect(4)
.mount(&server)
.await;

info!("Main sync now gets told that a one-time key has been used up.");

sliding_sync_then_assert_request_and_fake_response! {
[server, stream]
assert request = {
"conn_id": "encryption",
"extensions": {
"to_device": {
"since": "nb0",
},
}
},
respond with = {
"pos": "2",
"extensions": {
"to_device": {
"next_batch": "nb2"
},
"e2ee": {
"device_one_time_keys_count": {
"signed_curve25519": 49
}
}
}
},
};

assert!(
!found_duplicate.load(Ordering::SeqCst),
"The main sync should not have caused a duplicate one-time key"
);

mock_sync(
&server,
json!({
"next_batch": "foo",
"device_one_time_keys_count": {
"signed_curve25519": 49
}
}),
None,
)
.await;

info!("The notification client now syncs and tries to upload some one-time keys");

notification_client
.sync_once(Default::default())
.await
.expect("The notification client should be able to sync successfully");

info!("Back to the main sync");

sliding_sync_then_assert_request_and_fake_response! {
[server, stream]
assert request = {
"conn_id": "encryption",
"extensions": {
"to_device": {
"since": "foo",
},
}
},
respond with = {
"pos": "2",
"extensions": {
"to_device": {
"next_batch": "nb4"
},
"e2ee": {
"device_one_time_keys_count": {
"signed_curve25519": 49
}
}
}
},
};

sliding_sync_then_assert_request_and_fake_response! {
[server, stream]
assert request = {
"conn_id": "encryption",
"extensions": {
"to_device": {
"since": "nb4",
},
}
},
respond with = {
"pos": "2",
"extensions": {
"to_device": {
"next_batch": "nb5"
},
}
},
};

assert!(
!found_duplicate.load(Ordering::SeqCst),
"Duplicate one-time keys should not have been created"
);

server.verify().await;

Ok(())
}
19 changes: 1 addition & 18 deletions crates/matrix-sdk/src/client/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2204,7 +2204,7 @@ impl Client {
#[cfg(feature = "experimental-sliding-sync")]
self.sliding_sync_version(),
self.inner.http_client.clone(),
self.inner.base_client.clone_with_in_memory_state_store(),
self.inner.base_client.clone_with_in_memory_state_store().await?,
self.inner.server_capabilities.read().await.clone(),
self.inner.respect_login_well_known,
self.inner.event_cache.clone(),
Expand All @@ -2215,23 +2215,6 @@ impl Client {
.await,
};

// Copy the parent's session meta into the child. This initializes the in-memory
// state store of the child client with `SessionMeta`, and regenerates
// the `OlmMachine` if needs be.
//
// Note: we don't need to do a full `restore_session`, because this would
// overwrite the session information shared with the parent too, and it
// must be initialized at most once.
if let Some(session) = self.session() {
client
.set_session_meta(
session.into_meta(),
#[cfg(feature = "e2e-encryption")]
None,
)
.await?;
}

Ok(client)
}

Expand Down
Loading