Skip to content

Commit a08cd32

Browse files
committed
refactor: parse v2 receipts
Signed-off-by: Gustavo Inacio <[email protected]>
1 parent 0817632 commit a08cd32

File tree

6 files changed

+82
-14
lines changed

6 files changed

+82
-14
lines changed

Cargo.lock

+3
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Cargo.toml

+2-1
Original file line numberDiff line numberDiff line change
@@ -76,12 +76,13 @@ graphql_client = { version = "0.14.0", features = ["reqwest-rustls"] }
7676
bip39 = "2.0.0"
7777
rstest = "0.23.0"
7878
wiremock = "0.6.1"
79-
bon = "3.3"
8079
tonic = { version = "0.12.3", features = ["tls-roots", "gzip"] }
8180
prost = "0.13.4"
8281
prost-types = "0.13.3"
8382
tonic-build = "0.12.3"
8483
serde_yaml = "0.9.21"
84+
bon = "3.3"
85+
test-log = { version = "0.2.12", features = ["trace"] }
8586

8687
[patch.crates-io.tap_core]
8788
git = "https://github.com/semiotic-ai/timeline-aggregation-protocol"

crates/service/Cargo.toml

+3
Original file line numberDiff line numberDiff line change
@@ -32,13 +32,15 @@ tracing-subscriber = { workspace = true, features = ["fmt"] }
3232
clap = { workspace = true, features = ["derive"] }
3333
build-info.workspace = true
3434
lazy_static.workspace = true
35+
prost.workspace = true
3536
async-trait.workspace = true
3637
async-graphql = { version = "7.0.11", default-features = false }
3738
async-graphql-axum = "7.0.11"
3839
base64.workspace = true
3940
graphql = { git = "https://github.com/edgeandnode/toolshed", tag = "graphql-v0.3.0" }
4041
tap_core.workspace = true
4142
tap_graph.workspace = true
43+
tap_aggregator.workspace = true
4244
uuid.workspace = true
4345
bon.workspace = true
4446
tower_governor = { version = "0.5.0", features = ["axum"] }
@@ -71,6 +73,7 @@ tower-service = "0.3.3"
7173
tokio-test = "0.4.4"
7274
wiremock.workspace = true
7375
insta = "1.41.1"
76+
test-log.workspace = true
7477

7578
[build-dependencies]
7679
build-info-build = { version = "0.0.39", default-features = false }

crates/service/src/service/tap_receipt_header.rs

+43-12
Original file line numberDiff line numberDiff line change
@@ -2,8 +2,11 @@
22
// SPDX-License-Identifier: Apache-2.0
33

44
use axum_extra::headers::{self, Header, HeaderName, HeaderValue};
5+
use base64::prelude::*;
56
use lazy_static::lazy_static;
67
use prometheus::{register_counter, Counter};
8+
use prost::Message;
9+
use tap_aggregator::grpc;
710
use tap_graph::SignedReceipt;
811

912
use crate::tap::TapReceipt;
@@ -26,17 +29,28 @@ impl Header for TapHeader {
2629
where
2730
I: Iterator<Item = &'i HeaderValue>,
2831
{
29-
let mut execute = || {
30-
let value = values.next();
31-
let raw_receipt = value.ok_or(headers::Error::invalid())?;
32-
let raw_receipt = raw_receipt
33-
.to_str()
34-
.map_err(|_| headers::Error::invalid())?;
35-
let parsed_receipt: SignedReceipt =
36-
serde_json::from_str(raw_receipt).map_err(|_| headers::Error::invalid())?;
37-
Ok(TapHeader(crate::tap::TapReceipt::V1(parsed_receipt)))
32+
let mut execute = || -> anyhow::Result<TapHeader> {
33+
let raw_receipt = values.next().ok_or(headers::Error::invalid())?;
34+
35+
// we first try to decode a v2 receipt since it's cheaper and fail earlier than using
36+
// serde
37+
match BASE64_STANDARD.decode(raw_receipt) {
38+
Ok(raw_receipt) => {
39+
tracing::debug!("Decoded v2");
40+
let receipt = grpc::v2::SignedReceipt::decode(raw_receipt.as_ref())?;
41+
Ok(TapHeader(TapReceipt::V2(receipt.try_into()?)))
42+
}
43+
Err(_) => {
44+
tracing::debug!("Could not decode v2, trying v1");
45+
let parsed_receipt: SignedReceipt =
46+
serde_json::from_slice(raw_receipt.as_ref())?;
47+
Ok(TapHeader(TapReceipt::V1(parsed_receipt)))
48+
}
49+
}
3850
};
39-
execute().inspect_err(|_| TAP_RECEIPT_INVALID.inc())
51+
execute()
52+
.map_err(|_| headers::Error::invalid())
53+
.inspect_err(|_| TAP_RECEIPT_INVALID.inc())
4054
}
4155

4256
fn encode<E>(&self, _values: &mut E)
@@ -51,13 +65,16 @@ impl Header for TapHeader {
5165
mod test {
5266
use axum::http::HeaderValue;
5367
use axum_extra::headers::Header;
54-
use test_assets::{create_signed_receipt, SignedReceiptRequest};
68+
use base64::prelude::*;
69+
use prost::Message;
70+
use tap_aggregator::grpc::v2::SignedReceipt;
71+
use test_assets::{create_signed_receipt, create_signed_receipt_v2, SignedReceiptRequest};
5572

5673
use super::TapHeader;
5774
use crate::tap::TapReceipt;
5875

5976
#[tokio::test]
60-
async fn test_decode_valid_tap_receipt_header() {
77+
async fn test_decode_valid_tap_v1_receipt_header() {
6178
let original_receipt = create_signed_receipt(SignedReceiptRequest::builder().build()).await;
6279
let serialized_receipt = serde_json::to_string(&original_receipt).unwrap();
6380
let header_value = HeaderValue::from_str(&serialized_receipt).unwrap();
@@ -68,6 +85,20 @@ mod test {
6885
assert_eq!(decoded_receipt, TapHeader(TapReceipt::V1(original_receipt)));
6986
}
7087

88+
#[test_log::test(tokio::test)]
89+
async fn test_decode_valid_tap_v2_receipt_header() {
90+
let original_receipt = create_signed_receipt_v2().call().await;
91+
let protobuf_receipt = SignedReceipt::from(original_receipt.clone());
92+
let encoded = protobuf_receipt.encode_to_vec();
93+
let base64_encoded = BASE64_STANDARD.encode(encoded);
94+
let header_value = HeaderValue::from_str(&base64_encoded).unwrap();
95+
let header_values = vec![&header_value];
96+
let decoded_receipt = TapHeader::decode(&mut header_values.into_iter())
97+
.expect("tap receipt header value should be valid");
98+
99+
assert_eq!(decoded_receipt, TapHeader(TapReceipt::V2(original_receipt)));
100+
}
101+
71102
#[test]
72103
fn test_decode_non_string_tap_receipt_header() {
73104
let header_value = HeaderValue::from_static("123");

crates/tap-agent/Cargo.toml

+1-1
Original file line numberDiff line numberDiff line change
@@ -63,5 +63,5 @@ tempfile = "3.8.0"
6363
wiremock.workspace = true
6464
wiremock-grpc = "0.0.3-alpha3"
6565
test-assets = { path = "../test-assets" }
66-
test-log = { version = "0.2.12", features = ["trace"] }
66+
test-log.workspace = true
6767
rstest = "0.24.0"

crates/test-assets/src/lib.rs

+30
Original file line numberDiff line numberDiff line change
@@ -348,6 +348,36 @@ pub async fn create_signed_receipt(
348348
.unwrap()
349349
}
350350

351+
/// Function to generate a signed receipt using the TAP_SIGNER wallet.
352+
#[bon::builder]
353+
pub async fn create_signed_receipt_v2(
354+
#[builder(default = ALLOCATION_ID_0)] allocation_id: Address,
355+
#[builder(default)] nonce: u64,
356+
#[builder(default = SystemTime::now()
357+
.duration_since(UNIX_EPOCH)
358+
.unwrap()
359+
.as_nanos() as u64)]
360+
timestamp_ns: u64,
361+
#[builder(default = 1)] value: u128,
362+
) -> tap_graph::v2::SignedReceipt {
363+
let (wallet, _) = &*self::TAP_SIGNER;
364+
365+
Eip712SignedMessage::new(
366+
&self::TAP_EIP712_DOMAIN,
367+
tap_graph::v2::Receipt {
368+
payer: TAP_SENDER.1,
369+
service_provider: INDEXER_ADDRESS,
370+
data_service: Address::ZERO,
371+
allocation_id,
372+
nonce,
373+
timestamp_ns,
374+
value,
375+
},
376+
wallet,
377+
)
378+
.unwrap()
379+
}
380+
351381
pub async fn flush_messages(notify: &Notify) {
352382
loop {
353383
if tokio::time::timeout(Duration::from_millis(10), notify.notified())

0 commit comments

Comments
 (0)