Skip to content

Commit

Permalink
Move DoStateCheckpoint to the ledger_udpate stage for chunks
Browse files Browse the repository at this point in the history
  • Loading branch information
msmouse committed Nov 30, 2024
1 parent 67a6c42 commit 73892b5
Show file tree
Hide file tree
Showing 3 changed files with 71 additions and 62 deletions.
62 changes: 41 additions & 21 deletions execution/executor/src/chunk_executor/chunk_commit_queue.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,14 +6,16 @@

use crate::{
chunk_executor::chunk_result_verifier::ChunkResultVerifier,
metrics::CHUNK_OTHER_TIMERS,
types::{
executed_chunk::ExecutedChunk, partial_state_compute_result::PartialStateComputeResult,
},
};
use anyhow::{anyhow, ensure, Result};
use aptos_metrics_core::TimerHelper;
use aptos_storage_interface::{
state_store::{state_delta::StateDelta, state_summary::StateSummary},
DbReader,
state_store::{state::State, state_summary::StateSummary},
DbReader, LedgerSummary,
};
use aptos_types::{proof::accumulator::InMemoryTransactionAccumulator, transaction::Version};
use std::{collections::VecDeque, sync::Arc};
Expand All @@ -32,20 +34,20 @@ pub(crate) struct ChunkToUpdateLedger {
/// ... | to_commit | to_update_ledger | ---> (txn version increases)
/// \ \
/// \ latest_state
/// latest_state_summary
/// latest_txn_accumulator
///
pub struct ChunkCommitQueue {
/// Notice that latest_state and latest_txn_accumulator are at different versions.
latest_state: Arc<StateDelta>,
latest_state: State,
latest_state_summary: StateSummary,
latest_txn_accumulator: Arc<InMemoryTransactionAccumulator>,
to_commit: VecDeque<Option<ExecutedChunk>>,
to_update_ledger: VecDeque<Option<ChunkToUpdateLedger>>,
}

impl ChunkCommitQueue {
pub(crate) fn new_from_db(_db: &Arc<dyn DbReader>) -> Result<Self> {
todo!()
/* FIXME(aldenhu)
pub(crate) fn new_from_db(db: &Arc<dyn DbReader>) -> Result<Self> {
let LedgerSummary {
state,
state_summary,
Expand All @@ -54,20 +56,15 @@ impl ChunkCommitQueue {

Ok(Self {
latest_state: state,
latest_state_summary: state_summary,
latest_txn_accumulator: transaction_accumulator,
to_commit: VecDeque::new(),
to_update_ledger: VecDeque::new(),
})
*/
}

pub(crate) fn latest_state(&self) -> Arc<StateDelta> {
self.latest_state.clone()
}

pub(crate) fn latest_state_summary(&self) -> StateSummary {
// FIXME(aldenhu)
todo!()
pub(crate) fn latest_state(&self) -> &State {
&self.latest_state
}

pub(crate) fn expecting_version(&self) -> Version {
Expand All @@ -76,31 +73,47 @@ impl ChunkCommitQueue {

pub(crate) fn enqueue_for_ledger_update(
&mut self,
_chunk_to_update_ledger: ChunkToUpdateLedger,
chunk_to_update_ledger: ChunkToUpdateLedger,
) -> Result<()> {
/* FIXME(aldenhu)
self.latest_state = chunk_to_update_ledger.output.expect_result_state().clone();
let _timer = CHUNK_OTHER_TIMERS.timer_with(&["enqueue_for_ledger_update"]);

ensure!(
self.to_update_ledger.is_empty() || self.to_update_ledger.back().unwrap().is_some(),
"Last chunk to update ledger has not been processed."
);

self.latest_state = chunk_to_update_ledger.output.result_state().clone();
self.to_update_ledger
.push_back(Some(chunk_to_update_ledger));
Ok(())
*/
todo!()
}

pub(crate) fn next_chunk_to_update_ledger(
&mut self,
) -> Result<(Arc<InMemoryTransactionAccumulator>, ChunkToUpdateLedger)> {
) -> Result<(
StateSummary,
Arc<InMemoryTransactionAccumulator>,
ChunkToUpdateLedger,
)> {
let _timer = CHUNK_OTHER_TIMERS.timer_with(&["next_chunk_to_update_ledger"]);

let chunk_opt = self
.to_update_ledger
.front_mut()
.ok_or_else(|| anyhow!("No chunk to update ledger."))?;
let chunk = chunk_opt
.take()
.ok_or_else(|| anyhow!("Next chunk to update ledger has already been processed."))?;
Ok((self.latest_txn_accumulator.clone(), chunk))
Ok((
self.latest_state_summary.clone(),
self.latest_txn_accumulator.clone(),
chunk,
))
}

pub(crate) fn save_ledger_update_output(&mut self, chunk: ExecutedChunk) -> Result<()> {
let _timer = CHUNK_OTHER_TIMERS.timer_with(&["save_ledger_update_output"]);

ensure!(
!self.to_update_ledger.is_empty(),
"to_update_ledger is empty."
Expand All @@ -109,6 +122,11 @@ impl ChunkCommitQueue {
self.to_update_ledger.front().unwrap().is_none(),
"Head of to_update_ledger has not been processed."
);
self.latest_state_summary = chunk
.output
.expect_state_checkpoint_output()
.result_state_summary
.clone();
self.latest_txn_accumulator = chunk
.output
.expect_ledger_update_output()
Expand All @@ -132,6 +150,8 @@ impl ChunkCommitQueue {
}

pub(crate) fn dequeue_committed(&mut self) -> Result<()> {
let _timer = CHUNK_OTHER_TIMERS.timer_with(&["deque_committed"]);

ensure!(!self.to_commit.is_empty(), "to_commit is empty.");
ensure!(
self.to_commit.front().unwrap().is_none(),
Expand Down
69 changes: 29 additions & 40 deletions execution/executor/src/chunk_executor/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ use aptos_infallible::{Mutex, RwLock};
use aptos_logger::prelude::*;
use aptos_metrics_core::{IntGaugeHelper, TimerHelper};
use aptos_storage_interface::{
state_store::{state_delta::StateDelta, state_view::cached_state_view::CachedStateView},
state_store::{state::State, state_view::cached_state_view::CachedStateView},
DbReaderWriter,
};
use aptos_types::{
Expand Down Expand Up @@ -240,13 +240,12 @@ impl<V: VMBlockExecutor> ChunkExecutorInner<V> {
})
}

fn latest_state_view(&self, latest_state: &StateDelta) -> Result<CachedStateView> {
// FIXME(aldenhu): check
let first_version = latest_state.next_version();
fn state_view(&self, state: &State) -> Result<CachedStateView> {
let first_version = state.next_version();
Ok(CachedStateView::new(
StateViewId::ChunkExecution { first_version },
self.db.reader.clone(),
latest_state.current.clone(),
state.clone(),
)?)
}

Expand All @@ -262,10 +261,10 @@ impl<V: VMBlockExecutor> ChunkExecutorInner<V> {
let num_txns = output.num_transactions_to_commit();
if chunk.ledger_info_opt.is_some() || num_txns != 0 {
let _timer = CHUNK_OTHER_TIMERS.timer_with(&["commit_chunk_impl__save_txns"]);
// TODO(aldenhu): remove since there's no practical strategy to recover from this error.
fail_point!("executor::commit_chunk", |_| {
Err(anyhow::anyhow!("Injected error in commit_chunk"))
});
let output = chunk.output.expect_complete_result();
self.db.writer.save_transactions(
output.as_chunk_to_commit(),
chunk.ledger_info_opt.as_ref(),
Expand All @@ -290,7 +289,7 @@ impl<V: VMBlockExecutor> ChunkExecutorInner<V> {
chunk_verifier: Arc<dyn ChunkResultVerifier + Send + Sync>,
mode_for_log: &'static str,
) -> Result<()> {
let parent_state = self.commit_queue.lock().latest_state();
let parent_state = self.commit_queue.lock().latest_state().clone();

let first_version = parent_state.next_version();
ensure!(
Expand All @@ -302,22 +301,9 @@ impl<V: VMBlockExecutor> ChunkExecutorInner<V> {

let num_txns = chunk.len();

let state_view = self.latest_state_view(&parent_state)?;
let state_view = self.state_view(&parent_state)?;
let execution_output = chunk.into_output::<V>(state_view)?;

// Calculate state snapshot
let state_checkpoint_output = DoStateCheckpoint::run(
&execution_output,
self.commit_queue.lock().latest_state_summary(),
Some(
chunk_verifier
.transaction_infos()
.iter()
.map(|t| t.state_checkpoint_hash()),
),
)?;
let output = PartialStateComputeResult::new(execution_output);
output.set_state_checkpoint_output(state_checkpoint_output);

// Enqueue for next stage.
self.commit_queue
Expand All @@ -341,24 +327,29 @@ impl<V: VMBlockExecutor> ChunkExecutorInner<V> {
pub fn update_ledger(&self) -> Result<()> {
let _timer = CHUNK_OTHER_TIMERS.timer_with(&["chunk_update_ledger_total"]);

let (parent_accumulator, chunk) = {
let _timer = CHUNK_OTHER_TIMERS.timer_with(&["chunk_update_ledger__next_chunk"]);
self.commit_queue.lock().next_chunk_to_update_ledger()?
};
let (parent_state_summary, parent_accumulator, chunk) =
self.commit_queue.lock().next_chunk_to_update_ledger()?;
let ChunkToUpdateLedger {
output,
chunk_verifier,
} = chunk;

let first_version = parent_accumulator.num_leaves();
let ledger_update_output = {
let _timer = CHUNK_OTHER_TIMERS.timer_with(&["chunk_update_ledger__calculate"]);
DoLedgerUpdate::run(
&output.execution_output,
output.expect_state_checkpoint_output(),
parent_accumulator.clone(),
)?
};
output.set_state_checkpoint_output(DoStateCheckpoint::run(
&output.execution_output,
parent_state_summary,
Some(
chunk_verifier
.transaction_infos()
.iter()
.map(|t| t.state_checkpoint_hash()),
),
)?);

let ledger_update_output = DoLedgerUpdate::run(
&output.execution_output,
output.expect_state_checkpoint_output(),
parent_accumulator.clone(),
)?;

chunk_verifier.verify_chunk_result(&parent_accumulator, &ledger_update_output)?;

Expand All @@ -368,19 +359,17 @@ impl<V: VMBlockExecutor> ChunkExecutorInner<V> {
)?;
output.set_ledger_update_output(ledger_update_output);

let first_version = output.execution_output.first_version;
let num_txns = output.execution_output.num_transactions_to_commit();
let executed_chunk = ExecutedChunk {
output,
ledger_info_opt,
};
let num_txns = executed_chunk
.output
.expect_complete_result()
.num_transactions_to_commit();

let _timer = CHUNK_OTHER_TIMERS.timer_with(&["chunk_update_ledger__save"]);
self.commit_queue
.lock()
.save_ledger_update_output(executed_chunk)?;

info!(
LogSchema::new(LogEntry::ChunkExecutor)
.first_version_in_request(Some(first_version))
Expand Down Expand Up @@ -591,7 +580,7 @@ impl<V: VMBlockExecutor> ChunkExecutorInner<V> {
verify_execution_mode: &VerifyExecutionMode,
) -> Result<Version> {
// Execute transactions.
let state_view = self.latest_state_view(&self.commit_queue.lock().latest_state())?;
let state_view = self.state_view(self.commit_queue.lock().latest_state())?;
let txns = transactions
.iter()
.take((end_version - begin_version) as usize)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ impl PartialStateComputeResult {
.expect("StateCheckpointOutput not set")
}

pub fn expect_result_state(&self) -> &State {
pub fn result_state(&self) -> &State {
&self.execution_output.result_state
}

Expand Down

0 comments on commit 73892b5

Please sign in to comment.