Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor(indexer): Split object version ingestion #5113

Merged
merged 3 commits into from
Feb 5, 2025
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
25 changes: 24 additions & 1 deletion crates/iota-indexer/src/handlers/checkpoint_handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ use crate::{
tx_processor::{EpochEndIndexingObjectStore, IndexingPackageBuffer, TxChangesProcessor},
},
metrics::IndexerMetrics,
models::display::StoredDisplay,
models::{display::StoredDisplay, obj_indices::StoredObjectVersion},
store::{
IndexerStore, PgIndexerStore,
package_resolver::{IndexerStorePackageResolver, InterimPackageResolver},
Expand Down Expand Up @@ -266,6 +266,27 @@ impl CheckpointHandler {
}))
}

fn derive_object_versions(
object_history_changes: &TransactionObjectChangesToCommit,
) -> Vec<StoredObjectVersion> {
let mut object_versions = vec![];
for changed_obj in object_history_changes.changed_objects.iter() {
object_versions.push(StoredObjectVersion {
object_id: changed_obj.object.id().to_vec(),
object_version: changed_obj.object.version().value() as i64,
cp_sequence_number: changed_obj.checkpoint_sequence_number as i64,
});
tomxey marked this conversation as resolved.
Show resolved Hide resolved
}
for deleted_obj in object_history_changes.deleted_objects.iter() {
object_versions.push(StoredObjectVersion {
object_id: deleted_obj.object_id.to_vec(),
object_version: deleted_obj.object_version as i64,
cp_sequence_number: deleted_obj.checkpoint_sequence_number as i64,
});
tomxey marked this conversation as resolved.
Show resolved Hide resolved
}
object_versions
}

async fn index_checkpoint(
state: Arc<PgIndexerStore>,
data: CheckpointData,
Expand All @@ -284,6 +305,7 @@ impl CheckpointHandler {
Self::index_objects(data.clone(), &metrics, package_resolver.clone()).await?;
let object_history_changes: TransactionObjectChangesToCommit =
Self::index_objects_history(data.clone(), package_resolver.clone()).await?;
let object_versions = Self::derive_object_versions(&object_history_changes);

let (checkpoint, db_transactions, db_events, db_tx_indices, db_event_indices, db_displays) = {
let CheckpointData {
Expand Down Expand Up @@ -339,6 +361,7 @@ impl CheckpointHandler {
display_updates: db_displays,
object_changes,
object_history_changes,
object_versions,
packages,
epoch,
})
Expand Down
8 changes: 8 additions & 0 deletions crates/iota-indexer/src/handlers/committer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,7 @@ async fn commit_checkpoints<S>(
let mut display_updates_batch = BTreeMap::new();
let mut object_changes_batch = vec![];
let mut object_history_changes_batch = vec![];
let mut object_versions_batch = vec![];
let mut packages_batch = vec![];

for indexed_checkpoint in indexed_checkpoint_batch {
Expand All @@ -101,6 +102,7 @@ async fn commit_checkpoints<S>(
display_updates,
object_changes,
object_history_changes,
object_versions,
packages,
epoch: _,
} = indexed_checkpoint;
Expand All @@ -112,6 +114,7 @@ async fn commit_checkpoints<S>(
display_updates_batch.extend(display_updates.into_iter());
object_changes_batch.push(object_changes);
object_history_changes_batch.push(object_history_changes);
object_versions_batch.push(object_versions);
packages_batch.push(packages);
}

Expand All @@ -126,6 +129,10 @@ async fn commit_checkpoints<S>(
.into_iter()
.flatten()
.collect::<Vec<_>>();
let object_versions_batch = object_versions_batch
.into_iter()
.flatten()
.collect::<Vec<_>>();
let packages_batch = packages_batch.into_iter().flatten().collect::<Vec<_>>();
let checkpoint_num = checkpoint_batch.len();
let tx_count = tx_batch.len();
Expand All @@ -141,6 +148,7 @@ async fn commit_checkpoints<S>(
state.persist_packages(packages_batch),
state.persist_objects(object_changes_batch.clone()),
state.persist_object_history(object_history_changes_batch.clone()),
state.persist_object_versions(object_versions_batch.clone()),
];
if let Some(epoch_data) = epoch.clone() {
persist_tasks.push(state.persist_epoch(epoch_data));
Expand Down
3 changes: 2 additions & 1 deletion crates/iota-indexer/src/handlers/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
use std::collections::BTreeMap;

use crate::{
models::display::StoredDisplay,
models::{display::StoredDisplay, obj_indices::StoredObjectVersion},
types::{
EventIndex, IndexedCheckpoint, IndexedDeletedObject, IndexedEpochInfo, IndexedEvent,
IndexedObject, IndexedPackage, IndexedTransaction, TxIndex,
Expand All @@ -28,6 +28,7 @@ pub struct CheckpointDataToCommit {
pub display_updates: BTreeMap<String, StoredDisplay>,
pub object_changes: TransactionObjectChangesToCommit,
pub object_history_changes: TransactionObjectChangesToCommit,
pub object_versions: Vec<StoredObjectVersion>,
pub packages: Vec<IndexedPackage>,
pub epoch: Option<EpochToCommit>,
}
Expand Down
21 changes: 0 additions & 21 deletions crates/iota-indexer/src/models/obj_indices.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@

use diesel::prelude::*;

use super::objects::{StoredDeletedObject, StoredObject};
use crate::schema::objects_version;

/// Model types related to tables that support efficient execution of queries on
Expand All @@ -17,23 +16,3 @@ pub struct StoredObjectVersion {
pub object_version: i64,
pub cp_sequence_number: i64,
}

impl From<&StoredObject> for StoredObjectVersion {
fn from(o: &StoredObject) -> Self {
Self {
object_id: o.object_id.clone(),
object_version: o.object_version,
cp_sequence_number: o.checkpoint_sequence_number,
}
}
}

impl From<&StoredDeletedObject> for StoredObjectVersion {
fn from(o: &StoredDeletedObject) -> Self {
Self {
object_id: o.object_id.clone(),
object_version: o.object_version,
cp_sequence_number: o.checkpoint_sequence_number,
}
}
}
6 changes: 6 additions & 0 deletions crates/iota-indexer/src/store/indexer_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ use crate::{
handlers::{EpochToCommit, TransactionObjectChangesToCommit},
models::{
display::StoredDisplay,
obj_indices::StoredObjectVersion,
objects::{StoredDeletedObject, StoredObject},
},
types::{
Expand Down Expand Up @@ -53,6 +54,11 @@ pub trait IndexerStore: Any + Clone + Sync + Send + 'static {
object_changes: Vec<TransactionObjectChangesToCommit>,
) -> Result<(), IndexerError>;

async fn persist_object_versions(
&self,
object_versions: Vec<StoredObjectVersion>,
) -> Result<(), IndexerError>;

// persist objects snapshot with object changes during backfill
async fn backfill_objects_snapshot(
&self,
Expand Down
69 changes: 61 additions & 8 deletions crates/iota-indexer/src/store/pg_indexer_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -530,16 +530,13 @@ impl PgIndexerStore {
.checkpoint_db_commit_latency_objects_history_chunks
.start_timer();
let mut mutated_objects: Vec<StoredHistoryObject> = vec![];
let mut object_versions: Vec<StoredObjectVersion> = vec![];
let mut deleted_object_ids: Vec<StoredDeletedHistoryObject> = vec![];
for object in objects {
match object {
ObjectChangeToCommit::MutatedObject(stored_object) => {
object_versions.push(StoredObjectVersion::from(&stored_object));
mutated_objects.push(stored_object.into());
}
ObjectChangeToCommit::DeletedObject(stored_deleted_object) => {
object_versions.push(StoredObjectVersion::from(&stored_deleted_object));
deleted_object_ids.push(stored_deleted_object.into());
}
}
Expand All @@ -558,11 +555,6 @@ impl PgIndexerStore {
);
}

for object_version_chunk in object_versions.chunks(PG_COMMIT_CHUNK_SIZE_INTRA_DB_TX)
{
insert_or_ignore_into!(objects_version::table, object_version_chunk, conn);
}

for deleted_objects_chunk in
deleted_object_ids.chunks(PG_COMMIT_CHUNK_SIZE_INTRA_DB_TX)
{
Expand All @@ -586,6 +578,32 @@ impl PgIndexerStore {
})
}

fn persist_object_version_chunk(
&self,
object_versions: Vec<StoredObjectVersion>,
) -> Result<(), IndexerError> {
transactional_blocking_with_retry!(
&self.blocking_cp,
|conn| {
for object_version_chunk in object_versions.chunks(PG_COMMIT_CHUNK_SIZE_INTRA_DB_TX)
{
insert_or_ignore_into!(objects_version::table, object_version_chunk, conn);
}
Ok::<(), IndexerError>(())
},
PG_DB_COMMIT_SLEEP_DURATION
)
.tap_ok(|_| {
info!(
"Persisted {} chunked object versions",
object_versions.len(),
);
})
.tap_err(|e| {
tracing::error!("Failed to persist object version chunk with error: {}", e);
})
}

fn update_objects_snapshot(&self, start_cp: u64, end_cp: u64) -> Result<(), IndexerError> {
let work_mem_gb = std::env::var("INDEXER_PG_WORK_MEM")
.unwrap_or_else(|_e| "16".to_string())
Expand Down Expand Up @@ -1787,6 +1805,41 @@ impl IndexerStore for PgIndexerStore {
Ok(())
}

async fn persist_object_versions(
&self,
object_versions: Vec<StoredObjectVersion>,
) -> Result<(), IndexerError> {
if object_versions.is_empty() {
return Ok(());
}
let object_versions_count = object_versions.len();

let chunks = chunk!(object_versions, self.config.parallel_objects_chunk_size);
let futures = chunks
.into_iter()
.map(|c| self.spawn_blocking_task(move |this| this.persist_object_version_chunk(c)))
.collect::<Vec<_>>();

futures::future::join_all(futures)
tomxey marked this conversation as resolved.
Show resolved Hide resolved
.await
.into_iter()
.collect::<Result<Vec<_>, _>>()
.map_err(|e| {
tracing::error!("Failed to join persist_object_version_chunk futures: {}", e);
IndexerError::from(e)
})?
.into_iter()
.collect::<Result<Vec<_>, _>>()
.map_err(|e| {
IndexerError::PostgresWrite(format!(
"Failed to persist all object version chunks: {:?}",
e
))
})?;
info!("Persisted {} objects history", object_versions_count);
Ok(())
}

async fn update_objects_snapshot(
&self,
start_cp: u64,
Expand Down
Loading