diff --git a/tap_aggregator/src/aggregator.rs b/tap_aggregator/src/aggregator.rs index 5348415c..74c5d23f 100644 --- a/tap_aggregator/src/aggregator.rs +++ b/tap_aggregator/src/aggregator.rs @@ -41,7 +41,7 @@ pub async fn check_and_aggregate_receipts( check_receipt_timestamps(receipts, previous_rav.as_ref())?; // Get the allocation id from the first receipt, return error if there are no receipts - let allocation_id = match receipts.get(0) { + let allocation_id = match receipts.first() { Some(receipt) => receipt.message.allocation_id, None => return Err(tap_core::Error::NoValidReceiptsForRAVRequest.into()), }; diff --git a/tap_core/src/adapters/mock.rs b/tap_core/src/adapters/mock.rs index 4b0fe771..85c63db2 100644 --- a/tap_core/src/adapters/mock.rs +++ b/tap_core/src/adapters/mock.rs @@ -1,7 +1,9 @@ // Copyright 2023-, Semiotic AI, Inc. // SPDX-License-Identifier: Apache-2.0 +pub mod auditor_executor_mock; pub mod escrow_adapter_mock; +pub mod executor_mock; pub mod rav_storage_adapter_mock; pub mod receipt_checks_adapter_mock; pub mod receipt_storage_adapter_mock; diff --git a/tap_core/src/adapters/mock/auditor_executor_mock.rs b/tap_core/src/adapters/mock/auditor_executor_mock.rs new file mode 100644 index 00000000..49fa8eff --- /dev/null +++ b/tap_core/src/adapters/mock/auditor_executor_mock.rs @@ -0,0 +1,143 @@ +// Copyright 2023-, Semiotic AI, Inc. +// SPDX-License-Identifier: Apache-2.0 + +use super::{escrow_adapter_mock::AdpaterErrorMock, receipt_checks_adapter_mock::AdapterErrorMock}; +use crate::adapters::escrow_adapter::EscrowAdapter; +use crate::adapters::receipt_checks_adapter::ReceiptChecksAdapter; +use crate::eip_712_signed_message::EIP712SignedMessage; +use crate::tap_receipt::{Receipt, ReceivedReceipt}; +use alloy_primitives::Address; +use async_trait::async_trait; +use std::{ + collections::{HashMap, HashSet}, + sync::Arc, +}; +use tokio::sync::RwLock; + +#[derive(Clone)] +pub struct AuditorExecutorMock { + receipt_storage: Arc>>, + + sender_escrow_storage: Arc>>, + + query_appraisals: Arc>>, + allocation_ids: Arc>>, + sender_ids: Arc>>, +} + +impl AuditorExecutorMock { + pub fn new( + receipt_storage: Arc>>, + sender_escrow_storage: Arc>>, + query_appraisals: Arc>>, + allocation_ids: Arc>>, + sender_ids: Arc>>, + ) -> Self { + AuditorExecutorMock { + receipt_storage, + sender_escrow_storage, + allocation_ids, + sender_ids, + query_appraisals, + } + } +} + +impl AuditorExecutorMock { + pub async fn escrow(&self, sender_id: Address) -> Result { + let sender_escrow_storage = self.sender_escrow_storage.read().await; + if let Some(escrow) = sender_escrow_storage.get(&sender_id) { + return Ok(*escrow); + } + Err(AdpaterErrorMock::AdapterError { + error: "No escrow exists for provided sender ID.".to_owned(), + }) + } + + pub async fn increase_escrow(&mut self, sender_id: Address, value: u128) { + let mut sender_escrow_storage = self.sender_escrow_storage.write().await; + + if let Some(current_value) = sender_escrow_storage.get(&sender_id) { + let mut sender_escrow_storage = self.sender_escrow_storage.write().await; + sender_escrow_storage.insert(sender_id, current_value + value); + } else { + sender_escrow_storage.insert(sender_id, value); + } + } + + pub async fn reduce_escrow( + &self, + sender_id: Address, + value: u128, + ) -> Result<(), AdpaterErrorMock> { + let mut sender_escrow_storage = self.sender_escrow_storage.write().await; + + if let Some(current_value) = sender_escrow_storage.get(&sender_id) { + let checked_new_value = current_value.checked_sub(value); + if let Some(new_value) = checked_new_value { + sender_escrow_storage.insert(sender_id, new_value); + return Ok(()); + } + } + Err(AdpaterErrorMock::AdapterError { + error: "Provided value is greater than existing escrow.".to_owned(), + }) + } +} + +#[async_trait] +impl EscrowAdapter for AuditorExecutorMock { + type AdapterError = AdpaterErrorMock; + async fn get_available_escrow(&self, sender_id: Address) -> Result { + self.escrow(sender_id).await + } + async fn subtract_escrow( + &self, + sender_id: Address, + value: u128, + ) -> Result<(), Self::AdapterError> { + self.reduce_escrow(sender_id, value).await + } +} + +#[async_trait] +impl ReceiptChecksAdapter for AuditorExecutorMock { + type AdapterError = AdapterErrorMock; + + async fn is_unique( + &self, + receipt: &EIP712SignedMessage, + receipt_id: u64, + ) -> Result { + let receipt_storage = self.receipt_storage.read().await; + Ok(receipt_storage + .iter() + .all(|(stored_receipt_id, stored_receipt)| { + (stored_receipt.signed_receipt().message != receipt.message) + || *stored_receipt_id == receipt_id + })) + } + + async fn is_valid_allocation_id( + &self, + allocation_id: Address, + ) -> Result { + let allocation_ids = self.allocation_ids.read().await; + Ok(allocation_ids.contains(&allocation_id)) + } + + async fn is_valid_value(&self, value: u128, query_id: u64) -> Result { + let query_appraisals = self.query_appraisals.read().await; + let appraised_value = query_appraisals.get(&query_id).unwrap(); + + if value != *appraised_value { + return Ok(false); + } + Ok(true) + } + + async fn is_valid_sender_id(&self, sender_id: Address) -> Result { + let sender_ids = self.sender_ids.read().await; + Ok(sender_ids.contains(&sender_id)) + } +} diff --git a/tap_core/src/adapters/mock/executor_mock.rs b/tap_core/src/adapters/mock/executor_mock.rs new file mode 100644 index 00000000..76898659 --- /dev/null +++ b/tap_core/src/adapters/mock/executor_mock.rs @@ -0,0 +1,244 @@ +// Copyright 2023-, Semiotic AI, Inc. +// SPDX-License-Identifier: Apache-2.0 + +use super::{escrow_adapter_mock::AdpaterErrorMock, receipt_checks_adapter_mock::AdapterErrorMock}; +use crate::adapters::escrow_adapter::EscrowAdapter; +use crate::adapters::receipt_checks_adapter::ReceiptChecksAdapter; +use crate::adapters::receipt_storage_adapter::{ + safe_truncate_receipts, ReceiptRead, ReceiptStore, StoredReceipt, +}; +use crate::eip_712_signed_message::EIP712SignedMessage; +use crate::tap_receipt::{Receipt, ReceivedReceipt}; +use crate::{ + adapters::rav_storage_adapter::{RAVRead, RAVStore}, + tap_manager::SignedRAV, +}; +use alloy_primitives::Address; +use async_trait::async_trait; +use std::ops::RangeBounds; +use std::{ + collections::{HashMap, HashSet}, + sync::Arc, +}; +use tokio::sync::RwLock; + +pub type EscrowStorage = Arc>>; +pub type QueryAppraisals = Arc>>; + +#[derive(Clone)] +pub struct ExecutorMock { + /// local RAV store with rwlocks to allow sharing with other compenents as needed + rav_storage: Arc>>, + receipt_storage: Arc>>, + unique_id: Arc>, + + sender_escrow_storage: EscrowStorage, + + query_appraisals: QueryAppraisals, + allocation_ids: Arc>>, + sender_ids: Arc>>, +} + +impl ExecutorMock { + pub fn new( + rav_storage: Arc>>, + receipt_storage: Arc>>, + sender_escrow_storage: Arc>>, + query_appraisals: Arc>>, + allocation_ids: Arc>>, + sender_ids: Arc>>, + ) -> Self { + ExecutorMock { + rav_storage, + receipt_storage, + unique_id: Arc::new(RwLock::new(0)), + sender_escrow_storage, + allocation_ids, + sender_ids, + query_appraisals, + } + } +} + +#[async_trait] +impl RAVStore for ExecutorMock { + type AdapterError = AdpaterErrorMock; + + async fn update_last_rav(&self, rav: SignedRAV) -> Result<(), Self::AdapterError> { + let mut rav_storage = self.rav_storage.write().await; + *rav_storage = Some(rav); + Ok(()) + } +} + +#[async_trait] +impl RAVRead for ExecutorMock { + type AdapterError = AdpaterErrorMock; + + async fn last_rav(&self) -> Result, Self::AdapterError> { + Ok(self.rav_storage.read().await.clone()) + } +} + +#[async_trait] +impl ReceiptStore for ExecutorMock { + type AdapterError = AdapterErrorMock; + async fn store_receipt(&self, receipt: ReceivedReceipt) -> Result { + let mut id_pointer = self.unique_id.write().await; + let id_previous = *id_pointer; + let mut receipt_storage = self.receipt_storage.write().await; + receipt_storage.insert(*id_pointer, receipt); + *id_pointer += 1; + Ok(id_previous) + } + async fn update_receipt_by_id( + &self, + receipt_id: u64, + receipt: ReceivedReceipt, + ) -> Result<(), Self::AdapterError> { + let mut receipt_storage = self.receipt_storage.write().await; + + if !receipt_storage.contains_key(&receipt_id) { + return Err(AdapterErrorMock::AdapterError { + error: "Invalid receipt_id".to_owned(), + }); + }; + + receipt_storage.insert(receipt_id, receipt); + *self.unique_id.write().await += 1; + Ok(()) + } + async fn remove_receipts_in_timestamp_range + std::marker::Send>( + &self, + timestamp_ns: R, + ) -> Result<(), Self::AdapterError> { + let mut receipt_storage = self.receipt_storage.write().await; + receipt_storage.retain(|_, rx_receipt| { + !timestamp_ns.contains(&rx_receipt.signed_receipt().message.timestamp_ns) + }); + Ok(()) + } +} + +#[async_trait] +impl ReceiptRead for ExecutorMock { + type AdapterError = AdapterErrorMock; + async fn retrieve_receipts_in_timestamp_range + std::marker::Send>( + &self, + timestamp_range_ns: R, + limit: Option, + ) -> Result, Self::AdapterError> { + let receipt_storage = self.receipt_storage.read().await; + let mut receipts_in_range: Vec<(u64, ReceivedReceipt)> = receipt_storage + .iter() + .filter(|(_, rx_receipt)| { + timestamp_range_ns.contains(&rx_receipt.signed_receipt().message.timestamp_ns) + }) + .map(|(&id, rx_receipt)| (id, rx_receipt.clone())) + .collect(); + + if limit.is_some_and(|limit| receipts_in_range.len() > limit as usize) { + safe_truncate_receipts(&mut receipts_in_range, limit.unwrap()); + } + Ok(receipts_in_range.into_iter().map(|r| r.into()).collect()) + } +} + +impl ExecutorMock { + pub async fn escrow(&self, sender_id: Address) -> Result { + let sender_escrow_storage = self.sender_escrow_storage.read().await; + if let Some(escrow) = sender_escrow_storage.get(&sender_id) { + return Ok(*escrow); + } + Err(AdpaterErrorMock::AdapterError { + error: "No escrow exists for provided sender ID.".to_owned(), + }) + } + + pub async fn increase_escrow(&mut self, sender_id: Address, value: u128) { + let mut sender_escrow_storage = self.sender_escrow_storage.write().await; + + if let Some(current_value) = sender_escrow_storage.get(&sender_id) { + let mut sender_escrow_storage = self.sender_escrow_storage.write().await; + sender_escrow_storage.insert(sender_id, current_value + value); + } else { + sender_escrow_storage.insert(sender_id, value); + } + } + + pub async fn reduce_escrow( + &self, + sender_id: Address, + value: u128, + ) -> Result<(), AdpaterErrorMock> { + let mut sender_escrow_storage = self.sender_escrow_storage.write().await; + + if let Some(current_value) = sender_escrow_storage.get(&sender_id) { + let checked_new_value = current_value.checked_sub(value); + if let Some(new_value) = checked_new_value { + sender_escrow_storage.insert(sender_id, new_value); + return Ok(()); + } + } + Err(AdpaterErrorMock::AdapterError { + error: "Provided value is greater than existing escrow.".to_owned(), + }) + } +} + +#[async_trait] +impl EscrowAdapter for ExecutorMock { + type AdapterError = AdpaterErrorMock; + async fn get_available_escrow(&self, sender_id: Address) -> Result { + self.escrow(sender_id).await + } + async fn subtract_escrow( + &self, + sender_id: Address, + value: u128, + ) -> Result<(), Self::AdapterError> { + self.reduce_escrow(sender_id, value).await + } +} + +#[async_trait] +impl ReceiptChecksAdapter for ExecutorMock { + type AdapterError = AdapterErrorMock; + + async fn is_unique( + &self, + receipt: &EIP712SignedMessage, + receipt_id: u64, + ) -> Result { + let receipt_storage = self.receipt_storage.read().await; + Ok(receipt_storage + .iter() + .all(|(stored_receipt_id, stored_receipt)| { + (stored_receipt.signed_receipt().message != receipt.message) + || *stored_receipt_id == receipt_id + })) + } + + async fn is_valid_allocation_id( + &self, + allocation_id: Address, + ) -> Result { + let allocation_ids = self.allocation_ids.read().await; + Ok(allocation_ids.contains(&allocation_id)) + } + + async fn is_valid_value(&self, value: u128, query_id: u64) -> Result { + let query_appraisals = self.query_appraisals.read().await; + let appraised_value = query_appraisals.get(&query_id).unwrap(); + + if value != *appraised_value { + return Ok(false); + } + Ok(true) + } + + async fn is_valid_sender_id(&self, sender_id: Address) -> Result { + let sender_ids = self.sender_ids.read().await; + Ok(sender_ids.contains(&sender_id)) + } +} diff --git a/tap_core/src/tap_manager/manager.rs b/tap_core/src/tap_manager/manager.rs index 792b6e3d..c0f6a345 100644 --- a/tap_core/src/tap_manager/manager.rs +++ b/tap_core/src/tap_manager/manager.rs @@ -19,51 +19,46 @@ use crate::{ Error, }; -pub struct Manager { - /// Adapter for RAV CRUD - rav_storage_adapter: RAVSA, - /// Adapter for receipt CRUD - receipt_storage_adapter: RSA, +pub struct Manager { + /// Executor that implements adapters + executor: E, /// Checks that must be completed for each receipt before being confirmed or denied for rav request required_checks: Vec, /// Struct responsible for doing checks for receipt. Ownership stays with manager allowing manager /// to update configuration ( like minimum timestamp ). - receipt_auditor: ReceiptAuditor, + receipt_auditor: ReceiptAuditor, } -impl Manager { +impl Manager +where + E: Clone, +{ /// Creates new manager with provided `adapters`, any receipts received by this manager /// will complete all `required_checks` before being accepted or declined from RAV. /// `starting_min_timestamp` will be used as min timestamp until the first RAV request is created. /// pub fn new( domain_separator: Eip712Domain, - escrow_adapter: EA, - receipt_checks_adapter: RCA, - rav_storage_adapter: RAVSA, - receipt_storage_adapter: RSA, + executor: E, required_checks: Vec, starting_min_timestamp_ns: u64, ) -> Self { let receipt_auditor = ReceiptAuditor::new( domain_separator, - escrow_adapter, - receipt_checks_adapter, + executor.clone(), starting_min_timestamp_ns, ); Self { - rav_storage_adapter, - receipt_storage_adapter, + executor, required_checks, receipt_auditor, } } } -impl Manager +impl Manager where - RCA: ReceiptChecksAdapter, - RAVSA: RAVStore, + E: RAVStore + ReceiptChecksAdapter, { /// Verify `signed_rav` matches all values on `expected_rav`, and that `signed_rav` has a valid signer. /// @@ -87,7 +82,7 @@ where }); } - self.rav_storage_adapter + self.executor .update_last_rav(signed_rav) .await .map_err(|err| Error::AdapterError { @@ -98,27 +93,25 @@ where } } -impl Manager +impl Manager where - RAVSA: RAVRead, + E: RAVRead, { async fn get_previous_rav(&self) -> Result, Error> { - let previous_rav = - self.rav_storage_adapter - .last_rav() - .await - .map_err(|err| Error::AdapterError { - source_error: anyhow::Error::new(err), - })?; + let previous_rav = self + .executor + .last_rav() + .await + .map_err(|err| Error::AdapterError { + source_error: anyhow::Error::new(err), + })?; Ok(previous_rav) } } -impl Manager +impl Manager where - EA: EscrowAdapter, - RCA: ReceiptChecksAdapter, - RSA: ReceiptRead, + E: ReceiptRead + EscrowAdapter + ReceiptChecksAdapter, { async fn collect_receipts( &self, @@ -141,7 +134,7 @@ where }); } let received_receipts = self - .receipt_storage_adapter + .executor .retrieve_receipts_in_timestamp_range(min_timestamp_ns..max_timestamp_ns, limit) .await .map_err(|err| Error::AdapterError { @@ -183,12 +176,9 @@ where } } -impl Manager +impl Manager where - EA: EscrowAdapter, - RCA: ReceiptChecksAdapter, - RSA: ReceiptRead, - RAVSA: RAVRead, + E: ReceiptRead + RAVRead + EscrowAdapter + ReceiptChecksAdapter, { /// Completes remaining checks on all receipts up to (current time - `timestamp_buffer_ns`). Returns them in /// two lists (valid receipts and invalid receipts) along with the expected RAV that should be received @@ -253,10 +243,9 @@ where } } -impl Manager +impl Manager where - RSA: ReceiptStore, - RAVSA: RAVRead, + E: ReceiptStore + RAVRead, { /// Removes obsolete receipts from storage. Obsolete receipts are receipts that are older than the last RAV, and /// therefore already aggregated into the RAV. @@ -270,7 +259,7 @@ where pub async fn remove_obsolete_receipts(&self) -> Result<(), Error> { match self.get_previous_rav().await? { Some(last_rav) => { - self.receipt_storage_adapter + self.executor .remove_receipts_in_timestamp_range(..=last_rav.message.timestamp_ns) .await .map_err(|err| Error::AdapterError { @@ -283,11 +272,9 @@ where } } -impl Manager +impl Manager where - EA: EscrowAdapter, - RCA: ReceiptChecksAdapter, - RSA: ReceiptStore, + E: ReceiptStore + EscrowAdapter + ReceiptChecksAdapter, { /// Runs `initial_checks` on `signed_receipt` for initial verification, then stores received receipt. /// The provided `query_id` will be used as a key when chaecking query appraisal. @@ -313,7 +300,7 @@ where // This function first stores it, then checks it, then updates what was stored. let receipt_id = self - .receipt_storage_adapter + .executor .store_receipt(received_receipt.clone()) .await .map_err(|err| Error::AdapterError { @@ -326,7 +313,7 @@ where .await; } - self.receipt_storage_adapter + self.executor .update_receipt_by_id(receipt_id, received_receipt) .await .map_err(|err| Error::AdapterError { diff --git a/tap_core/src/tap_manager/test/manager_test.rs b/tap_core/src/tap_manager/test/manager_test.rs index 741ea19f..c81e38a0 100644 --- a/tap_core/src/tap_manager/test/manager_test.rs +++ b/tap_core/src/tap_manager/test/manager_test.rs @@ -19,10 +19,9 @@ mod manager_unit_test { use crate::{ adapters::{ escrow_adapter_mock::EscrowAdapterMock, - rav_storage_adapter_mock::RAVStorageAdapterMock, + executor_mock::{EscrowStorage, ExecutorMock, QueryAppraisals}, receipt_checks_adapter_mock::ReceiptChecksAdapterMock, receipt_storage_adapter::ReceiptRead, - receipt_storage_adapter_mock::ReceiptStorageAdapterMock, }, eip_712_signed_message::EIP712SignedMessage, get_current_timestamp_u64_ns, @@ -73,27 +72,40 @@ mod manager_unit_test { } #[fixture] - fn rav_storage_adapter() -> RAVStorageAdapterMock { + fn executor_mock() -> (ExecutorMock, EscrowStorage, QueryAppraisals) { let rav_storage = Arc::new(RwLock::new(None)); + let receipt_storage = Arc::new(RwLock::new(HashMap::new())); + + let sender_escrow_storage = Arc::new(RwLock::new(HashMap::new())); + + let allocation_ids_set = Arc::new(RwLock::new(HashSet::from_iter(allocation_ids()))); + let sender_ids_set = Arc::new(RwLock::new(HashSet::from_iter(sender_ids()))); + let query_appraisal_storage = Arc::new(RwLock::new(HashMap::new())); - RAVStorageAdapterMock::new(rav_storage) + ( + ExecutorMock::new( + rav_storage, + receipt_storage, + sender_escrow_storage.clone(), + query_appraisal_storage.clone(), + allocation_ids_set, + sender_ids_set, + ), + sender_escrow_storage, + query_appraisal_storage, + ) } #[fixture] - fn escrow_adapters() -> (EscrowAdapterMock, Arc>>) { + fn escrow_adapters() -> (EscrowAdapterMock, EscrowStorage) { let sender_escrow_storage = Arc::new(RwLock::new(HashMap::new())); let escrow_adapter = EscrowAdapterMock::new(Arc::clone(&sender_escrow_storage)); (escrow_adapter, sender_escrow_storage) } #[fixture] - fn receipt_adapters() -> ( - ReceiptStorageAdapterMock, - ReceiptChecksAdapterMock, - Arc>>, - ) { + fn receipt_adapters() -> (ReceiptChecksAdapterMock, Arc>>) { let receipt_storage = Arc::new(RwLock::new(HashMap::new())); - let receipt_storage_adapter = ReceiptStorageAdapterMock::new(Arc::clone(&receipt_storage)); let allocation_ids_set = Arc::new(RwLock::new(HashSet::from_iter(allocation_ids()))); let sender_ids_set = Arc::new(RwLock::new(HashSet::from_iter(sender_ids()))); @@ -106,11 +118,7 @@ mod manager_unit_test { Arc::clone(&sender_ids_set), ); - ( - receipt_storage_adapter, - receipt_checks_adapter, - query_appraisal_storage, - ) + (receipt_checks_adapter, query_appraisal_storage) } #[rstest] @@ -119,30 +127,19 @@ mod manager_unit_test { #[case::no_checks(Vec::::new())] #[tokio::test] async fn manager_verify_and_store_varying_initial_checks( - rav_storage_adapter: RAVStorageAdapterMock, - escrow_adapters: (EscrowAdapterMock, Arc>>), - receipt_adapters: ( - ReceiptStorageAdapterMock, - ReceiptChecksAdapterMock, - Arc>>, - ), + executor_mock: (ExecutorMock, EscrowStorage, QueryAppraisals), keys: (LocalWallet, Address), allocation_ids: Vec
, domain_separator: Eip712Domain, #[case] initial_checks: Vec, ) { - let (escrow_adapter, escrow_storage) = escrow_adapters; - let (receipt_storage_adapter, receipt_checks_adapter, query_appraisal_storage) = - receipt_adapters; + let (executor, escrow_storage, query_appraisal_storage) = executor_mock; // give receipt 5 second variance for min start time let starting_min_timestamp = get_current_timestamp_u64_ns().unwrap() - 500000000; let manager = Manager::new( domain_separator.clone(), - escrow_adapter, - receipt_checks_adapter, - rav_storage_adapter, - receipt_storage_adapter, + executor, get_full_list_of_checks(), starting_min_timestamp, ); @@ -174,30 +171,19 @@ mod manager_unit_test { #[case::no_checks(Vec::::new())] #[tokio::test] async fn manager_create_rav_request_all_valid_receipts( - rav_storage_adapter: RAVStorageAdapterMock, - escrow_adapters: (EscrowAdapterMock, Arc>>), - receipt_adapters: ( - ReceiptStorageAdapterMock, - ReceiptChecksAdapterMock, - Arc>>, - ), + executor_mock: (ExecutorMock, EscrowStorage, QueryAppraisals), keys: (LocalWallet, Address), allocation_ids: Vec
, domain_separator: Eip712Domain, #[case] initial_checks: Vec, ) { - let (escrow_adapter, escrow_storage) = escrow_adapters; - let (receipt_storage_adapter, receipt_checks_adapter, query_appraisal_storage) = - receipt_adapters; + let (executor, escrow_storage, query_appraisal_storage) = executor_mock; // give receipt 5 second variance for min start time let starting_min_timestamp = get_current_timestamp_u64_ns().unwrap() - 500000000; let manager = Manager::new( domain_separator.clone(), - escrow_adapter, - receipt_checks_adapter, - rav_storage_adapter, - receipt_storage_adapter, + executor, get_full_list_of_checks(), starting_min_timestamp, ); @@ -251,30 +237,19 @@ mod manager_unit_test { #[case::no_checks(Vec::::new())] #[tokio::test] async fn manager_create_multiple_rav_requests_all_valid_receipts( - rav_storage_adapter: RAVStorageAdapterMock, - escrow_adapters: (EscrowAdapterMock, Arc>>), - receipt_adapters: ( - ReceiptStorageAdapterMock, - ReceiptChecksAdapterMock, - Arc>>, - ), + executor_mock: (ExecutorMock, EscrowStorage, QueryAppraisals), keys: (LocalWallet, Address), allocation_ids: Vec
, domain_separator: Eip712Domain, #[case] initial_checks: Vec, ) { - let (escrow_adapter, escrow_storage) = escrow_adapters; - let (receipt_storage_adapter, receipt_checks_adapter, query_appraisal_storage) = - receipt_adapters; + let (executor, escrow_storage, query_appraisal_storage) = executor_mock; // give receipt 5 second variance for min start time let starting_min_timestamp = get_current_timestamp_u64_ns().unwrap() - 500000000; let manager = Manager::new( domain_separator.clone(), - escrow_adapter, - receipt_checks_adapter, - rav_storage_adapter, - receipt_storage_adapter, + executor, get_full_list_of_checks(), starting_min_timestamp, ); @@ -384,13 +359,7 @@ mod manager_unit_test { #[rstest] #[tokio::test] async fn manager_create_multiple_rav_requests_all_valid_receipts_consecutive_timestamps( - rav_storage_adapter: RAVStorageAdapterMock, - escrow_adapters: (EscrowAdapterMock, Arc>>), - receipt_adapters: ( - ReceiptStorageAdapterMock, - ReceiptChecksAdapterMock, - Arc>>, - ), + executor_mock: (ExecutorMock, EscrowStorage, QueryAppraisals), keys: (LocalWallet, Address), allocation_ids: Vec
, domain_separator: Eip712Domain, @@ -398,18 +367,13 @@ mod manager_unit_test { initial_checks: Vec, #[values(true, false)] remove_old_receipts: bool, ) { - let (escrow_adapter, escrow_storage) = escrow_adapters; - let (receipt_storage_adapter, receipt_checks_adapter, query_appraisal_storage) = - receipt_adapters; + let (executor, escrow_storage, query_appraisal_storage) = executor_mock; // give receipt 5 second variance for min start time let starting_min_timestamp = get_current_timestamp_u64_ns().unwrap() - 500000000; let manager = Manager::new( domain_separator.clone(), - escrow_adapter, - receipt_checks_adapter, - rav_storage_adapter, - receipt_storage_adapter, + executor, get_full_list_of_checks(), starting_min_timestamp, ); @@ -500,7 +464,7 @@ mod manager_unit_test { // We expect to have 10 receipts left in receipt storage assert_eq!( manager - .receipt_storage_adapter + .executor .retrieve_receipts_in_timestamp_range(.., None) .await .unwrap() diff --git a/tap_core/src/tap_receipt/receipt_auditor.rs b/tap_core/src/tap_receipt/receipt_auditor.rs index cc977c5a..02cbea08 100644 --- a/tap_core/src/tap_receipt/receipt_auditor.rs +++ b/tap_core/src/tap_receipt/receipt_auditor.rs @@ -17,24 +17,21 @@ use crate::{ use super::{received_receipt::Checking, AwaitingReserve, ReceiptWithState}; -pub struct ReceiptAuditor { +pub struct ReceiptAuditor { domain_separator: Eip712Domain, - escrow_adapter: EA, - receipt_checks_adapter: RCA, + executor: E, min_timestamp_ns: RwLock, } -impl ReceiptAuditor { +impl ReceiptAuditor { pub fn new( domain_separator: Eip712Domain, - escrow_adapter: EA, - receipt_checks_adapter: RCA, + executor: E, starting_min_timestamp_ns: u64, ) -> Self { Self { domain_separator, - escrow_adapter, - receipt_checks_adapter, + executor, min_timestamp_ns: RwLock::new(starting_min_timestamp_ns), } } @@ -105,10 +102,9 @@ impl ReceiptAuditor { } } -impl ReceiptAuditor +impl ReceiptAuditor where - EA: EscrowAdapter, - RCA: ReceiptChecksAdapter, + E: EscrowAdapter + ReceiptChecksAdapter, { pub async fn check( &self, @@ -143,9 +139,9 @@ where } } -impl ReceiptAuditor +impl ReceiptAuditor where - RCA: ReceiptChecksAdapter, + E: ReceiptChecksAdapter, { async fn check_uniqueness( &self, @@ -153,7 +149,7 @@ where receipt_id: u64, ) -> ReceiptResult<()> { if !self - .receipt_checks_adapter + .executor .is_unique(signed_receipt, receipt_id) .await .map_err(|e| ReceiptError::CheckFailedToComplete { @@ -170,7 +166,7 @@ where signed_receipt: &EIP712SignedMessage, ) -> ReceiptResult<()> { if !self - .receipt_checks_adapter + .executor .is_valid_allocation_id(signed_receipt.message.allocation_id) .await .map_err(|e| ReceiptError::CheckFailedToComplete { @@ -210,7 +206,7 @@ where query_id: u64, ) -> ReceiptResult<()> { if !self - .receipt_checks_adapter + .executor .is_valid_value(signed_receipt.message.value, query_id) .await .map_err(|e| ReceiptError::CheckFailedToComplete { @@ -256,7 +252,7 @@ where source_error_message: err.to_string(), })?; if !self - .receipt_checks_adapter + .executor .is_valid_sender_id(receipt_signer_address) .await .map_err(|e| ReceiptError::CheckFailedToComplete { @@ -298,7 +294,7 @@ where ) -> Result<()> { let rav_signer_address = signed_rav.recover_signer(&self.domain_separator)?; if !self - .receipt_checks_adapter + .executor .is_valid_sender_id(rav_signer_address) .await .map_err(|err| Error::AdapterError { @@ -313,9 +309,9 @@ where } } -impl ReceiptAuditor +impl ReceiptAuditor where - EA: EscrowAdapter, + E: EscrowAdapter, { pub async fn check_and_reserve_escrow( &self, @@ -328,7 +324,7 @@ where source_error_message: err.to_string(), })?; if self - .escrow_adapter + .executor .subtract_escrow(receipt_signer_address, signed_receipt.message.value) .await .is_err() diff --git a/tap_core/src/tap_receipt/received_receipt.rs b/tap_core/src/tap_receipt/received_receipt.rs index 758d2a1a..d008ef55 100644 --- a/tap_core/src/tap_receipt/received_receipt.rs +++ b/tap_core/src/tap_receipt/received_receipt.rs @@ -218,13 +218,12 @@ where } impl ReceiptWithState { - pub async fn check_and_reserve_escrow( + pub async fn check_and_reserve_escrow( self, - auditor: &ReceiptAuditor, + auditor: &ReceiptAuditor, ) -> ResultReceipt where - EA: EscrowAdapter, - RCA: ReceiptChecksAdapter, + A: EscrowAdapter + ReceiptChecksAdapter, { match auditor.check_and_reserve_escrow(&self).await { Ok(_) => Ok(self.perform_state_changes(Reserved)), @@ -244,12 +243,14 @@ impl ReceiptWithState { /// /// Returns [`Error::InvalidCheckError] if requested error in not a required check (list of required checks provided by user on construction) /// - pub async fn perform_check( + pub async fn perform_check( &mut self, check: &ReceiptCheck, receipt_id: u64, - receipt_auditor: &ReceiptAuditor, - ) { + receipt_auditor: &ReceiptAuditor, + ) where + A: EscrowAdapter + ReceiptChecksAdapter, + { // Only perform check if it is incomplete // Don't check if already failed if !self.check_is_complete(check) && !self.any_check_resulted_in_error() { @@ -264,11 +265,14 @@ impl ReceiptWithState { } } - pub async fn perform_check_batch( + pub async fn perform_check_batch( batch: &mut [Self], check: &ReceiptCheck, - receipt_auditor: &ReceiptAuditor, - ) -> Result<()> { + receipt_auditor: &ReceiptAuditor, + ) -> Result<()> + where + A: EscrowAdapter + ReceiptChecksAdapter, + { let results = receipt_auditor.check_batch(check, batch).await; for (receipt, result) in batch.iter_mut().zip(results) { @@ -288,12 +292,14 @@ impl ReceiptWithState { /// /// Returns [`Error::InvalidCheckError] if requested error in not a required check (list of required checks provided by user on construction) /// - pub async fn perform_checks( + pub async fn perform_checks( &mut self, checks: &[ReceiptCheck], receipt_id: u64, - receipt_auditor: &ReceiptAuditor, - ) { + receipt_auditor: &ReceiptAuditor, + ) where + A: EscrowAdapter + ReceiptChecksAdapter, + { for check in checks { self.perform_check(check, receipt_id, receipt_auditor).await; } @@ -303,11 +309,14 @@ impl ReceiptWithState { /// /// Returns `Err` only if unable to complete a check, returns `Ok` if no check failed to complete (*Important:* this is not the result of the check, just the result of _completing_ the check) /// - pub async fn finalize_receipt_checks( + pub async fn finalize_receipt_checks( mut self, receipt_id: u64, - receipt_auditor: &ReceiptAuditor, - ) -> ResultReceipt { + receipt_auditor: &ReceiptAuditor, + ) -> ResultReceipt + where + A: EscrowAdapter + ReceiptChecksAdapter, + { let incomplete_checks = self.incomplete_checks(); self.perform_checks(incomplete_checks.as_slice(), receipt_id, receipt_auditor) diff --git a/tap_core/src/tap_receipt/tests/received_receipt_tests.rs b/tap_core/src/tap_receipt/tests/received_receipt_tests.rs index 3ff62e46..c1df5712 100644 --- a/tap_core/src/tap_receipt/tests/received_receipt_tests.rs +++ b/tap_core/src/tap_receipt/tests/received_receipt_tests.rs @@ -17,7 +17,9 @@ mod received_receipt_unit_test { use crate::{ adapters::{ + auditor_executor_mock::AuditorExecutorMock, escrow_adapter_mock::EscrowAdapterMock, + executor_mock::{EscrowStorage, QueryAppraisals}, receipt_checks_adapter_mock::ReceiptChecksAdapterMock, receipt_storage_adapter_mock::ReceiptStorageAdapterMock, }, @@ -95,6 +97,28 @@ mod received_receipt_unit_test { (escrow_adapter, sender_escrow_storage) } + #[fixture] + fn auditor_executor() -> (AuditorExecutorMock, EscrowStorage, QueryAppraisals) { + let sender_escrow_storage = Arc::new(RwLock::new(HashMap::new())); + + let receipt_storage = Arc::new(RwLock::new(HashMap::new())); + + let allocation_ids_set = Arc::new(RwLock::new(HashSet::from_iter(allocation_ids()))); + let sender_ids_set = Arc::new(RwLock::new(HashSet::from_iter(sender_ids()))); + let query_appraisal_storage = Arc::new(RwLock::new(HashMap::new())); + ( + AuditorExecutorMock::new( + receipt_storage, + sender_escrow_storage.clone(), + query_appraisal_storage.clone(), + allocation_ids_set, + sender_ids_set, + ), + sender_escrow_storage, + query_appraisal_storage, + ) + } + #[fixture] fn domain_separator() -> Eip712Domain { eip712_domain! { @@ -139,23 +163,13 @@ mod received_receipt_unit_test { keys: (LocalWallet, Address), domain_separator: Eip712Domain, allocation_ids: Vec
, - escrow_adapters: (EscrowAdapterMock, Arc>>), - receipt_adapters: ( - ReceiptStorageAdapterMock, - ReceiptChecksAdapterMock, - Arc>>, - ), + auditor_executor: (AuditorExecutorMock, EscrowStorage, QueryAppraisals), ) { - let (_, receipt_checks_adapter, query_appraisal_storage) = receipt_adapters; - let (escrow_adapter, escrow_storage) = escrow_adapters; + let (executor, escrow_storage, query_appraisal_storage) = auditor_executor; // give receipt 5 second variance for min start time let starting_min_timestamp = get_current_timestamp_u64_ns().unwrap() - 500000000; - let receipt_auditor = ReceiptAuditor::new( - domain_separator.clone(), - escrow_adapter, - receipt_checks_adapter, - starting_min_timestamp, - ); + let receipt_auditor = + ReceiptAuditor::new(domain_separator.clone(), executor, starting_min_timestamp); let query_value = 20u128; let signed_receipt = EIP712SignedMessage::new( @@ -209,23 +223,13 @@ mod received_receipt_unit_test { keys: (LocalWallet, Address), allocation_ids: Vec
, domain_separator: Eip712Domain, - escrow_adapters: (EscrowAdapterMock, Arc>>), - receipt_adapters: ( - ReceiptStorageAdapterMock, - ReceiptChecksAdapterMock, - Arc>>, - ), + auditor_executor: (AuditorExecutorMock, EscrowStorage, QueryAppraisals), ) { - let (_, receipt_checks_adapter, query_appraisal_storage) = receipt_adapters; - let (escrow_adapter, escrow_storage) = escrow_adapters; + let (executor, escrow_storage, query_appraisal_storage) = auditor_executor; // give receipt 5 second variance for min start time let starting_min_timestamp = get_current_timestamp_u64_ns().unwrap() - 500000000; - let receipt_auditor = ReceiptAuditor::new( - domain_separator.clone(), - escrow_adapter, - receipt_checks_adapter, - starting_min_timestamp, - ); + let receipt_auditor = + ReceiptAuditor::new(domain_separator.clone(), executor, starting_min_timestamp); let query_value = 20u128; let signed_receipt = EIP712SignedMessage::new( @@ -280,23 +284,13 @@ mod received_receipt_unit_test { keys: (LocalWallet, Address), allocation_ids: Vec
, domain_separator: Eip712Domain, - escrow_adapters: (EscrowAdapterMock, Arc>>), - receipt_adapters: ( - ReceiptStorageAdapterMock, - ReceiptChecksAdapterMock, - Arc>>, - ), + auditor_executor: (AuditorExecutorMock, EscrowStorage, QueryAppraisals), ) { - let (_, receipt_checks_adapter, query_appraisal_storage) = receipt_adapters; - let (escrow_adapter, escrow_storage) = escrow_adapters; + let (executor, escrow_storage, query_appraisal_storage) = auditor_executor; // give receipt 5 second variance for min start time let starting_min_timestamp = get_current_timestamp_u64_ns().unwrap() - 500000000; - let receipt_auditor = ReceiptAuditor::new( - domain_separator.clone(), - escrow_adapter, - receipt_checks_adapter, - starting_min_timestamp, - ); + let receipt_auditor = + ReceiptAuditor::new(domain_separator.clone(), executor, starting_min_timestamp); let query_value = 20u128; let signed_receipt = EIP712SignedMessage::new( diff --git a/tap_integration_tests/tests/indexer_mock/mod.rs b/tap_integration_tests/tests/indexer_mock/mod.rs index 94a9e186..45a9c309 100644 --- a/tap_integration_tests/tests/indexer_mock/mod.rs +++ b/tap_integration_tests/tests/indexer_mock/mod.rs @@ -50,35 +50,24 @@ pub trait Rpc { /// receipt_count is a thread-safe counter that increments with each receipt verified and stored. /// threshold is a limit to which receipt_count can increment, after reaching which RAV request is triggered. /// aggregator_client is an HTTP client used for making JSON-RPC requests to another server. -pub struct RpcManager< - EA: EscrowAdapter + Send + Sync + 'static, // An instance of EscrowAdapter, marked as thread-safe with Send and given 'static lifetime - RCA: ReceiptChecksAdapter + Send + Sync + 'static, // An instance of ReceiptChecksAdapter - RSA: ReceiptStore + ReceiptRead + Send + Sync + 'static, // An instance of ReceiptStorageAdapter - RAVSA: RAVStore + RAVRead + Send + Sync + 'static, -> { - manager: Arc>, // Manager object reference counted with an Arc +pub struct RpcManager { + manager: Arc>, // Manager object reference counted with an Arc initial_checks: Vec, // Vector of initial checks to be performed on each request - receipt_count: Arc, // Thread-safe atomic counter for receipts - threshold: u64, // The count at which a RAV request will be triggered + receipt_count: Arc, // Thread-safe atomic counter for receipts + threshold: u64, // The count at which a RAV request will be triggered aggregator_client: (HttpClient, String), // HTTP client for sending requests to the aggregator server } /// Implementation for `RpcManager`, includes the constructor and the `request` method. /// Constructor initializes a new instance of `RpcManager`. /// `request` method handles incoming JSON-RPC requests and it verifies and stores the receipt from the request. -impl< - EA: EscrowAdapter + Send + Sync + 'static, - RCA: ReceiptChecksAdapter + Send + Sync + 'static, - RSA: ReceiptStore + ReceiptRead + Send + Sync + 'static, - RAVSA: RAVStore + RAVRead + Send + Sync + 'static, - > RpcManager +impl RpcManager +where + E: Clone, { pub fn new( domain_separator: Eip712Domain, - escrow_adapter: EA, - receipt_checks_adapter: RCA, - receipt_storage_adapter: RSA, - rav_storage_adapter: RAVSA, + executor: E, initial_checks: Vec, required_checks: Vec, threshold: u64, @@ -86,12 +75,9 @@ impl< aggregate_server_api_version: String, ) -> Result { Ok(Self { - manager: Arc::new(Manager::::new( + manager: Arc::new(Manager::::new( domain_separator, - escrow_adapter, - receipt_checks_adapter, - rav_storage_adapter, - receipt_storage_adapter, + executor, required_checks, get_current_timestamp_u64_ns()?, )), @@ -107,12 +93,17 @@ impl< } #[async_trait] -impl< - CA: EscrowAdapter + Send + Sync + 'static, - RCA: ReceiptChecksAdapter + Send + Sync + 'static, - RSA: ReceiptStore + ReceiptRead + Send + Sync + 'static, - RAVSA: RAVStore + RAVRead + Send + Sync + 'static, - > RpcServer for RpcManager +impl RpcServer for RpcManager +where + E: ReceiptStore + + ReceiptRead + + RAVStore + + RAVRead + + ReceiptChecksAdapter + + EscrowAdapter + + Send + + Sync + + 'static, { async fn request( &self, @@ -163,24 +154,28 @@ impl< } /// run_server function initializes and starts a JSON-RPC server that handles incoming requests. -pub async fn run_server< - CA: EscrowAdapter + Send + Sync + 'static, - RCA: ReceiptChecksAdapter + Send + Sync + 'static, - RSA: ReceiptStore + ReceiptRead + Send + Sync + 'static, - RAVSA: RAVStore + RAVRead + Send + Sync + 'static, ->( +pub async fn run_server( port: u16, // Port on which the server will listen domain_separator: Eip712Domain, // EIP712 domain separator - escrow_adapter: CA, // EscrowAdapter instance - receipt_checks_adapter: RCA, // ReceiptChecksAdapter instance - receipt_storage_adapter: RSA, // ReceiptStorageAdapter instance - rav_storage_adapter: RAVSA, // RAVStorageAdapter instance + executor: E, // Executor instance initial_checks: Vec, // Vector of initial checks to be performed on each request required_checks: Vec, // Vector of required checks to be performed on each request threshold: u64, // The count at which a RAV request will be triggered aggregate_server_address: String, // Address of the aggregator server aggregate_server_api_version: String, // API version of the aggregator server -) -> Result<(ServerHandle, std::net::SocketAddr)> { +) -> Result<(ServerHandle, std::net::SocketAddr)> +where + E: ReceiptStore + + ReceiptRead + + RAVStore + + RAVRead + + ReceiptChecksAdapter + + EscrowAdapter + + Clone + + Send + + Sync + + 'static, +{ // Setting up the JSON RPC server println!("Starting server..."); let server = ServerBuilder::new() @@ -191,10 +186,7 @@ pub async fn run_server< println!("Listening on: {}", addr); let rpc_manager = RpcManager::new( domain_separator, - escrow_adapter, - receipt_checks_adapter, - receipt_storage_adapter, - rav_storage_adapter, + executor, initial_checks, required_checks, threshold, @@ -207,17 +199,15 @@ pub async fn run_server< } // request_rav function creates a request for aggregate receipts (RAV), sends it to another server and verifies the result. -async fn request_rav< - CA: EscrowAdapter + Send + Sync + 'static, - RCA: ReceiptChecksAdapter + Send + Sync + 'static, - RSA: ReceiptStore + ReceiptRead + Send + Sync + 'static, - RAVSA: RAVStore + RAVRead + Send + Sync + 'static, ->( - manager: &Arc>, +async fn request_rav( + manager: &Arc>, time_stamp_buffer: u64, // Buffer for timestamping, see tap_core for details aggregator_client: &(HttpClient, String), // HttpClient for making requests to the tap_aggregator server threshold: usize, -) -> Result<()> { +) -> Result<()> +where + E: ReceiptRead + RAVRead + RAVStore + EscrowAdapter + ReceiptChecksAdapter, +{ // Create the aggregate_receipts request params let rav_request = manager.create_rav_request(time_stamp_buffer, None).await?; diff --git a/tap_integration_tests/tests/showcase.rs b/tap_integration_tests/tests/showcase.rs index 4b958dfb..2d9a092d 100644 --- a/tap_integration_tests/tests/showcase.rs +++ b/tap_integration_tests/tests/showcase.rs @@ -27,7 +27,8 @@ use tokio::sync::RwLock; use tap_aggregator::{jsonrpsee_helpers, server as agg_server}; use tap_core::{ adapters::{ - escrow_adapter_mock::EscrowAdapterMock, rav_storage_adapter_mock::RAVStorageAdapterMock, + escrow_adapter_mock::EscrowAdapterMock, executor_mock::ExecutorMock, + rav_storage_adapter_mock::RAVStorageAdapterMock, receipt_checks_adapter_mock::ReceiptChecksAdapterMock, receipt_storage_adapter_mock::ReceiptStorageAdapterMock, }, @@ -157,6 +158,34 @@ fn escrow_adapter() -> EscrowAdapterMock { EscrowAdapterMock::new(Arc::new(RwLock::new(HashMap::new()))) } +#[fixture] +fn executor( + keys_sender: (LocalWallet, Address), + query_price: Vec, + allocation_ids: Vec
, + receipt_storage: Arc>>, +) -> ExecutorMock { + let (_, sender_address) = keys_sender; + let query_appraisals: HashMap<_, _> = (0u64..).zip(query_price).collect(); + let query_appraisal_storage = Arc::new(RwLock::new(query_appraisals)); + let allocation_ids: Arc>> = + Arc::new(RwLock::new(HashSet::from_iter(allocation_ids))); + let sender_ids: Arc>> = + Arc::new(RwLock::new(HashSet::from([sender_address]))); + let rav_storage = Arc::new(RwLock::new(None)); + + let sender_escrow_storage = Arc::new(RwLock::new(HashMap::new())); + + ExecutorMock::new( + rav_storage, + receipt_storage, + sender_escrow_storage, + query_appraisal_storage, + allocation_ids, + sender_ids, + ) +} + #[fixture] fn receipt_storage() -> Arc>> { Arc::new(RwLock::new(HashMap::new())) @@ -226,43 +255,13 @@ fn initial_checks() -> Vec { } #[fixture] -fn indexer_1_adapters( - escrow_adapter: EscrowAdapterMock, - receipt_storage_adapter: ReceiptStorageAdapterMock, - receipt_checks_adapter: ReceiptChecksAdapterMock, - rav_storage_adapter: RAVStorageAdapterMock, -) -> ( - EscrowAdapterMock, - ReceiptStorageAdapterMock, - ReceiptChecksAdapterMock, - RAVStorageAdapterMock, -) { - ( - escrow_adapter, - receipt_storage_adapter, - receipt_checks_adapter, - rav_storage_adapter, - ) +fn indexer_1_adapters(executor: ExecutorMock) -> ExecutorMock { + executor } #[fixture] -fn indexer_2_adapters( - escrow_adapter: EscrowAdapterMock, - receipt_storage_adapter: ReceiptStorageAdapterMock, - receipt_checks_adapter: ReceiptChecksAdapterMock, - rav_storage_adapter: RAVStorageAdapterMock, -) -> ( - EscrowAdapterMock, - ReceiptStorageAdapterMock, - ReceiptChecksAdapterMock, - RAVStorageAdapterMock, -) { - ( - escrow_adapter, - receipt_storage_adapter, - receipt_checks_adapter, - rav_storage_adapter, - ) +fn indexer_2_adapters(executor: ExecutorMock) -> ExecutorMock { + executor } // Helper fixture to generate a batch of receipts to be sent to the Indexer. @@ -419,12 +418,7 @@ async fn single_indexer_test_server( http_request_size_limit: u32, http_response_size_limit: u32, http_max_concurrent_connections: u32, - indexer_1_adapters: ( - EscrowAdapterMock, - ReceiptStorageAdapterMock, - ReceiptChecksAdapterMock, - RAVStorageAdapterMock, - ), + indexer_1_adapters: ExecutorMock, available_escrow: u128, initial_checks: Vec, required_checks: Vec, @@ -439,14 +433,10 @@ async fn single_indexer_test_server( http_max_concurrent_connections, ) .await?; - let (escrow_adapter, receipt_storage_adapter, receipt_checks_adapter, rav_storage_adapter) = - indexer_1_adapters; + let executor = indexer_1_adapters; let (indexer_handle, indexer_addr) = start_indexer_server( domain_separator.clone(), - escrow_adapter, - receipt_storage_adapter, - receipt_checks_adapter, - rav_storage_adapter, + executor, sender_id, available_escrow, initial_checks, @@ -470,18 +460,8 @@ async fn two_indexers_test_servers( http_request_size_limit: u32, http_response_size_limit: u32, http_max_concurrent_connections: u32, - indexer_1_adapters: ( - EscrowAdapterMock, - ReceiptStorageAdapterMock, - ReceiptChecksAdapterMock, - RAVStorageAdapterMock, - ), - indexer_2_adapters: ( - EscrowAdapterMock, - ReceiptStorageAdapterMock, - ReceiptChecksAdapterMock, - RAVStorageAdapterMock, - ), + indexer_1_adapters: ExecutorMock, + indexer_2_adapters: ExecutorMock, available_escrow: u128, initial_checks: Vec, required_checks: Vec, @@ -503,25 +483,12 @@ async fn two_indexers_test_servers( http_max_concurrent_connections, ) .await?; - let ( - escrow_adapter_1, - receipt_storage_adapter_1, - receipt_checks_adapter_1, - rav_storage_adapter_1, - ) = indexer_1_adapters; - let ( - escrow_adapter_2, - receipt_storage_adapter_2, - receipt_checks_adapter_2, - rav_storage_adapter_2, - ) = indexer_2_adapters; + let executor_1 = indexer_1_adapters; + let executor_2 = indexer_2_adapters; let (indexer_handle, indexer_addr) = start_indexer_server( domain_separator.clone(), - escrow_adapter_1, - receipt_storage_adapter_1, - receipt_checks_adapter_1, - rav_storage_adapter_1, + executor_1, sender_id, available_escrow, initial_checks.clone(), @@ -533,10 +500,7 @@ async fn two_indexers_test_servers( let (indexer_handle_2, indexer_addr_2) = start_indexer_server( domain_separator.clone(), - escrow_adapter_2, - receipt_storage_adapter_2, - receipt_checks_adapter_2, - rav_storage_adapter_2, + executor_2, sender_id, available_escrow, initial_checks, @@ -563,12 +527,7 @@ async fn single_indexer_wrong_sender_test_server( http_request_size_limit: u32, http_response_size_limit: u32, http_max_concurrent_connections: u32, - indexer_1_adapters: ( - EscrowAdapterMock, - ReceiptStorageAdapterMock, - ReceiptChecksAdapterMock, - RAVStorageAdapterMock, - ), + indexer_1_adapters: ExecutorMock, available_escrow: u128, initial_checks: Vec, required_checks: Vec, @@ -583,15 +542,11 @@ async fn single_indexer_wrong_sender_test_server( http_max_concurrent_connections, ) .await?; - let (escrow_adapter, receipt_storage_adapter, receipt_checks_adapter, rav_storage_adapter) = - indexer_1_adapters; + let executor = indexer_1_adapters; let (indexer_handle, indexer_addr) = start_indexer_server( domain_separator.clone(), - escrow_adapter, - receipt_storage_adapter, - receipt_checks_adapter, - rav_storage_adapter, + executor, sender_id, available_escrow, initial_checks, @@ -963,10 +918,7 @@ async fn generate_requests( // Start-up a mock Indexer. Requires a Sender Aggregator to be running. async fn start_indexer_server( domain_separator: Eip712Domain, - mut escrow_adapter: EscrowAdapterMock, - receipt_storage_adapter: ReceiptStorageAdapterMock, - receipt_checks_adapter: ReceiptChecksAdapterMock, - rav_storage_adapter: RAVStorageAdapterMock, + mut executor: ExecutorMock, sender_id: Address, available_escrow: u128, initial_checks: Vec, @@ -979,18 +931,13 @@ async fn start_indexer_server( listener.local_addr()?.port() }; - escrow_adapter - .increase_escrow(sender_id, available_escrow) - .await; + executor.increase_escrow(sender_id, available_escrow).await; let aggregate_server_address = "http://".to_string() + &agg_server_addr.to_string(); let (server_handle, socket_addr) = indexer_mock::run_server( http_port, domain_separator, - escrow_adapter, - receipt_checks_adapter, - receipt_storage_adapter, - rav_storage_adapter, + executor, initial_checks, required_checks, receipt_threshold,