Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

StateSummary #15411

Draft
wants to merge 6 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion consensus/src/execution_pipeline.rs
Original file line number Diff line number Diff line change
Expand Up @@ -226,7 +226,7 @@ impl ExecutionPipeline {
});
let start = Instant::now();
executor
.execute_and_state_checkpoint(
.execute_and_update_state(
block,
parent_block_id,
block_executor_onchain_config,
Expand Down
2 changes: 1 addition & 1 deletion consensus/src/pipeline/pipeline_builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -434,7 +434,7 @@ impl PipelineBuilder {
let start = Instant::now();
tokio::task::spawn_blocking(move || {
executor
.execute_and_state_checkpoint(
.execute_and_update_state(
(block.id(), txns).into(),
block.parent_id(),
onchain_execution_config,
Expand Down
2 changes: 1 addition & 1 deletion consensus/src/state_computer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -545,7 +545,7 @@ async fn test_commit_sync_race() {
Ok(StateComputeResult::new_dummy())
}

fn execute_and_state_checkpoint(
fn execute_and_update_state(
&self,
_block: ExecutableBlock,
_parent_block_id: HashValue,
Expand Down
2 changes: 1 addition & 1 deletion consensus/src/state_computer_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,7 @@ impl BlockExecutorTrait for DummyBlockExecutor {
Ok(())
}

fn execute_and_state_checkpoint(
fn execute_and_update_state(
&self,
block: ExecutableBlock,
_parent_block_id: HashValue,
Expand Down
4 changes: 2 additions & 2 deletions execution/executor-benchmark/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -911,7 +911,7 @@ mod tests {
let parent_block_id = vm_executor.committed_block_id();
let block_id = HashValue::random();
vm_executor
.execute_and_state_checkpoint(
.execute_and_update_state(
(block_id, vec![txn.clone()]).into(),
parent_block_id,
BENCHMARKS_BLOCK_EXECUTOR_ONCHAIN_CONFIG,
Expand All @@ -932,7 +932,7 @@ mod tests {
let parent_block_id = other_executor.committed_block_id();
let block_id = HashValue::random();
other_executor
.execute_and_state_checkpoint(
.execute_and_update_state(
(block_id, vec![txn]).into(),
parent_block_id,
BENCHMARKS_BLOCK_EXECUTOR_ONCHAIN_CONFIG,
Expand Down
2 changes: 1 addition & 1 deletion execution/executor-benchmark/src/transaction_executor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ where
{
let _timer = TIMER.with_label_values(&["execute"]).start_timer();
self.executor
.execute_and_state_checkpoint(
.execute_and_update_state(
executable_block,
self.parent_block_id,
BENCHMARKS_BLOCK_EXECUTOR_ONCHAIN_CONFIG,
Expand Down
43 changes: 31 additions & 12 deletions execution/executor-types/src/execution_output.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ use crate::{
};
use aptos_drop_helper::DropHelper;
use aptos_storage_interface::state_store::{
state_delta::StateDelta, state_view::cached_state_view::StateCache,
state::State, state_view::cached_state_view::ShardedStateCache,
};
use aptos_types::{
contract_event::ContractEvent,
Expand All @@ -36,14 +36,23 @@ impl ExecutionOutput {
to_commit: TransactionsToKeep,
to_discard: TransactionsWithOutput,
to_retry: TransactionsWithOutput,
state_cache: StateCache,
last_checkpoint_state: Option<State>,
result_state: State,
state_reads: ShardedStateCache,
block_end_info: Option<BlockEndInfo>,
next_epoch_state: Option<EpochState>,
subscribable_events: Planned<Vec<ContractEvent>>,
) -> Self {
let next_version = first_version + to_commit.len() as Version;
assert_eq!(next_version, result_state.next_version());
if is_block {
// If it's a block, ensure it ends with state checkpoint.
assert!(to_commit.is_empty() || to_commit.ends_with_sole_checkpoint());
assert!(last_checkpoint_state.is_some());
assert!(last_checkpoint_state
.as_ref()
.unwrap()
.is_the_same(&result_state));
} else {
// If it's not, there shouldn't be any transaction to be discarded or retried.
assert!(to_discard.is_empty() && to_retry.is_empty());
Expand All @@ -56,22 +65,26 @@ impl ExecutionOutput {
to_commit,
to_discard,
to_retry,
state_cache,
last_checkpoint_state,
result_state,
state_reads,
block_end_info,
next_epoch_state,
subscribable_events,
})
}

pub fn new_empty(state: Arc<StateDelta>) -> Self {
pub fn new_empty(parent_state: State) -> Self {
Self::new_impl(Inner {
is_block: false,
first_version: state.next_version(),
first_version: parent_state.next_version(),
statuses_for_input_txns: vec![],
to_commit: TransactionsToKeep::new_empty(),
to_discard: TransactionsWithOutput::new_empty(),
to_retry: TransactionsWithOutput::new_empty(),
state_cache: StateCache::new_empty(state.current.clone()),
last_checkpoint_state: None,
result_state: parent_state,
state_reads: ShardedStateCache::default(),
block_end_info: None,
next_epoch_state: None,
subscribable_events: Planned::ready(vec![]),
Expand All @@ -88,7 +101,9 @@ impl ExecutionOutput {
to_commit: TransactionsToKeep::new_dummy_success(txns),
to_discard: TransactionsWithOutput::new_empty(),
to_retry: TransactionsWithOutput::new_empty(),
state_cache: StateCache::new_dummy(),
last_checkpoint_state: None,
result_state: State::new_empty(),
state_reads: ShardedStateCache::default(),
block_end_info: None,
next_epoch_state: None,
subscribable_events: Planned::ready(vec![]),
Expand All @@ -107,7 +122,9 @@ impl ExecutionOutput {
to_commit: TransactionsToKeep::new_empty(),
to_discard: TransactionsWithOutput::new_empty(),
to_retry: TransactionsWithOutput::new_empty(),
state_cache: StateCache::new_dummy(),
last_checkpoint_state: None,
result_state: self.result_state.clone(),
state_reads: ShardedStateCache::default(),
block_end_info: None,
next_epoch_state: self.next_epoch_state.clone(),
subscribable_events: Planned::ready(vec![]),
Expand Down Expand Up @@ -146,10 +163,12 @@ pub struct Inner {
pub to_discard: TransactionsWithOutput,
pub to_retry: TransactionsWithOutput,

/// Carries the frozen base state view, so all in-mem nodes involved won't drop before the
/// execution result is processed; as well as all the accounts touched during execution, together
/// with their proofs.
pub state_cache: StateCache,
pub last_checkpoint_state: Option<State>,
pub result_state: State,
/// State items read during execution, useful for calculating the state storge usage and
/// indices used by the db pruner.
pub state_reads: ShardedStateCache,

/// Optional StateCheckpoint payload
pub block_end_info: Option<BlockEndInfo>,
/// Optional EpochState payload.
Expand Down
6 changes: 4 additions & 2 deletions execution/executor-types/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@
// Parts of the project are originally copyright © Meta Platforms, Inc.
// SPDX-License-Identifier: Apache-2.0
#![forbid(unsafe_code)]
// FIXME(aldenhu): remove
#![allow(dead_code)]

use anyhow::Result;
use aptos_crypto::HashValue;
Expand Down Expand Up @@ -135,12 +137,12 @@ pub trait BlockExecutorTrait: Send + Sync {
onchain_config: BlockExecutorConfigFromOnchain,
) -> ExecutorResult<StateComputeResult> {
let block_id = block.block_id;
self.execute_and_state_checkpoint(block, parent_block_id, onchain_config)?;
self.execute_and_update_state(block, parent_block_id, onchain_config)?;
self.ledger_update(block_id, parent_block_id)
}

/// Executes a block and returns the state checkpoint output.
fn execute_and_state_checkpoint(
fn execute_and_update_state(
&self,
block: ExecutableBlock,
parent_block_id: HashValue,
Expand Down
34 changes: 14 additions & 20 deletions execution/executor-types/src/state_checkpoint_output.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,44 +5,39 @@

use aptos_crypto::HashValue;
use aptos_drop_helper::DropHelper;
use aptos_storage_interface::state_store::{
sharded_state_updates::ShardedStateUpdates, state_delta::StateDelta,
};
use aptos_storage_interface::state_store::state_summary::StateSummary;
use derive_more::Deref;
use std::sync::Arc;

#[derive(Clone, Debug, Default, Deref)]
#[derive(Clone, Debug, Deref)]
pub struct StateCheckpointOutput {
#[deref]
inner: Arc<DropHelper<Inner>>,
}

impl StateCheckpointOutput {
pub fn new(
parent_state: Arc<StateDelta>,
result_state: Arc<StateDelta>,
state_updates_before_last_checkpoint: Option<ShardedStateUpdates>,
last_state_checkpoint_summary: Option<StateSummary>,
result_state_summary: StateSummary,
state_checkpoint_hashes: Vec<Option<HashValue>>,
) -> Self {
Self::new_impl(Inner {
parent_state,
result_state,
state_updates_before_last_checkpoint,
last_state_checkpoint_summary,
result_state_summary,
state_checkpoint_hashes,
})
}

pub fn new_empty(state: Arc<StateDelta>) -> Self {
pub fn new_empty(parent_state_summary: StateSummary) -> Self {
Self::new_impl(Inner {
parent_state: state.clone(),
result_state: state,
state_updates_before_last_checkpoint: None,
last_state_checkpoint_summary: None,
result_state_summary: parent_state_summary,
state_checkpoint_hashes: vec![],
})
}

pub fn new_dummy() -> Self {
Self::new_empty(Arc::new(StateDelta::new_empty()))
Self::new_empty(StateSummary::new_empty())
}

fn new_impl(inner: Inner) -> Self {
Expand All @@ -52,14 +47,13 @@ impl StateCheckpointOutput {
}

pub fn reconfig_suffix(&self) -> Self {
Self::new_empty(self.result_state.clone())
Self::new_empty(self.result_state_summary.clone())
}
}

#[derive(Debug, Default)]
#[derive(Debug)]
pub struct Inner {
pub parent_state: Arc<StateDelta>,
pub result_state: Arc<StateDelta>,
pub state_updates_before_last_checkpoint: Option<ShardedStateUpdates>,
pub last_state_checkpoint_summary: Option<StateSummary>,
pub result_state_summary: StateSummary,
pub state_checkpoint_hashes: Vec<Option<HashValue>>,
}
6 changes: 5 additions & 1 deletion execution/executor-types/src/state_compute_result.rs
Original file line number Diff line number Diff line change
Expand Up @@ -156,6 +156,9 @@ impl StateComputeResult {
}

pub fn as_chunk_to_commit(&self) -> ChunkToCommit {
todo!()

/* FIXME(aldenhu): sharded_state_cache
ChunkToCommit {
first_version: self.ledger_update_output.first_version(),
transactions: &self.execution_output.to_commit.transactions,
Expand All @@ -168,8 +171,9 @@ impl StateComputeResult {
.state_checkpoint_output
.state_updates_before_last_checkpoint
.as_ref(),
sharded_state_cache: Some(&self.execution_output.state_cache.sharded_state_cache),
sharded_state_cache,
is_reconfig: self.execution_output.next_epoch_state.is_some(),
}
*/
}
}
13 changes: 6 additions & 7 deletions execution/executor/src/block_executor/block_tree/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -207,12 +207,12 @@ impl BlockTree {
fn root_from_db(block_lookup: &Arc<BlockLookup>, db: &Arc<dyn DbReader>) -> Result<Arc<Block>> {
let ledger_info_with_sigs = db.get_latest_ledger_info()?;
let ledger_info = ledger_info_with_sigs.ledger_info();
let ledger_view = db.get_latest_executed_trees()?;
let ledger_summary = db.get_pre_committed_ledger_summary()?;

ensure!(
ledger_view.version() == Some(ledger_info.version()),
ledger_summary.version() == Some(ledger_info.version()),
"Missing ledger info at the end of the ledger. latest version {:?}, LI version {}",
ledger_view.version(),
ledger_summary.version(),
ledger_info.version(),
);

Expand All @@ -222,10 +222,7 @@ impl BlockTree {
ledger_info.consensus_block_id()
};

let output = PartialStateComputeResult::new_empty(
ledger_view.state().clone(),
ledger_view.txn_accumulator().clone(),
);
let output = PartialStateComputeResult::new_empty(ledger_summary);

block_lookup.fetch_or_add_block(id, output, None)
}
Expand Down Expand Up @@ -259,10 +256,12 @@ impl BlockTree {
);
last_committed_block
};
/* FIXME(aldenhu)
root.output
.expect_result_state()
.current
.log_generation("block_tree_base");
*/
let old_root = std::mem::replace(&mut *self.root.lock(), root);

// send old root to async task to drop it
Expand Down
9 changes: 6 additions & 3 deletions execution/executor/src/block_executor/block_tree/test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@ use crate::{
};
use aptos_crypto::{hash::PRE_GENESIS_BLOCK_ID, HashValue};
use aptos_infallible::Mutex;
use aptos_storage_interface::ExecutedTrees;
use aptos_types::{block_info::BlockInfo, epoch_state::EpochState, ledger_info::LedgerInfo};
use std::sync::Arc;

Expand Down Expand Up @@ -39,11 +38,15 @@ fn id(index: u64) -> HashValue {
}

fn empty_block() -> PartialStateComputeResult {
let result_view = ExecutedTrees::new_empty();
todo!()
/* FIXME(aldenhu)
let result_view = LedgerSummary::new_empty();
PartialStateComputeResult::new_empty(
result_view.state().clone(),
result_view.state.clone(),
result_view.transaction_accumulator.clone(),
)

*/
}

fn gen_ledger_info(block_id: HashValue, reconfig: bool) -> LedgerInfo {
Expand Down
Loading
Loading