Skip to content

Commit

Permalink
restructure shared state between StateStore and BufferedState
Browse files Browse the repository at this point in the history
  • Loading branch information
msmouse committed Dec 2, 2024
1 parent a2b8c87 commit e86870f
Show file tree
Hide file tree
Showing 13 changed files with 169 additions and 123 deletions.
5 changes: 3 additions & 2 deletions storage/aptosdb/src/backup/restore_utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -164,8 +164,9 @@ pub(crate) fn save_transactions(

ledger_db.write_schemas(ledger_db_batch)?;

*state_store.current_state().lock() =
StateDelta::new_empty_with_version(Some(last_version));
state_store
.current_state()
.set(StateDelta::new_empty_with_version(Some(last_version)));
}

Ok(())
Expand Down
3 changes: 1 addition & 2 deletions storage/aptosdb/src/db/include/aptosdb_reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ impl DbReader for AptosDB {

fn get_pre_committed_version(&self) -> Result<Option<Version>> {
gauged_api("get_pre_committed_version", || {
Ok(self.state_store.current_state().lock().current_version)
Ok(self.state_store.current_state().current_version)
})
}

Expand Down Expand Up @@ -641,7 +641,6 @@ impl DbReader for AptosDB {
Ok(self
.state_store
.current_state()
.lock()
.base_version
)
})
Expand Down
5 changes: 2 additions & 3 deletions storage/aptosdb/src/db/include/aptosdb_writer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -238,8 +238,7 @@ impl AptosDB {
);

{
let current_state_guard = self.state_store.current_state();
let current_state = current_state_guard.lock();
let current_state = self.state_store.current_state();
ensure!(
chunk.base_state_version == current_state.base_version,
"base_state_version {:?} does not equal to the base_version {:?} in buffered state with current version {:?}",
Expand Down Expand Up @@ -555,7 +554,7 @@ impl AptosDB {
version_to_commit: Version,
) -> Result<Option<Version>> {
let old_committed_ver = self.ledger_db.metadata_db().get_synced_version()?;
let pre_committed_ver = self.state_store.current_state().lock().current_version;
let pre_committed_ver = self.state_store.current_state().current_version;
ensure!(
old_committed_ver.is_none() || version_to_commit >= old_committed_ver.unwrap(),
"Version too old to commit. Committed: {:?}; Trying to commit with LI: {}",
Expand Down
46 changes: 28 additions & 18 deletions storage/aptosdb/src/state_store/buffered_state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,17 +5,19 @@
use crate::{
metrics::{LATEST_CHECKPOINT_VERSION, OTHER_TIMERS_SECONDS},
state_store::{state_snapshot_committer::StateSnapshotCommitter, CurrentState, StateDb},
state_store::{
persisted_state::PersistedState, state_snapshot_committer::StateSnapshotCommitter,
CurrentState, StateDb,
},
};
use aptos_infallible::Mutex;
use aptos_logger::info;
use aptos_metrics_core::TimerHelper;
use aptos_scratchpad::SmtAncestors;
use aptos_storage_interface::{
db_ensure as ensure,
state_store::{sharded_state_updates::ShardedStateUpdates, state_delta::StateDelta},
AptosDbError, Result,
};
use aptos_types::state_store::state_value::StateValue;
use std::{
sync::{
mpsc,
Expand All @@ -41,7 +43,7 @@ pub struct BufferedState {
/// state after the latest checkpoint. The `current` is the latest speculative state.
/// n.b. this is an `Arc` shared with the StateStore so that merely querying the latest state
/// does not require locking the buffered state.
state_after_checkpoint: CurrentState,
state_after_checkpoint: Arc<Mutex<CurrentState>>,
state_commit_sender: SyncSender<CommitMessage<Arc<StateDelta>>>,
target_items: usize,
join_handle: Option<JoinHandle<()>>,
Expand All @@ -58,25 +60,29 @@ impl BufferedState {
state_db: &Arc<StateDb>,
state_after_checkpoint: StateDelta,
target_items: usize,
) -> (Self, SmtAncestors<StateValue>, CurrentState) {
current_state: Arc<Mutex<CurrentState>>,
persisted_state: Arc<Mutex<PersistedState>>,
) -> Self {
let (state_commit_sender, state_commit_receiver) =
mpsc::sync_channel(ASYNC_COMMIT_CHANNEL_BUFFER_SIZE as usize);
let arc_state_db = Arc::clone(state_db);
let smt_ancestors = SmtAncestors::new(state_after_checkpoint.base.clone());
let smt_ancestors_clone = smt_ancestors.clone();
persisted_state
.lock()
.set(state_after_checkpoint.base.clone());
let persisted_state_clone = persisted_state.clone();
// Create a new thread with receiver subscribing to state commit changes
let join_handle = std::thread::Builder::new()
.name("state-committer".to_string())
.spawn(move || {
let committer = StateSnapshotCommitter::new(
arc_state_db,
state_commit_receiver,
smt_ancestors_clone,
persisted_state_clone,
);
committer.run();
})
.expect("Failed to spawn state committer thread.");
let current_state = CurrentState::new(state_after_checkpoint.clone());
current_state.lock().set(state_after_checkpoint.clone());
let myself = Self {
state_until_checkpoint: None,
state_after_checkpoint: current_state.clone(),
Expand All @@ -86,7 +92,7 @@ impl BufferedState {
join_handle: Some(join_handle),
};
myself.report_latest_committed_version();
(myself, smt_ancestors, current_state)
myself
}

/// This method checks whether a commit is needed based on the target_items value and the number of items in state_until_checkpoint.
Expand Down Expand Up @@ -194,7 +200,7 @@ impl BufferedState {
new_state_after_checkpoint.base_version == state_after_checkpoint.base_version,
"Diff between base and latest checkpoints not provided.",
);
*state_after_checkpoint = new_state_after_checkpoint.clone();
state_after_checkpoint.set(new_state_after_checkpoint.clone());
}
}

Expand All @@ -205,16 +211,20 @@ impl BufferedState {
self.report_latest_committed_version();
Ok(())
}

pub(crate) fn drain(&mut self) {
self.sync_commit();
self.state_commit_sender.send(CommitMessage::Exit).unwrap();
if let Some(handle) = self.join_handle.take() {
handle
.join()
.expect("snapshot commit thread should join peacefully.");
}
}
}

impl Drop for BufferedState {
fn drop(&mut self) {
self.sync_commit();
self.state_commit_sender.send(CommitMessage::Exit).unwrap();
self.join_handle
.take()
.expect("snapshot commit thread must exist.")
.join()
.expect("snapshot commit thread should join peacefully.");
self.drain()
}
}
28 changes: 28 additions & 0 deletions storage/aptosdb/src/state_store/current_state.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
// Copyright (c) Aptos Foundation
// SPDX-License-Identifier: Apache-2.0

use aptos_storage_interface::state_store::state_delta::StateDelta;
use derive_more::{Deref, DerefMut};

#[derive(Clone, Debug, Deref, DerefMut)]
pub(crate) struct CurrentState {
#[deref]
#[deref_mut]
from_latest_checkpoint_to_current: StateDelta,
}

impl CurrentState {
pub fn new_dummy() -> Self {
Self {
from_latest_checkpoint_to_current: StateDelta::new_empty(),
}
}

pub fn set(&mut self, from_latest_checkpoint_to_current: StateDelta) {
self.from_latest_checkpoint_to_current = from_latest_checkpoint_to_current;
}

pub fn get(&self) -> &StateDelta {
&self.from_latest_checkpoint_to_current
}
}
Loading

0 comments on commit e86870f

Please sign in to comment.