Skip to content

Commit

Permalink
refactor(core): collect runs batched checks (#180)
Browse files Browse the repository at this point in the history
The checks are specifically made to check the receipts that are to be
aggregated only in their own context. Helps for uniqueness checks for
example, where we avoid calling the storage backend and only check that
the collected receipts are unique.

Signed-off-by: Alexis Asseman <[email protected]>
  • Loading branch information
aasseman authored Oct 30, 2023
1 parent 4fbc6df commit d3359c1
Show file tree
Hide file tree
Showing 3 changed files with 178 additions and 7 deletions.
17 changes: 13 additions & 4 deletions tap_core/src/tap_manager/manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -236,10 +236,19 @@ impl<
let mut accepted_signed_receipts = Vec::<SignedReceipt>::new();
let mut failed_signed_receipts = Vec::<SignedReceipt>::new();

for (receipt_id, mut received_receipt) in received_receipts {
received_receipt
.finalize_receipt_checks(receipt_id, &self.receipt_auditor)
.await?;
let mut received_receipts: Vec<ReceivedReceipt> =
received_receipts.into_iter().map(|e| e.1).collect();

for check in self.required_checks.iter() {
ReceivedReceipt::perform_check_batch(
&mut received_receipts,
check,
&self.receipt_auditor,
)
.await?;
}

for received_receipt in received_receipts {
if received_receipt.is_accepted() {
accepted_signed_receipts.push(received_receipt.signed_receipt);
} else {
Expand Down
147 changes: 147 additions & 0 deletions tap_core/src/tap_receipt/receipt_auditor.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,10 @@
// Copyright 2023-, Semiotic AI, Inc.
// SPDX-License-Identifier: Apache-2.0

use std::collections::HashSet;

use alloy_sol_types::Eip712Domain;
use ethers::types::Signature;
use tokio::sync::RwLock;

use crate::{
Expand All @@ -12,6 +15,8 @@ use crate::{
Error, Result,
};

use super::ReceivedReceipt;

pub struct ReceiptAuditor<EA: EscrowAdapter, RCA: ReceiptChecksAdapter> {
domain_separator: Eip712Domain,
escrow_adapter: EA,
Expand Down Expand Up @@ -58,6 +63,25 @@ impl<EA: EscrowAdapter, RCA: ReceiptChecksAdapter> ReceiptAuditor<EA, RCA> {
}
}

pub async fn check_batch(
&self,
receipt_check: &ReceiptCheck,
received_receipts: &mut [ReceivedReceipt],
) -> Vec<ReceiptResult<()>> {
match receipt_check {
ReceiptCheck::CheckUnique => self.check_uniqueness_batch(received_receipts).await,
ReceiptCheck::CheckAllocationId => {
self.check_allocation_id_batch(received_receipts).await
}
ReceiptCheck::CheckSignature => self.check_signature_batch(received_receipts).await,
ReceiptCheck::CheckTimestamp => self.check_timestamp_batch(received_receipts).await,
ReceiptCheck::CheckValue => self.check_value_batch(received_receipts).await,
ReceiptCheck::CheckAndReserveEscrow => {
self.check_and_reserve_escrow_batch(received_receipts).await
}
}
}

async fn check_uniqueness(
&self,
signed_receipt: &EIP712SignedMessage<Receipt>,
Expand All @@ -76,6 +100,33 @@ impl<EA: EscrowAdapter, RCA: ReceiptChecksAdapter> ReceiptAuditor<EA, RCA> {
Ok(())
}

async fn check_uniqueness_batch(
&self,
received_receipts: &mut [ReceivedReceipt],
) -> Vec<ReceiptResult<()>> {
let mut results = Vec::new();

// If at least one of the receipts in the batch hasn't been checked for uniqueness yet, check the whole batch.
if received_receipts
.iter()
.filter(|r| r.checks.contains_key(&ReceiptCheck::CheckUnique))
.any(|r| r.checks[&ReceiptCheck::CheckUnique].is_none())
{
let mut signatures: HashSet<Signature> = HashSet::new();

for received_receipt in received_receipts {
let signature = received_receipt.signed_receipt.signature;
if signatures.insert(signature) {
results.push(Ok(()));
} else {
results.push(Err(ReceiptError::NonUniqueReceipt));
}
}
}

results
}

async fn check_allocation_id(
&self,
signed_receipt: &EIP712SignedMessage<Receipt>,
Expand All @@ -95,6 +146,25 @@ impl<EA: EscrowAdapter, RCA: ReceiptChecksAdapter> ReceiptAuditor<EA, RCA> {
Ok(())
}

async fn check_allocation_id_batch(
&self,
received_receipts: &mut [ReceivedReceipt],
) -> Vec<ReceiptResult<()>> {
let mut results = Vec::new();

for received_receipt in received_receipts
.iter_mut()
.filter(|r| r.checks.contains_key(&ReceiptCheck::CheckAllocationId))
{
if received_receipt.checks[&ReceiptCheck::CheckAllocationId].is_none() {
let signed_receipt = &received_receipt.signed_receipt;
results.push(self.check_allocation_id(signed_receipt).await);
}
}

results
}

async fn check_timestamp(
&self,
signed_receipt: &EIP712SignedMessage<Receipt>,
Expand All @@ -108,6 +178,26 @@ impl<EA: EscrowAdapter, RCA: ReceiptChecksAdapter> ReceiptAuditor<EA, RCA> {
}
Ok(())
}

async fn check_timestamp_batch(
&self,
received_receipts: &mut [ReceivedReceipt],
) -> Vec<ReceiptResult<()>> {
let mut results = Vec::new();

for received_receipt in received_receipts
.iter_mut()
.filter(|r| r.checks.contains_key(&ReceiptCheck::CheckTimestamp))
{
if received_receipt.checks[&ReceiptCheck::CheckTimestamp].is_none() {
let signed_receipt = &received_receipt.signed_receipt;
results.push(self.check_timestamp(signed_receipt).await);
}
}

results
}

async fn check_value(
&self,
signed_receipt: &EIP712SignedMessage<Receipt>,
Expand All @@ -128,6 +218,28 @@ impl<EA: EscrowAdapter, RCA: ReceiptChecksAdapter> ReceiptAuditor<EA, RCA> {
Ok(())
}

async fn check_value_batch(
&self,
received_receipts: &mut [ReceivedReceipt],
) -> Vec<ReceiptResult<()>> {
let mut results = Vec::new();

for received_receipt in received_receipts
.iter_mut()
.filter(|r| r.checks.contains_key(&ReceiptCheck::CheckValue))
{
if received_receipt.checks[&ReceiptCheck::CheckValue].is_none() {
let signed_receipt = &received_receipt.signed_receipt;
results.push(
self.check_value(signed_receipt, received_receipt.query_id)
.await,
);
}
}

results
}

async fn check_signature(
&self,
signed_receipt: &EIP712SignedMessage<Receipt>,
Expand Down Expand Up @@ -155,6 +267,25 @@ impl<EA: EscrowAdapter, RCA: ReceiptChecksAdapter> ReceiptAuditor<EA, RCA> {
Ok(())
}

async fn check_signature_batch(
&self,
received_receipts: &mut [ReceivedReceipt],
) -> Vec<ReceiptResult<()>> {
let mut results = Vec::new();

for received_receipt in received_receipts
.iter_mut()
.filter(|r| r.checks.contains_key(&ReceiptCheck::CheckSignature))
{
if received_receipt.checks[&ReceiptCheck::CheckSignature].is_none() {
let signed_receipt = &received_receipt.signed_receipt;
results.push(self.check_signature(signed_receipt).await);
}
}

results
}

async fn check_and_reserve_escrow(
&self,
signed_receipt: &EIP712SignedMessage<Receipt>,
Expand All @@ -176,6 +307,22 @@ impl<EA: EscrowAdapter, RCA: ReceiptChecksAdapter> ReceiptAuditor<EA, RCA> {
Ok(())
}

async fn check_and_reserve_escrow_batch(
&self,
received_receipts: &mut [ReceivedReceipt],
) -> Vec<ReceiptResult<()>> {
let mut results = Vec::new();

for received_receipt in received_receipts.iter_mut().filter(|r| {
r.escrow_reserve_attempt_required() && !r.escrow_reserve_attempt_completed()
}) {
let signed_receipt = &received_receipt.signed_receipt;
results.push(self.check_and_reserve_escrow(signed_receipt).await);
}

results
}

pub async fn check_rav_signature(
&self,
signed_rav: &EIP712SignedMessage<ReceiptAggregateVoucher>,
Expand Down
21 changes: 18 additions & 3 deletions tap_core/src/tap_receipt/received_receipt.rs
Original file line number Diff line number Diff line change
Expand Up @@ -147,6 +147,21 @@ impl ReceivedReceipt {
result
}

pub async fn perform_check_batch<CA: EscrowAdapter, RCA: ReceiptChecksAdapter>(
batch: &mut [Self],
check: &ReceiptCheck,
receipt_auditor: &ReceiptAuditor<CA, RCA>,
) -> Result<()> {
let results = receipt_auditor.check_batch(check, batch).await;

for (receipt, result) in batch.iter_mut().zip(results) {
receipt.update_check(check, Some(result))?;
receipt.update_state();
}

Ok(())
}

/// Completes a list of *incomplete* check and stores the result, if the check already has a result it is skipped
///
/// Returns `Err` only if unable to complete a check, returns `Ok` if the checks were completed (*Important:* this is not the result of the check, just the result of _completing_ the check)
Expand Down Expand Up @@ -293,7 +308,7 @@ impl ReceivedReceipt {
}

/// Updates receieved receipt state based on internal values, should be called anytime internal state changes
fn update_state(&mut self) {
pub(crate) fn update_state(&mut self) {
let mut next_state = self.state.clone();
match self.state {
ReceiptState::Received => {
Expand Down Expand Up @@ -357,14 +372,14 @@ impl ReceivedReceipt {
ReceiptState::AwaitingReserveEscrow
}

fn escrow_reserve_attempt_completed(&self) -> bool {
pub(crate) fn escrow_reserve_attempt_completed(&self) -> bool {
if let Some(escrow_reserve_attempt) = &self.escrow_reserved {
return escrow_reserve_attempt.is_some();
}
false
}

fn escrow_reserve_attempt_required(&self) -> bool {
pub(crate) fn escrow_reserve_attempt_required(&self) -> bool {
self.escrow_reserved.is_some()
}

Expand Down

0 comments on commit d3359c1

Please sign in to comment.