Skip to content

Commit

Permalink
Add a benchmark command using the fungible application. (linera-io#1433)
Browse files Browse the repository at this point in the history
* WIP

* WIP

* Fix benchmarks build.

* Verify balances after benchmark.

* Add a seed argument for the PRNG.

* Minor cleanups.

* Remove n_apps; make each sender equally busy.

* Publish bytecode only once for benchmarks.

* Add --uniform option.

* Revert some unrelated changes.

* Address comments; print JSON.

* Improve error handling.

* Add an end-to-end test for the benchmark.

* Create node services sequentially to avoid TCP port conflicts.

---------

Co-authored-by: Christos Hadjiaslanis <[email protected]>
  • Loading branch information
afck and christos-h authored Jan 4, 2024
1 parent f72e5af commit 33673a1
Show file tree
Hide file tree
Showing 7 changed files with 351 additions and 3 deletions.
4 changes: 4 additions & 0 deletions .github/workflows/rust.yml
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,10 @@ jobs:
- name: Run some extra execution tests with wasmtime
run: |
cargo test --locked -p linera-execution --features wasmtime
- name: Run the benchmark test
run: |
cargo build --locked -p linera-service --bin linera-benchmark --features benchmark
cargo test --locked -p linera-service --features benchmark test_end_to_end_fungible_benchmark
- name: Build Wasm test runner
# use debug mode to avoid building wasmtime in release mode
run: |
Expand Down
10 changes: 10 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,7 @@ linked-hash-map = "0.5.6"
once_cell = "1.18.0"
oneshot = "0.1.6"
parse_duration = "2.1.1"
port-selector = "0.1.6"
prettyplease = "0.2.15"
prometheus = "0.13.3"
proc-macro-error = "1.0.4"
Expand Down
9 changes: 8 additions & 1 deletion linera-service/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ edition = "2021"
[features]
default = ["wasmer", "rocksdb"]
test = ["linera-views/test"]
benchmark = ["linera-base/test"]
benchmark = ["linera-base/test", "dep:fungible", "dep:port-selector"]
wasmer = ["linera-execution/wasmer", "linera-storage/wasmer"]
wasmtime = ["linera-execution/wasmtime", "linera-storage/wasmtime"]
rocksdb = ["linera-views/rocksdb", "linera-core/rocksdb", "linera-storage/rocksdb"]
Expand All @@ -37,6 +37,7 @@ comfy-table = { workspace = true }
current_platform = "0.2.0"
dirs = { workspace = true }
file-lock = "2.1.10"
fungible = { workspace = true, optional = true }
futures = { workspace = true }
hex = { workspace = true }
http = { workspace = true }
Expand All @@ -51,6 +52,7 @@ linera-storage = { workspace = true }
linera-views = { workspace = true }
parse_duration = { workspace = true }
pathdiff = { workspace = true, optional = true }
port-selector = { workspace = true, optional = true }
prometheus = { workspace = true }
rand = { workspace = true }
rand07 = { workspace = true }
Expand Down Expand Up @@ -111,3 +113,8 @@ path = "src/proxy.rs"
[[bin]]
name = "linera-schema-export"
path = "src/schema_export.rs"

[[bin]]
name = "linera-benchmark"
path = "src/benchmark.rs"
required-features = ["benchmark"]
279 changes: 279 additions & 0 deletions linera-service/src/benchmark.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,279 @@
// Copyright (c) Zefchain Labs, Inc.
// SPDX-License-Identifier: Apache-2.0

use anyhow::{bail, Context as _, Result};
use clap::Parser as _;
use fungible::{Account, AccountOwner, FungibleTokenAbi, InitialState, Parameters};
use futures::future::{join_all, try_join_all};
use linera_base::{
async_graphql::InputType,
data_types::Amount,
identifiers::{ApplicationId, ChainId, Owner},
};
use linera_execution::system::SystemChannel;
use linera_service::cli_wrappers::{ApplicationWrapper, ClientWrapper, Network};
use port_selector::random_free_tcp_port;
use rand::{Rng as _, SeedableRng};
use serde_json::Value;
use std::{collections::BTreeMap, path::Path, sync::Arc, time::Duration};
use tempfile::tempdir;
use tokio::time::Instant;
use tracing::info;

#[derive(clap::Parser)]
#[command(
name = "linera-benchmark",
version = clap::crate_version!(),
about = "Run benchmarks against a Linera network",
)]
enum Args {
Fungible {
/// The number of wallets in the test.
#[arg(long = "wallets", default_value = "4")]
wallets: usize,

/// The number of transactions being made per wallet.
#[arg(long = "transactions", default_value = "4")]
transactions: usize,

/// The faucet (which implicitly defines the network)
#[arg(long = "faucet", default_value = "http://faucet.devnet.linera.net")]
faucet: String,

/// The seed for the PRNG determining the pattern of transactions.
#[arg(long = "seed", default_value = "0")]
seed: u64,

#[arg(long = "uniform")]
/// If set, each chain receives the exact same amount of transfers.
uniform: bool,
},
}

#[tokio::main]
async fn main() -> Result<()> {
let args = Args::parse();
match args {
Args::Fungible {
wallets,
transactions,
faucet,
seed,
uniform,
} => benchmark_with_fungible(wallets, transactions, faucet, seed, uniform).await,
}
}

async fn benchmark_with_fungible(
num_wallets: usize,
num_transactions: usize,
faucet: String,
seed: u64,
uniform: bool,
) -> Result<()> {
// Create the clients and initialize the wallets
let dir = Arc::new(tempdir().context("cannot create temp dir")?);
let publisher = ClientWrapper::new(dir, Network::Grpc, None, num_wallets);
publisher.wallet_init(&[], Some(&faucet)).await?;
let clients = (0..num_wallets)
.map(|n| {
let dir = Arc::new(tempdir().context("cannot create temp dir")?);
Ok(ClientWrapper::new(dir, Network::Grpc, None, n))
})
.collect::<Result<Vec<_>, anyhow::Error>>()?;
try_join_all(
clients
.iter()
.map(|client| client.wallet_init(&[], Some(&faucet))),
)
.await?;

// Sync their balances (sanity check)
try_join_all(clients.iter().map(|user| async move {
let chain = user.default_chain().context("missing default chain")?;
let balance = user.synchronize_balance(chain).await?;
info!("User {:?} has {}", user.get_owner(), balance);
Ok::<_, anyhow::Error>(())
}))
.await?;

// Start the node services and subscribe to the publisher chain.
let publisher_chain_id = publisher.default_chain().context("missing default chain")?;
let mut services = Vec::new();
for client in &clients {
let free_port = random_free_tcp_port().context("no free TCP port")?;
let chain_id = client.default_chain().context("missing default chain")?;
let node_service = client.run_node_service(free_port).await?;
let channel = SystemChannel::PublishedBytecodes;
node_service
.subscribe(chain_id, publisher_chain_id, channel)
.await?;
services.push(node_service);
}

// Publish the fungible application bytecode.
let path = Path::new("examples/fungible").canonicalize().unwrap();
let (contract, service) = publisher.build_application(&path, "fungible", true).await?;
let bytecode_id = publisher.publish_bytecode(contract, service, None).await?;

struct BenchmarkContext {
application_id: ApplicationId<FungibleTokenAbi>,
owner: Owner,
default_chain: ChainId,
}

// Create the fungible applications
let apps = try_join_all(clients.iter().zip(services).enumerate().map(
|(i, (client, node_service))| async move {
let owner = client.get_owner().context("missing owner")?;
let default_chain = client.default_chain().context("missing default chain")?;
let initial_state = InitialState {
accounts: BTreeMap::from([(
AccountOwner::User(owner),
Amount::from_tokens(num_transactions as u128),
)]),
};
let parameters = Parameters::new(format!("FUN{}", i).leak());
let application_id = node_service
.create_application::<FungibleTokenAbi>(
&default_chain,
&bytecode_id,
&parameters,
&initial_state,
&[],
)
.await?;
let context = BenchmarkContext {
application_id,
owner,
default_chain,
};
let app = FungibleApp(
node_service
.make_application(&context.default_chain, &context.application_id)
.await?,
);
Ok::<_, anyhow::Error>((app, context, node_service))
},
))
.await?;

// create transaction futures
let mut expected_balances = vec![vec![Amount::ZERO; apps.len()]; apps.len()];
let mut rng = rand::rngs::SmallRng::seed_from_u64(seed);

let transaction_futures = (0..num_transactions).flat_map(|transaction_i| {
apps.iter()
.enumerate()
.map(|(sender_i, (sender_app, sender_context, _))| {
let receiver_i = if uniform {
(transaction_i + sender_i + 1) % apps.len()
} else {
rng.gen_range(0..apps.len())
};
let (_, receiver_context, _) = &apps[receiver_i];
expected_balances[receiver_i][sender_i]
.try_add_assign(Amount::ONE)
.unwrap();
sender_app.transfer(
AccountOwner::User(sender_context.owner),
Amount::ONE,
Account {
chain_id: receiver_context.default_chain,
owner: AccountOwner::User(receiver_context.owner),
},
)
})
.collect::<Vec<_>>()
});

info!("Making {} transactions", num_wallets * num_transactions);

let timer = Instant::now();

let results = join_all(transaction_futures).await;
let successes = results.into_iter().filter(Result::is_ok).count();

let tps: f64 = successes as f64 / timer.elapsed().as_secs_f64();

let failures = num_wallets * num_transactions - successes;
info!("Successes: {:?}", successes);
info!("Failures: {:?}", failures);
info!("TPS: {:.2}", tps);
println!(
"{{\
\"successes\": {successes},
\"failures\": {failures},
\"tps\": {tps}
}}"
);

try_join_all(apps.iter().zip(expected_balances).map(
|((_, context, node_service), expected_balances)| {
try_join_all(apps.iter().zip(expected_balances).map(
|((_, sender_context, _), expected_balance)| async move {
let app = FungibleApp(
node_service
.make_application(
&context.default_chain,
&sender_context.application_id,
)
.await?,
);
for i in 0.. {
tokio::time::sleep(Duration::from_secs(i)).await;
let actual_balance =
app.get_amount(&AccountOwner::User(context.owner)).await;
if actual_balance == expected_balance {
break;
}
if i == 4 {
bail!(
"Expected balance: {}, actual balance: {}",
expected_balance,
actual_balance
);
}
}
assert_eq!(
app.get_amount(&AccountOwner::User(context.owner)).await,
expected_balance
);
Ok(())
},
))
},
))
.await?;

Ok(())
}

struct FungibleApp(ApplicationWrapper<fungible::FungibleTokenAbi>);

impl FungibleApp {
async fn get_amount(&self, account_owner: &fungible::AccountOwner) -> Amount {
let query = format!(
"accounts {{ entry(key: {}) {{ value }} }}",
account_owner.to_value()
);
let response_body = self.0.query(&query).await.unwrap();
serde_json::from_value(response_body["accounts"]["entry"]["value"].clone())
.unwrap_or_default()
}

async fn transfer(
&self,
account_owner: fungible::AccountOwner,
amount_transfer: Amount,
destination: fungible::Account,
) -> Result<Value> {
let mutation = format!(
"transfer(owner: {}, amount: \"{}\", targetAccount: {})",
account_owner.to_value(),
amount_transfer,
destination.to_value(),
);
self.0.mutate(mutation).await
}
}
5 changes: 3 additions & 2 deletions linera-service/src/cli_wrappers/wallet.rs
Original file line number Diff line number Diff line change
Expand Up @@ -829,8 +829,9 @@ impl NodeService {
"mutation {{ subscribe(\
subscriberChainId: \"{subscriber_chain_id}\", \
publisherChainId: \"{publisher_chain_id}\", \
channel: \"{channel}\") \
}}"
channel: \"{}\") \
}}",
channel.to_value(),
);
self.query_node(query).await?;
Ok(())
Expand Down
Loading

0 comments on commit 33673a1

Please sign in to comment.