From bb778828e36d53a7d91a27e55109f2f45621badc Mon Sep 17 00:00:00 2001 From: phoenix <51927076+phoenix-o@users.noreply.github.com> Date: Thu, 5 Sep 2024 10:43:54 -0400 Subject: [PATCH] repair coin index (#19142) temporary PR. This PR introduces a background task to repair the `coin_index` and remove any dangling entries. The PR will be active for one release and will be reverted afterward. The background task works by iterating over a snapshot of the `coin_index`, identifying coins that no longer belong to their respective owners, and populating a list of candidates for removal(some entries might be benign) Once the candidate list is populated, the task makes a second pass over the candidates list. This time it locks the corresponding entries to prevent potential races with concurrent writes. The task then reverifies the eligibility criteria and removes the dangling entries --- crates/sui-core/src/authority.rs | 4 +- crates/sui-core/src/verify_indexes.rs | 60 +++++++++++++++++++++++++++ crates/sui-storage/src/indexes.rs | 6 +-- 3 files changed, 66 insertions(+), 4 deletions(-) diff --git a/crates/sui-core/src/authority.rs b/crates/sui-core/src/authority.rs index 2001ac58a9fd6..daf157c740dad 100644 --- a/crates/sui-core/src/authority.rs +++ b/crates/sui-core/src/authority.rs @@ -6,7 +6,7 @@ use crate::execution_cache::ExecutionCacheTraitPointers; use crate::execution_cache::TransactionCacheRead; use crate::rest_index::RestIndexStore; use crate::transaction_outputs::TransactionOutputs; -use crate::verify_indexes::verify_indexes; +use crate::verify_indexes::{fix_indexes, verify_indexes}; use anyhow::anyhow; use arc_swap::{ArcSwap, Guard}; use async_trait::async_trait; @@ -2709,6 +2709,8 @@ impl AuthorityState { validator_tx_finalizer, }); + let state_clone = Arc::downgrade(&state); + spawn_monitored_task!(fix_indexes(state_clone)); // Start a task to execute ready certificates. let authority_state = Arc::downgrade(&state); spawn_monitored_task!(execution_process( diff --git a/crates/sui-core/src/verify_indexes.rs b/crates/sui-core/src/verify_indexes.rs index 38b4fd2dcd2b0..befec1a5b3809 100644 --- a/crates/sui-core/src/verify_indexes.rs +++ b/crates/sui-core/src/verify_indexes.rs @@ -1,14 +1,17 @@ // Copyright (c) Mysten Labs, Inc. // SPDX-License-Identifier: Apache-2.0 +use std::sync::Weak; use std::{collections::BTreeMap, sync::Arc}; use anyhow::{anyhow, bail, Result}; +use sui_storage::indexes::CoinIndexKey; use sui_storage::{indexes::CoinInfo, IndexStore}; use sui_types::{base_types::ObjectInfo, object::Owner}; use tracing::info; use typed_store::traits::Map; +use crate::authority::AuthorityState; use crate::{authority::authority_store_tables::LiveObject, state_accumulator::AccumulatorStore}; /// This is a very expensive function that verifies some of the secondary indexes. This is done by @@ -88,3 +91,60 @@ pub fn verify_indexes(store: &dyn AccumulatorStore, indexes: Arc) -> Ok(()) } + +// temporary code to repair the coin index. This should be removed in the next release +pub async fn fix_indexes(authority_state: Weak) -> Result<()> { + let is_violation = |coin_index_key: &CoinIndexKey, + state: &Arc| + -> anyhow::Result { + if let Some(object) = state.get_object_store().get_object(&coin_index_key.2)? { + if matches!(object.owner, Owner::AddressOwner(real_owner_id) | Owner::ObjectOwner(real_owner_id) if coin_index_key.0 == real_owner_id) + { + return Ok(false); + } + } + Ok(true) + }; + + tracing::info!("Starting fixing coin index"); + // populate candidate list without locking. Some entries are benign + let authority_state_clone = authority_state.clone(); + let candidates = tokio::task::spawn_blocking(move || { + if let Some(authority) = authority_state_clone.upgrade() { + let mut batch = vec![]; + if let Some(indexes) = &authority.indexes { + for (coin_index_key, _) in indexes.tables().coin_index().unbounded_iter() { + if is_violation(&coin_index_key, &authority)? { + batch.push(coin_index_key); + } + } + } + return Ok::, anyhow::Error>(batch); + } + Ok(vec![]) + }) + .await??; + + if let Some(authority) = authority_state.upgrade() { + if let Some(indexes) = &authority.indexes { + for chunk in candidates.chunks(100) { + let _locks = indexes + .caches + .locks + .acquire_locks(chunk.iter().map(|key| key.0)) + .await; + let mut batch = vec![]; + for key in chunk { + if is_violation(key, &authority)? { + batch.push(key); + } + } + let mut wb = indexes.tables().coin_index().batch(); + wb.delete_batch(indexes.tables().coin_index(), batch)?; + wb.write()?; + } + } + } + tracing::info!("Finished fix for the coin index"); + Ok(()) +} diff --git a/crates/sui-storage/src/indexes.rs b/crates/sui-storage/src/indexes.rs index 4519cedbaaa3a..898bba45a45d5 100644 --- a/crates/sui-storage/src/indexes.rs +++ b/crates/sui-storage/src/indexes.rs @@ -42,7 +42,7 @@ use typed_store::traits::{TableSummary, TypedStoreDebug}; use typed_store::DBMapUtils; type OwnerIndexKey = (SuiAddress, ObjectID); -type CoinIndexKey = (SuiAddress, String, ObjectID); +pub type CoinIndexKey = (SuiAddress, String, ObjectID); type DynamicFieldKey = (ObjectID, ObjectID); type EventId = (TxSequenceNumber, usize); type EventIndex = (TransactionEventsDigest, TransactionDigest, u64); @@ -129,7 +129,7 @@ impl IndexStoreMetrics { pub struct IndexStoreCaches { per_coin_type_balance: ShardedLruCache<(SuiAddress, TypeTag), SuiResult>, all_balances: ShardedLruCache>>>, - locks: MutexTable, + pub locks: MutexTable, } #[derive(Default)] @@ -229,7 +229,7 @@ impl IndexStoreTables { pub struct IndexStore { next_sequence_number: AtomicU64, tables: IndexStoreTables, - caches: IndexStoreCaches, + pub caches: IndexStoreCaches, metrics: Arc, max_type_length: u64, remove_deprecated_tables: bool,