diff --git a/mithril-aggregator/src/dependency_injection/builder.rs b/mithril-aggregator/src/dependency_injection/builder.rs index 4d3d6384a86..762cbed1b48 100644 --- a/mithril-aggregator/src/dependency_injection/builder.rs +++ b/mithril-aggregator/src/dependency_injection/builder.rs @@ -24,7 +24,7 @@ use mithril_common::{ CardanoImmutableDigester, DumbImmutableFileObserver, ImmutableDigester, ImmutableFileObserver, ImmutableFileSystemObserver, }, - entities::{CertificatePending, CompressionAlgorithm, Epoch}, + entities::{CardanoDbBeacon, CertificatePending, CompressionAlgorithm, Epoch}, era::{ adapters::{EraReaderAdapterBuilder, EraReaderDummyAdapter}, EraChecker, EraMarker, EraReader, EraReaderAdapter, SupportedEra, @@ -1369,6 +1369,14 @@ impl DependenciesBuilder { let block_range_root_retriever = self.get_transaction_repository().await?; let service = MithrilProverService::new(transaction_retriever, block_range_root_retriever); + service + .compute_cache(&CardanoDbBeacon::new( + self.configuration.get_network()?.to_string(), + 0, + u64::MAX, + )) + .await?; + Ok(Arc::new(service)) } diff --git a/mithril-aggregator/src/services/prover.rs b/mithril-aggregator/src/services/prover.rs index cb2928d1903..3f9bb687223 100644 --- a/mithril-aggregator/src/services/prover.rs +++ b/mithril-aggregator/src/services/prover.rs @@ -3,8 +3,8 @@ use rayon::prelude::*; use std::{ collections::{BTreeMap, BTreeSet, HashMap}, sync::Arc, + time::Duration, }; -use tokio::sync::Mutex; use mithril_common::{ crypto_helper::{MKMap, MKMapNode, MKTree}, @@ -12,6 +12,7 @@ use mithril_common::{ BlockRange, CardanoDbBeacon, CardanoTransaction, CardanoTransactionsSetProof, TransactionHash, }, + resource_pool::ResourcePool, signable_builder::BlockRangeRootRetriever, StdResult, }; @@ -28,14 +29,10 @@ pub trait ProverService: Sync + Send { ) -> StdResult>; /// Compute the cache - async fn compute_cache(&self, _up_to: &CardanoDbBeacon) -> StdResult<()> { - Ok(()) - } + async fn compute_cache(&self, _up_to: &CardanoDbBeacon) -> StdResult<()>; /// Clear the cache - async fn clear_cache(&self) -> StdResult<()> { - Ok(()) - } + async fn clear_cache(&self) -> StdResult<()>; } /// Transactions retriever @@ -62,7 +59,7 @@ pub trait TransactionsRetriever: Sync + Send { pub struct MithrilProverService { transaction_retriever: Arc, block_range_root_retriever: Arc, - mk_map_cache: Mutex>>>, + mk_map_cache: ResourcePool>>, } impl MithrilProverService { @@ -74,7 +71,7 @@ impl MithrilProverService { Self { transaction_retriever, block_range_root_retriever, - mk_map_cache: Mutex::new(None), + mk_map_cache: ResourcePool::default(), } } @@ -119,7 +116,7 @@ impl MithrilProverService { impl ProverService for MithrilProverService { async fn compute_transactions_proofs( &self, - up_to: &CardanoDbBeacon, + _up_to: &CardanoDbBeacon, transaction_hashes: &[TransactionHash], ) -> StdResult> { // 1 - Compute the set of block ranges with transactions to prove @@ -139,9 +136,9 @@ impl ProverService for MithrilProverService { let mk_trees = BTreeMap::from_iter(mk_trees?); // 3 - Compute block range roots Merkle map - self.compute_cache(up_to).await?; - let mut mk_map = self.mk_map_cache.lock().await; - let mk_map = mk_map.as_mut().unwrap(); + let mut mk_map = self + .mk_map_cache + .acquire_resource(Duration::from_millis(1000))?; // 4 - Enrich the Merkle map with the block ranges Merkle trees for (block_range, mk_tree) in mk_trees { @@ -150,6 +147,9 @@ impl ProverService for MithrilProverService { // 5 - Compute the proof for all transactions if let Ok(mk_proof) = mk_map.compute_proof(transaction_hashes) { + self.mk_map_cache + .return_resource(mk_map.into_inner(), mk_map.discriminant())?; + let transaction_hashes_certified: Vec = transaction_hashes .iter() .filter(|hash| mk_proof.contains(&hash.as_str().into()).is_ok()) @@ -166,22 +166,30 @@ impl ProverService for MithrilProverService { } async fn compute_cache(&self, up_to: &CardanoDbBeacon) -> StdResult<()> { - let mut mk_map = self.mk_map_cache.lock().await; - if mk_map.is_none() { - println!("Computing Merkle map from block range roots"); + if self.mk_map_cache.count()? == 0 { + println!("Computing Merkle map cache from block range roots"); + let mk_map_cache = self .block_range_root_retriever .compute_merkle_map_from_block_range_roots(up_to.immutable_file_number) .await?; - mk_map.replace(mk_map_cache); + let discriminant_new = self.mk_map_cache.discriminant()? + 1; + self.mk_map_cache.set_discriminant(discriminant_new)?; + for i in 0..10 { + println!("Computing Merkle map cache from block range roots: {}", i); + self.mk_map_cache + .return_resource(mk_map_cache.clone(), discriminant_new)?; + } + self.mk_map_cache + .return_resource(mk_map_cache, discriminant_new)?; + println!("Done computing Merkle map cache from block range roots"); } Ok(()) } async fn clear_cache(&self) -> StdResult<()> { - let mut mk_map = self.mk_map_cache.lock().await; - mk_map.take(); + self.mk_map_cache.drain(); Ok(()) } @@ -387,6 +395,7 @@ mod tests { }); }, ); + prover.compute_cache(&test_data.beacon).await.unwrap(); let transactions_set_proof = prover .compute_transactions_proofs(&test_data.beacon, &test_data.transaction_hashes_to_prove) @@ -440,6 +449,7 @@ mod tests { }); }, ); + prover.compute_cache(&test_data.beacon).await.unwrap(); let transactions_set_proof = prover .compute_transactions_proofs(&test_data.beacon, &test_data.transaction_hashes_to_prove) @@ -496,6 +506,7 @@ mod tests { }); }, ); + prover.compute_cache(&test_data.beacon).await.unwrap(); let transactions_set_proof = prover .compute_transactions_proofs(&test_data.beacon, &test_data.transaction_hashes_to_prove) @@ -533,6 +544,7 @@ mod tests { .return_once(|_| MKMap::new(&[])); }, ); + prover.compute_cache(&test_data.beacon).await.unwrap(); prover .compute_transactions_proofs(&test_data.beacon, &test_data.transaction_hashes_to_prove) diff --git a/mithril-common/src/lib.rs b/mithril-common/src/lib.rs index 6fb6924cfc4..ea8adf2c2f2 100644 --- a/mithril-common/src/lib.rs +++ b/mithril-common/src/lib.rs @@ -59,6 +59,7 @@ pub mod entities; pub mod era; pub mod messages; pub mod protocol; +pub mod resource_pool; pub mod signable_builder; cfg_test_tools! { diff --git a/mithril-common/src/resource_pool.rs b/mithril-common/src/resource_pool.rs new file mode 100644 index 00000000000..cd82cb939fe --- /dev/null +++ b/mithril-common/src/resource_pool.rs @@ -0,0 +1,316 @@ +//! Resource pool implementation + +use anyhow::Context; +use std::{ + collections::VecDeque, + ops::{Deref, DerefMut}, + sync::{Condvar, Mutex}, + time::Duration, +}; +use thiserror::Error; + +use crate::StdResult; + +/// [ResourcePool] related errors. +#[derive(Error, Debug)] +pub enum ResourcePoolError { + /// Internal Mutex is poisoned + #[error("Poisoned mutex caused error during acquire lock on resource pool")] + PoisonedLock(), + + /// Acquire resource has timed out + #[error("Acquire resource has timed out")] + AcquireTimeout(), +} + +/// Resource pool implementation (FIFO) +pub struct ResourcePool { + /// The size of the pool + size: usize, + + /// Discriminant for the resource pool to check if a returned resource is stale + discriminant: Mutex, + + /// Resources in the pool + resources: Mutex>, + + /// Condition variable to notify when a resource is available + not_empty: Condvar, +} + +impl ResourcePool { + /// Create a new resource pool + pub fn new(pool_size: usize, resources: Vec) -> Self { + Self { + size: pool_size, + discriminant: Mutex::new(0), + resources: Mutex::new(resources.into()), + not_empty: Condvar::new(), + } + } + + /// Acquire a resource from the pool with a timeout + pub fn acquire_resource(&self, timeout: Duration) -> StdResult> { + let mut resources = self + .resources + .lock() + .map_err(|_| ResourcePoolError::PoisonedLock()) + .with_context(|| "Resource pool 'acquire_resource' failed locking Mutex")?; + while resources.is_empty() { + let (resources_locked, timeout) = + self.not_empty.wait_timeout(resources, timeout).unwrap(); + if timeout.timed_out() { + return Err(ResourcePoolError::AcquireTimeout()) + .with_context(|| "Resource pool 'acquire_resource' has timed out"); + } + resources = resources_locked; + } + + Ok(ResourcePoolItem::new(self, resources.pop_front().unwrap())) + } + + /// Return a resource to the pool + /// A resource is returned to the pool only if the discriminant matches + /// and if the pool is not already full + pub fn return_resource(&self, resource: T, discriminant: u64) -> StdResult<()> { + if self.count()? == self.size { + // Pool is full + return Ok(()); + } + let mut resources = self + .resources + .lock() + .map_err(|_| ResourcePoolError::PoisonedLock()) + .with_context(|| "Resource pool 'return_resource' failed locking Mutex")?; + if self.discriminant()? != discriminant { + // Stale resource + return Ok(()); + } + resources.push_back(resource); + self.not_empty.notify_one(); + + Ok(()) + } + + /// Drain the pool + pub fn drain(&self) { + let mut resources = self.resources.lock().unwrap(); + let _ = resources.drain(..).collect::>(); + } + + /// Get the discriminant of the resource pool item + pub fn discriminant(&self) -> StdResult { + Ok(*self + .discriminant + .lock() + .map_err(|_| ResourcePoolError::PoisonedLock()) + .with_context(|| "Resource pool 'discriminant' failed locking Mutex")?) + } + + /// Set the discriminant of the resource pool item + pub fn set_discriminant(&self, discriminant: u64) -> StdResult<()> { + let mut discriminant_guard = self + .discriminant + .lock() + .map_err(|_| ResourcePoolError::PoisonedLock()) + .with_context(|| "Resource pool 'set_discriminant' failed locking Mutex")?; + *discriminant_guard = discriminant; + + Ok(()) + } + + /// Count the resources in the pool + pub fn count(&self) -> StdResult { + Ok(self + .resources + .lock() + .map_err(|_| ResourcePoolError::PoisonedLock()) + .with_context(|| "Resource pool 'count' failed locking Mutex")? + .len()) + } + + /// Size of the resource pool + pub fn size(&self) -> usize { + self.size + } +} + +impl Default for ResourcePool { + fn default() -> Self { + Self::new(10, vec![]) + } +} + +/// Resource pool item which will return the resource to the pool when dropped +pub struct ResourcePoolItem<'a, T: Send + Sync> { + resource_pool: &'a ResourcePool, + discriminant: u64, + resource: Option, +} + +impl<'a, T: Send + Sync> ResourcePoolItem<'a, T> { + /// Create a new resource pool item + pub fn new(resource_pool: &'a ResourcePool, resource: T) -> Self { + let discriminant = *resource_pool.discriminant.lock().unwrap(); + Self { + resource_pool, + discriminant, + resource: Some(resource), + } + } + + /// Get the discriminant of the resource pool item + pub fn discriminant(&self) -> u64 { + self.discriminant + } + + /// Get a reference to the inner resource + pub fn resource(&self) -> &T { + self.resource.as_ref().unwrap() + } + + /// Take the inner resource + pub fn into_inner(&mut self) -> T { + self.resource.take().unwrap() + } +} + +impl Deref for ResourcePoolItem<'_, T> { + type Target = T; + + fn deref(&self) -> &T { + self.resource.as_ref().unwrap() + } +} + +impl DerefMut for ResourcePoolItem<'_, T> { + fn deref_mut(&mut self) -> &mut T { + self.resource.as_mut().unwrap() + } +} + +impl Drop for ResourcePoolItem<'_, T> { + fn drop(&mut self) { + if self.resource.is_some() { + let resource = self.into_inner(); + let _ = self + .resource_pool + .return_resource(resource, self.discriminant); + } + } +} + +#[cfg(test)] +mod tests { + use std::time::Duration; + + use super::*; + + #[test] + fn test_resource_pool_acquire_returns_resource_when_available() { + let pool_size = 10; + let resources_expected: Vec = (0..pool_size).map(|i| i.to_string()).collect(); + let pool = ResourcePool::::new(pool_size, resources_expected.clone()); + + let mut resources_items = vec![]; + for _ in 0..pool_size { + let resource_item = pool.acquire_resource(Duration::from_millis(1000)).unwrap(); + resources_items.push(resource_item); + } + let resources_result = resources_items + .iter_mut() + .map(|resource_item| resource_item.resource().to_owned()) + .collect::>(); + + assert_eq!(resources_expected, resources_result); + assert_eq!(pool.count().unwrap(), 0); + } + + #[tokio::test] + async fn test_resource_pool_acquire_locks_until_timeout_when_no_resource_available() { + let pool_size = 10; + let resources_expected: Vec = (0..pool_size).map(|i| i.to_string()).collect(); + let pool = ResourcePool::::new(pool_size, resources_expected.clone()); + + let mut resources_items = vec![]; + for _ in 0..pool_size { + let resource_item = pool.acquire_resource(Duration::from_millis(1000)).unwrap(); + resources_items.push(resource_item); + } + + assert!(pool.acquire_resource(Duration::from_millis(1000)).is_err()); + } + + #[tokio::test] + async fn test_resource_pool_drains_successfully() { + let pool_size = 10; + let resources_expected: Vec = (0..pool_size).map(|i| i.to_string()).collect(); + let pool = ResourcePool::::new(pool_size, resources_expected.clone()); + assert_eq!(pool.count().unwrap(), pool_size); + + pool.drain(); + + assert_eq!(pool.count().unwrap(), 0); + } + + #[tokio::test] + async fn test_resource_pool_returns_fresh_resource() { + let pool_size = 10; + let resources_expected: Vec = (0..pool_size).map(|i| i.to_string()).collect(); + let pool = ResourcePool::::new(pool_size, resources_expected.clone()); + assert_eq!(pool.count().unwrap(), pool_size); + + let mut resource_item = pool.acquire_resource(Duration::from_millis(1000)).unwrap(); + assert_eq!(pool.count().unwrap(), pool_size - 1); + pool.return_resource(resource_item.into_inner(), pool.discriminant().unwrap()) + .unwrap(); + + assert_eq!(pool.count().unwrap(), pool_size); + } + + #[tokio::test] + async fn test_resource_pool_returns_resource_automatically() { + let pool_size = 10; + let resources_expected: Vec = (0..pool_size).map(|i| i.to_string()).collect(); + let pool = ResourcePool::::new(pool_size, resources_expected.clone()); + assert_eq!(pool.count().unwrap(), pool_size); + + { + let _resource_item = pool.acquire_resource(Duration::from_millis(1000)).unwrap(); + assert_eq!(pool.count().unwrap(), pool_size - 1); + } + + assert_eq!(pool.count().unwrap(), pool_size); + } + + #[tokio::test] + async fn test_resource_pool_does_not_return_resource_when_pool_is_full() { + let pool_size = 10; + let resources_expected: Vec = (0..pool_size).map(|i| i.to_string()).collect(); + let pool = ResourcePool::::new(pool_size, resources_expected.clone()); + assert_eq!(pool.count().unwrap(), pool_size); + + pool.return_resource("resource".to_string(), pool.discriminant().unwrap()) + .unwrap(); + + assert_eq!(pool.count().unwrap(), pool_size); + } + + #[tokio::test] + async fn test_resource_pool_does_not_return_stale_resource() { + let pool_size = 10; + let resources_expected: Vec = (0..pool_size).map(|i| i.to_string()).collect(); + let pool = ResourcePool::::new(pool_size, resources_expected.clone()); + assert_eq!(pool.count().unwrap(), pool_size); + + let mut resource_item = pool.acquire_resource(Duration::from_millis(1000)).unwrap(); + assert_eq!(pool.count().unwrap(), pool_size - 1); + let discriminant_stale = pool.discriminant().unwrap(); + pool.set_discriminant(pool.discriminant().unwrap() + 1) + .unwrap(); + pool.return_resource(resource_item.into_inner(), discriminant_stale) + .unwrap(); + + assert_eq!(pool.count().unwrap(), pool_size - 1); + } +}