Skip to content

Commit

Permalink
Benchmark sync_worker (#1360)
Browse files Browse the repository at this point in the history
* benchmark syncs

* fix missing dependency
  • Loading branch information
insipx authored Dec 2, 2024
1 parent 0a078b0 commit 85dd6d3
Show file tree
Hide file tree
Showing 17 changed files with 287 additions and 169 deletions.
2 changes: 2 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 2 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,8 @@ url = "2.5.0"
zeroize = "1.8"
bincode = "1.3"
console_error_panic_hook = "0.1"
fdlimit = "0.3"
const_format = "0.2"

# Internal Crate Dependencies
xmtp_cryptography = { path = "xmtp_cryptography" }
Expand Down
6 changes: 4 additions & 2 deletions dev/bench
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,9 @@
set -eou pipefail

if [[ -z "${1-}" ]]; then
cargo bench --no-fail-fast --features bench
cargo bench --no-fail-fast --features bench -p xmtp_mls
else
cargo bench --no-fail-fast --features bench -- "$1"
cargo bench --no-fail-fast --features bench -p xmtp_mls -- "$1"
fi

echo "Open benchmarks at target/criterion/report.html"
4 changes: 2 additions & 2 deletions dev/flamegraph
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,9 @@ if [[ "${OSTYPE}" == "darwin"* ]]; then
fi

if [[ -z "${1-}" ]]; then
XMTP_FLAMEGRAPH=trace cargo bench --no-fail-fast --features bench
XMTP_FLAMEGRAPH=trace cargo bench --no-fail-fast --features bench -p xmtp_mls
else
XMTP_FLAMEGRAPH=trace cargo bench --no-fail-fast --features bench -- "$1"
XMTP_FLAMEGRAPH=trace cargo bench --no-fail-fast --features bench -p xmtp_mls -- "$1"
fi

cat xmtp_mls/tracing.foldeed | inferno-flamegraph > tracing-flamegraph.svg
4 changes: 2 additions & 2 deletions xmtp_debug/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ owo-colors = "4.1"
url.workspace = true
redb = "2.2"
directories = "5.0"
const_format = "0.2"
const_format.workspace = true
speedy = "0.8"
hex.workspace = true
prost.workspace = true
Expand All @@ -40,5 +40,5 @@ rand.workspace = true
fs_extra = "1.3"
ethers.workspace = true
miniserde = "0.1"
fdlimit = "0.3"
fdlimit.workspace = true
lipsum = "0.9"
1 change: 1 addition & 0 deletions xmtp_debug/src/app/clients.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
//! Different ways to create a [`crate::DbgClient`]
use super::*;
use crate::app::types::*;

Expand Down
13 changes: 13 additions & 0 deletions xmtp_mls/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,9 @@ bench = [
"once_cell",
"dep:xmtp_api_grpc",
"criterion",
"dep:fdlimit",
"dep:ethers",
"dep:const_format"
]
default = ["grpc-api"]
grpc-api = ["dep:xmtp_api_grpc"]
Expand All @@ -35,6 +38,7 @@ test-utils = [
"xmtp_proto/test-utils",
"xmtp_api_http/test-utils",
"xmtp_api_grpc/test-utils",
"dep:const_format"
]
update-schema = ["toml"]

Expand Down Expand Up @@ -74,6 +78,9 @@ console_error_panic_hook = { workspace = true, optional = true }
toml = { version = "0.8.4", optional = true }
tracing-wasm = { version = "0.2", optional = true }
xmtp_api_http = { path = "../xmtp_api_http", optional = true }
fdlimit = { workspace = true, optional = true }
ethers = { workspace = true, features = ["openssl"], optional = true }
const_format = { workspace = true, optional = true }

# Test/Bench Utils
anyhow = { workspace = true, optional = true }
Expand Down Expand Up @@ -132,6 +139,7 @@ mockall = "0.13.1"
openmls_basic_credential.workspace = true
xmtp_id = { path = "../xmtp_id", features = ["test-utils"] }
xmtp_proto = { workspace = true, features = ["test-utils"] }
const_format.workspace = true

[target.'cfg(not(target_arch = "wasm32"))'.dev-dependencies]
ctor.workspace = true
Expand Down Expand Up @@ -181,3 +189,8 @@ harness = false
name = "identity"
required-features = ["bench"]

[[bench]]
harness = false
name = "sync"
required-features = ["bench"]

12 changes: 3 additions & 9 deletions xmtp_mls/benches/group_limit.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,18 +9,12 @@ use tracing::{trace_span, Instrument};
use xmtp_mls::{
builder::ClientBuilder,
groups::GroupMetadataOptions,
utils::{
bench::{
bench_async_setup, create_identities_if_dont_exist, init_logging, Identity,
BENCH_ROOT_SPAN,
},
test::TestClient,
utils::bench::{
bench_async_setup, create_identities_if_dont_exist, init_logging, BenchClient, Identity,
BENCH_ROOT_SPAN,
},
Client,
};

pub type BenchClient = Client<TestClient>;

pub const IDENTITY_SAMPLES: [usize; 9] = [10, 20, 40, 80, 100, 200, 300, 400, 450];
pub const MAX_IDENTITIES: usize = 1_000;
pub const SAMPLE_SIZE: usize = 10;
Expand Down
49 changes: 3 additions & 46 deletions xmtp_mls/benches/identity.rs
Original file line number Diff line number Diff line change
@@ -1,24 +1,15 @@
use crate::tracing::Instrument;
use criterion::{criterion_group, criterion_main, BatchSize, Criterion};
use ethers::signers::LocalWallet;
use tokio::runtime::{Builder, Runtime};
use xmtp_id::{
associations::{
builder::SignatureRequest,
generate_inbox_id,
unverified::{UnverifiedRecoverableEcdsaSignature, UnverifiedSignature},
},
InboxOwner,
};
use xmtp_mls::utils::{bench::init_logging, test::TestClient as TestApiClient};
use xmtp_mls::{
client::Client,
identity::IdentityStrategy,
utils::bench::{bench_async_setup, BENCH_ROOT_SPAN},
};
use xmtp_proto::api_client::XmtpTestClient;

type BenchClient = Client<TestApiClient>;
use xmtp_mls::utils::bench::{bench_async_setup, BenchClient, BENCH_ROOT_SPAN};
use xmtp_mls::utils::bench::{clients, init_logging};

#[macro_use]
extern crate tracing;
Expand All @@ -32,40 +23,6 @@ fn setup() -> Runtime {
.unwrap()
}

async fn new_client() -> (BenchClient, LocalWallet) {
let nonce = 1;
let wallet = xmtp_cryptography::utils::generate_local_wallet();
let inbox_id = generate_inbox_id(&wallet.get_address(), &nonce).unwrap();

let dev = std::env::var("DEV_GRPC");
let is_dev_network = matches!(dev, Ok(d) if d == "true" || d == "1");

let api_client = if is_dev_network {
tracing::info!("Using Dev GRPC");
<TestApiClient as XmtpTestClient>::create_dev().await
} else {
tracing::info!("Using Local GRPC");
<TestApiClient as XmtpTestClient>::create_local().await
};

let client = BenchClient::builder(IdentityStrategy::new(
inbox_id,
wallet.get_address(),
nonce,
None,
));

let client = client
.temp_store()
.await
.api_client(api_client)
.build()
.await
.unwrap();

(client, wallet)
}

async fn ecdsa_signature(client: &BenchClient, owner: impl InboxOwner) -> SignatureRequest {
let mut signature_request = client.context().signature_request().unwrap();
let signature_text = signature_request.signature_text();
Expand All @@ -92,7 +49,7 @@ fn register_identity_eoa(c: &mut Criterion) {
b.to_async(&runtime).iter_batched(
|| {
bench_async_setup(|| async {
let (client, wallet) = new_client().await;
let (client, wallet) = clients::new_unregistered_client(false).await;
let signature_request = ecdsa_signature(&client, wallet).await;

(client, signature_request, span.clone())
Expand Down
56 changes: 56 additions & 0 deletions xmtp_mls/benches/sync.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
//! Benchmarking for syncing functions
use crate::tracing::Instrument;
use criterion::{criterion_group, criterion_main, BatchSize, Criterion};
use tokio::runtime::{Builder, Runtime};
use xmtp_mls::utils::bench::{bench_async_setup, BENCH_ROOT_SPAN};
use xmtp_mls::utils::bench::{clients, init_logging};

#[macro_use]
extern crate tracing;

fn setup() -> Runtime {
Builder::new_multi_thread()
.enable_time()
.enable_io()
.thread_name("xmtp-bencher")
.build()
.unwrap()
}

fn start_sync_worker(c: &mut Criterion) {
init_logging();

let runtime = setup();
let mut benchmark_group = c.benchmark_group("start_sync_worker");
benchmark_group.sample_size(10);
benchmark_group.bench_function("start_sync_worker", |b| {
let span = trace_span!(BENCH_ROOT_SPAN);
b.to_async(&runtime).iter_batched(
|| {
bench_async_setup(|| async {
let client = clients::new_client(true).await;
let provider = client.mls_provider().unwrap();
// set history sync URL
(client, provider, span.clone())
})
},
|(client, provider, span)| async move {
client
.start_sync_worker(&provider)
.instrument(span)
.await
.unwrap()
},
BatchSize::SmallInput,
)
});

benchmark_group.finish();
}

criterion_group!(
name = sync;
config = Criterion::default().sample_size(10);
targets = start_sync_worker
);
criterion_main!(sync);
7 changes: 6 additions & 1 deletion xmtp_mls/src/groups/device_sync.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ use rand::{
use serde::{Deserialize, Serialize};
use std::time::Duration;
use thiserror::Error;
use tracing::warn;
use tracing::{instrument, warn};
use xmtp_cryptography::utils as crypto_utils;
use xmtp_id::scw_verifier::SmartContractSignatureVerifier;
use xmtp_proto::api_client::trait_impls::XmtpApi;
Expand Down Expand Up @@ -118,6 +118,7 @@ where
ApiClient: XmtpApi + Send + Sync + 'static,
V: SmartContractSignatureVerifier + Send + Sync + 'static,
{
#[instrument(level = "trace", skip_all)]
pub async fn start_sync_worker(
&self,
provider: &XmtpOpenMlsProvider,
Expand Down Expand Up @@ -231,6 +232,7 @@ where
* Ideally called when the client is registered.
* Will auto-send a sync request if sync group is created.
*/
#[instrument(level = "trace", skip_all)]
pub async fn sync_init(&self, provider: &XmtpOpenMlsProvider) -> Result<(), DeviceSyncError> {
tracing::info!(
"Initializing device sync... url: {:?}",
Expand All @@ -249,6 +251,7 @@ where
Ok(())
}

#[instrument(level = "trace", skip_all)]
async fn ensure_sync_group(
&self,
provider: &XmtpOpenMlsProvider,
Expand All @@ -265,6 +268,7 @@ where
Ok(sync_group)
}

#[instrument(level = "trace", skip_all)]
pub async fn send_sync_request(
&self,
provider: &XmtpOpenMlsProvider,
Expand Down Expand Up @@ -585,6 +589,7 @@ where
Ok(())
}

#[instrument(level = "trace", skip_all)]
pub fn get_sync_group(&self) -> Result<MlsGroup<Self>, GroupError> {
let conn = self.store().conn()?;
let sync_group_id = conn
Expand Down
19 changes: 8 additions & 11 deletions xmtp_mls/src/groups/device_sync/message_sync.rs
Original file line number Diff line number Diff line change
Expand Up @@ -47,21 +47,19 @@ pub(crate) mod tests {
#[cfg(target_arch = "wasm32")]
wasm_bindgen_test::wasm_bindgen_test_configure!(run_in_dedicated_worker);

const HISTORY_SERVER_HOST: &str = "localhost";
const HISTORY_SERVER_PORT: u16 = 5558;

use super::*;
use crate::{assert_ok, builder::ClientBuilder, groups::GroupMetadataOptions};
use crate::{
assert_ok, builder::ClientBuilder, groups::GroupMetadataOptions,
utils::test::HISTORY_SYNC_URL,
};
use std::time::{Duration, Instant};
use xmtp_cryptography::utils::generate_local_wallet;
use xmtp_id::InboxOwner;

#[tokio::test(flavor = "multi_thread", worker_threads = 1)]
async fn test_message_history_sync() {
let history_sync_url = format!("http://{}:{}", HISTORY_SERVER_HOST, HISTORY_SERVER_PORT);

let wallet = generate_local_wallet();
let amal_a = ClientBuilder::new_test_client_with_history(&wallet, &history_sync_url).await;
let amal_a = ClientBuilder::new_test_client_with_history(&wallet, HISTORY_SYNC_URL).await;

let amal_a_provider = amal_a.mls_provider().unwrap();
let amal_a_conn = amal_a_provider.conn_ref();
Expand All @@ -87,7 +85,7 @@ pub(crate) mod tests {
assert_eq!(syncable_messages.len(), 2); // welcome message, and message that was just sent

// Create a second installation for amal.
let amal_b = ClientBuilder::new_test_client_with_history(&wallet, &history_sync_url).await;
let amal_b = ClientBuilder::new_test_client_with_history(&wallet, HISTORY_SYNC_URL).await;
let amal_b_provider = amal_b.mls_provider().unwrap();
let amal_b_conn = amal_b_provider.conn_ref();

Expand Down Expand Up @@ -161,16 +159,15 @@ pub(crate) mod tests {

#[tokio::test]
async fn test_externals_cant_join_sync_group() {
let history_sync_url = format!("http://{}:{}", HISTORY_SERVER_HOST, HISTORY_SERVER_PORT);
let wallet = generate_local_wallet();
let amal = ClientBuilder::new_test_client_with_history(&wallet, &history_sync_url).await;
let amal = ClientBuilder::new_test_client_with_history(&wallet, HISTORY_SYNC_URL).await;
amal.sync_welcomes(&amal.store().conn().unwrap())
.await
.expect("sync welcomes");

let external_wallet = generate_local_wallet();
let external_client =
ClientBuilder::new_test_client_with_history(&external_wallet, &history_sync_url).await;
ClientBuilder::new_test_client_with_history(&external_wallet, HISTORY_SYNC_URL).await;

external_client
.sync_welcomes(&external_client.store().conn().unwrap())
Expand Down
2 changes: 2 additions & 0 deletions xmtp_mls/src/subscriptions.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ use tokio::{
task::JoinHandle,
};
use tokio_stream::wrappers::BroadcastStream;
use tracing::instrument;
use xmtp_id::scw_verifier::SmartContractSignatureVerifier;
use xmtp_proto::{api_client::XmtpMlsStreams, xmtp::mls::api::v1::WelcomeMessage};

Expand Down Expand Up @@ -104,6 +105,7 @@ impl<C> StreamMessages<C> for broadcast::Receiver<LocalEvents<C>>
where
C: Clone + Send + Sync + 'static,
{
#[instrument(level = "trace", skip_all)]
fn stream_sync_messages(self) -> impl Stream<Item = Result<LocalEvents<C>, SubscribeError>> {
BroadcastStream::new(self).filter_map(|event| async {
crate::optify!(event, "Missed message due to event queue lag")
Expand Down
Loading

0 comments on commit 85dd6d3

Please sign in to comment.