Skip to content

Commit

Permalink
feat: [MR-609] More eagerly remove replicated state at checkpoint hei…
Browse files Browse the repository at this point in the history
…ghts (#906)

Close [MR-609](https://dfinity.atlassian.net/browse/MR-609)

After several preparation PRs manage to keep extra in-memory states
required for pending CUP, `remove_inmemory_states_below` can now safely
remove in-memory states at previous checkpoint heights.

In order to do that, we separate `checkpoints_to_keep` and
`inmemory_states_keep` in `remove_states_below_impl`. As a result,
`snapshots` and `states_metadata` are not in sync anymore. We need to
adjust some logic when updating them in state sync and
`commit_and_certify`.

[MR-609]:
https://dfinity.atlassian.net/browse/MR-609?atlOrigin=eyJpIjoiNWRkNTljNzYxNjVmNDY3MDlhMDU5Y2ZhYzA5YTRkZjUiLCJwIjoiZ2l0aHViLWNvbS1KU1cifQ
  • Loading branch information
ShuoWangNSL authored Dec 13, 2024
1 parent 91dd6b3 commit 52e0fac
Show file tree
Hide file tree
Showing 2 changed files with 298 additions and 144 deletions.
160 changes: 89 additions & 71 deletions rs/state_manager/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -898,9 +898,6 @@ fn check_certifications_metadata_snapshots_and_states_metadata_are_consistent(
.filter(|h| h.get() != 0)
.collect::<Vec<_>>();
debug_assert_eq!(certification_heights, snapshot_heights);
for h in states.states_metadata.keys() {
debug_assert!(states.certifications_metadata.contains_key(h));
}
}

fn initialize_tip(
Expand Down Expand Up @@ -2155,53 +2152,61 @@ impl StateManagerImpl {
check_certifications_metadata_snapshots_and_states_metadata_are_consistent(&states);
states.disable_state_fetch_below(height);

if states
let is_snapshot_present = states
.snapshots
.iter()
.any(|snapshot| snapshot.height == height)
{
.any(|snapshot| snapshot.height == height);

let is_state_metadata_present = states.states_metadata.contains_key(&height);

// If both the snapshot and the state metadata are present, we can safely skip it.
if is_snapshot_present && is_state_metadata_present {
info!(
self.log,
"Completed StateSync for state {} that we already have locally", height
);
return;
}

states.snapshots.push_back(Snapshot {
height,
state: Arc::new(state),
});
states
.snapshots
.make_contiguous()
.sort_by_key(|snapshot| snapshot.height);
if !is_snapshot_present {
states.snapshots.push_back(Snapshot {
height,
state: Arc::new(state),
});
states
.snapshots
.make_contiguous()
.sort_by_key(|snapshot| snapshot.height);

self.metrics
.resident_state_count
.set(states.snapshots.len() as i64);
self.metrics
.resident_state_count
.set(states.snapshots.len() as i64);

states
.certifications_metadata
.insert(height, certification_metadata);
states
.certifications_metadata
.insert(height, certification_metadata);
}

let state_size_bytes: i64 = manifest
.file_table
.iter()
.map(|f| f.size_bytes as i64)
.sum();

states.states_metadata.insert(
height,
StateMetadata {
checkpoint_layout: Some(cp_layout),
bundled_manifest: Some(BundledManifest {
root_hash,
manifest,
meta_manifest,
}),
state_sync_file_group: None,
},
);
if !is_state_metadata_present {
states.states_metadata.insert(
height,
StateMetadata {
checkpoint_layout: Some(cp_layout),
bundled_manifest: Some(BundledManifest {
root_hash,
manifest,
meta_manifest,
}),
state_sync_file_group: None,
},
);
}

let latest_height = update_latest_height(&self.latest_state_height, height);
if latest_height == height.get() {
Expand Down Expand Up @@ -2253,6 +2258,18 @@ impl StateManagerImpl {
#[cfg(debug_assertions)]
let latest_state_height = self.latest_state_height();

// Practically, Consensus does not ask state manager to keep states which are already removed.
// However, in debug builds, we filter `extra_inmemory_heights_to_keep` and store `existing_extra_inmemory_heights_to_keep`
// so that we can verify later that they are all retained.
#[cfg(debug_assertions)]
let state_heights = self.list_state_heights(ic_interfaces_state_manager::CERT_ANY);
#[cfg(debug_assertions)]
let existing_extra_inmemory_heights_to_keep: Vec<Height> = extra_inmemory_heights_to_keep
.iter()
.filter(|h| state_heights.contains(h))
.copied()
.collect();

let heights_to_remove = std::ops::Range {
start: Height::new(1),
end: last_height_to_keep,
Expand All @@ -2273,31 +2290,25 @@ impl StateManagerImpl {
state_metadata.bundled_manifest.as_ref().map(|_| *height)
});

// The `extra_inmemory_heights_to_keep` is used for preserving in-memory states,
// but it can safely be included in the `heights_to_keep` set, which retains both
// in-memory states and checkpoints. This is safe because:
//
// 1. When called by `remove_inmemory_states_below`, checkpoints are never removed,
// regardless of the inclusion of `extra_inmemory_heights_to_keep`, so no harm
// or unnecessary preservation occurs.
//
// 2. When called by `remove_states_below`, `extra_inmemory_heights_to_keep` is always
// an empty set, having no effect on the outcome.
//
// In the future, separating these sets could clarify their distinct purposes and
// simplify reasoning about correctness without relying heavily on input behavior.
let heights_to_keep: BTreeSet<Height> = states
// We keep checkpoints at or above the `last_checkpoint_to_keep` height
// as well as the one with latest manifest for the purpose of incremental manifest computation and fast state sync.
let checkpoint_heights_to_keep: BTreeSet<Height> = states
.states_metadata
.keys()
.copied()
.filter(|height| {
*height == Self::INITIAL_STATE_HEIGHT || *height >= last_checkpoint_to_keep
})
.chain(std::iter::once(latest_certified_height))
.chain(latest_manifest_height)
.chain(extra_inmemory_heights_to_keep.iter().copied())
.collect();

// In addition, we retain the latest certified state and any extra states specified to keep.
// Note that `checkpoint_heights_to_keep` and `inmemory_heights_to_keep` are separate,
// as decisions to retain a checkpoint or an in-memory state are made independently.
let inmemory_heights_to_keep = std::iter::once(latest_certified_height)
.chain(extra_inmemory_heights_to_keep.iter().copied())
.collect::<BTreeSet<_>>();

// Send object to deallocation thread if it has capacity.
let deallocate = |x| {
if self.deallocation_sender.len() < DEALLOCATION_BACKLOG_THRESHOLD {
Expand All @@ -2311,7 +2322,7 @@ impl StateManagerImpl {

let (removed, retained) = states.snapshots.drain(0..).partition(|snapshot| {
heights_to_remove.contains(&snapshot.height)
&& !heights_to_keep.contains(&snapshot.height)
&& !inmemory_heights_to_keep.contains(&snapshot.height)
});
states.snapshots = retained;

Expand All @@ -2332,7 +2343,6 @@ impl StateManagerImpl {
.iter()
.map(|s| s.height)
.filter(|h| h.get() != 0)
.chain(states.states_metadata.keys().copied())
.min();
if let Some(min_resident_height) = min_resident_height {
self.metrics
Expand All @@ -2348,7 +2358,7 @@ impl StateManagerImpl {
deallocate(Box::new(removed));

for (height, metadata) in states.states_metadata.range(heights_to_remove) {
if heights_to_keep.contains(height) {
if checkpoint_heights_to_keep.contains(height) {
continue;
}
if let Some(ref checkpoint_layout) = metadata.checkpoint_layout {
Expand All @@ -2361,7 +2371,7 @@ impl StateManagerImpl {
.certifications_metadata
.split_off(&last_height_to_keep);

for h in heights_to_keep.iter() {
for h in inmemory_heights_to_keep.iter() {
if let Some(cert_metadata) = states.certifications_metadata.remove(h) {
certifications_metadata.insert(*h, cert_metadata);
}
Expand Down Expand Up @@ -2391,7 +2401,7 @@ impl StateManagerImpl {

let mut metadata_to_keep = states.states_metadata.split_off(&last_height_to_keep);

for h in heights_to_keep.iter() {
for h in checkpoint_heights_to_keep.iter() {
if let Some(metadata) = states.states_metadata.remove(h) {
metadata_to_keep.insert(*h, metadata);
}
Expand All @@ -2418,7 +2428,6 @@ impl StateManagerImpl {

#[cfg(debug_assertions)]
{
use ic_interfaces_state_manager::CERT_ANY;
let unfiltered_checkpoint_heights = self
.state_layout
.unfiltered_checkpoint_heights()
Expand All @@ -2430,13 +2439,17 @@ impl StateManagerImpl {
)
});

let state_heights = self.list_state_heights(CERT_ANY);
let state_heights = self.list_state_heights(ic_interfaces_state_manager::CERT_ANY);

// All checkpoints to keep should exist on disk.
debug_assert!(checkpoint_heights_to_keep
.iter()
.all(|h| unfiltered_checkpoint_heights.contains(h)));

debug_assert!(heights_to_keep
// If the in-memory states that Consensus ask to keep exist in the beginning, they should be all retained.
debug_assert!(existing_extra_inmemory_heights_to_keep
.iter()
.all(|h| unfiltered_checkpoint_heights.contains(h)
|| extra_inmemory_heights_to_keep.contains(h)
|| *h == latest_certified_height));
.all(|h| state_heights.contains(h)));

debug_assert!(state_heights.contains(&latest_state_height));
debug_assert!(state_heights.contains(&latest_certified_height));
Expand Down Expand Up @@ -3566,18 +3579,6 @@ impl StateManager for StateManagerImpl {
.certifications_metadata
.insert(height, certification_metadata);

if let Some((state_metadata, compute_manifest_request)) =
state_metadata_and_compute_manifest_request
{
states.states_metadata.insert(height, state_metadata);
debug_assert!(self.tip_channel.len() <= 1);
self.tip_channel
.send(compute_manifest_request)
.expect("failed to send ComputeManifestRequest message");
} else {
debug_assert!(scope != CertificationScope::Full);
}

let latest_height = update_latest_height(&self.latest_state_height, height);
self.metrics.max_resident_height.set(latest_height as i64);
{
Expand All @@ -3593,6 +3594,23 @@ impl StateManager for StateManagerImpl {
}
}

if let Some((state_metadata, compute_manifest_request)) =
state_metadata_and_compute_manifest_request
{
let metadata = states
.states_metadata
.entry(height)
.or_insert(state_metadata);
debug_assert!(self.tip_channel.len() <= 1);
if metadata.bundled_manifest.is_none() {
self.tip_channel
.send(compute_manifest_request)
.expect("failed to send ComputeManifestRequest message");
}
} else {
debug_assert!(scope != CertificationScope::Full);
}

self.metrics
.resident_state_count
.set(states.snapshots.len() as i64);
Expand Down
Loading

0 comments on commit 52e0fac

Please sign in to comment.