Skip to content

Commit

Permalink
refactor(core): collect runs batched checks
Browse files Browse the repository at this point in the history
The checks are specifically made to check the receipts that are to be
aggregated only in their own context. Helps for uniqueness checks for
example, where we avoid calling the storage backend and only check that
the collected receipts are unique.

Signed-off-by: Alexis Asseman <[email protected]>
  • Loading branch information
aasseman committed Oct 3, 2023
1 parent b5761c9 commit 26242a2
Show file tree
Hide file tree
Showing 3 changed files with 172 additions and 7 deletions.
15 changes: 11 additions & 4 deletions tap_core/src/tap_manager/manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -236,10 +236,17 @@ impl<
let mut accepted_signed_receipts = Vec::<SignedReceipt>::new();
let mut failed_signed_receipts = Vec::<SignedReceipt>::new();

for (receipt_id, mut received_receipt) in received_receipts {
received_receipt
.finalize_receipt_checks(receipt_id, &self.receipt_auditor)
.await?;
let mut received_receipts: Vec<ReceivedReceipt> =
received_receipts.into_iter().map(|e| e.1).collect();

for check in self.required_checks.iter() {
self.receipt_auditor
.check_bunch(check, &mut received_receipts)
.await
.unwrap();
}

for received_receipt in received_receipts {
if received_receipt.is_accepted() {
accepted_signed_receipts.push(received_receipt.signed_receipt);
} else {
Expand Down
158 changes: 158 additions & 0 deletions tap_core/src/tap_receipt/receipt_auditor.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,10 @@
// Copyright 2023-, Semiotic AI, Inc.
// SPDX-License-Identifier: Apache-2.0

use std::collections::HashSet;

use alloy_sol_types::Eip712Domain;
use ethers::types::Signature;
use tokio::sync::RwLock;

use crate::{
Expand All @@ -12,6 +15,8 @@ use crate::{
Error, Result,
};

use super::ReceivedReceipt;

pub struct ReceiptAuditor<EA: EscrowAdapter, RCA: ReceiptChecksAdapter> {
domain_separator: Eip712Domain,
escrow_adapter: EA,
Expand Down Expand Up @@ -58,6 +63,31 @@ impl<EA: EscrowAdapter, RCA: ReceiptChecksAdapter> ReceiptAuditor<EA, RCA> {
}
}

pub async fn check_bunch(
&self,
receipt_check: &ReceiptCheck,
received_receipts: &mut [ReceivedReceipt],
) -> Result<()> {
let result = match receipt_check {
ReceiptCheck::CheckUnique => self.check_uniqueness_bunch(received_receipts).await,
ReceiptCheck::CheckAllocationId => {
self.check_allocation_id_bunch(received_receipts).await
}
ReceiptCheck::CheckSignature => self.check_signature_bunch(received_receipts).await,
ReceiptCheck::CheckTimestamp => self.check_timestamp_bunch(received_receipts).await,
ReceiptCheck::CheckValue => self.check_value_bunch(received_receipts).await,
ReceiptCheck::CheckAndReserveEscrow => {
self.check_and_reserve_escrow_bunch(received_receipts).await
}
};

for received_receipt in received_receipts.iter_mut() {
received_receipt.update_state();
}

result
}

async fn check_uniqueness(
&self,
signed_receipt: &EIP712SignedMessage<Receipt>,
Expand All @@ -76,6 +106,38 @@ impl<EA: EscrowAdapter, RCA: ReceiptChecksAdapter> ReceiptAuditor<EA, RCA> {
Ok(())
}

async fn check_uniqueness_bunch(
&self,
received_receipts: &mut [ReceivedReceipt],
) -> Result<()> {
// If at least one of the receipts in the bunch hasn't been checked for uniqueness yet, check the whole bunch.
if received_receipts
.iter()
.filter(|r| r.checks.get(&ReceiptCheck::CheckUnique).is_some())
.any(|r| r.checks[&ReceiptCheck::CheckUnique].is_none())
{
let mut signatures: HashSet<Signature> = HashSet::new();

for received_receipt in received_receipts {
let signature = received_receipt.signed_receipt.signature;
if signatures.insert(signature) {
received_receipt
.update_check(&ReceiptCheck::CheckUnique, Some(Ok(())))
.unwrap();
} else {
received_receipt
.update_check(
&ReceiptCheck::CheckUnique,
Some(Err(ReceiptError::NonUniqueReceipt)),
)
.unwrap();
}
}
}

Ok(())
}

async fn check_allocation_id(
&self,
signed_receipt: &EIP712SignedMessage<Receipt>,
Expand All @@ -95,6 +157,27 @@ impl<EA: EscrowAdapter, RCA: ReceiptChecksAdapter> ReceiptAuditor<EA, RCA> {
Ok(())
}

async fn check_allocation_id_bunch(
&self,
received_receipts: &mut [ReceivedReceipt],
) -> Result<()> {
for received_receipt in received_receipts
.iter_mut()
.filter(|r| r.checks.get(&ReceiptCheck::CheckAllocationId).is_some())
{
if received_receipt.checks[&ReceiptCheck::CheckAllocationId].is_none() {
let signed_receipt = &received_receipt.signed_receipt;
let result = self.check_allocation_id(signed_receipt).await;

received_receipt
.update_check(&ReceiptCheck::CheckAllocationId, Some(result))
.unwrap();
}
}

Ok(())
}

async fn check_timestamp(
&self,
signed_receipt: &EIP712SignedMessage<Receipt>,
Expand All @@ -108,6 +191,25 @@ impl<EA: EscrowAdapter, RCA: ReceiptChecksAdapter> ReceiptAuditor<EA, RCA> {
}
Ok(())
}

async fn check_timestamp_bunch(&self, received_receipts: &mut [ReceivedReceipt]) -> Result<()> {
for received_receipt in received_receipts
.iter_mut()
.filter(|r| r.checks.get(&ReceiptCheck::CheckTimestamp).is_some())
{
if received_receipt.checks[&ReceiptCheck::CheckTimestamp].is_none() {
let signed_receipt = &received_receipt.signed_receipt;
let result = self.check_timestamp(signed_receipt).await;

received_receipt
.update_check(&ReceiptCheck::CheckTimestamp, Some(result))
.unwrap();
}
}

Ok(())
}

async fn check_value(
&self,
signed_receipt: &EIP712SignedMessage<Receipt>,
Expand All @@ -128,6 +230,26 @@ impl<EA: EscrowAdapter, RCA: ReceiptChecksAdapter> ReceiptAuditor<EA, RCA> {
Ok(())
}

async fn check_value_bunch(&self, received_receipts: &mut [ReceivedReceipt]) -> Result<()> {
for received_receipt in received_receipts
.iter_mut()
.filter(|r| r.checks.get(&ReceiptCheck::CheckValue).is_some())
{
if received_receipt.checks[&ReceiptCheck::CheckValue].is_none() {
let signed_receipt = &received_receipt.signed_receipt;
let result = self
.check_value(signed_receipt, received_receipt.query_id)
.await;

received_receipt
.update_check(&ReceiptCheck::CheckValue, Some(result))
.unwrap();
}
}

Ok(())
}

async fn check_signature(
&self,
signed_receipt: &EIP712SignedMessage<Receipt>,
Expand Down Expand Up @@ -155,6 +277,24 @@ impl<EA: EscrowAdapter, RCA: ReceiptChecksAdapter> ReceiptAuditor<EA, RCA> {
Ok(())
}

async fn check_signature_bunch(&self, received_receipts: &mut [ReceivedReceipt]) -> Result<()> {
for received_receipt in received_receipts
.iter_mut()
.filter(|r| r.checks.get(&ReceiptCheck::CheckSignature).is_some())
{
if received_receipt.checks[&ReceiptCheck::CheckSignature].is_none() {
let signed_receipt = &received_receipt.signed_receipt;
let result = self.check_signature(signed_receipt).await;

received_receipt
.update_check(&ReceiptCheck::CheckSignature, Some(result))
.unwrap();
}
}

Ok(())
}

async fn check_and_reserve_escrow(
&self,
signed_receipt: &EIP712SignedMessage<Receipt>,
Expand All @@ -176,6 +316,24 @@ impl<EA: EscrowAdapter, RCA: ReceiptChecksAdapter> ReceiptAuditor<EA, RCA> {
Ok(())
}

async fn check_and_reserve_escrow_bunch(
&self,
received_receipts: &mut [ReceivedReceipt],
) -> Result<()> {
for received_receipt in received_receipts.iter_mut().filter(|r| {
r.escrow_reserve_attempt_required() && !r.escrow_reserve_attempt_completed()
}) {
let signed_receipt = &received_receipt.signed_receipt;
let result = self.check_and_reserve_escrow(signed_receipt).await;

received_receipt
.update_check(&ReceiptCheck::CheckAndReserveEscrow, Some(result))
.unwrap();
}

Ok(())
}

pub async fn check_rav_signature(
&self,
signed_rav: &EIP712SignedMessage<ReceiptAggregateVoucher>,
Expand Down
6 changes: 3 additions & 3 deletions tap_core/src/tap_receipt/received_receipt.rs
Original file line number Diff line number Diff line change
Expand Up @@ -293,7 +293,7 @@ impl ReceivedReceipt {
}

/// Updates receieved receipt state based on internal values, should be called anytime internal state changes
fn update_state(&mut self) {
pub(crate) fn update_state(&mut self) {
let mut next_state = self.state.clone();
match self.state {
ReceiptState::Received => {
Expand Down Expand Up @@ -357,14 +357,14 @@ impl ReceivedReceipt {
ReceiptState::AwaitingReserveEscrow
}

fn escrow_reserve_attempt_completed(&self) -> bool {
pub(crate) fn escrow_reserve_attempt_completed(&self) -> bool {
if let Some(escrow_reserve_attempt) = &self.escrow_reserved {
return escrow_reserve_attempt.is_some();
}
false
}

fn escrow_reserve_attempt_required(&self) -> bool {
pub(crate) fn escrow_reserve_attempt_required(&self) -> bool {
self.escrow_reserved.is_some()
}

Expand Down

0 comments on commit 26242a2

Please sign in to comment.