Skip to content

Commit

Permalink
add result_state to ExecutionOutput
Browse files Browse the repository at this point in the history
  • Loading branch information
msmouse committed Nov 29, 2024
1 parent dbadb09 commit 94a37b5
Show file tree
Hide file tree
Showing 34 changed files with 454 additions and 234 deletions.
43 changes: 31 additions & 12 deletions execution/executor-types/src/execution_output.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ use crate::{
};
use aptos_drop_helper::DropHelper;
use aptos_storage_interface::state_store::{
state_delta::StateDelta, state_view::cached_state_view::StateCache,
state::State, state_view::cached_state_view::ShardedStateCache,
};
use aptos_types::{
contract_event::ContractEvent,
Expand All @@ -36,14 +36,23 @@ impl ExecutionOutput {
to_commit: TransactionsToKeep,
to_discard: TransactionsWithOutput,
to_retry: TransactionsWithOutput,
state_cache: StateCache,
last_checkpoint_state: Option<State>,
result_state: State,
state_reads: ShardedStateCache,
block_end_info: Option<BlockEndInfo>,
next_epoch_state: Option<EpochState>,
subscribable_events: Planned<Vec<ContractEvent>>,
) -> Self {
let next_version = first_version + to_commit.len() as Version;
assert_eq!(next_version, result_state.next_version());
if is_block {
// If it's a block, ensure it ends with state checkpoint.
assert!(to_commit.is_empty() || to_commit.ends_with_sole_checkpoint());
assert!(last_checkpoint_state.is_some());
assert!(last_checkpoint_state
.as_ref()
.unwrap()
.is_the_same(&result_state));
} else {
// If it's not, there shouldn't be any transaction to be discarded or retried.
assert!(to_discard.is_empty() && to_retry.is_empty());
Expand All @@ -56,22 +65,26 @@ impl ExecutionOutput {
to_commit,
to_discard,
to_retry,
state_cache,
last_checkpoint_state,
result_state,
state_reads,
block_end_info,
next_epoch_state,
subscribable_events,
})
}

pub fn new_empty(state: Arc<StateDelta>) -> Self {
pub fn new_empty(parent_state: State) -> Self {
Self::new_impl(Inner {
is_block: false,
first_version: state.next_version(),
first_version: parent_state.next_version(),
statuses_for_input_txns: vec![],
to_commit: TransactionsToKeep::new_empty(),
to_discard: TransactionsWithOutput::new_empty(),
to_retry: TransactionsWithOutput::new_empty(),
state_cache: StateCache::new_empty(state.current.clone()),
last_checkpoint_state: None,
result_state: parent_state,
state_reads: ShardedStateCache::default(),
block_end_info: None,
next_epoch_state: None,
subscribable_events: Planned::ready(vec![]),
Expand All @@ -88,7 +101,9 @@ impl ExecutionOutput {
to_commit: TransactionsToKeep::new_dummy_success(txns),
to_discard: TransactionsWithOutput::new_empty(),
to_retry: TransactionsWithOutput::new_empty(),
state_cache: StateCache::new_dummy(),
last_checkpoint_state: None,
result_state: State::new_empty(),
state_reads: ShardedStateCache::default(),
block_end_info: None,
next_epoch_state: None,
subscribable_events: Planned::ready(vec![]),
Expand All @@ -107,7 +122,9 @@ impl ExecutionOutput {
to_commit: TransactionsToKeep::new_empty(),
to_discard: TransactionsWithOutput::new_empty(),
to_retry: TransactionsWithOutput::new_empty(),
state_cache: StateCache::new_dummy(),
last_checkpoint_state: None,
result_state: self.result_state.clone(),
state_reads: ShardedStateCache::default(),
block_end_info: None,
next_epoch_state: self.next_epoch_state.clone(),
subscribable_events: Planned::ready(vec![]),
Expand Down Expand Up @@ -146,10 +163,12 @@ pub struct Inner {
pub to_discard: TransactionsWithOutput,
pub to_retry: TransactionsWithOutput,

/// Carries the frozen base state view, so all in-mem nodes involved won't drop before the
/// execution result is processed; as well as all the accounts touched during execution, together
/// with their proofs.
pub state_cache: StateCache,
pub last_checkpoint_state: Option<State>,
pub result_state: State,
/// State items read during execution, useful for calculating the state storge usage and
/// indices used by the db pruner.
pub state_reads: ShardedStateCache,

/// Optional StateCheckpoint payload
pub block_end_info: Option<BlockEndInfo>,
/// Optional EpochState payload.
Expand Down
11 changes: 9 additions & 2 deletions execution/executor-types/src/state_checkpoint_output.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ use aptos_crypto::HashValue;
use aptos_drop_helper::DropHelper;
use aptos_storage_interface::state_store::{
sharded_state_updates::ShardedStateUpdates, state_delta::StateDelta,
state_summary::StateSummary,
};
use derive_more::Deref;
use std::sync::Arc;
Expand All @@ -32,17 +33,20 @@ impl StateCheckpointOutput {
})
}

pub fn new_empty(state: Arc<StateDelta>) -> Self {
pub fn new_empty(_state_summary: StateSummary) -> Self {
todo!()
/* FIXME(aldenhu)
Self::new_impl(Inner {
parent_state: state.clone(),
result_state: state,
state_updates_before_last_checkpoint: None,
state_checkpoint_hashes: vec![],
})
*/
}

pub fn new_dummy() -> Self {
Self::new_empty(Arc::new(StateDelta::new_empty()))
Self::new_empty(StateSummary::new_empty())
}

fn new_impl(inner: Inner) -> Self {
Expand All @@ -52,7 +56,10 @@ impl StateCheckpointOutput {
}

pub fn reconfig_suffix(&self) -> Self {
/* FIXME(aldenhu)
Self::new_empty(self.result_state.clone())
*/
todo!()
}
}

Expand Down
6 changes: 5 additions & 1 deletion execution/executor-types/src/state_compute_result.rs
Original file line number Diff line number Diff line change
Expand Up @@ -156,6 +156,9 @@ impl StateComputeResult {
}

pub fn as_chunk_to_commit(&self) -> ChunkToCommit {
todo!()

/* FIXME(aldenhu): sharded_state_cache
ChunkToCommit {
first_version: self.ledger_update_output.first_version(),
transactions: &self.execution_output.to_commit.transactions,
Expand All @@ -168,8 +171,9 @@ impl StateComputeResult {
.state_checkpoint_output
.state_updates_before_last_checkpoint
.as_ref(),
sharded_state_cache: Some(&self.execution_output.state_cache.sharded_state_cache),
sharded_state_cache,
is_reconfig: self.execution_output.next_epoch_state.is_some(),
}
*/
}
}
7 changes: 3 additions & 4 deletions execution/executor/src/block_executor/block_tree/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -222,10 +222,7 @@ impl BlockTree {
ledger_info.consensus_block_id()
};

let output = PartialStateComputeResult::new_empty(
ledger_summary.state().clone(),
ledger_summary.txn_accumulator().clone(),
);
let output = PartialStateComputeResult::new_empty(ledger_summary);

block_lookup.fetch_or_add_block(id, output, None)
}
Expand Down Expand Up @@ -259,10 +256,12 @@ impl BlockTree {
);
last_committed_block
};
/* FIXME(aldenhu)
root.output
.expect_result_state()
.current
.log_generation("block_tree_base");
*/
let old_root = std::mem::replace(&mut *self.root.lock(), root);

// send old root to async task to drop it
Expand Down
7 changes: 5 additions & 2 deletions execution/executor/src/block_executor/block_tree/test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@ use crate::{
};
use aptos_crypto::{hash::PRE_GENESIS_BLOCK_ID, HashValue};
use aptos_infallible::Mutex;
use aptos_storage_interface::LedgerSummary;
use aptos_types::{block_info::BlockInfo, epoch_state::EpochState, ledger_info::LedgerInfo};
use std::sync::Arc;

Expand Down Expand Up @@ -39,11 +38,15 @@ fn id(index: u64) -> HashValue {
}

fn empty_block() -> PartialStateComputeResult {
todo!()
/* FIXME(aldenhu)
let result_view = LedgerSummary::new_empty();
PartialStateComputeResult::new_empty(
result_view.state().clone(),
result_view.state.clone(),
result_view.transaction_accumulator.clone(),
)
*/
}

fn gen_ledger_info(block_id: HashValue, reconfig: bool) -> LedgerInfo {
Expand Down
11 changes: 3 additions & 8 deletions execution/executor/src/block_executor/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,10 +27,7 @@ use aptos_infallible::RwLock;
use aptos_logger::prelude::*;
use aptos_metrics_core::{IntGaugeHelper, TimerHelper};
use aptos_storage_interface::{
state_store::state_view::{
async_proof_fetcher::AsyncProofFetcher, cached_state_view::CachedStateView,
},
DbReaderWriter,
state_store::state_view::cached_state_view::CachedStateView, DbReaderWriter,
};
use aptos_types::{
block_executor::{
Expand Down Expand Up @@ -224,9 +221,7 @@ where
CachedStateView::new(
StateViewId::BlockExecution { block_id },
Arc::clone(&self.db.reader),
parent_output.execution_output.next_version(),
parent_output.expect_result_state().current.clone(),
Arc::new(AsyncProofFetcher::new(self.db.reader.clone())),
parent_output.execution_output.result_state.clone(),
)?
};

Expand Down Expand Up @@ -255,7 +250,7 @@ where
});
DoStateCheckpoint::run(
&execution_output,
parent_output.expect_result_state(),
parent_output.expect_result_state_summary().clone(),
Option::<Vec<_>>::None,
)
})?;
Expand Down
22 changes: 19 additions & 3 deletions execution/executor/src/chunk_executor/chunk_commit_queue.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,10 @@ use crate::{
},
};
use anyhow::{anyhow, ensure, Result};
use aptos_storage_interface::{state_store::state_delta::StateDelta, DbReader, LedgerSummary};
use aptos_storage_interface::{
state_store::{state_delta::StateDelta, state_summary::StateSummary},
DbReader,
};
use aptos_types::{proof::accumulator::InMemoryTransactionAccumulator, transaction::Version};
use std::{collections::VecDeque, sync::Arc};

Expand Down Expand Up @@ -40,35 +43,48 @@ pub struct ChunkCommitQueue {
}

impl ChunkCommitQueue {
pub(crate) fn new_from_db(db: &Arc<dyn DbReader>) -> Result<Self> {
pub(crate) fn new_from_db(_db: &Arc<dyn DbReader>) -> Result<Self> {
todo!()
/* FIXME(aldenhu)
let LedgerSummary {
state,
state_summary,
transaction_accumulator,
} = db.get_pre_committed_ledger_summary()?;
Ok(Self {
latest_state: state,
latest_txn_accumulator: transaction_accumulator,
to_commit: VecDeque::new(),
to_update_ledger: VecDeque::new(),
})
*/
}

pub(crate) fn latest_state(&self) -> Arc<StateDelta> {
self.latest_state.clone()
}

pub(crate) fn latest_state_summary(&self) -> StateSummary {
// FIXME(aldenhu)
todo!()
}

pub(crate) fn expecting_version(&self) -> Version {
self.latest_state.next_version()
}

pub(crate) fn enqueue_for_ledger_update(
&mut self,
chunk_to_update_ledger: ChunkToUpdateLedger,
_chunk_to_update_ledger: ChunkToUpdateLedger,
) -> Result<()> {
/* FIXME(aldenhu)
self.latest_state = chunk_to_update_ledger.output.expect_result_state().clone();
self.to_update_ledger
.push_back(Some(chunk_to_update_ledger));
Ok(())
*/
todo!()
}

pub(crate) fn next_chunk_to_update_ledger(
Expand Down
10 changes: 3 additions & 7 deletions execution/executor/src/chunk_executor/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,10 +24,7 @@ use aptos_infallible::{Mutex, RwLock};
use aptos_logger::prelude::*;
use aptos_metrics_core::{IntGaugeHelper, TimerHelper};
use aptos_storage_interface::{
state_store::{
state_delta::StateDelta,
state_view::{async_proof_fetcher::AsyncProofFetcher, cached_state_view::CachedStateView},
},
state_store::{state_delta::StateDelta, state_view::cached_state_view::CachedStateView},
DbReaderWriter,
};
use aptos_types::{
Expand Down Expand Up @@ -244,13 +241,12 @@ impl<V: VMBlockExecutor> ChunkExecutorInner<V> {
}

fn latest_state_view(&self, latest_state: &StateDelta) -> Result<CachedStateView> {
// FIXME(aldenhu): check
let first_version = latest_state.next_version();
Ok(CachedStateView::new(
StateViewId::ChunkExecution { first_version },
self.db.reader.clone(),
first_version,
latest_state.current.clone(),
Arc::new(AsyncProofFetcher::new(self.db.reader.clone())),
)?)
}

Expand Down Expand Up @@ -312,7 +308,7 @@ impl<V: VMBlockExecutor> ChunkExecutorInner<V> {
// Calculate state snapshot
let state_checkpoint_output = DoStateCheckpoint::run(
&execution_output,
&self.commit_queue.lock().latest_state(),
self.commit_queue.lock().latest_state_summary(),
Some(
chunk_verifier
.transaction_infos()
Expand Down
6 changes: 2 additions & 4 deletions execution/executor/src/db_bootstrapper/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -152,17 +152,15 @@ pub fn calculate_genesis<V: VMBlockExecutor>(
"Genesis txn didn't output reconfig event."
);

let output = ApplyExecutionOutput::run(execution_output, &ledger_summary)?;
let output = ApplyExecutionOutput::run(execution_output, ledger_summary)?;
let timestamp_usecs = if genesis_version == 0 {
// TODO(aldenhu): fix existing tests before using real timestamp and check on-chain epoch.
GENESIS_TIMESTAMP_USECS
} else {
let state_view = CachedStateView::new(
StateViewId::Miscellaneous,
Arc::clone(&db.reader),
output.execution_output.next_version(),
output.expect_result_state().current.clone(),
Arc::new(AsyncProofFetcher::new(db.reader.clone())),
output.execution_output.result_state.clone(),
)?;
let next_epoch = epoch
.checked_add(1)
Expand Down
4 changes: 2 additions & 2 deletions execution/executor/src/tests/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -496,7 +496,7 @@ fn apply_transaction_by_writeset(
let chunk_output =
DoGetExecutionOutput::by_transaction_output(txns, txn_outs, state_view).unwrap();

let output = ApplyExecutionOutput::run(chunk_output, &ledger_summary).unwrap();
let output = ApplyExecutionOutput::run(chunk_output, ledger_summary).unwrap();

db.writer
.save_transactions(
Expand Down Expand Up @@ -696,7 +696,7 @@ fn run_transactions_naive(
TransactionSliceMetadata::unknown(),
)
.unwrap();
let output = ApplyExecutionOutput::run(out, &ledger_summary).unwrap();
let output = ApplyExecutionOutput::run(out, ledger_summary).unwrap();
db.writer
.save_transactions(
output.expect_complete_result().as_chunk_to_commit(),
Expand Down
Loading

0 comments on commit 94a37b5

Please sign in to comment.