diff --git a/bin/inx-chronicle/src/api/stardust/core/routes.rs b/bin/inx-chronicle/src/api/stardust/core/routes.rs index 32d773376..256d69ca2 100644 --- a/bin/inx-chronicle/src/api/stardust/core/routes.rs +++ b/bin/inx-chronicle/src/api/stardust/core/routes.rs @@ -26,13 +26,12 @@ use chronicle::{ db::MongoDb, types::{ ledger::{OutputMetadata, OutputWithMetadata}, - stardust::block::{BlockId, MilestoneId, MilestoneOption, OutputId, TransactionId}, + stardust::block::{BlockId, MilestoneId, OutputId, TransactionId}, tangle::MilestoneIndex, }, }; use futures::TryStreamExt; use lazy_static::lazy_static; -use mongodb::bson; use packable::PackableExt; use super::responses::BlockChildrenResponse; @@ -226,38 +225,38 @@ async fn transaction_included_block( } async fn receipts(database: Extension) -> ApiResult { - let mut milestone_options = database.get_milestone_options().await?; + let mut receipts_at = database.stream_all_receipts().await?; let mut receipts = Vec::new(); - while let Some(doc) = milestone_options.try_next().await? { - // TODO: unwrap - let (index, opt): (MilestoneIndex, MilestoneOption) = bson::from_document(doc).unwrap(); - let opt: &bee_block_stardust::payload::milestone::MilestoneOption = &opt.try_into().unwrap(); - let opt: MilestoneOptionDto = opt.into(); + while let Some((receipt, at)) = receipts_at.try_next().await? { + let receipt: &bee_block_stardust::payload::milestone::MilestoneOption = &receipt.try_into()?; + let receipt: bee_block_stardust::payload::milestone::option::dto::MilestoneOptionDto = receipt.into(); - if let MilestoneOptionDto::Receipt(receipt) = opt { + if let MilestoneOptionDto::Receipt(receipt) = receipt { receipts.push(ReceiptDto { receipt, - milestone_index: *index, + milestone_index: *at, }); + } else { + unreachable!("the query only returns receipt milestone options"); } } Ok(ReceiptsResponse { receipts }) } async fn receipts_migrated_at(database: Extension, Path(index): Path) -> ApiResult { - let mut milestone_options = database.get_milestone_options_migrated_at(index.into()).await?; + let mut receipts_at = database.stream_receipts_migrated_at(index.into()).await?; let mut receipts = Vec::new(); - while let Some(doc) = milestone_options.try_next().await? { - // TODO: unwrap - let (index, opt): (MilestoneIndex, MilestoneOption) = bson::from_document(doc).unwrap(); - let opt: &bee_block_stardust::payload::milestone::MilestoneOption = &opt.try_into().unwrap(); - let opt: MilestoneOptionDto = opt.into(); + while let Some((receipt, at)) = receipts_at.try_next().await? { + let receipt: &bee_block_stardust::payload::milestone::MilestoneOption = &receipt.try_into()?; + let receipt: bee_block_stardust::payload::milestone::option::dto::MilestoneOptionDto = receipt.into(); - if let MilestoneOptionDto::Receipt(receipt) = opt { + if let MilestoneOptionDto::Receipt(receipt) = receipt { receipts.push(ReceiptDto { receipt, - milestone_index: *index, + milestone_index: *at, }); + } else { + unreachable!("the query only returns receipt milestone options"); } } Ok(ReceiptsResponse { receipts }) diff --git a/src/db/collections/milestone.rs b/src/db/collections/milestone.rs index 48b529c69..b63fa28e2 100644 --- a/src/db/collections/milestone.rs +++ b/src/db/collections/milestone.rs @@ -3,10 +3,9 @@ use std::ops::RangeInclusive; -use bee_rest_api_stardust::types::dtos as bee; use futures::{Stream, TryStreamExt}; use mongodb::{ - bson::{self, doc, Document}, + bson::{self, doc}, error::Error, options::{FindOneOptions, FindOptions, IndexOptions, UpdateOptions}, IndexModel, @@ -17,7 +16,7 @@ use crate::{ db::MongoDb, types::{ stardust::{ - block::{MilestoneId, MilestonePayload}, + block::{MilestoneId, MilestoneOption, MilestonePayload}, milestone::MilestoneTimestamp, }, tangle::MilestoneIndex, @@ -127,7 +126,7 @@ impl MongoDb { .aggregate( vec![ doc! { "$match": { "milestone_id": milestone_id } }, - doc! { "$replaceRoot": { "newRoot": "$payload" } }, + doc! { "$replaceWith": "$payload" }, ], None, ) @@ -148,7 +147,7 @@ impl MongoDb { .aggregate( vec![ doc! { "$match": { "milestone_index": index } }, - doc! { "$replaceRoot": { "newRoot": "$payload" } }, + doc! { "$replaceWith": "$payload" }, ], None, ) @@ -335,140 +334,77 @@ impl MongoDb { Ok(sync_data) } - // TODO: use dedicated type `MilestoneOptionsWithIndex` instead of Document - /// Returns a stream of all available receipt milestone options together with their corresponding `MilestoneIndex`. - pub async fn get_milestone_options(&self) -> Result>, Error> { - self.0 - .collection::(MilestoneDocument::COLLECTION) + /// Streams all available receipt milestone options together with their corresponding `MilestoneIndex`. + pub async fn stream_all_receipts( + &self, + ) -> Result>, Error> { + #[derive(Deserialize)] + struct ReceiptAtIndex { + receipt: MilestoneOption, + at: MilestoneIndex, + } + + Ok(self + .0 + .collection::(MilestoneDocument::COLLECTION) .aggregate( vec![ + doc! { "$unwind": "payload.essence.options"}, doc! { "$match": { - "payload.essence.options.receipt.migrated_at": { "$exists": true }, + "options.receipt.migrated_at": { "$exists": true }, + } }, + doc! { "$replaceWith": { + "receipt": "options.receipt" , + "at": "$milestone_index" , } }, - doc! { "$replaceRoot": { "newRoot": { - "milestone_index": "$milestone_index" , - "milestone_options": "$payload.essence.options" , - } } }, + doc! { "$sort": { "at": 1 } }, ], None, ) - .await + .await? + .map_ok(|doc| { + // Panic: we made sure that this is infallible. + let ReceiptAtIndex { receipt, at } = bson::from_document::(doc).unwrap(); + + (receipt, at) + })) } - // TODO: use dedicated type `MilestoneOptionsWithIndex` instead of Document - // TODO: might be better to return a Vec right away instead of a Stream. - /// Returns a stream of all available receipt milestone options belonging to a given migration index. - pub async fn get_milestone_options_migrated_at( + /// Streams all available receipt milestone options together with their corresponding `MilestoneIndex` that were + /// migrated at the given index. + pub async fn stream_receipts_migrated_at( &self, migrated_at: MilestoneIndex, - ) -> Result>, Error> { - self.0 - .collection::(MilestoneDocument::COLLECTION) + ) -> Result>, Error> { + #[derive(Deserialize)] + struct ReceiptAtIndex { + receipt: MilestoneOption, + at: MilestoneIndex, + } + + Ok(self + .0 + .collection::(MilestoneDocument::COLLECTION) .aggregate( vec![ + doc! { "$unwind": "payload.essence.options"}, doc! { "$match": { - "payload.essence.options.receipt.migrated_at": migrated_at , + "options.receipt.migrated_at": { "$and": [ { "$exists": true }, { "$eq": migrated_at } ] }, + } }, + doc! { "$replaceWith": { + "receipt": "options.receipt" , + "at": "$milestone_index" , } }, - doc! { "$replaceRoot": { "newRoot": { - "milestone_index": "$milestone_index" , - "milestone_options": "$payload.essence.options" , - } } }, + doc! { "$sort": { "at": 1 } }, ], None, ) - .await - } - - async fn milestone_records_sorted_with_receipt( - &self, - ) -> Result>, Error> { - self.0 - .collection::(MilestoneDocument::COLLECTION) - .find( - doc! { "payload.essence.options.receipt.migrated_at": { "$ne": 0} }, - FindOptions::builder().sort(doc! {"milestone_index": 1u32}).build(), - ) - .await - } - - /// Returns all stored receipts. - pub async fn get_receipts(&self) -> Result, Error> { - let mut milestone_records = self.milestone_records_sorted_with_receipt().await?; - let mut receipt_dtos = vec![]; - while let Some(milestone_record) = milestone_records.try_next().await? { - receipt_dtos.extend( - milestone_record - .payload - .essence - .options - .iter() - .cloned() - .filter_map(|o| { - // TODO: fix this uglyness - let o: &bee_block_stardust::payload::milestone::MilestoneOption = &o.try_into().unwrap(); - let o: bee_block_stardust::payload::milestone::option::dto::MilestoneOptionDto = o.into(); - if let bee_block_stardust::payload::milestone::option::dto::MilestoneOptionDto::Receipt( - receipt, - ) = o - { - Some(bee::ReceiptDto { - receipt, - milestone_index: *milestone_record.milestone_index, - }) - } else { - None - } - }), - ); - } - Ok(receipt_dtos) - } - - /// Returns all stored receipts with the given migration index. - pub async fn get_receipts_migrated_at(&self, migrated_at: MilestoneIndex) -> Result, Error> { - let mut milestone_records = self - .milestone_records_sorted_with_receipt_migrated_at(migrated_at) - .await?; - let mut receipt_dtos = vec![]; - while let Some(milestone_record) = milestone_records.try_next().await? { - receipt_dtos.extend( - milestone_record - .payload - .essence - .options - .iter() - .cloned() - .filter_map(|o| { - // TODO: fix this uglyness - let o: &bee_block_stardust::payload::milestone::MilestoneOption = &o.try_into().unwrap(); - let o: bee_block_stardust::payload::milestone::option::dto::MilestoneOptionDto = o.into(); - if let bee_block_stardust::payload::milestone::option::dto::MilestoneOptionDto::Receipt( - receipt, - ) = o - { - Some(bee::ReceiptDto { - receipt, - milestone_index: *milestone_record.milestone_index, - }) - } else { - None - } - }), - ); - } - Ok(receipt_dtos) - } + .await? + .map_ok(|doc| { + // Panic: we made sure that this is infallible. + let ReceiptAtIndex { receipt, at } = bson::from_document::(doc).unwrap(); - async fn milestone_records_sorted_with_receipt_migrated_at( - &self, - migrated_at: MilestoneIndex, - ) -> Result>, Error> { - self.0 - .collection::(MilestoneDocument::COLLECTION) - .find( - doc! { "payload.essence.options.receipt.migrated_at": migrated_at }, - FindOptions::builder().sort(doc! {"milestone_index": 1u32}).build(), - ) - .await + (receipt, at) + })) } }