From 84feafd4d48757716d7d142c106fa71133d881e7 Mon Sep 17 00:00:00 2001 From: Igor Date: Wed, 6 Nov 2024 13:06:14 -0800 Subject: [PATCH] reuse code to simplify and avoid speculation log warnings --- .../aptos-debugger/src/aptos_debugger.rs | 7 +- .../src/transaction_bench_state.rs | 6 +- aptos-move/aptos-vm/src/aptos_vm.rs | 10 +-- aptos-move/aptos-vm/src/block_executor/mod.rs | 61 +++++++++------ .../aptos-vm/src/block_executor/vm_wrapper.rs | 2 +- .../sharded_executor_service.rs | 4 +- aptos-move/e2e-tests/src/executor.rs | 4 +- .../src/native/native_vm.rs | 75 +++++-------------- .../parallel_uncoordinated_block_executor.rs | 51 ++++++++----- 9 files changed, 107 insertions(+), 113 deletions(-) diff --git a/aptos-move/aptos-debugger/src/aptos_debugger.rs b/aptos-move/aptos-debugger/src/aptos_debugger.rs index 743bea9f096167..51ef028044f80c 100644 --- a/aptos-move/aptos-debugger/src/aptos_debugger.rs +++ b/aptos-move/aptos-debugger/src/aptos_debugger.rs @@ -23,7 +23,7 @@ use aptos_validator_interface::{ AptosValidatorInterface, DBDebuggerInterface, DebuggerStateView, RestDebuggerInterface, }; use aptos_vm::{ - block_executor::{AptosTransactionOutput, BlockAptosVM}, + block_executor::{AptosTransactionOutput, AptosVMBlockExecutorWrapper}, data_cache::AsMoveResolver, AptosVM, }; @@ -428,7 +428,10 @@ fn execute_block_no_limit( state_view: &DebuggerStateView, concurrency_level: usize, ) -> Result, VMStatus> { - BlockAptosVM::execute_block::<_, NoOpTransactionCommitHook>( + AptosVMBlockExecutorWrapper::execute_block::< + _, + NoOpTransactionCommitHook, + >( sig_verified_txns, state_view, BlockExecutorConfig { diff --git a/aptos-move/aptos-transaction-benchmarks/src/transaction_bench_state.rs b/aptos-move/aptos-transaction-benchmarks/src/transaction_bench_state.rs index d3a71a1d228626..ca85ac3827c5a5 100644 --- a/aptos-move/aptos-transaction-benchmarks/src/transaction_bench_state.rs +++ b/aptos-move/aptos-transaction-benchmarks/src/transaction_bench_state.rs @@ -30,7 +30,7 @@ use aptos_types::{ vm_status::VMStatus, }; use aptos_vm::{ - block_executor::{AptosTransactionOutput, BlockAptosVM}, + block_executor::{AptosTransactionOutput, AptosVMBlockExecutorWrapper}, data_cache::AsMoveResolver, sharded_block_executor::{ local_executor_shard::{LocalExecutorClient, LocalExecutorService}, @@ -212,7 +212,7 @@ where ) -> (Vec, usize) { let block_size = transactions.len(); let timer = Instant::now(); - let output = BlockAptosVM::execute_block::< + let output = AptosVMBlockExecutorWrapper::execute_block::< _, NoOpTransactionCommitHook, >( @@ -260,7 +260,7 @@ where ) -> (Vec, usize) { let block_size = transactions.len(); let timer = Instant::now(); - let output = BlockAptosVM::execute_block::< + let output = AptosVMBlockExecutorWrapper::execute_block::< _, NoOpTransactionCommitHook, >( diff --git a/aptos-move/aptos-vm/src/aptos_vm.rs b/aptos-move/aptos-vm/src/aptos_vm.rs index 5b653814459674..fe236a58481508 100644 --- a/aptos-move/aptos-vm/src/aptos_vm.rs +++ b/aptos-move/aptos-vm/src/aptos_vm.rs @@ -3,7 +3,7 @@ // SPDX-License-Identifier: Apache-2.0 use crate::{ - block_executor::{AptosTransactionOutput, BlockAptosVM}, + block_executor::{AptosTransactionOutput, AptosVMBlockExecutorWrapper}, counters::*, data_cache::{AsMoveResolver, StorageAdapter}, errors::{discarded_output, expect_only_successful_execution}, @@ -22,8 +22,8 @@ use crate::{ sharded_block_executor::{executor_client::ExecutorClient, ShardedBlockExecutor}, system_module_names::*, transaction_metadata::TransactionMetadata, - transaction_validation, verifier, - verifier::randomness::get_randomness_annotation, + transaction_validation, + verifier::{self, randomness::get_randomness_annotation}, VMBlockExecutor, VMValidator, }; use anyhow::anyhow; @@ -2768,7 +2768,7 @@ impl AptosVM { // TODO - move out from this file? -/// Production implementation of TransactionBlockExecutor. +/// Production implementation of VMBlockExecutor. /// /// Transaction execution: AptosVM /// Executing conflicts: in the input order, via BlockSTM, @@ -2811,7 +2811,7 @@ impl VMBlockExecutor for AptosVMBlockExecutor { ); let count = transactions.len(); - let ret = BlockAptosVM::execute_block::< + let ret = AptosVMBlockExecutorWrapper::execute_block::< _, NoOpTransactionCommitHook, >( diff --git a/aptos-move/aptos-vm/src/block_executor/mod.rs b/aptos-move/aptos-vm/src/block_executor/mod.rs index 25d5d97d7d5234..1797f9fb6ceab0 100644 --- a/aptos-move/aptos-vm/src/block_executor/mod.rs +++ b/aptos-move/aptos-vm/src/block_executor/mod.rs @@ -4,17 +4,17 @@ pub(crate) mod vm_wrapper; -use crate::{ - block_executor::vm_wrapper::AptosExecutorTask, - counters::{BLOCK_EXECUTOR_CONCURRENCY, BLOCK_EXECUTOR_EXECUTE_BLOCK_SECONDS}, -}; +use crate::counters::{BLOCK_EXECUTOR_CONCURRENCY, BLOCK_EXECUTOR_EXECUTE_BLOCK_SECONDS}; use aptos_aggregator::{ delayed_change::DelayedChange, delta_change_set::DeltaOp, resolver::TAggregatorV1View, }; use aptos_block_executor::{ - code_cache_global::ImmutableModuleCache, errors::BlockExecutionError, executor::BlockExecutor, - task::TransactionOutput as BlockExecutorTransactionOutput, - txn_commit_hook::TransactionCommitHook, types::InputOutputKey, + code_cache_global::ImmutableModuleCache, + errors::BlockExecutionError, + executor::BlockExecutor, + task::{ExecutorTask, TransactionOutput as BlockExecutorTransactionOutput}, + txn_commit_hook::TransactionCommitHook, + types::InputOutputKey, }; use aptos_infallible::Mutex; use aptos_types::{ @@ -49,9 +49,11 @@ use once_cell::sync::{Lazy, OnceCell}; use std::{ collections::{BTreeMap, HashSet}, hash::Hash, + marker::PhantomData, ops::Deref, sync::Arc, }; +use vm_wrapper::AptosExecutorTask; static RAYON_EXEC_POOL: Lazy> = Lazy::new(|| { Arc::new( @@ -138,7 +140,7 @@ impl AptosTransactionOutput { self.committed_output.get().unwrap() } - pub fn take_output(mut self) -> TransactionOutput { + fn take_output(mut self) -> TransactionOutput { match self.committed_output.take() { Some(output) => output, // TODO: revisit whether we should always get it via committed, or o.w. create a @@ -445,9 +447,26 @@ impl BlockExecutorTransactionOutput for AptosTransactionOutput { } } -pub struct BlockAptosVM; +pub struct AptosBlockExecutorWrapper< + E: ExecutorTask< + Txn = SignatureVerifiedTransaction, + Error = VMStatus, + Output = AptosTransactionOutput, + Environment = AptosEnvironment, + >, +> { + _phantom: PhantomData, +} -impl BlockAptosVM { +impl< + E: ExecutorTask< + Txn = SignatureVerifiedTransaction, + Error = VMStatus, + Output = AptosTransactionOutput, + Environment = AptosEnvironment, + >, + > AptosBlockExecutorWrapper +{ fn execute_block_on_thread_pool< S: StateView + Sync, L: TransactionCommitHook, @@ -476,18 +495,13 @@ impl BlockAptosVM { global_module_cache.as_ref(), )?; - let executor = BlockExecutor::< - SignatureVerifiedTransaction, - AptosExecutorTask, - S, - L, - ExecutableTestType, - >::new( - config, - executor_thread_pool, - global_module_cache, - transaction_commit_listener, - ); + let executor = + BlockExecutor::::new( + config, + executor_thread_pool, + global_module_cache, + transaction_commit_listener, + ); let ret = executor.execute_block(environment, signature_verified_block, state_view); match ret { @@ -561,6 +575,9 @@ impl BlockAptosVM { } } +// Same as AptosBlockExecutorWrapper with AptosExecutorTask +pub type AptosVMBlockExecutorWrapper = AptosBlockExecutorWrapper; + #[cfg(test)] mod test { use super::*; diff --git a/aptos-move/aptos-vm/src/block_executor/vm_wrapper.rs b/aptos-move/aptos-vm/src/block_executor/vm_wrapper.rs index 306e847eae6c7f..21b9a7bf2965e1 100644 --- a/aptos-move/aptos-vm/src/block_executor/vm_wrapper.rs +++ b/aptos-move/aptos-vm/src/block_executor/vm_wrapper.rs @@ -21,7 +21,7 @@ use aptos_vm_types::{ use fail::fail_point; use move_core_types::vm_status::{StatusCode, VMStatus}; -pub(crate) struct AptosExecutorTask { +pub struct AptosExecutorTask { vm: AptosVM, id: StateViewId, } diff --git a/aptos-move/aptos-vm/src/sharded_block_executor/sharded_executor_service.rs b/aptos-move/aptos-vm/src/sharded_block_executor/sharded_executor_service.rs index efe860c37103e4..35b89a4831b636 100644 --- a/aptos-move/aptos-vm/src/sharded_block_executor/sharded_executor_service.rs +++ b/aptos-move/aptos-vm/src/sharded_block_executor/sharded_executor_service.rs @@ -2,7 +2,7 @@ // SPDX-License-Identifier: Apache-2.0 use crate::{ - block_executor::BlockAptosVM, + block_executor::AptosVMBlockExecutorWrapper, sharded_block_executor::{ aggr_overridden_state_view::{AggregatorOverriddenStateView, TOTAL_SUPPLY_AGGR_BASE_VAL}, coordinator_client::CoordinatorClient, @@ -135,7 +135,7 @@ impl ShardedExecutorService { ); }); s.spawn(move |_| { - let ret = BlockAptosVM::execute_block_on_thread_pool_without_global_module_cache( + let ret = AptosVMBlockExecutorWrapper::execute_block_on_thread_pool_without_global_module_cache( executor_thread_pool, &signature_verified_transactions, aggr_overridden_state_view.as_ref(), diff --git a/aptos-move/e2e-tests/src/executor.rs b/aptos-move/e2e-tests/src/executor.rs index b97961dad4efc8..7b3f04c26f73e7 100644 --- a/aptos-move/e2e-tests/src/executor.rs +++ b/aptos-move/e2e-tests/src/executor.rs @@ -47,7 +47,7 @@ use aptos_types::{ AptosCoinType, CoinType, }; use aptos_vm::{ - block_executor::{AptosTransactionOutput, BlockAptosVM}, + block_executor::{AptosTransactionOutput, AptosVMBlockExecutorWrapper}, data_cache::AsMoveResolver, gas::make_prod_gas_meter, move_vm_ext::{MoveVmExt, SessionExt, SessionId}, @@ -636,7 +636,7 @@ impl FakeExecutor { }, onchain: onchain_config, }; - BlockAptosVM::execute_block_on_thread_pool_without_global_module_cache::< + AptosVMBlockExecutorWrapper::execute_block_on_thread_pool_without_global_module_cache::< _, NoOpTransactionCommitHook, >( diff --git a/execution/executor-benchmark/src/native/native_vm.rs b/execution/executor-benchmark/src/native/native_vm.rs index 7d032db683a5c9..8691b3aea64895 100644 --- a/execution/executor-benchmark/src/native/native_vm.rs +++ b/execution/executor-benchmark/src/native/native_vm.rs @@ -16,8 +16,6 @@ use aptos_aggregator::{ delta_math::DeltaHistory, }; use aptos_block_executor::{ - code_cache_global::ImmutableModuleCache, - errors::BlockExecutionError, task::{ExecutionStatus, ExecutorTask}, txn_commit_hook::NoOpTransactionCommitHook, }; @@ -34,8 +32,6 @@ use aptos_types::{ BlockExecutorConfig, BlockExecutorConfigFromOnchain, BlockExecutorLocalConfig, }, contract_event::ContractEvent, - error::PanicError, - executable::ExecutableTestType, fee_statement::FeeStatement, move_utils::move_event_v2::MoveEventV2Type, on_chain_config::FeatureFlag, @@ -47,7 +43,10 @@ use aptos_types::{ write_set::WriteOp, AptosCoinType, }; -use aptos_vm::{block_executor::AptosTransactionOutput, VMBlockExecutor}; +use aptos_vm::{ + block_executor::{AptosBlockExecutorWrapper, AptosTransactionOutput}, + VMBlockExecutor, +}; use aptos_vm_environment::environment::AptosEnvironment; use aptos_vm_types::{ abstract_write_op::{ @@ -63,7 +62,7 @@ use bytes::Bytes; use move_core_types::{ language_storage::StructTag, value::{IdentifierMappingKind, MoveStructLayout, MoveTypeLayout}, - vm_status::{StatusCode, VMStatus}, + vm_status::VMStatus, }; use move_vm_types::delayed_values::delayed_field_id::DelayedFieldID; use serde::{de::DeserializeOwned, Serialize}; @@ -88,60 +87,20 @@ impl VMBlockExecutor for NativeVMBlockExecutor { state_view: &(impl StateView + Sync), onchain_config: BlockExecutorConfigFromOnchain, ) -> Result, VMStatus> { - Self::execute_block_impl(transactions, state_view, BlockExecutorConfig { - local: BlockExecutorLocalConfig { - concurrency_level: NativeConfig::get_concurrency_level(), - allow_fallback: false, - discard_failed_blocks: false, - }, - onchain: onchain_config, - }) - } -} - -impl NativeVMBlockExecutor { - pub fn execute_block_impl( - signature_verified_block: &[SignatureVerifiedTransaction], - state_view: &S, - config: BlockExecutorConfig, - ) -> Result, VMStatus> { - let executor = aptos_block_executor::executor::BlockExecutor::< - SignatureVerifiedTransaction, - NativeVMExecutorTask, - S, - NoOpTransactionCommitHook, - ExecutableTestType, - >::new( - config, + AptosBlockExecutorWrapper::::execute_block_on_thread_pool_without_global_module_cache::<_, NoOpTransactionCommitHook>( Arc::clone(&NATIVE_EXECUTOR_POOL), - Arc::new(ImmutableModuleCache::empty()), - None, - ); - - let environment = AptosEnvironment::new_with_delayed_field_optimization_enabled(state_view); - let ret = executor.execute_block(environment, signature_verified_block, state_view); - match ret { - Ok(block_output) => { - let (transaction_outputs, block_end_info) = block_output.into_inner(); - let output_vec: Vec<_> = transaction_outputs - .into_iter() - .map(|output| output.take_output()) - .collect(); - - // Flush the speculative logs of the committed transactions. - // let pos = output_vec.partition_point(|o| !o.status().is_retry()); - - Ok(BlockOutput::new(output_vec, block_end_info)) + transactions, + state_view, + BlockExecutorConfig { + local: BlockExecutorLocalConfig { + concurrency_level: NativeConfig::get_concurrency_level(), + allow_fallback: false, + discard_failed_blocks: false, + }, + onchain: onchain_config, }, - Err(BlockExecutionError::FatalBlockExecutorError(PanicError::CodeInvariantError( - err_msg, - ))) => Err(VMStatus::Error { - status_code: StatusCode::DELAYED_FIELD_OR_BLOCKSTM_CODE_INVARIANT_ERROR, - sub_status: None, - message: Some(err_msg), - }), - Err(BlockExecutionError::FatalVMError(err)) => Err(err), - } + None + ) } } diff --git a/execution/executor-benchmark/src/native/parallel_uncoordinated_block_executor.rs b/execution/executor-benchmark/src/native/parallel_uncoordinated_block_executor.rs index 585539a51d36f0..f803fdf1a0e80c 100644 --- a/execution/executor-benchmark/src/native/parallel_uncoordinated_block_executor.rs +++ b/execution/executor-benchmark/src/native/parallel_uncoordinated_block_executor.rs @@ -30,13 +30,20 @@ use aptos_types::{ AptosCoinType, }; use aptos_vm::VMBlockExecutor; -use dashmap::{mapref::one::{Ref, RefMut}, DashMap}; +use dashmap::{ + mapref::one::{Ref, RefMut}, + DashMap, +}; use once_cell::sync::OnceCell; use rayon::iter::{IntoParallelIterator, IntoParallelRefIterator, ParallelIterator}; -use thread_local::ThreadLocal; use std::{ - cell::Cell, collections::{BTreeMap, HashMap}, hash::RandomState, sync::atomic::{AtomicU64, Ordering}, u64 + cell::Cell, + collections::{BTreeMap, HashMap}, + hash::RandomState, + sync::atomic::{AtomicU64, Ordering}, + u64, }; +use thread_local::ThreadLocal; /// Executes transactions fully, and produces TransactionOutput (with final WriteSet) /// (unlike execution within BlockSTM that produces non-materialized VMChangeSet) @@ -794,6 +801,7 @@ struct CoinSupply { } struct SupplyWithDecrement { + #[allow(dead_code)] pub base: u128, pub decrement: ThreadLocal>, } @@ -882,7 +890,9 @@ impl CommonNativeRawTransactionExecutor for NativeValueCacheRawTransactionExecut state_view: &(impl StateView + Sync), _output: &mut IncrementalOutput, ) -> Result<()> { - let cache_key = StateKey::resource(&AccountAddress::TEN, &self.db_util.common.concurrent_supply).unwrap(); + let cache_key = + StateKey::resource(&AccountAddress::TEN, &self.db_util.common.concurrent_supply) + .unwrap(); if USE_THREAD_LOCAL_SUPPLY { let entry = self.cache_get_or_init(&cache_key, |_key| { @@ -946,7 +956,7 @@ impl CommonNativeRawTransactionExecutor for NativeValueCacheRawTransactionExecut base: DbAccessUtil::get_value::(key, state_view) .unwrap() .unwrap(), - decrement: ThreadLocal::new() + decrement: ThreadLocal::new(), }) }); match total_supply_entry.value() { @@ -957,13 +967,14 @@ impl CommonNativeRawTransactionExecutor for NativeValueCacheRawTransactionExecut _ => panic!("wrong type"), } } else { - let mut total_supply_entry = self.cache_get_mut_or_init(total_supply_state_key, |key| { - CachedResource::AptCoinSupply(CoinSupply { - total_supply: DbAccessUtil::get_value::(key, state_view) - .unwrap() - .unwrap(), - }) - }); + let mut total_supply_entry = + self.cache_get_mut_or_init(total_supply_state_key, |key| { + CachedResource::AptCoinSupply(CoinSupply { + total_supply: DbAccessUtil::get_value::(key, state_view) + .unwrap() + .unwrap(), + }) + }); match total_supply_entry.value_mut() { CachedResource::AptCoinSupply(coin_supply) => { @@ -973,7 +984,6 @@ impl CommonNativeRawTransactionExecutor for NativeValueCacheRawTransactionExecut }; } - Ok(()) } @@ -1039,7 +1049,10 @@ impl NativeValueCacheRawTransactionExecutor { return ref_mut; } - self.cache.entry(key.clone()).or_insert(init_value(key)).downgrade() + self.cache + .entry(key.clone()) + .or_insert(init_value(key)) + .downgrade() } fn cache_get_mut_or_init<'a>( @@ -1055,7 +1068,10 @@ impl NativeValueCacheRawTransactionExecutor { self.cache.entry(key.clone()).or_insert(init_value(key)) } - fn fetch_concurrent_supply(&self, state_view: &(impl StateView + Sync)) -> ConcurrentSupplyResource { + fn fetch_concurrent_supply( + &self, + state_view: &(impl StateView + Sync), + ) -> ConcurrentSupplyResource { let concurrent_supply_rg_tag = &self.db_util.common.concurrent_supply; let apt_metadata_object_state_key = self @@ -1067,13 +1083,12 @@ impl NativeValueCacheRawTransactionExecutor { .unwrap() .unwrap(); - let concurrent_supply = bcs::from_bytes::( + bcs::from_bytes::( &apt_metadata_object .remove(concurrent_supply_rg_tag) .unwrap(), ) - .unwrap(); - concurrent_supply + .unwrap() } fn update_fa_balance(