Skip to content

Commit

Permalink
feat(db): use a materialized view for ledger updates (#698)
Browse files Browse the repository at this point in the history
* Use a materialized view for ledger updates.

* Add instrumentation

* Use details.address

* Add test for ledger updates and fix a bug with the milestone index.

* Use spent milestone timestamp if it exists

* Add test case for ledger updates by address. Fix indexer _id bug.
  • Loading branch information
Alexandcoats authored Sep 20, 2022
1 parent 8b10d2a commit 493ab8e
Show file tree
Hide file tree
Showing 8 changed files with 285 additions and 110 deletions.
4 changes: 2 additions & 2 deletions src/bin/inx-chronicle/api/stardust/explorer/routes.rs
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ async fn ledger_updates_by_address(

let mut record_stream = database
.collection::<LedgerUpdateCollection>()
.stream_ledger_updates_by_address(
.get_ledger_updates_by_address(
&address_dto,
// Get one extra record so that we can create the cursor.
page_size + 1,
Expand Down Expand Up @@ -99,7 +99,7 @@ async fn ledger_updates_by_milestone(

let mut record_stream = database
.collection::<LedgerUpdateCollection>()
.stream_ledger_updates_by_milestone(milestone_index, page_size + 1, cursor)
.get_ledger_updates_by_milestone(milestone_index, page_size + 1, cursor)
.await?;

// Take all of the requested records first
Expand Down
49 changes: 16 additions & 33 deletions src/bin/inx-chronicle/stardust_inx/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,7 @@ use bee_inx::client::Inx;
use chronicle::{
db::{
collections::{
BlockCollection, LedgerUpdateCollection, MilestoneCollection, OutputCollection, ProtocolUpdateCollection,
TreasuryCollection,
BlockCollection, MilestoneCollection, OutputCollection, ProtocolUpdateCollection, TreasuryCollection,
},
MongoDb,
},
Expand All @@ -24,7 +23,6 @@ use chronicle::{
},
};
use futures::{StreamExt, TryStreamExt};
use tokio::try_join;
use tracing::{debug, info, instrument, trace_span, warn, Instrument};

use self::{chunks::ChunksExt, stream::LedgerUpdateStream};
Expand Down Expand Up @@ -187,6 +185,8 @@ impl Actor for InxWorker {
task.await.unwrap()?;
}

self.db.collection::<OutputCollection>().create_ledger_updates().await?;

info!("Inserted {} unspent outputs.", count);

info!(
Expand Down Expand Up @@ -256,6 +256,10 @@ impl HandleEvent<Result<LedgerUpdateRecord, InxError>> for InxWorker {

insert_unspent_outputs(&self.db, ledger_update.created).await?;
update_spent_outputs(&self.db, ledger_update.consumed).await?;
self.db
.collection::<OutputCollection>()
.merge_into_ledger_updates(ledger_update.milestone_index)
.await?;

handle_cone_stream(&self.db, inx, ledger_update.milestone_index).await?;
handle_protocol_params(&self.db, inx, ledger_update.milestone_index).await?;
Expand All @@ -274,43 +278,22 @@ impl HandleEvent<Result<LedgerUpdateRecord, InxError>> for InxWorker {
#[instrument(skip_all, fields(num = outputs.len()), level = "trace")]
async fn insert_unspent_outputs(db: &MongoDb, outputs: Vec<LedgerOutput>) -> Result<(), InxError> {
let output_collection = db.collection::<OutputCollection>();
let ledger_collection = db.collection::<LedgerUpdateCollection>();
try_join! {
async {
for batch in &outputs.iter().chunks(INSERT_BATCH_SIZE) {
output_collection.insert_unspent_outputs(batch).await?;
}
Result::<_, InxError>::Ok(())
},
async {
for batch in &outputs.iter().chunks(INSERT_BATCH_SIZE) {
ledger_collection.insert_unspent_ledger_updates(batch).await?;
}
Ok(())
}
}?;

for batch in &outputs.iter().chunks(INSERT_BATCH_SIZE) {
output_collection.insert_unspent_outputs(batch).await?;
}

Ok(())
}

#[instrument(skip_all, fields(num = outputs.len()), level = "trace")]
async fn update_spent_outputs(db: &MongoDb, outputs: Vec<LedgerSpent>) -> Result<(), InxError> {
let output_collection = db.collection::<OutputCollection>();
let ledger_collection = db.collection::<LedgerUpdateCollection>();
try_join! {
async {
for batch in &outputs.iter().chunks(INSERT_BATCH_SIZE) {
output_collection.update_spent_outputs(batch).await?;
}
Ok(())
},
async {
for batch in &outputs.iter().chunks(INSERT_BATCH_SIZE) {
ledger_collection.insert_spent_ledger_updates(batch).await?;
}
Ok(())
}

for batch in &outputs.iter().chunks(INSERT_BATCH_SIZE) {
output_collection.update_spent_outputs(batch).await?;
}
.and(Ok(()))
Ok(())
}

#[instrument(skip_all, level = "trace")]
Expand Down
77 changes: 7 additions & 70 deletions src/db/collections/ledger_update.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,20 +5,19 @@ use futures::{Stream, TryStreamExt};
use mongodb::{
bson::{doc, Document},
error::Error,
options::{FindOptions, IndexOptions, InsertManyOptions},
options::{FindOptions, IndexOptions},
IndexModel,
};
use serde::{Deserialize, Serialize};
use tracing::instrument;

use super::SortOrder;
use crate::{
db::{
mongodb::{InsertIgnoreDuplicatesExt, MongoDbCollection, MongoDbCollectionExt},
mongodb::{MongoDbCollection, MongoDbCollectionExt},
MongoDb,
},
types::{
ledger::{LedgerOutput, LedgerSpent, MilestoneIndexTimestamp},
ledger::MilestoneIndexTimestamp,
stardust::{
block::{output::OutputId, Address},
milestone::MilestoneTimestamp,
Expand Down Expand Up @@ -61,15 +60,15 @@ impl MongoDbCollection for LedgerUpdateCollection {
}
}

#[derive(Clone, Debug, Serialize, Deserialize)]
#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
#[allow(missing_docs)]
pub struct LedgerUpdateByAddressRecord {
pub at: MilestoneIndexTimestamp,
pub output_id: OutputId,
pub is_spent: bool,
}

#[derive(Clone, Debug, Serialize, Deserialize)]
#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
#[allow(missing_docs)]
pub struct LedgerUpdateByMilestoneRecord {
pub address: Address,
Expand Down Expand Up @@ -106,70 +105,8 @@ impl LedgerUpdateCollection {
Ok(())
}

/// Inserts [`LedgerSpent`] updates.
#[instrument(skip_all, err, level = "trace")]
pub async fn insert_spent_ledger_updates<'a, I>(&self, outputs: I) -> Result<(), Error>
where
I: IntoIterator<Item = &'a LedgerSpent>,
I::IntoIter: Send + Sync,
{
let ledger_updates = outputs.into_iter().filter_map(
|LedgerSpent {
output: LedgerOutput { output_id, output, .. },
spent_metadata,
}| {
// Ledger updates
output.owning_address().map(|&address| LedgerUpdateDocument {
_id: Id {
milestone_index: spent_metadata.spent.milestone_index,
output_id: *output_id,
is_spent: true,
},
address,
milestone_timestamp: spent_metadata.spent.milestone_timestamp,
})
},
);
self.insert_many_ignore_duplicates(ledger_updates, InsertManyOptions::builder().ordered(false).build())
.await?;

Ok(())
}

/// Inserts unspent [`LedgerOutput`] updates.
#[instrument(skip_all, err, level = "trace")]
pub async fn insert_unspent_ledger_updates<'a, I>(&self, outputs: I) -> Result<(), Error>
where
I: IntoIterator<Item = &'a LedgerOutput>,
I::IntoIter: Send + Sync,
{
let ledger_updates = outputs.into_iter().filter_map(
|LedgerOutput {
output_id,
booked,
output,
..
}| {
// Ledger updates
output.owning_address().map(|&address| LedgerUpdateDocument {
_id: Id {
milestone_index: booked.milestone_index,
output_id: *output_id,
is_spent: false,
},
address,
milestone_timestamp: booked.milestone_timestamp,
})
},
);
self.insert_many_ignore_duplicates(ledger_updates, InsertManyOptions::builder().ordered(false).build())
.await?;

Ok(())
}

/// Streams updates to the ledger for a given address.
pub async fn stream_ledger_updates_by_address(
pub async fn get_ledger_updates_by_address(
&self,
address: &Address,
page_size: usize,
Expand Down Expand Up @@ -216,7 +153,7 @@ impl LedgerUpdateCollection {
}

/// Streams updates to the ledger for a given milestone index (sorted by [`OutputId`]).
pub async fn stream_ledger_updates_by_milestone(
pub async fn get_ledger_updates_by_milestone(
&self,
milestone_index: MilestoneIndex,
page_size: usize,
Expand Down
2 changes: 1 addition & 1 deletion src/db/collections/outputs/indexer/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -164,7 +164,7 @@ impl OutputCollection {
doc! { "$sort": sort },
doc! { "$limit": page_size as i64 },
doc! { "$replaceWith": {
"output_id": "$metadata.output_id",
"output_id": "$_id",
"booked_index": "$metadata.booked.milestone_index"
} },
],
Expand Down
69 changes: 69 additions & 0 deletions src/db/collections/outputs/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ pub use self::indexer::{
use super::OutputKind;
use crate::{
db::{
collections::LedgerUpdateCollection,
mongodb::{InsertIgnoreDuplicatesExt, MongoDbCollection, MongoDbCollectionExt},
MongoDb,
},
Expand Down Expand Up @@ -213,6 +214,74 @@ impl OutputCollection {
Ok(())
}

/// Create the initial ledger updates materialized view by pulling all output data.
#[instrument(skip_all, err, level = "trace")]
pub async fn create_ledger_updates(&self) -> Result<(), Error> {
self.aggregate::<OutputDocument>(
vec![
doc! { "$match": { "details.address": { "$ne": null } } },
doc! { "$project": {
"_id": {
"milestone_index": { "$ifNull": [
"$metadata.spent_metadata.spent.milestone_index",
"$metadata.booked.milestone_index"
] },
"output_id": "$_id",
"is_spent": { "$ne": [ "$metadata.spent_metadata", null ] },
},
"address": "$details.address",
"milestone_timestamp": { "$ifNull": [
"$metadata.spent_metadata.spent.milestone_timestamp",
"$metadata.booked.milestone_timestamp"
] },
} },
doc! { "$merge": { "into": LedgerUpdateCollection::NAME, "whenMatched": "keepExisting" } },
],
None,
)
.await?
.try_next()
.await?;
Ok(())
}

/// Merges the outputs table into the ledger updates materialized view.
#[instrument(skip_all, err, level = "trace")]
pub async fn merge_into_ledger_updates(&self, milestone_index: MilestoneIndex) -> Result<(), Error> {
self.aggregate::<OutputDocument>(
vec![
doc! { "$match": {
"details.address": { "$ne": null },
"$or": [
{ "metadata.booked.milestone_index": milestone_index },
{ "metadata.spent_metadata.spent.milestone_index": milestone_index },
]
} },
doc! { "$project": {
"_id": {
"milestone_index": { "$ifNull": [
"$metadata.spent_metadata.spent.milestone_index",
"$metadata.booked.milestone_index"
] },
"output_id": "$_id",
"is_spent": { "$ne": [ "$metadata.spent_metadata", null ] },
},
"address": "$details.address",
"milestone_timestamp": { "$ifNull": [
"$metadata.spent_metadata.spent.milestone_timestamp",
"$metadata.booked.milestone_timestamp"
] },
} },
doc! { "$merge": { "into": LedgerUpdateCollection::NAME, "whenMatched": "keepExisting" } },
],
None,
)
.await?
.try_next()
.await?;
Ok(())
}

/// Get an [`Output`] by [`OutputId`].
pub async fn get_output(&self, output_id: &OutputId) -> Result<Option<Output>, Error> {
self.aggregate(
Expand Down
2 changes: 1 addition & 1 deletion src/types/stardust/block/output/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ pub struct OutputAmount(#[serde(with = "crate::types::util::stringify")] pub u64

pub type OutputIndex = u16;

#[derive(Copy, Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
#[derive(Copy, Clone, Debug, PartialEq, Eq, Serialize, Deserialize, Hash)]
pub struct OutputId {
pub transaction_id: TransactionId,
pub index: OutputIndex,
Expand Down
2 changes: 1 addition & 1 deletion src/types/stardust/block/payload/transaction.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ use crate::types::{
util::bytify,
};

#[derive(Copy, Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
#[derive(Copy, Clone, Debug, PartialEq, Eq, Serialize, Deserialize, Hash)]
#[serde(transparent)]
pub struct TransactionId(#[serde(with = "bytify")] pub [u8; Self::LENGTH]);

Expand Down
Loading

0 comments on commit 493ab8e

Please sign in to comment.