Skip to content

Commit

Permalink
Move DoStateCheckpoint to the ledger_udpate stage for blocks
Browse files Browse the repository at this point in the history
  • Loading branch information
msmouse committed Nov 30, 2024
1 parent 274b7e3 commit 67a6c42
Show file tree
Hide file tree
Showing 10 changed files with 87 additions and 100 deletions.
2 changes: 1 addition & 1 deletion consensus/src/execution_pipeline.rs
Original file line number Diff line number Diff line change
Expand Up @@ -226,7 +226,7 @@ impl ExecutionPipeline {
});
let start = Instant::now();
executor
.execute_and_state_checkpoint(
.execute_and_update_state(
block,
parent_block_id,
block_executor_onchain_config,
Expand Down
2 changes: 1 addition & 1 deletion consensus/src/pipeline/pipeline_builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -434,7 +434,7 @@ impl PipelineBuilder {
let start = Instant::now();
tokio::task::spawn_blocking(move || {
executor
.execute_and_state_checkpoint(
.execute_and_update_state(
(block.id(), txns).into(),
block.parent_id(),
onchain_execution_config,
Expand Down
2 changes: 1 addition & 1 deletion consensus/src/state_computer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -545,7 +545,7 @@ async fn test_commit_sync_race() {
Ok(StateComputeResult::new_dummy())
}

fn execute_and_state_checkpoint(
fn execute_and_update_state(
&self,
_block: ExecutableBlock,
_parent_block_id: HashValue,
Expand Down
2 changes: 1 addition & 1 deletion consensus/src/state_computer_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,7 @@ impl BlockExecutorTrait for DummyBlockExecutor {
Ok(())
}

fn execute_and_state_checkpoint(
fn execute_and_update_state(
&self,
block: ExecutableBlock,
_parent_block_id: HashValue,
Expand Down
4 changes: 2 additions & 2 deletions execution/executor-benchmark/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -911,7 +911,7 @@ mod tests {
let parent_block_id = vm_executor.committed_block_id();
let block_id = HashValue::random();
vm_executor
.execute_and_state_checkpoint(
.execute_and_update_state(
(block_id, vec![txn.clone()]).into(),
parent_block_id,
BENCHMARKS_BLOCK_EXECUTOR_ONCHAIN_CONFIG,
Expand All @@ -932,7 +932,7 @@ mod tests {
let parent_block_id = other_executor.committed_block_id();
let block_id = HashValue::random();
other_executor
.execute_and_state_checkpoint(
.execute_and_update_state(
(block_id, vec![txn]).into(),
parent_block_id,
BENCHMARKS_BLOCK_EXECUTOR_ONCHAIN_CONFIG,
Expand Down
2 changes: 1 addition & 1 deletion execution/executor-benchmark/src/transaction_executor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ where
{
let _timer = TIMER.with_label_values(&["execute"]).start_timer();
self.executor
.execute_and_state_checkpoint(
.execute_and_update_state(
executable_block,
self.parent_block_id,
BENCHMARKS_BLOCK_EXECUTOR_ONCHAIN_CONFIG,
Expand Down
6 changes: 4 additions & 2 deletions execution/executor-types/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@
// Parts of the project are originally copyright © Meta Platforms, Inc.
// SPDX-License-Identifier: Apache-2.0
#![forbid(unsafe_code)]
// FIXME(aldenhu): remove
#![allow(dead_code)]

use anyhow::Result;
use aptos_crypto::HashValue;
Expand Down Expand Up @@ -135,12 +137,12 @@ pub trait BlockExecutorTrait: Send + Sync {
onchain_config: BlockExecutorConfigFromOnchain,
) -> ExecutorResult<StateComputeResult> {
let block_id = block.block_id;
self.execute_and_state_checkpoint(block, parent_block_id, onchain_config)?;
self.execute_and_update_state(block, parent_block_id, onchain_config)?;
self.ledger_update(block_id, parent_block_id)
}

/// Executes a block and returns the state checkpoint output.
fn execute_and_state_checkpoint(
fn execute_and_update_state(
&self,
block: ExecutableBlock,
parent_block_id: HashValue,
Expand Down
39 changes: 13 additions & 26 deletions execution/executor-types/src/state_checkpoint_output.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,44 +5,35 @@

use aptos_crypto::HashValue;
use aptos_drop_helper::DropHelper;
use aptos_storage_interface::state_store::{
sharded_state_updates::ShardedStateUpdates, state_delta::StateDelta,
state_summary::StateSummary,
};
use aptos_storage_interface::state_store::state_summary::StateSummary;
use derive_more::Deref;
use std::sync::Arc;

#[derive(Clone, Debug, Default, Deref)]
#[derive(Clone, Debug, Deref)]
pub struct StateCheckpointOutput {
#[deref]
inner: Arc<DropHelper<Inner>>,
}

impl StateCheckpointOutput {
pub fn new(
parent_state: Arc<StateDelta>,
result_state: Arc<StateDelta>,
state_updates_before_last_checkpoint: Option<ShardedStateUpdates>,
last_state_checkpoint_summary: Option<StateSummary>,
result_state_summary: StateSummary,
state_checkpoint_hashes: Vec<Option<HashValue>>,
) -> Self {
Self::new_impl(Inner {
parent_state,
result_state,
state_updates_before_last_checkpoint,
last_state_checkpoint_summary,
result_state_summary,
state_checkpoint_hashes,
})
}

pub fn new_empty(_state_summary: StateSummary) -> Self {
todo!()
/* FIXME(aldenhu)
pub fn new_empty(parent_state_summary: StateSummary) -> Self {
Self::new_impl(Inner {
parent_state: state.clone(),
result_state: state,
state_updates_before_last_checkpoint: None,
last_state_checkpoint_summary: None,
result_state_summary: parent_state_summary,
state_checkpoint_hashes: vec![],
})
*/
}

pub fn new_dummy() -> Self {
Expand All @@ -56,17 +47,13 @@ impl StateCheckpointOutput {
}

pub fn reconfig_suffix(&self) -> Self {
/* FIXME(aldenhu)
Self::new_empty(self.result_state.clone())
*/
todo!()
Self::new_empty(self.result_state_summary.clone())
}
}

#[derive(Debug, Default)]
#[derive(Debug)]
pub struct Inner {
pub parent_state: Arc<StateDelta>,
pub result_state: Arc<StateDelta>,
pub state_updates_before_last_checkpoint: Option<ShardedStateUpdates>,
pub last_state_checkpoint_summary: Option<StateSummary>,
pub result_state_summary: StateSummary,
pub state_checkpoint_hashes: Vec<Option<HashValue>>,
}
124 changes: 59 additions & 65 deletions execution/executor/src/block_executor/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@ where
Ok(())
}

fn execute_and_state_checkpoint(
fn execute_and_update_state(
&self,
block: ExecutableBlock,
parent_block_id: HashValue,
Expand All @@ -103,7 +103,7 @@ where
.read()
.as_ref()
.expect("BlockExecutor is not reset")
.execute_and_state_checkpoint(block, parent_block_id, onchain_config)
.execute_and_update_state(block, parent_block_id, onchain_config)
}

fn ledger_update(
Expand Down Expand Up @@ -176,7 +176,7 @@ where
self.block_tree.root_block().id
}

fn execute_and_state_checkpoint(
fn execute_and_update_state(
&self,
block: ExecutableBlock,
parent_block_id: HashValue,
Expand All @@ -201,64 +201,41 @@ where
"execute_block"
);
let committed_block_id = self.committed_block_id();
let (execution_output, state_checkpoint_output) =
let execution_output =
if parent_block_id != committed_block_id && parent_output.has_reconfiguration() {
// ignore reconfiguration suffix, even if the block is non-empty
info!(
LogSchema::new(LogEntry::BlockExecutor).block_id(block_id),
"reconfig_descendant_block_received"
);
(
parent_output.execution_output.reconfig_suffix(),
parent_output
.expect_state_checkpoint_output()
.reconfig_suffix(),
)
parent_output.execution_output.reconfig_suffix()
} else {
let state_view = {
let _timer = OTHER_TIMERS.timer_with(&["verified_state_view"]);

let _timer = OTHER_TIMERS.timer_with(&["get_state_view"]);
CachedStateView::new(
StateViewId::BlockExecution { block_id },
Arc::clone(&self.db.reader),
parent_output.execution_output.result_state.clone(),
)?
};

let execution_output = {
let _timer = GET_BLOCK_EXECUTION_OUTPUT_BY_EXECUTING.start_timer();
fail_point!("executor::block_executor_execute_block", |_| {
Err(ExecutorError::from(anyhow::anyhow!(
"Injected error in block_executor_execute_block"
)))
});

DoGetExecutionOutput::by_transaction_execution(
&self.block_executor,
transactions,
state_view,
onchain_config.clone(),
TransactionSliceMetadata::block(parent_block_id, block_id),
)?
};

let _timer = OTHER_TIMERS.timer_with(&["state_checkpoint"]);

let state_checkpoint_output = THREAD_MANAGER.get_exe_cpu_pool().install(|| {
fail_point!("executor::block_state_checkpoint", |_| {
Err(anyhow::anyhow!("Injected error in block state checkpoint."))
});
DoStateCheckpoint::run(
&execution_output,
parent_output.expect_result_state_summary().clone(),
Option::<Vec<_>>::None,
)
})?;
(execution_output, state_checkpoint_output)
let _timer = GET_BLOCK_EXECUTION_OUTPUT_BY_EXECUTING.start_timer();
fail_point!("executor::block_executor_execute_block", |_| {
Err(ExecutorError::from(anyhow::anyhow!(
"Injected error in block_executor_execute_block"
)))
});

DoGetExecutionOutput::by_transaction_execution(
&self.block_executor,
transactions,
state_view,
onchain_config.clone(),
TransactionSliceMetadata::block(parent_block_id, block_id),
)?
};
let output = PartialStateComputeResult::new(execution_output);
output.set_state_checkpoint_output(state_checkpoint_output);

let output = PartialStateComputeResult::new(execution_output);
let _ = self
.block_tree
.add_block(parent_block_id, block_id, output)?;
Expand Down Expand Up @@ -286,33 +263,50 @@ where
// At this point of time two things must happen
// 1. The block tree must also have the current block id with or without the ledger update output.
// 2. We must have the ledger update output of the parent block.
let parent_output = parent_block.output.expect_ledger_update_output();
let parent_accumulator = parent_output.txn_accumulator();
let block = block_vec.pop().expect("Must exist").unwrap();
let output = &block.output;
parent_block.ensure_has_child(block_id)?;
let output = &block.output;
let parent_out = &parent_block.output;

// TODO(aldenhu): remove, assuming no retries.
if let Some(complete_result) = block.output.get_complete_result() {
info!(block_id = block_id, "ledger_update already done.");
return Ok(complete_result);
}

let output =
if parent_block_id != committed_block_id && parent_block.output.has_reconfiguration() {
info!(
LogSchema::new(LogEntry::BlockExecutor).block_id(block_id),
"reconfig_descendant_block_received"
);
parent_output.reconfig_suffix()
} else {
THREAD_MANAGER.get_non_exe_cpu_pool().install(|| {
DoLedgerUpdate::run(
&output.execution_output,
output.expect_state_checkpoint_output(),
parent_accumulator.clone(),
)
})?
};

block.output.set_ledger_update_output(output);
if parent_block_id != committed_block_id && parent_out.has_reconfiguration() {
info!(block_id = block_id, "ledger_update for reconfig suffix.");
// parent must have done all state checkpoint and ledger update
output.set_state_checkpoint_output(
parent_out
.expect_state_checkpoint_output()
.reconfig_suffix(),
);
output.set_ledger_update_output(
parent_out.expect_ledger_update_output().reconfig_suffix(),
);
} else {
THREAD_MANAGER.get_non_exe_cpu_pool().install(|| {
// TODO(aldenhu): remove? no known strategy to recover from this failure
fail_point!("executor::block_state_checkpoint", |_| {
Err(anyhow::anyhow!("Injected error in block state checkpoint."))
});
output.set_state_checkpoint_output(DoStateCheckpoint::run(
&output.execution_output,
parent_block.output.expect_result_state_summary().clone(),
Option::<Vec<_>>::None,
)?);
output.set_ledger_update_output(DoLedgerUpdate::run(
&output.execution_output,
output.expect_state_checkpoint_output(),
parent_out
.expect_ledger_update_output()
.transaction_accumulator
.clone(),
)?);
Ok(())
})?;
}

Ok(block.output.expect_complete_result())
}
Expand Down
4 changes: 4 additions & 0 deletions execution/executor/src/workflow/do_state_checkpoint.rs
Original file line number Diff line number Diff line change
@@ -1,11 +1,13 @@
// Copyright (c) Aptos Foundation
// SPDX-License-Identifier: Apache-2.0

use crate::metrics::OTHER_TIMERS;
use anyhow::Result;
use aptos_crypto::HashValue;
use aptos_executor_types::{
execution_output::ExecutionOutput, state_checkpoint_output::StateCheckpointOutput,
};
use aptos_metrics_core::TimerHelper;
use aptos_storage_interface::state_store::state_summary::StateSummary;

pub struct DoStateCheckpoint;
Expand All @@ -16,6 +18,8 @@ impl DoStateCheckpoint {
_parent_state_summary: StateSummary,
_known_state_checkpoints: Option<impl IntoIterator<Item = Option<HashValue>>>,
) -> Result<StateCheckpointOutput> {
let _timer = OTHER_TIMERS.timer_with(&["do_state_checkpoint"]);

/* FIXME(aldenhu):
// Apply the write set, get the latest state.
InMemoryStateCalculatorV2::calculate_for_transactions(
Expand Down

0 comments on commit 67a6c42

Please sign in to comment.