Skip to content

Commit

Permalink
runtime: validate transactions in parallel (#12654)
Browse files Browse the repository at this point in the history
In the old flow, validation (including signature checks) happened inside
`verify_and_charge_transaction()`. In the new flow, we first run
`parallel_validate_transactions()` to validate and compute the
transaction cost (including signature checks) in parallel. Only after a
transaction passes validation do we call
`verify_and_charge_transaction()` with the known `cost`.


```mermaid
graph TD;
    subgraph Old_Flow
    A[Runtime::apply]
    A-->B[self.process_transactions]
    B-->C[process_transaction]
    C-->D[verify_and_charge_transaction]
    D-->E[validate_transaction incl. signature]
    E-->|Valid|F[charge fees & finalize]
    E-->|Invalid|G[reject tx]
    end

    subgraph New_Flow
    A2[Runtime::apply]
    A2-->H[self.process_transactions]
    H-->|parallel|I[parallel_validate_transactions incl. signature]
    I-->|Valid, get cost|J[process_transaction]
    J-->K[verify_and_charge_transaction]
    K-->L[charge fees & finalize]
    I-->|Invalid|M[reject tx early]
    classDef threaded fill:#3240a8,stroke:#333,stroke-width:2px;
    class I threaded;
    end
```

(blue = running in parallel)

## Testing
The change brings ~20% throughput improvement on `n2d-standard-16`
```sh
1006/1006 blocks applied in 2m at a rate of 7.5129/s. 0s remaining. Skipped 0 blocks. Over last 100 blocks to height 1073: 0 empty blocks, averaging 413.00 Tgas per non-empty block
```
```sh
1007/1007 blocks applied in 2m at a rate of 8.9972/s. 0s remaining. Skipped 0 blocks. Over last 100 blocks to height 1073: 0 empty blocks, averaging 413.00 Tgas per non-empty block
```
Additionally, profiles before and after the change are available:
[before](https://share.firefox.dev/4a5W9tO),
[after](https://share.firefox.dev/40pd4oe)
  • Loading branch information
miloserdow authored Jan 15, 2025
1 parent cf45cc5 commit abe4a60
Show file tree
Hide file tree
Showing 4 changed files with 445 additions and 342 deletions.
56 changes: 33 additions & 23 deletions chain/chain/src/runtime/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -566,6 +566,17 @@ impl RuntimeAdapter for NightshadeRuntime {
}
}

let cost = match validate_transaction(
runtime_config,
gas_price,
transaction,
verify_signature,
current_protocol_version,
) {
Ok(cost) => cost,
Err(e) => return Ok(Some(e)),
};

if let Some(state_root) = state_root {
let shard_uid =
self.account_id_to_shard_uid(transaction.transaction.signer_id(), epoch_id)?;
Expand All @@ -574,9 +585,8 @@ impl RuntimeAdapter for NightshadeRuntime {
match verify_and_charge_transaction(
runtime_config,
&mut state_update,
gas_price,
transaction,
verify_signature,
&cost,
// here we do not know which block the transaction will be included
// and therefore skip the check on the nonce upper bound.
None,
Expand All @@ -586,17 +596,8 @@ impl RuntimeAdapter for NightshadeRuntime {
Err(e) => Ok(Some(e)),
}
} else {
// Doing basic validation without a state root
match validate_transaction(
runtime_config,
gas_price,
transaction,
verify_signature,
current_protocol_version,
) {
Ok(_) => Ok(None),
Err(e) => Ok(Some(e)),
}
// Without a state root, verification is skipped
Ok(None)
}
}

Expand Down Expand Up @@ -773,27 +774,36 @@ impl RuntimeAdapter for NightshadeRuntime {
continue;
}

// Verifying the validity of the transaction based on the current state.
match verify_and_charge_transaction(
let verify_result = validate_transaction(
runtime_config,
&mut state_update,
prev_block.next_gas_price,
&tx,
false,
Some(next_block_height),
true,
protocol_version,
) {
Ok(verification_result) => {
tracing::trace!(target: "runtime", tx=?tx.get_hash(), "including transaction that passed validation");
)
.and_then(|cost| {
verify_and_charge_transaction(
runtime_config,
&mut state_update,
&tx,
&cost,
Some(next_block_height),
protocol_version,
)
});

match verify_result {
Ok(cost) => {
tracing::trace!(target: "runtime", tx=?tx.get_hash(), "including transaction that passed validation and verification");
state_update.commit(StateChangeCause::NotWritableToDisk);
total_gas_burnt += verification_result.gas_burnt;
total_gas_burnt += cost.gas_burnt;
total_size += tx.get_size();
result.transactions.push(tx);
// Take one transaction from this group, no more.
break;
}
Err(err) => {
tracing::trace!(target: "runtime", tx=?tx.get_hash(), ?err, "discarding transaction that is invalid");
tracing::trace!(target: "runtime", tx=?tx.get_hash(), ?err, "discarding transaction that failed verification or verification");
rejected_invalid_tx += 1;
state_update.rollback();
}
Expand Down
11 changes: 9 additions & 2 deletions runtime/runtime-params-estimator/src/estimator_context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -453,12 +453,19 @@ impl Testbed<'_> {
let verify_signature = true;

let clock = GasCost::measure(metric);
node_runtime::verify_and_charge_transaction(
let cost = node_runtime::validate_transaction(
&self.apply_state.config,
&mut state_update,
gas_price,
tx,
verify_signature,
PROTOCOL_VERSION,
)
.expect("expected no validation error");
node_runtime::verify_and_charge_transaction(
&self.apply_state.config,
&mut state_update,
tx,
&cost,
block_height,
PROTOCOL_VERSION,
)
Expand Down
101 changes: 81 additions & 20 deletions runtime/runtime/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ pub use crate::verifier::{
validate_transaction, verify_and_charge_transaction, ZERO_BALANCE_ACCOUNT_STORAGE_LIMIT,
};
use bandwidth_scheduler::{run_bandwidth_scheduler, BandwidthSchedulerOutput};
use config::total_prepaid_send_fees;
use config::{total_prepaid_send_fees, TransactionCost};
pub use congestion_control::bootstrap_congestion_info;
use congestion_control::ReceiptSink;
use itertools::Itertools;
Expand Down Expand Up @@ -70,6 +70,7 @@ use near_vm_runner::ContractCode;
use near_vm_runner::ContractRuntimeCache;
use near_vm_runner::ProfileDataV3;
use pipelining::ReceiptPreparationPipeline;
use rayon::prelude::*;
use std::cmp::max;
use std::collections::{HashMap, HashSet, VecDeque};
use std::sync::Arc;
Expand Down Expand Up @@ -289,6 +290,28 @@ impl Runtime {
debug!(target: "runtime", "{}", log_str);
}

/// Validates all transactions in parallel and returns an iterator of
/// transactions paired with their validation results.
///
/// Returns an `Iterator` of `(&SignedTransaction, Result<TransactionCost, InvalidTxError>)`
fn parallel_validate_transactions<'a>(
config: &'a RuntimeConfig,
gas_price: Balance,
transactions: &'a [SignedTransaction],
current_protocol_version: ProtocolVersion,
) -> impl Iterator<Item = (&'a SignedTransaction, Result<TransactionCost, InvalidTxError>)>
{
let results: Vec<_> = transactions
.par_iter()
.map(move |tx| {
let cost_result =
validate_transaction(config, gas_price, tx, true, current_protocol_version);
(tx, cost_result)
})
.collect();
results.into_iter()
}

/// Takes one signed transaction, verifies it and converts it to a receipt.
///
/// Add the produced receipt either to the new local receipts if the signer is the same
Expand All @@ -312,6 +335,7 @@ impl Runtime {
state_update: &mut TrieUpdate,
apply_state: &ApplyState,
signed_transaction: &SignedTransaction,
transaction_cost: &TransactionCost,
stats: &mut ApplyStats,
) -> Result<(Receipt, ExecutionOutcomeWithId), InvalidTxError> {
let span = tracing::Span::current();
Expand All @@ -320,9 +344,8 @@ impl Runtime {
match verify_and_charge_transaction(
&apply_state.config,
state_update,
apply_state.gas_price,
signed_transaction,
true,
transaction_cost,
Some(apply_state.block_height),
apply_state.current_protocol_version,
) {
Expand Down Expand Up @@ -1573,6 +1596,32 @@ impl Runtime {
state_update.commit(StateChangeCause::Migration);
}

/// Helper function that checks `RelaxedChunkValidation`. If it is enabled, we log a debug
/// message and return `Ok(())` to skip the transaction. Otherwise, we return `Err(e.into())`.
fn handle_invalid_transaction<E: std::fmt::Debug + Clone + Into<RuntimeError>>(
e: E,
tx_hash: &CryptoHash,
protocol_version: ProtocolVersion,
reason: &str,
) -> Result<(), RuntimeError> {
if checked_feature!(
"protocol_feature_relaxed_chunk_validation",
RelaxedChunkValidation,
protocol_version
) {
tracing::debug!(
target: "runtime",
"invalid transaction ignored ({}) => tx_hash: {}, error: {:?}",
reason,
tx_hash,
e
);
Ok(())
} else {
Err(e.into())
}
}

/// Processes a collection of transactions.
///
/// Fills the `processing_state` with local receipts generated during processing of the
Expand All @@ -1589,31 +1638,43 @@ impl Runtime {
let apply_state = &mut processing_state.apply_state;
let state_update = &mut processing_state.state_update;

for signed_transaction in processing_state.transactions.iter_nonexpired_transactions() {
for (signed_transaction, maybe_cost) in Self::parallel_validate_transactions(
&apply_state.config,
apply_state.gas_price,
&processing_state.transactions.transactions,
apply_state.current_protocol_version,
) {
let tx_hash = signed_transaction.get_hash();
let cost = match maybe_cost {
Ok(c) => c,
Err(e) => {
Self::handle_invalid_transaction(
e.clone(),
&tx_hash,
processing_state.protocol_version,
"parallel validation error",
)?;
continue;
}
};

let tx_result = self.process_transaction(
state_update,
apply_state,
signed_transaction,
&cost,
&mut processing_state.stats,
);
let (receipt, outcome_with_id) = match tx_result {
Ok(v) => v,
Ok(outcome) => outcome,
Err(e) => {
if checked_feature!(
"protocol_feature_relaxed_chunk_validation",
RelaxedChunkValidation,
processing_state.protocol_version
) {
// NB: number of invalid transactions are noted in metrics.
tracing::debug!(
target: "runtime",
message="invalid transaction ignored",
tx_hash=%signed_transaction.get_hash()
);
continue;
} else {
return Err(e.into());
}
Self::handle_invalid_transaction(
e,
&tx_hash,
processing_state.protocol_version,
"process_transaction error",
)?;
continue;
}
};
if receipt.receiver_id() == signed_transaction.transaction.signer_id() {
Expand Down
Loading

0 comments on commit abe4a60

Please sign in to comment.