diff --git a/.github/workflows/rust.yml b/.github/workflows/rust.yml index 6e934433204..b4fa0358f93 100644 --- a/.github/workflows/rust.yml +++ b/.github/workflows/rust.yml @@ -66,6 +66,10 @@ jobs: - name: Run some extra execution tests with wasmtime run: | cargo test --locked -p linera-execution --features wasmtime + - name: Run the benchmark test + run: | + cargo build --locked -p linera-service --bin linera-benchmark --features benchmark + cargo test --locked -p linera-service --features benchmark test_end_to_end_fungible_benchmark - name: Build Wasm test runner # use debug mode to avoid building wasmtime in release mode run: | diff --git a/Cargo.lock b/Cargo.lock index fddcf9ecf65..1b92f7aa9bf 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3441,6 +3441,7 @@ dependencies = [ "matching-engine", "parse_duration", "pathdiff", + "port-selector", "prometheus", "proptest", "rand 0.7.3", @@ -4446,6 +4447,15 @@ dependencies = [ "plotters-backend", ] +[[package]] +name = "port-selector" +version = "0.1.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "bd119ef551a50cd8939f0ff93bd062891f7b0dbb771b4a05df8a9c13aebaff68" +dependencies = [ + "rand 0.8.5", +] + [[package]] name = "powerfmt" version = "0.2.0" diff --git a/Cargo.toml b/Cargo.toml index 5407d7f928a..ec4aefd3cb2 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -72,6 +72,7 @@ linked-hash-map = "0.5.6" once_cell = "1.18.0" oneshot = "0.1.6" parse_duration = "2.1.1" +port-selector = "0.1.6" prettyplease = "0.2.15" prometheus = "0.13.3" proc-macro-error = "1.0.4" diff --git a/linera-service/Cargo.toml b/linera-service/Cargo.toml index 195fb46a593..424c5e40723 100644 --- a/linera-service/Cargo.toml +++ b/linera-service/Cargo.toml @@ -13,7 +13,7 @@ edition = "2021" [features] default = ["wasmer", "rocksdb"] test = ["linera-views/test"] -benchmark = ["linera-base/test"] +benchmark = ["linera-base/test", "dep:fungible", "dep:port-selector"] wasmer = ["linera-execution/wasmer", "linera-storage/wasmer"] wasmtime = ["linera-execution/wasmtime", "linera-storage/wasmtime"] rocksdb = ["linera-views/rocksdb", "linera-core/rocksdb", "linera-storage/rocksdb"] @@ -37,6 +37,7 @@ comfy-table = { workspace = true } current_platform = "0.2.0" dirs = { workspace = true } file-lock = "2.1.10" +fungible = { workspace = true, optional = true } futures = { workspace = true } hex = { workspace = true } http = { workspace = true } @@ -51,6 +52,7 @@ linera-storage = { workspace = true } linera-views = { workspace = true } parse_duration = { workspace = true } pathdiff = { workspace = true, optional = true } +port-selector = { workspace = true, optional = true } prometheus = { workspace = true } rand = { workspace = true } rand07 = { workspace = true } @@ -111,3 +113,8 @@ path = "src/proxy.rs" [[bin]] name = "linera-schema-export" path = "src/schema_export.rs" + +[[bin]] +name = "linera-benchmark" +path = "src/benchmark.rs" +required-features = ["benchmark"] diff --git a/linera-service/src/benchmark.rs b/linera-service/src/benchmark.rs new file mode 100644 index 00000000000..88469cc350d --- /dev/null +++ b/linera-service/src/benchmark.rs @@ -0,0 +1,279 @@ +// Copyright (c) Zefchain Labs, Inc. +// SPDX-License-Identifier: Apache-2.0 + +use anyhow::{bail, Context as _, Result}; +use clap::Parser as _; +use fungible::{Account, AccountOwner, FungibleTokenAbi, InitialState, Parameters}; +use futures::future::{join_all, try_join_all}; +use linera_base::{ + async_graphql::InputType, + data_types::Amount, + identifiers::{ApplicationId, ChainId, Owner}, +}; +use linera_execution::system::SystemChannel; +use linera_service::cli_wrappers::{ApplicationWrapper, ClientWrapper, Network}; +use port_selector::random_free_tcp_port; +use rand::{Rng as _, SeedableRng}; +use serde_json::Value; +use std::{collections::BTreeMap, path::Path, sync::Arc, time::Duration}; +use tempfile::tempdir; +use tokio::time::Instant; +use tracing::info; + +#[derive(clap::Parser)] +#[command( + name = "linera-benchmark", + version = clap::crate_version!(), + about = "Run benchmarks against a Linera network", +)] +enum Args { + Fungible { + /// The number of wallets in the test. + #[arg(long = "wallets", default_value = "4")] + wallets: usize, + + /// The number of transactions being made per wallet. + #[arg(long = "transactions", default_value = "4")] + transactions: usize, + + /// The faucet (which implicitly defines the network) + #[arg(long = "faucet", default_value = "http://faucet.devnet.linera.net")] + faucet: String, + + /// The seed for the PRNG determining the pattern of transactions. + #[arg(long = "seed", default_value = "0")] + seed: u64, + + #[arg(long = "uniform")] + /// If set, each chain receives the exact same amount of transfers. + uniform: bool, + }, +} + +#[tokio::main] +async fn main() -> Result<()> { + let args = Args::parse(); + match args { + Args::Fungible { + wallets, + transactions, + faucet, + seed, + uniform, + } => benchmark_with_fungible(wallets, transactions, faucet, seed, uniform).await, + } +} + +async fn benchmark_with_fungible( + num_wallets: usize, + num_transactions: usize, + faucet: String, + seed: u64, + uniform: bool, +) -> Result<()> { + // Create the clients and initialize the wallets + let dir = Arc::new(tempdir().context("cannot create temp dir")?); + let publisher = ClientWrapper::new(dir, Network::Grpc, None, num_wallets); + publisher.wallet_init(&[], Some(&faucet)).await?; + let clients = (0..num_wallets) + .map(|n| { + let dir = Arc::new(tempdir().context("cannot create temp dir")?); + Ok(ClientWrapper::new(dir, Network::Grpc, None, n)) + }) + .collect::, anyhow::Error>>()?; + try_join_all( + clients + .iter() + .map(|client| client.wallet_init(&[], Some(&faucet))), + ) + .await?; + + // Sync their balances (sanity check) + try_join_all(clients.iter().map(|user| async move { + let chain = user.default_chain().context("missing default chain")?; + let balance = user.synchronize_balance(chain).await?; + info!("User {:?} has {}", user.get_owner(), balance); + Ok::<_, anyhow::Error>(()) + })) + .await?; + + // Start the node services and subscribe to the publisher chain. + let publisher_chain_id = publisher.default_chain().context("missing default chain")?; + let mut services = Vec::new(); + for client in &clients { + let free_port = random_free_tcp_port().context("no free TCP port")?; + let chain_id = client.default_chain().context("missing default chain")?; + let node_service = client.run_node_service(free_port).await?; + let channel = SystemChannel::PublishedBytecodes; + node_service + .subscribe(chain_id, publisher_chain_id, channel) + .await?; + services.push(node_service); + } + + // Publish the fungible application bytecode. + let path = Path::new("examples/fungible").canonicalize().unwrap(); + let (contract, service) = publisher.build_application(&path, "fungible", true).await?; + let bytecode_id = publisher.publish_bytecode(contract, service, None).await?; + + struct BenchmarkContext { + application_id: ApplicationId, + owner: Owner, + default_chain: ChainId, + } + + // Create the fungible applications + let apps = try_join_all(clients.iter().zip(services).enumerate().map( + |(i, (client, node_service))| async move { + let owner = client.get_owner().context("missing owner")?; + let default_chain = client.default_chain().context("missing default chain")?; + let initial_state = InitialState { + accounts: BTreeMap::from([( + AccountOwner::User(owner), + Amount::from_tokens(num_transactions as u128), + )]), + }; + let parameters = Parameters::new(format!("FUN{}", i).leak()); + let application_id = node_service + .create_application::( + &default_chain, + &bytecode_id, + ¶meters, + &initial_state, + &[], + ) + .await?; + let context = BenchmarkContext { + application_id, + owner, + default_chain, + }; + let app = FungibleApp( + node_service + .make_application(&context.default_chain, &context.application_id) + .await?, + ); + Ok::<_, anyhow::Error>((app, context, node_service)) + }, + )) + .await?; + + // create transaction futures + let mut expected_balances = vec![vec![Amount::ZERO; apps.len()]; apps.len()]; + let mut rng = rand::rngs::SmallRng::seed_from_u64(seed); + + let transaction_futures = (0..num_transactions).flat_map(|transaction_i| { + apps.iter() + .enumerate() + .map(|(sender_i, (sender_app, sender_context, _))| { + let receiver_i = if uniform { + (transaction_i + sender_i + 1) % apps.len() + } else { + rng.gen_range(0..apps.len()) + }; + let (_, receiver_context, _) = &apps[receiver_i]; + expected_balances[receiver_i][sender_i] + .try_add_assign(Amount::ONE) + .unwrap(); + sender_app.transfer( + AccountOwner::User(sender_context.owner), + Amount::ONE, + Account { + chain_id: receiver_context.default_chain, + owner: AccountOwner::User(receiver_context.owner), + }, + ) + }) + .collect::>() + }); + + info!("Making {} transactions", num_wallets * num_transactions); + + let timer = Instant::now(); + + let results = join_all(transaction_futures).await; + let successes = results.into_iter().filter(Result::is_ok).count(); + + let tps: f64 = successes as f64 / timer.elapsed().as_secs_f64(); + + let failures = num_wallets * num_transactions - successes; + info!("Successes: {:?}", successes); + info!("Failures: {:?}", failures); + info!("TPS: {:.2}", tps); + println!( + "{{\ + \"successes\": {successes}, + \"failures\": {failures}, + \"tps\": {tps} + }}" + ); + + try_join_all(apps.iter().zip(expected_balances).map( + |((_, context, node_service), expected_balances)| { + try_join_all(apps.iter().zip(expected_balances).map( + |((_, sender_context, _), expected_balance)| async move { + let app = FungibleApp( + node_service + .make_application( + &context.default_chain, + &sender_context.application_id, + ) + .await?, + ); + for i in 0.. { + tokio::time::sleep(Duration::from_secs(i)).await; + let actual_balance = + app.get_amount(&AccountOwner::User(context.owner)).await; + if actual_balance == expected_balance { + break; + } + if i == 4 { + bail!( + "Expected balance: {}, actual balance: {}", + expected_balance, + actual_balance + ); + } + } + assert_eq!( + app.get_amount(&AccountOwner::User(context.owner)).await, + expected_balance + ); + Ok(()) + }, + )) + }, + )) + .await?; + + Ok(()) +} + +struct FungibleApp(ApplicationWrapper); + +impl FungibleApp { + async fn get_amount(&self, account_owner: &fungible::AccountOwner) -> Amount { + let query = format!( + "accounts {{ entry(key: {}) {{ value }} }}", + account_owner.to_value() + ); + let response_body = self.0.query(&query).await.unwrap(); + serde_json::from_value(response_body["accounts"]["entry"]["value"].clone()) + .unwrap_or_default() + } + + async fn transfer( + &self, + account_owner: fungible::AccountOwner, + amount_transfer: Amount, + destination: fungible::Account, + ) -> Result { + let mutation = format!( + "transfer(owner: {}, amount: \"{}\", targetAccount: {})", + account_owner.to_value(), + amount_transfer, + destination.to_value(), + ); + self.0.mutate(mutation).await + } +} diff --git a/linera-service/src/cli_wrappers/wallet.rs b/linera-service/src/cli_wrappers/wallet.rs index 99f77f667ad..1c52ccdeaab 100644 --- a/linera-service/src/cli_wrappers/wallet.rs +++ b/linera-service/src/cli_wrappers/wallet.rs @@ -829,8 +829,9 @@ impl NodeService { "mutation {{ subscribe(\ subscriberChainId: \"{subscriber_chain_id}\", \ publisherChainId: \"{publisher_chain_id}\", \ - channel: \"{channel}\") \ - }}" + channel: \"{}\") \ + }}", + channel.to_value(), ); self.query_node(query).await?; Ok(()) diff --git a/linera-service/tests/end_to_end_tests.rs b/linera-service/tests/end_to_end_tests.rs index 8d9866d7369..f13f9f3e0cd 100644 --- a/linera-service/tests/end_to_end_tests.rs +++ b/linera-service/tests/end_to_end_tests.rs @@ -2100,6 +2100,52 @@ async fn test_end_to_end_faucet(config: impl LineraNetConfig) { net.terminate().await.unwrap(); } +#[cfg(feature = "benchmark")] +#[cfg_attr(feature = "rocksdb", test_case(LocalNetTestingConfig::new(Database::RocksDb, Network::Grpc) ; "rocksdb_grpc"))] +#[cfg_attr(feature = "scylladb", test_case(LocalNetTestingConfig::new(Database::ScyllaDb, Network::Grpc) ; "scylladb_grpc"))] +#[cfg_attr(feature = "aws", test_case(LocalNetTestingConfig::new(Database::DynamoDb, Network::Grpc) ; "aws_grpc"))] +#[cfg_attr(feature = "kubernetes", test_case(SharedLocalKubernetesNetTestingConfig::new(Network::Grpc, None) ; "kubernetes_grpc"))] +#[test_log::test(tokio::test)] +async fn test_end_to_end_fungible_benchmark(config: impl LineraNetConfig) { + use linera_service::util::CommandExt as _; + use tokio::process::Command; + + let _guard = INTEGRATION_TEST_GUARD.lock().await; + + // Create runner and two clients. + let (mut net, client1) = config.instantiate().await.unwrap(); + + let chain1 = client1.get_wallet().unwrap().default_chain().unwrap(); + + let mut faucet = client1 + .run_faucet(None, chain1, Amount::from_tokens(1)) + .await + .unwrap(); + + let path = util::resolve_binary("linera-benchmark", env!("CARGO_PKG_NAME")) + .await + .unwrap(); + // The benchmark looks for examples/fungible, so it needs to run in the project root. + let current_dir = std::env::current_exe().unwrap(); + let dir = current_dir.ancestors().nth(4).unwrap(); + let mut command = Command::new(path); + command + .current_dir(dir) + .arg("fungible") + .args(["--wallets", "2"]) + .args(["--transactions", "1"]) + .arg("--uniform") + .args(["--faucet".to_string(), faucet.url()]); + let stdout = command.spawn_and_wait_for_stdout().await.unwrap(); + let json = serde_json::from_str::(&stdout).unwrap(); + assert_eq!(json["successes"], 2); + + faucet.ensure_is_running().unwrap(); + faucet.terminate().await.unwrap(); + net.ensure_is_running().await.unwrap(); + net.terminate().await.unwrap(); +} + #[cfg_attr(feature = "rocksdb", test_case(LocalNetTestingConfig::new(Database::RocksDb, Network::Grpc) ; "rocksdb_grpc"))] #[cfg_attr(feature = "scylladb", test_case(LocalNetTestingConfig::new(Database::ScyllaDb, Network::Grpc) ; "scylladb_grpc"))] #[cfg_attr(feature = "aws", test_case(LocalNetTestingConfig::new(Database::DynamoDb, Network::Grpc) ; "aws_grpc"))]