diff --git a/crates/iota-indexer/src/handlers/checkpoint_handler.rs b/crates/iota-indexer/src/handlers/checkpoint_handler.rs index 63c93f7640b..9c9c8d1f0ab 100644 --- a/crates/iota-indexer/src/handlers/checkpoint_handler.rs +++ b/crates/iota-indexer/src/handlers/checkpoint_handler.rs @@ -47,7 +47,7 @@ use crate::{ tx_processor::{EpochEndIndexingObjectStore, IndexingPackageBuffer, TxChangesProcessor}, }, metrics::IndexerMetrics, - models::display::StoredDisplay, + models::{display::StoredDisplay, obj_indices::StoredObjectVersion}, store::{ IndexerStore, PgIndexerStore, package_resolver::{IndexerStorePackageResolver, InterimPackageResolver}, @@ -266,6 +266,19 @@ impl CheckpointHandler { })) } + fn derive_object_versions( + object_history_changes: &TransactionObjectChangesToCommit, + ) -> Vec { + let mut object_versions = vec![]; + for changed_obj in object_history_changes.changed_objects.iter() { + object_versions.push(changed_obj.into()); + } + for deleted_obj in object_history_changes.deleted_objects.iter() { + object_versions.push(deleted_obj.into()); + } + object_versions + } + async fn index_checkpoint( state: Arc, data: CheckpointData, @@ -284,6 +297,7 @@ impl CheckpointHandler { Self::index_objects(data.clone(), &metrics, package_resolver.clone()).await?; let object_history_changes: TransactionObjectChangesToCommit = Self::index_objects_history(data.clone(), package_resolver.clone()).await?; + let object_versions = Self::derive_object_versions(&object_history_changes); let (checkpoint, db_transactions, db_events, db_tx_indices, db_event_indices, db_displays) = { let CheckpointData { @@ -339,6 +353,7 @@ impl CheckpointHandler { display_updates: db_displays, object_changes, object_history_changes, + object_versions, packages, epoch, }) diff --git a/crates/iota-indexer/src/handlers/committer.rs b/crates/iota-indexer/src/handlers/committer.rs index d5ee2af611a..2daa045d13b 100644 --- a/crates/iota-indexer/src/handlers/committer.rs +++ b/crates/iota-indexer/src/handlers/committer.rs @@ -89,6 +89,7 @@ async fn commit_checkpoints( let mut display_updates_batch = BTreeMap::new(); let mut object_changes_batch = vec![]; let mut object_history_changes_batch = vec![]; + let mut object_versions_batch = vec![]; let mut packages_batch = vec![]; for indexed_checkpoint in indexed_checkpoint_batch { @@ -101,6 +102,7 @@ async fn commit_checkpoints( display_updates, object_changes, object_history_changes, + object_versions, packages, epoch: _, } = indexed_checkpoint; @@ -112,6 +114,7 @@ async fn commit_checkpoints( display_updates_batch.extend(display_updates.into_iter()); object_changes_batch.push(object_changes); object_history_changes_batch.push(object_history_changes); + object_versions_batch.push(object_versions); packages_batch.push(packages); } @@ -126,6 +129,10 @@ async fn commit_checkpoints( .into_iter() .flatten() .collect::>(); + let object_versions_batch = object_versions_batch + .into_iter() + .flatten() + .collect::>(); let packages_batch = packages_batch.into_iter().flatten().collect::>(); let checkpoint_num = checkpoint_batch.len(); let tx_count = tx_batch.len(); @@ -141,6 +148,7 @@ async fn commit_checkpoints( state.persist_packages(packages_batch), state.persist_objects(object_changes_batch.clone()), state.persist_object_history(object_history_changes_batch.clone()), + state.persist_object_versions(object_versions_batch.clone()), ]; if let Some(epoch_data) = epoch.clone() { persist_tasks.push(state.persist_epoch(epoch_data)); diff --git a/crates/iota-indexer/src/handlers/mod.rs b/crates/iota-indexer/src/handlers/mod.rs index 4d89c524e4e..6e3e71e7213 100644 --- a/crates/iota-indexer/src/handlers/mod.rs +++ b/crates/iota-indexer/src/handlers/mod.rs @@ -5,7 +5,7 @@ use std::collections::BTreeMap; use crate::{ - models::display::StoredDisplay, + models::{display::StoredDisplay, obj_indices::StoredObjectVersion}, types::{ EventIndex, IndexedCheckpoint, IndexedDeletedObject, IndexedEpochInfo, IndexedEvent, IndexedObject, IndexedPackage, IndexedTransaction, TxIndex, @@ -28,6 +28,7 @@ pub struct CheckpointDataToCommit { pub display_updates: BTreeMap, pub object_changes: TransactionObjectChangesToCommit, pub object_history_changes: TransactionObjectChangesToCommit, + pub object_versions: Vec, pub packages: Vec, pub epoch: Option, } diff --git a/crates/iota-indexer/src/models/obj_indices.rs b/crates/iota-indexer/src/models/obj_indices.rs index 136c2d68064..8ecfe390213 100644 --- a/crates/iota-indexer/src/models/obj_indices.rs +++ b/crates/iota-indexer/src/models/obj_indices.rs @@ -4,8 +4,10 @@ use diesel::prelude::*; -use super::objects::{StoredDeletedObject, StoredObject}; -use crate::schema::objects_version; +use crate::{ + schema::objects_version, + types::{IndexedDeletedObject, IndexedObject}, +}; /// Model types related to tables that support efficient execution of queries on /// the `objects`, `objects_history` and `objects_snapshot` tables. @@ -18,22 +20,22 @@ pub struct StoredObjectVersion { pub cp_sequence_number: i64, } -impl From<&StoredObject> for StoredObjectVersion { - fn from(o: &StoredObject) -> Self { +impl From<&IndexedObject> for StoredObjectVersion { + fn from(o: &IndexedObject) -> Self { Self { - object_id: o.object_id.clone(), - object_version: o.object_version, - cp_sequence_number: o.checkpoint_sequence_number, + object_id: o.object.id().to_vec(), + object_version: o.object.version().value() as i64, + cp_sequence_number: o.checkpoint_sequence_number as i64, } } } -impl From<&StoredDeletedObject> for StoredObjectVersion { - fn from(o: &StoredDeletedObject) -> Self { +impl From<&IndexedDeletedObject> for StoredObjectVersion { + fn from(o: &IndexedDeletedObject) -> Self { Self { - object_id: o.object_id.clone(), - object_version: o.object_version, - cp_sequence_number: o.checkpoint_sequence_number, + object_id: o.object_id.to_vec(), + object_version: o.object_version as i64, + cp_sequence_number: o.checkpoint_sequence_number as i64, } } } diff --git a/crates/iota-indexer/src/store/indexer_store.rs b/crates/iota-indexer/src/store/indexer_store.rs index b41b250f28e..f94a8628db9 100644 --- a/crates/iota-indexer/src/store/indexer_store.rs +++ b/crates/iota-indexer/src/store/indexer_store.rs @@ -11,6 +11,7 @@ use crate::{ handlers::{EpochToCommit, TransactionObjectChangesToCommit}, models::{ display::StoredDisplay, + obj_indices::StoredObjectVersion, objects::{StoredDeletedObject, StoredObject}, }, types::{ @@ -53,6 +54,11 @@ pub trait IndexerStore: Any + Clone + Sync + Send + 'static { object_changes: Vec, ) -> Result<(), IndexerError>; + async fn persist_object_versions( + &self, + object_versions: Vec, + ) -> Result<(), IndexerError>; + // persist objects snapshot with object changes during backfill async fn backfill_objects_snapshot( &self, diff --git a/crates/iota-indexer/src/store/pg_indexer_store.rs b/crates/iota-indexer/src/store/pg_indexer_store.rs index 55c5e3d9e50..3ad032c4233 100644 --- a/crates/iota-indexer/src/store/pg_indexer_store.rs +++ b/crates/iota-indexer/src/store/pg_indexer_store.rs @@ -530,16 +530,13 @@ impl PgIndexerStore { .checkpoint_db_commit_latency_objects_history_chunks .start_timer(); let mut mutated_objects: Vec = vec![]; - let mut object_versions: Vec = vec![]; let mut deleted_object_ids: Vec = vec![]; for object in objects { match object { ObjectChangeToCommit::MutatedObject(stored_object) => { - object_versions.push(StoredObjectVersion::from(&stored_object)); mutated_objects.push(stored_object.into()); } ObjectChangeToCommit::DeletedObject(stored_deleted_object) => { - object_versions.push(StoredObjectVersion::from(&stored_deleted_object)); deleted_object_ids.push(stored_deleted_object.into()); } } @@ -558,11 +555,6 @@ impl PgIndexerStore { ); } - for object_version_chunk in object_versions.chunks(PG_COMMIT_CHUNK_SIZE_INTRA_DB_TX) - { - insert_or_ignore_into!(objects_version::table, object_version_chunk, conn); - } - for deleted_objects_chunk in deleted_object_ids.chunks(PG_COMMIT_CHUNK_SIZE_INTRA_DB_TX) { @@ -586,6 +578,32 @@ impl PgIndexerStore { }) } + fn persist_object_version_chunk( + &self, + object_versions: Vec, + ) -> Result<(), IndexerError> { + transactional_blocking_with_retry!( + &self.blocking_cp, + |conn| { + for object_version_chunk in object_versions.chunks(PG_COMMIT_CHUNK_SIZE_INTRA_DB_TX) + { + insert_or_ignore_into!(objects_version::table, object_version_chunk, conn); + } + Ok::<(), IndexerError>(()) + }, + PG_DB_COMMIT_SLEEP_DURATION + ) + .tap_ok(|_| { + info!( + "Persisted {} chunked object versions", + object_versions.len(), + ); + }) + .tap_err(|e| { + tracing::error!("Failed to persist object version chunk with error: {}", e); + }) + } + fn update_objects_snapshot(&self, start_cp: u64, end_cp: u64) -> Result<(), IndexerError> { let work_mem_gb = std::env::var("INDEXER_PG_WORK_MEM") .unwrap_or_else(|_e| "16".to_string()) @@ -1787,6 +1805,39 @@ impl IndexerStore for PgIndexerStore { Ok(()) } + async fn persist_object_versions( + &self, + object_versions: Vec, + ) -> Result<(), IndexerError> { + if object_versions.is_empty() { + return Ok(()); + } + let object_versions_count = object_versions.len(); + + let chunks = chunk!(object_versions, self.config.parallel_objects_chunk_size); + let futures = chunks + .into_iter() + .map(|c| self.spawn_blocking_task(move |this| this.persist_object_version_chunk(c))) + .collect::>(); + + futures::future::try_join_all(futures) + .await + .map_err(|e| { + tracing::error!("Failed to join persist_object_version_chunk futures: {}", e); + IndexerError::from(e) + })? + .into_iter() + .collect::, _>>() + .map_err(|e| { + IndexerError::PostgresWrite(format!( + "Failed to persist all object version chunks: {:?}", + e + )) + })?; + info!("Persisted {} objects history", object_versions_count); + Ok(()) + } + async fn update_objects_snapshot( &self, start_cp: u64,