Skip to content

Commit

Permalink
[pruner] introduce indexes pruner
Browse files Browse the repository at this point in the history
  • Loading branch information
phoenix-o committed Feb 10, 2025
1 parent dfbe228 commit cb0a697
Show file tree
Hide file tree
Showing 4 changed files with 263 additions and 47 deletions.
3 changes: 3 additions & 0 deletions crates/sui-config/src/node.rs
Original file line number Diff line number Diff line change
Expand Up @@ -888,6 +888,8 @@ pub struct AuthorityStorePruningConfig {
/// may result in some old versions that will never be pruned.
#[serde(default, skip_serializing_if = "std::ops::Not::not")]
pub enable_compaction_filter: bool,
#[serde(skip_serializing_if = "Option::is_none")]
pub num_epochs_to_retain_for_indexes: Option<u64>,
}

fn default_num_latest_epoch_dbs_to_retain() -> usize {
Expand Down Expand Up @@ -928,6 +930,7 @@ impl Default for AuthorityStorePruningConfig {
killswitch_tombstone_pruning: false,
smooth: true,
enable_compaction_filter: cfg!(test) || cfg!(msim),
num_epochs_to_retain_for_indexes: None,
}
}
}
Expand Down
2 changes: 2 additions & 0 deletions crates/sui-core/src/authority.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2904,6 +2904,7 @@ impl AuthorityState {
store.perpetual_tables.clone(),
checkpoint_store.clone(),
rpc_index.clone(),
indexes.clone(),
store.objects_lock_table.clone(),
config.authority_store_pruning_config.clone(),
epoch_store.committee().authority_exists(&name),
Expand Down Expand Up @@ -3018,6 +3019,7 @@ impl AuthorityState {
&self.database_for_testing().perpetual_tables,
&self.checkpoint_store,
self.rpc_index.as_deref(),
self.indexes.as_deref(),
&self.database_for_testing().objects_lock_table,
None,
config.authority_store_pruning_config,
Expand Down
34 changes: 33 additions & 1 deletion crates/sui-core/src/authority/authority_store_pruner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ use crate::authority::authority_store_types::{
ObjectContentDigest, StoreData, StoreObject, StoreObjectWrapper,
};
use crate::checkpoints::{CheckpointStore, CheckpointWatermark};
use crate::jsonrpc_index::IndexStore;
use crate::rpc_index::RpcIndexStore;
use anyhow::anyhow;
use bincode::Options;
Expand Down Expand Up @@ -250,11 +251,14 @@ impl AuthorityStorePruner {
perpetual_db: &Arc<AuthorityPerpetualTables>,
checkpoint_db: &Arc<CheckpointStore>,
rpc_index: Option<&RpcIndexStore>,
jsonrpc_index: Option<&IndexStore>,
checkpoint_number: CheckpointSequenceNumber,
checkpoints_to_prune: Vec<CheckpointDigest>,
checkpoint_content_to_prune: Vec<CheckpointContents>,
effects_to_prune: &Vec<TransactionEffects>,
metrics: Arc<AuthorityStorePruningMetrics>,
epoch_id: EpochId,
num_epoch_to_retain_for_indexes: Option<u64>,
) -> anyhow::Result<()> {
let _scope = monitored_scope("EffectsLivePruner");

Expand Down Expand Up @@ -316,6 +320,18 @@ impl AuthorityStorePruner {
if let Some(rpc_index) = rpc_index {
rpc_index.prune(&checkpoint_content_to_prune)?;
}
if let (Some(num_epochs_to_retain_for_indexes), Some(jsonrpc_index)) =
(num_epoch_to_retain_for_indexes, jsonrpc_index)
{
let tx_digest = effects_to_prune
.iter()
.filter(|e| e.executed_epoch() + num_epochs_to_retain_for_indexes < epoch_id)
.last()
.map(|e| e.transaction_digest());
if let Some(digest) = tx_digest {
jsonrpc_index.prune(digest)?;
}
}
perpetual_batch.write()?;
checkpoints_batch.write()?;
metrics
Expand Down Expand Up @@ -356,6 +372,7 @@ impl AuthorityStorePruner {
perpetual_db,
checkpoint_store,
rpc_index,
None,
pruner_db,
PruningMode::Objects,
config.num_epochs_to_retain,
Expand All @@ -365,6 +382,7 @@ impl AuthorityStorePruner {
config,
metrics.clone(),
indirect_objects_threshold,
epoch_id,
)
.await
}
Expand All @@ -373,6 +391,7 @@ impl AuthorityStorePruner {
perpetual_db: &Arc<AuthorityPerpetualTables>,
checkpoint_store: &Arc<CheckpointStore>,
rpc_index: Option<&RpcIndexStore>,
jsonrpc_index: Option<&IndexStore>,
objects_lock_table: &Arc<RwLockTable<ObjectContentDigest>>,
pruner_db: Option<&Arc<AuthorityPrunerTables>>,
config: AuthorityStorePruningConfig,
Expand Down Expand Up @@ -416,6 +435,7 @@ impl AuthorityStorePruner {
perpetual_db,
checkpoint_store,
rpc_index,
jsonrpc_index,
pruner_db,
PruningMode::Checkpoints,
config
Expand All @@ -427,6 +447,7 @@ impl AuthorityStorePruner {
config,
metrics.clone(),
indirect_objects_threshold,
epoch_id,
)
.await
}
Expand All @@ -436,6 +457,7 @@ impl AuthorityStorePruner {
perpetual_db: &Arc<AuthorityPerpetualTables>,
checkpoint_store: &Arc<CheckpointStore>,
rpc_index: Option<&RpcIndexStore>,
jsonrpc_index: Option<&IndexStore>,
pruner_db: Option<&Arc<AuthorityPrunerTables>>,
mode: PruningMode,
num_epochs_to_retain: u64,
Expand All @@ -445,6 +467,7 @@ impl AuthorityStorePruner {
config: AuthorityStorePruningConfig,
metrics: Arc<AuthorityStorePruningMetrics>,
indirect_objects_threshold: usize,
epoch_id: EpochId,
) -> anyhow::Result<()> {
let _scope = monitored_scope("PruneForEligibleEpochs");

Expand Down Expand Up @@ -515,11 +538,14 @@ impl AuthorityStorePruner {
perpetual_db,
checkpoint_store,
rpc_index,
jsonrpc_index,
checkpoint_number,
checkpoints_to_prune,
checkpoint_content_to_prune,
&effects_to_prune,
metrics.clone(),
epoch_id,
config.num_epochs_to_retain_for_indexes,
)?,
};
checkpoints_to_prune = vec![];
Expand Down Expand Up @@ -549,11 +575,14 @@ impl AuthorityStorePruner {
perpetual_db,
checkpoint_store,
rpc_index,
jsonrpc_index,
checkpoint_number,
checkpoints_to_prune,
checkpoint_content_to_prune,
&effects_to_prune,
metrics.clone(),
epoch_id,
config.num_epochs_to_retain_for_indexes,
)?,
};
}
Expand Down Expand Up @@ -647,6 +676,7 @@ impl AuthorityStorePruner {
perpetual_db: Arc<AuthorityPerpetualTables>,
checkpoint_store: Arc<CheckpointStore>,
rpc_index: Option<Arc<RpcIndexStore>>,
jsonrpc_index: Option<Arc<IndexStore>>,
objects_lock_table: Arc<RwLockTable<ObjectContentDigest>>,
pruner_db: Option<Arc<AuthorityPrunerTables>>,
metrics: Arc<AuthorityStorePruningMetrics>,
Expand Down Expand Up @@ -714,7 +744,7 @@ impl AuthorityStorePruner {
}
},
_ = checkpoints_prune_interval.tick(), if !matches!(config.num_epochs_to_retain_for_checkpoints(), None | Some(u64::MAX) | Some(0)) => {
if let Err(err) = Self::prune_checkpoints_for_eligible_epochs(&perpetual_db, &checkpoint_store, rpc_index.as_deref(), &objects_lock_table, pruner_db.as_ref(), config.clone(), metrics.clone(), indirect_objects_threshold, archive_readers.clone(), epoch_duration_ms).await {
if let Err(err) = Self::prune_checkpoints_for_eligible_epochs(&perpetual_db, &checkpoint_store, rpc_index.as_deref(), jsonrpc_index.as_deref(), &objects_lock_table, pruner_db.as_ref(), config.clone(), metrics.clone(), indirect_objects_threshold, archive_readers.clone(), epoch_duration_ms).await {
error!("Failed to prune checkpoints: {:?}", err);
}
},
Expand All @@ -729,6 +759,7 @@ impl AuthorityStorePruner {
perpetual_db: Arc<AuthorityPerpetualTables>,
checkpoint_store: Arc<CheckpointStore>,
rpc_index: Option<Arc<RpcIndexStore>>,
jsonrpc_index: Option<Arc<IndexStore>>,
objects_lock_table: Arc<RwLockTable<ObjectContentDigest>>,
mut pruning_config: AuthorityStorePruningConfig,
is_validator: bool,
Expand All @@ -755,6 +786,7 @@ impl AuthorityStorePruner {
perpetual_db,
checkpoint_store,
rpc_index,
jsonrpc_index,
objects_lock_table,
pruner_db,
AuthorityStorePruningMetrics::new(registry),
Expand Down
Loading

0 comments on commit cb0a697

Please sign in to comment.