Skip to content

Commit

Permalink
reuse code to simplify and avoid speculation log warnings
Browse files Browse the repository at this point in the history
  • Loading branch information
igor-aptos committed Nov 6, 2024
1 parent 4499818 commit 84feafd
Show file tree
Hide file tree
Showing 9 changed files with 107 additions and 113 deletions.
7 changes: 5 additions & 2 deletions aptos-move/aptos-debugger/src/aptos_debugger.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ use aptos_validator_interface::{
AptosValidatorInterface, DBDebuggerInterface, DebuggerStateView, RestDebuggerInterface,
};
use aptos_vm::{
block_executor::{AptosTransactionOutput, BlockAptosVM},
block_executor::{AptosTransactionOutput, AptosVMBlockExecutorWrapper},
data_cache::AsMoveResolver,
AptosVM,
};
Expand Down Expand Up @@ -428,7 +428,10 @@ fn execute_block_no_limit(
state_view: &DebuggerStateView,
concurrency_level: usize,
) -> Result<Vec<TransactionOutput>, VMStatus> {
BlockAptosVM::execute_block::<_, NoOpTransactionCommitHook<AptosTransactionOutput, VMStatus>>(
AptosVMBlockExecutorWrapper::execute_block::<
_,
NoOpTransactionCommitHook<AptosTransactionOutput, VMStatus>,
>(
sig_verified_txns,
state_view,
BlockExecutorConfig {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ use aptos_types::{
vm_status::VMStatus,
};
use aptos_vm::{
block_executor::{AptosTransactionOutput, BlockAptosVM},
block_executor::{AptosTransactionOutput, AptosVMBlockExecutorWrapper},
data_cache::AsMoveResolver,
sharded_block_executor::{
local_executor_shard::{LocalExecutorClient, LocalExecutorService},
Expand Down Expand Up @@ -212,7 +212,7 @@ where
) -> (Vec<TransactionOutput>, usize) {
let block_size = transactions.len();
let timer = Instant::now();
let output = BlockAptosVM::execute_block::<
let output = AptosVMBlockExecutorWrapper::execute_block::<
_,
NoOpTransactionCommitHook<AptosTransactionOutput, VMStatus>,
>(
Expand Down Expand Up @@ -260,7 +260,7 @@ where
) -> (Vec<TransactionOutput>, usize) {
let block_size = transactions.len();
let timer = Instant::now();
let output = BlockAptosVM::execute_block::<
let output = AptosVMBlockExecutorWrapper::execute_block::<
_,
NoOpTransactionCommitHook<AptosTransactionOutput, VMStatus>,
>(
Expand Down
10 changes: 5 additions & 5 deletions aptos-move/aptos-vm/src/aptos_vm.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
// SPDX-License-Identifier: Apache-2.0

use crate::{
block_executor::{AptosTransactionOutput, BlockAptosVM},
block_executor::{AptosTransactionOutput, AptosVMBlockExecutorWrapper},
counters::*,
data_cache::{AsMoveResolver, StorageAdapter},
errors::{discarded_output, expect_only_successful_execution},
Expand All @@ -22,8 +22,8 @@ use crate::{
sharded_block_executor::{executor_client::ExecutorClient, ShardedBlockExecutor},
system_module_names::*,
transaction_metadata::TransactionMetadata,
transaction_validation, verifier,
verifier::randomness::get_randomness_annotation,
transaction_validation,
verifier::{self, randomness::get_randomness_annotation},
VMBlockExecutor, VMValidator,
};
use anyhow::anyhow;
Expand Down Expand Up @@ -2768,7 +2768,7 @@ impl AptosVM {

// TODO - move out from this file?

/// Production implementation of TransactionBlockExecutor.
/// Production implementation of VMBlockExecutor.
///
/// Transaction execution: AptosVM
/// Executing conflicts: in the input order, via BlockSTM,
Expand Down Expand Up @@ -2811,7 +2811,7 @@ impl VMBlockExecutor for AptosVMBlockExecutor {
);

let count = transactions.len();
let ret = BlockAptosVM::execute_block::<
let ret = AptosVMBlockExecutorWrapper::execute_block::<
_,
NoOpTransactionCommitHook<AptosTransactionOutput, VMStatus>,
>(
Expand Down
61 changes: 39 additions & 22 deletions aptos-move/aptos-vm/src/block_executor/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,17 +4,17 @@

pub(crate) mod vm_wrapper;

use crate::{
block_executor::vm_wrapper::AptosExecutorTask,
counters::{BLOCK_EXECUTOR_CONCURRENCY, BLOCK_EXECUTOR_EXECUTE_BLOCK_SECONDS},
};
use crate::counters::{BLOCK_EXECUTOR_CONCURRENCY, BLOCK_EXECUTOR_EXECUTE_BLOCK_SECONDS};
use aptos_aggregator::{
delayed_change::DelayedChange, delta_change_set::DeltaOp, resolver::TAggregatorV1View,
};
use aptos_block_executor::{
code_cache_global::ImmutableModuleCache, errors::BlockExecutionError, executor::BlockExecutor,
task::TransactionOutput as BlockExecutorTransactionOutput,
txn_commit_hook::TransactionCommitHook, types::InputOutputKey,
code_cache_global::ImmutableModuleCache,
errors::BlockExecutionError,
executor::BlockExecutor,
task::{ExecutorTask, TransactionOutput as BlockExecutorTransactionOutput},
txn_commit_hook::TransactionCommitHook,
types::InputOutputKey,
};
use aptos_infallible::Mutex;
use aptos_types::{
Expand Down Expand Up @@ -49,9 +49,11 @@ use once_cell::sync::{Lazy, OnceCell};
use std::{
collections::{BTreeMap, HashSet},
hash::Hash,
marker::PhantomData,
ops::Deref,
sync::Arc,
};
use vm_wrapper::AptosExecutorTask;

static RAYON_EXEC_POOL: Lazy<Arc<rayon::ThreadPool>> = Lazy::new(|| {
Arc::new(
Expand Down Expand Up @@ -138,7 +140,7 @@ impl AptosTransactionOutput {
self.committed_output.get().unwrap()
}

pub fn take_output(mut self) -> TransactionOutput {
fn take_output(mut self) -> TransactionOutput {
match self.committed_output.take() {
Some(output) => output,
// TODO: revisit whether we should always get it via committed, or o.w. create a
Expand Down Expand Up @@ -445,9 +447,26 @@ impl BlockExecutorTransactionOutput for AptosTransactionOutput {
}
}

pub struct BlockAptosVM;
pub struct AptosBlockExecutorWrapper<
E: ExecutorTask<
Txn = SignatureVerifiedTransaction,
Error = VMStatus,
Output = AptosTransactionOutput,
Environment = AptosEnvironment,
>,
> {
_phantom: PhantomData<E>,
}

impl BlockAptosVM {
impl<
E: ExecutorTask<
Txn = SignatureVerifiedTransaction,
Error = VMStatus,
Output = AptosTransactionOutput,
Environment = AptosEnvironment,
>,
> AptosBlockExecutorWrapper<E>
{
fn execute_block_on_thread_pool<
S: StateView + Sync,
L: TransactionCommitHook<Output = AptosTransactionOutput>,
Expand Down Expand Up @@ -476,18 +495,13 @@ impl BlockAptosVM {
global_module_cache.as_ref(),
)?;

let executor = BlockExecutor::<
SignatureVerifiedTransaction,
AptosExecutorTask,
S,
L,
ExecutableTestType,
>::new(
config,
executor_thread_pool,
global_module_cache,
transaction_commit_listener,
);
let executor =
BlockExecutor::<SignatureVerifiedTransaction, E, S, L, ExecutableTestType>::new(
config,
executor_thread_pool,
global_module_cache,
transaction_commit_listener,
);

let ret = executor.execute_block(environment, signature_verified_block, state_view);
match ret {
Expand Down Expand Up @@ -561,6 +575,9 @@ impl BlockAptosVM {
}
}

// Same as AptosBlockExecutorWrapper with AptosExecutorTask
pub type AptosVMBlockExecutorWrapper = AptosBlockExecutorWrapper<AptosExecutorTask>;

#[cfg(test)]
mod test {
use super::*;
Expand Down
2 changes: 1 addition & 1 deletion aptos-move/aptos-vm/src/block_executor/vm_wrapper.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ use aptos_vm_types::{
use fail::fail_point;
use move_core_types::vm_status::{StatusCode, VMStatus};

pub(crate) struct AptosExecutorTask {
pub struct AptosExecutorTask {
vm: AptosVM,
id: StateViewId,
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
// SPDX-License-Identifier: Apache-2.0

use crate::{
block_executor::BlockAptosVM,
block_executor::AptosVMBlockExecutorWrapper,
sharded_block_executor::{
aggr_overridden_state_view::{AggregatorOverriddenStateView, TOTAL_SUPPLY_AGGR_BASE_VAL},
coordinator_client::CoordinatorClient,
Expand Down Expand Up @@ -135,7 +135,7 @@ impl<S: StateView + Sync + Send + 'static> ShardedExecutorService<S> {
);
});
s.spawn(move |_| {
let ret = BlockAptosVM::execute_block_on_thread_pool_without_global_module_cache(
let ret = AptosVMBlockExecutorWrapper::execute_block_on_thread_pool_without_global_module_cache(
executor_thread_pool,
&signature_verified_transactions,
aggr_overridden_state_view.as_ref(),
Expand Down
4 changes: 2 additions & 2 deletions aptos-move/e2e-tests/src/executor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ use aptos_types::{
AptosCoinType, CoinType,
};
use aptos_vm::{
block_executor::{AptosTransactionOutput, BlockAptosVM},
block_executor::{AptosTransactionOutput, AptosVMBlockExecutorWrapper},
data_cache::AsMoveResolver,
gas::make_prod_gas_meter,
move_vm_ext::{MoveVmExt, SessionExt, SessionId},
Expand Down Expand Up @@ -636,7 +636,7 @@ impl FakeExecutor {
},
onchain: onchain_config,
};
BlockAptosVM::execute_block_on_thread_pool_without_global_module_cache::<
AptosVMBlockExecutorWrapper::execute_block_on_thread_pool_without_global_module_cache::<
_,
NoOpTransactionCommitHook<AptosTransactionOutput, VMStatus>,
>(
Expand Down
75 changes: 17 additions & 58 deletions execution/executor-benchmark/src/native/native_vm.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,6 @@ use aptos_aggregator::{
delta_math::DeltaHistory,
};
use aptos_block_executor::{
code_cache_global::ImmutableModuleCache,
errors::BlockExecutionError,
task::{ExecutionStatus, ExecutorTask},
txn_commit_hook::NoOpTransactionCommitHook,
};
Expand All @@ -34,8 +32,6 @@ use aptos_types::{
BlockExecutorConfig, BlockExecutorConfigFromOnchain, BlockExecutorLocalConfig,
},
contract_event::ContractEvent,
error::PanicError,
executable::ExecutableTestType,
fee_statement::FeeStatement,
move_utils::move_event_v2::MoveEventV2Type,
on_chain_config::FeatureFlag,
Expand All @@ -47,7 +43,10 @@ use aptos_types::{
write_set::WriteOp,
AptosCoinType,
};
use aptos_vm::{block_executor::AptosTransactionOutput, VMBlockExecutor};
use aptos_vm::{
block_executor::{AptosBlockExecutorWrapper, AptosTransactionOutput},
VMBlockExecutor,
};
use aptos_vm_environment::environment::AptosEnvironment;
use aptos_vm_types::{
abstract_write_op::{
Expand All @@ -63,7 +62,7 @@ use bytes::Bytes;
use move_core_types::{
language_storage::StructTag,
value::{IdentifierMappingKind, MoveStructLayout, MoveTypeLayout},
vm_status::{StatusCode, VMStatus},
vm_status::VMStatus,
};
use move_vm_types::delayed_values::delayed_field_id::DelayedFieldID;
use serde::{de::DeserializeOwned, Serialize};
Expand All @@ -88,60 +87,20 @@ impl VMBlockExecutor for NativeVMBlockExecutor {
state_view: &(impl StateView + Sync),
onchain_config: BlockExecutorConfigFromOnchain,
) -> Result<BlockOutput<TransactionOutput>, VMStatus> {
Self::execute_block_impl(transactions, state_view, BlockExecutorConfig {
local: BlockExecutorLocalConfig {
concurrency_level: NativeConfig::get_concurrency_level(),
allow_fallback: false,
discard_failed_blocks: false,
},
onchain: onchain_config,
})
}
}

impl NativeVMBlockExecutor {
pub fn execute_block_impl<S: StateView + Sync>(
signature_verified_block: &[SignatureVerifiedTransaction],
state_view: &S,
config: BlockExecutorConfig,
) -> Result<BlockOutput<TransactionOutput>, VMStatus> {
let executor = aptos_block_executor::executor::BlockExecutor::<
SignatureVerifiedTransaction,
NativeVMExecutorTask,
S,
NoOpTransactionCommitHook<AptosTransactionOutput, VMStatus>,
ExecutableTestType,
>::new(
config,
AptosBlockExecutorWrapper::<NativeVMExecutorTask>::execute_block_on_thread_pool_without_global_module_cache::<_, NoOpTransactionCommitHook<AptosTransactionOutput, VMStatus>>(
Arc::clone(&NATIVE_EXECUTOR_POOL),
Arc::new(ImmutableModuleCache::empty()),
None,
);

let environment = AptosEnvironment::new_with_delayed_field_optimization_enabled(state_view);
let ret = executor.execute_block(environment, signature_verified_block, state_view);
match ret {
Ok(block_output) => {
let (transaction_outputs, block_end_info) = block_output.into_inner();
let output_vec: Vec<_> = transaction_outputs
.into_iter()
.map(|output| output.take_output())
.collect();

// Flush the speculative logs of the committed transactions.
// let pos = output_vec.partition_point(|o| !o.status().is_retry());

Ok(BlockOutput::new(output_vec, block_end_info))
transactions,
state_view,
BlockExecutorConfig {
local: BlockExecutorLocalConfig {
concurrency_level: NativeConfig::get_concurrency_level(),
allow_fallback: false,
discard_failed_blocks: false,
},
onchain: onchain_config,
},
Err(BlockExecutionError::FatalBlockExecutorError(PanicError::CodeInvariantError(
err_msg,
))) => Err(VMStatus::Error {
status_code: StatusCode::DELAYED_FIELD_OR_BLOCKSTM_CODE_INVARIANT_ERROR,
sub_status: None,
message: Some(err_msg),
}),
Err(BlockExecutionError::FatalVMError(err)) => Err(err),
}
None
)
}
}

Expand Down
Loading

0 comments on commit 84feafd

Please sign in to comment.