From 313eecc321fb5562c85139cce2fed3764f91fb6f Mon Sep 17 00:00:00 2001 From: Gustavo Inacio Date: Thu, 26 Dec 2024 15:20:31 +0100 Subject: [PATCH] almost done Signed-off-by: Gustavo Inacio --- tap_aggregator/Cargo.toml | 5 +- tap_aggregator/proto/tap_aggregator.proto | 15 +- .../src/{tap_aggregator.rs => grpc.rs} | 23 ++ tap_aggregator/src/hybrid.rs | 148 ------------ tap_aggregator/src/lib.rs | 3 +- tap_aggregator/src/main.rs | 107 +-------- tap_aggregator/src/server.rs | 210 ++++++++++-------- .../bin/client.rs => tests/aggregate_test.rs} | 62 +++--- tap_integration_tests/tests/showcase.rs | 21 +- 9 files changed, 202 insertions(+), 392 deletions(-) rename tap_aggregator/src/{tap_aggregator.rs => grpc.rs} (84%) delete mode 100644 tap_aggregator/src/hybrid.rs rename tap_aggregator/{src/bin/client.rs => tests/aggregate_test.rs} (52%) diff --git a/tap_aggregator/Cargo.toml b/tap_aggregator/Cargo.toml index 6266b2b..fafd9d1 100644 --- a/tap_aggregator/Cargo.toml +++ b/tap_aggregator/Cargo.toml @@ -11,9 +11,6 @@ description = "A JSON-RPC service for the Timeline Aggregation Protocol that let name = "tap_aggregator" path = "src/main.rs" -[[bin]] -name = "client" - [dependencies] tap_core = { path = "../tap_core", version = "2.0.0" } serde.workspace = true @@ -42,7 +39,7 @@ futures-util = "0.3.28" lazy_static = "1.4.0" ruint = "1.10.1" tower = { version = "0.4", features = ["util", "steer"] } -tonic = { version = "0.12.3", features = ["transport"] } +tonic = { version = "0.12.3", features = ["transport", "zstd"] } prost = "0.13.3" [build-dependencies] diff --git a/tap_aggregator/proto/tap_aggregator.proto b/tap_aggregator/proto/tap_aggregator.proto index f16fc63..6f98294 100644 --- a/tap_aggregator/proto/tap_aggregator.proto +++ b/tap_aggregator/proto/tap_aggregator.proto @@ -33,20 +33,7 @@ message RavResponse { SignedRav rav = 1; } -message TapRpcApiVersion { - string version = 1; -} - -message TapRpcApiVersionsInfo { - repeated TapRpcApiVersion versions_supported = 1; - repeated TapRpcApiVersion versions_deprecated = 2; -} - -// Optional request message for ApiVersions (TODO: should we use use google.protobuf.Empty?) -message ApiVersionsRequest {} - service TapAggregator { - rpc ApiVersions(ApiVersionsRequest) returns (TapRpcApiVersionsInfo); rpc AggregateReceipts(RavRequest) returns (RavResponse); } @@ -55,4 +42,4 @@ message Uint128 { uint64 high = 1; // Lowest 64 bits of a 128 bit number. uint64 low = 2; -} \ No newline at end of file +} diff --git a/tap_aggregator/src/tap_aggregator.rs b/tap_aggregator/src/grpc.rs similarity index 84% rename from tap_aggregator/src/tap_aggregator.rs rename to tap_aggregator/src/grpc.rs index 61181ac..eef9f83 100644 --- a/tap_aggregator/src/tap_aggregator.rs +++ b/tap_aggregator/src/grpc.rs @@ -107,3 +107,26 @@ impl From for Uint128 { Self { high, low } } } + +impl RavRequest { + pub fn new( + receipts: Vec, + previous_rav: Option, + ) -> Self { + Self { + receipts: receipts.into_iter().map(Into::into).collect(), + previous_rav: previous_rav.map(Into::into), + } + } +} + +impl RavResponse { + pub fn signed_rav(mut self) -> anyhow::Result { + let signed_rav: tap_core::rav::SignedRAV = self + .rav + .take() + .ok_or(anyhow!("Couldn't find rav"))? + .try_into()?; + Ok(signed_rav) + } +} diff --git a/tap_aggregator/src/hybrid.rs b/tap_aggregator/src/hybrid.rs deleted file mode 100644 index 3486fbe..0000000 --- a/tap_aggregator/src/hybrid.rs +++ /dev/null @@ -1,148 +0,0 @@ -use std::{future::Future, marker::PhantomData}; - -use hyper::body::Frame; -use std::pin::Pin; -use std::task::Poll; -use tower::Service; - -use hyper::{ - body::{Body, Incoming}, - Request, Response, -}; -use pin_project::pin_project; - -pub fn hybrid_once( - web: Web, - grpc: Grpc, -) -> HybridService -where - Web: Service, Response = Response>, - Grpc: Service, Response = Response>, - Web::Error: Into>, - Grpc::Error: Into>, -{ - HybridService { - web, - grpc, - _phantom: PhantomData, - } -} - -#[derive(Clone)] -pub struct HybridService -where - Web: Service, Response = Response>, - Grpc: Service, Response = Response>, - Web::Error: Into>, - Grpc::Error: Into>, -{ - web: Web, - grpc: Grpc, - _phantom: PhantomData<(WebBody, GrpcBody)>, -} - -impl Service> - for HybridService -where - Web: Service, Response = Response>, - Grpc: Service, Response = Response>, - Web::Error: Into>, - Grpc::Error: Into>, -{ - type Response = Response>; - type Error = Box; - type Future = HybridFuture; - - fn poll_ready( - &mut self, - cx: &mut std::task::Context<'_>, - ) -> std::task::Poll> { - match self.web.poll_ready(cx) { - Poll::Ready(Ok(())) => match self.grpc.poll_ready(cx) { - Poll::Ready(Ok(())) => Poll::Ready(Ok(())), - Poll::Ready(Err(e)) => Poll::Ready(Err(e.into())), - Poll::Pending => Poll::Pending, - }, - Poll::Ready(Err(e)) => Poll::Ready(Err(e.into())), - Poll::Pending => Poll::Pending, - } - } - - fn call(&mut self, req: Request) -> Self::Future { - if req.headers().get("content-type").map(|x| x.as_bytes()) == Some(b"application/grpc") { - HybridFuture::Grpc(self.grpc.call(req)) - } else { - HybridFuture::Web(self.web.call(req)) - } - } -} - -#[derive(Clone)] -#[pin_project(project = HybridBodyProj)] -pub enum HybridBody { - Web(#[pin] WebBody), - Grpc(#[pin] GrpcBody), -} - -impl Body for HybridBody -where - WebBody: Body + Send + Unpin, - GrpcBody: Body + Send + Unpin, - WebBody::Error: std::error::Error + Send + Sync + 'static, - GrpcBody::Error: std::error::Error + Send + Sync + 'static, -{ - type Data = WebBody::Data; - type Error = Box; - - fn is_end_stream(&self) -> bool { - match self { - HybridBody::Web(b) => b.is_end_stream(), - HybridBody::Grpc(b) => b.is_end_stream(), - } - } - - fn poll_frame( - self: Pin<&mut Self>, - cx: &mut std::task::Context, - ) -> Poll, Self::Error>>> { - match self.project() { - HybridBodyProj::Web(b) => b.poll_frame(cx).map_err(|e| e.into()), - HybridBodyProj::Grpc(b) => b.poll_frame(cx).map_err(|e| e.into()), - } - } -} - -#[pin_project(project = HybridFutureProj)] -pub enum HybridFuture { - Web(#[pin] WebFuture), - Grpc(#[pin] GrpcFuture), -} - -impl Future - for HybridFuture -where - WebFuture: Future, WebError>>, - GrpcFuture: Future, GrpcError>>, - WebError: Into>, - GrpcError: Into>, -{ - type Output = Result< - Response>, - Box, - >; - - fn poll(self: Pin<&mut Self>, cx: &mut std::task::Context) -> Poll { - match self.project() { - HybridFutureProj::Web(a) => match a.poll(cx) { - Poll::Ready(Ok(res)) => Poll::Ready(Ok(res.map(HybridBody::Web))), - Poll::Ready(Err(e)) => Poll::Ready(Err(e.into())), - Poll::Pending => Poll::Pending, - }, - HybridFutureProj::Grpc(b) => match b.poll(cx) { - Poll::Ready(Ok(res)) => Poll::Ready(Ok(res.map(HybridBody::Grpc))), - Poll::Ready(Err(e)) => Poll::Ready(Err(e.into())), - Poll::Pending => Poll::Pending, - }, - } - } -} diff --git a/tap_aggregator/src/lib.rs b/tap_aggregator/src/lib.rs index a22739d..6746f3a 100644 --- a/tap_aggregator/src/lib.rs +++ b/tap_aggregator/src/lib.rs @@ -4,8 +4,7 @@ pub mod aggregator; pub mod api_versioning; pub mod error_codes; -// pub mod hybrid; +pub mod grpc; pub mod jsonrpsee_helpers; pub mod metrics; pub mod server; -pub mod tap_aggregator; diff --git a/tap_aggregator/src/main.rs b/tap_aggregator/src/main.rs index c88c62e..5ec2b71 100644 --- a/tap_aggregator/src/main.rs +++ b/tap_aggregator/src/main.rs @@ -3,27 +3,15 @@ #![doc = include_str!("../README.md")] -use std::collections::HashSet; -use std::str::FromStr; +use std::{collections::HashSet, str::FromStr}; -use alloy::dyn_abi::Eip712Domain; -use alloy::primitives::Address; -use alloy::signers::local::PrivateKeySigner; +use alloy::{dyn_abi::Eip712Domain, primitives::Address, signers::local::PrivateKeySigner}; use anyhow::Result; -use axum::error_handling::HandleError; -use axum::routing::post_service; -use axum::BoxError; -use axum::Router; use clap::Parser; -use hyper::StatusCode; +use log::{debug, info}; use tap_core::tap_eip712_domain; -use tokio::net::TcpListener; -use log::{debug, info}; -use tap_aggregator::metrics; -use tap_aggregator::server; -use tokio::signal; -use tower::make::Shared; +use tap_aggregator::{metrics, server}; #[derive(Parser, Debug)] #[command(author, version, about, long_about = None)] @@ -119,96 +107,25 @@ async fn main() -> Result<()> { // Start the JSON-RPC server. // This await is non-blocking - let (service, handle) = server::create_rpc_tower_service( - wallet.clone(), - accepted_addresses.clone(), - domain_separator.clone(), + let (handle, _) = server::run_server( + args.port, + wallet, + accepted_addresses, + domain_separator, args.max_request_body_size, args.max_response_body_size, args.max_connections, - )?; - + ) + .await?; info!("Server started. Listening on port {}.", args.port); - async fn handle_anyhow_error(err: BoxError) -> (StatusCode, String) { - ( - StatusCode::INTERNAL_SERVER_ERROR, - format!("Something went wrong: {err}"), - ) - } - let router = Router::new().route_service( - "/", - HandleError::new(post_service(service), handle_anyhow_error), - ); - - let grpc_service = server::create_grpc_service(wallet, accepted_addresses, domain_separator)?; - - let service = tower::steer::Steer::new( - [router, grpc_service.into_axum_router()], - |req: &hyper::Request<_>, _services: &[_]| { - if req - .headers() - .get(hyper::header::CONTENT_TYPE) - .map(|content_type| content_type.as_bytes()) - .filter(|content_type| content_type.starts_with(b"application/grpc")) - .is_some() - { - // route to the gRPC service (second service element) when the - // header is set - 1 - } else { - // otherwise route to the REST service - 0 - } - }, - ); - - // Create a `TcpListener` using tokio. - let listener = TcpListener::bind(&format!("0.0.0.0:{}", args.port)) - .await - .expect("Failed to bind to indexer-service port"); - - if let Err(e) = axum::serve(listener, Shared::new(service)) - .with_graceful_shutdown(shutdown_handler()) - .await - { - anyhow::bail!("Indexer service error: {e}"); - } + let _ = handle.await; // If we're here, we've received a signal to exit. info!("Shutting down..."); - - // Stop the server and wait for it to finish gracefully. - let _ = handle.stop(); - handle.stopped().await; - - debug!("Goodbye!"); Ok(()) } -/// Graceful shutdown handler -async fn shutdown_handler() { - let ctrl_c = async { - signal::ctrl_c() - .await - .expect("Failed to install Ctrl+C handler"); - }; - - let terminate = async { - signal::unix::signal(signal::unix::SignalKind::terminate()) - .expect("Failed to install signal handler") - .recv() - .await; - }; - - tokio::select! { - _ = ctrl_c => {}, - _ = terminate => {}, - } - - info!("Signal received, starting graceful shutdown"); -} - fn create_eip712_domain(args: &Args) -> Result { // Transfrom the args into the types expected by Eip712Domain::new(). diff --git a/tap_aggregator/src/server.rs b/tap_aggregator/src/server.rs index fc6b78a..b518134 100644 --- a/tap_aggregator/src/server.rs +++ b/tap_aggregator/src/server.rs @@ -3,29 +3,38 @@ use std::{collections::HashSet, str::FromStr}; -use alloy::dyn_abi::Eip712Domain; -use alloy::primitives::Address; -use alloy::signers::local::PrivateKeySigner; +use alloy::{dyn_abi::Eip712Domain, primitives::Address, signers::local::PrivateKeySigner}; use anyhow::Result; -use jsonrpsee::server::TowerService; -use jsonrpsee::{proc_macros::rpc, server::ServerBuilder, server::ServerHandle}; +use axum::{error_handling::HandleError, routing::post_service, BoxError, Router}; +use hyper::StatusCode; +use jsonrpsee::{ + proc_macros::rpc, + server::{ServerBuilder, ServerHandle, TowerService}, +}; use lazy_static::lazy_static; +use log::info; use prometheus::{register_counter, register_int_counter, Counter, IntCounter}; -use tap_core::receipt::SignedReceipt; -use tonic::service::Routes; -use tower::layer::util::Identity; -// use tower::steer::Steer; - -use crate::aggregator::check_and_aggregate_receipts; -use crate::api_versioning::{ - tap_rpc_api_versions_info, TapRpcApiVersion, TapRpcApiVersionsInfo, - TAP_RPC_API_VERSIONS_DEPRECATED, -}; -use crate::error_codes::{JsonRpcErrorCode, JsonRpcWarningCode}; -use crate::jsonrpsee_helpers::{JsonRpcError, JsonRpcResponse, JsonRpcResult, JsonRpcWarning}; -use crate::tap_aggregator; use tap_core::{ - rav::ReceiptAggregateVoucher, receipt::Receipt, signed_message::EIP712SignedMessage, + rav::ReceiptAggregateVoucher, + receipt::{Receipt, SignedReceipt}, + signed_message::EIP712SignedMessage, +}; +use tokio::{net::TcpListener, signal, task::JoinHandle}; +use tonic::{codec::CompressionEncoding, service::Routes, Request, Response, Status}; +use tower::{layer::util::Identity, make::Shared}; + +use crate::{ + aggregator::check_and_aggregate_receipts, + api_versioning::{ + tap_rpc_api_versions_info, TapRpcApiVersion, TapRpcApiVersionsInfo, + TAP_RPC_API_VERSIONS_DEPRECATED, + }, + error_codes::{JsonRpcErrorCode, JsonRpcWarningCode}, + grpc::{ + tap_aggregator_server::{TapAggregator, TapAggregatorServer}, + RavRequest, RavResponse, + }, + jsonrpsee_helpers::{JsonRpcError, JsonRpcResponse, JsonRpcResult, JsonRpcWarning}, }; // Register the metrics into the global metrics registry. @@ -35,37 +44,27 @@ lazy_static! { "Number of successful receipt aggregation requests." ) .unwrap(); -} -lazy_static! { static ref AGGREGATION_FAILURE_COUNTER: IntCounter = register_int_counter!( "aggregation_failure_count", "Number of failed receipt aggregation requests (for any reason)." ) .unwrap(); -} -lazy_static! { static ref DEPRECATION_WARNING_COUNT: IntCounter = register_int_counter!( "deprecation_warning_count", "Number of deprecation warnings sent to clients." ) .unwrap(); -} -lazy_static! { static ref VERSION_ERROR_COUNT: IntCounter = register_int_counter!( "version_error_count", "Number of API version errors sent to clients." ) .unwrap(); -} -lazy_static! { static ref TOTAL_AGGREGATED_RECEIPTS: IntCounter = register_int_counter!( "total_aggregated_receipts", "Total number of receipts successfully aggregated." ) .unwrap(); -} // Using float for the GRT value because it can somewhat easily exceed the maximum value of int64. -lazy_static! { static ref TOTAL_GRT_AGGREGATED: Counter = register_counter!( "total_aggregated_grt", "Total successfully aggregated GRT value (wei)." @@ -96,6 +95,7 @@ pub trait Rpc { ) -> JsonRpcResult>; } +#[derive(Clone)] struct RpcImpl { wallet: PrivateKeySigner, accepted_addresses: HashSet
, @@ -177,27 +177,8 @@ fn aggregate_receipts_( } } -use tap_aggregator::tap_aggregator_server::{TapAggregator, TapAggregatorServer}; -use tap_aggregator::{ - ApiVersionsRequest, RavRequest, RavResponse, TapRpcApiVersionsInfo as TapGRpcApiVersionsInfo, -}; -use tonic::{Request, Response, Status}; - #[tonic::async_trait] impl TapAggregator for RpcImpl { - async fn api_versions( - &self, - _request: Request, - ) -> Result, Status> { - // Example response - let response = TapGRpcApiVersionsInfo { - versions_deprecated: vec![], - versions_supported: vec![], - }; - - Ok(Response::new(response)) - } - async fn aggregate_receipts( &self, request: Request, @@ -281,7 +262,6 @@ impl RpcServer for RpcImpl { } } -// TODO: create a gRPC alike run_server pub async fn run_server( port: u16, wallet: PrivateKeySigner, @@ -290,62 +270,115 @@ pub async fn run_server( max_request_body_size: u32, max_response_body_size: u32, max_concurrent_connections: u32, -) -> Result<(ServerHandle, std::net::SocketAddr)> { +) -> Result<(JoinHandle<()>, std::net::SocketAddr)> { + // Create a `TcpListener` using tokio. + let listener = TcpListener::bind(&format!("0.0.0.0:{}", port)) + .await + .expect("Failed to bind to indexer-service port"); + // Setting up the JSON RPC server - println!("Starting server..."); - let server = ServerBuilder::new() - .max_request_body_size(max_request_body_size) - .max_response_body_size(max_response_body_size) - .max_connections(max_concurrent_connections) - .http_only() - .build(format!("0.0.0.0:{}", port)) - .await?; - let addr = server.local_addr()?; - println!("Listening on: {}", addr); let rpc_impl = RpcImpl { wallet, accepted_addresses, domain_separator, }; - let handle = server.start(rpc_impl.into_rpc()); + let (json_rpc_service, _) = create_json_rpc_service( + rpc_impl.clone(), + max_request_body_size, + max_response_body_size, + max_concurrent_connections, + )?; + + async fn handle_anyhow_error(err: BoxError) -> (StatusCode, String) { + ( + StatusCode::INTERNAL_SERVER_ERROR, + format!("Something went wrong: {err}"), + ) + } + let json_rpc_router = Router::new().route_service( + "/", + HandleError::new(post_service(json_rpc_service), handle_anyhow_error), + ); + + let grpc_service = create_grpc_service(rpc_impl)?; + + let service = tower::steer::Steer::new( + [json_rpc_router, grpc_service.into_axum_router()], + |req: &hyper::Request<_>, _services: &[_]| { + if req + .headers() + .get(hyper::header::CONTENT_TYPE) + .map(|content_type| content_type.as_bytes()) + .filter(|content_type| content_type.starts_with(b"application/grpc")) + .is_some() + { + // route to the gRPC service (second service element) when the + // header is set + 1 + } else { + // otherwise route to the REST service + 0 + } + }, + ); + + let addr = listener.local_addr()?; + let handle = tokio::spawn(async move { + if let Err(e) = axum::serve(listener, Shared::new(service)) + .with_graceful_shutdown(shutdown_handler()) + .await + { + log::error!("Tap Aggregator error: {e}"); + } + }); + Ok((handle, addr)) } -pub fn create_grpc_service( - wallet: PrivateKeySigner, - accepted_addresses: HashSet
, - domain_separator: Eip712Domain, -) -> Result { - let rpc_impl = RpcImpl { - wallet, - accepted_addresses, - domain_separator, +/// Graceful shutdown handler +async fn shutdown_handler() { + let ctrl_c = async { + signal::ctrl_c() + .await + .expect("Failed to install Ctrl+C handler"); + }; + + let terminate = async { + signal::unix::signal(signal::unix::SignalKind::terminate()) + .expect("Failed to install signal handler") + .recv() + .await; }; - let grpc_service = Routes::new(TapAggregatorServer::new(rpc_impl)).prepare(); + + tokio::select! { + _ = ctrl_c => {}, + _ = terminate => {}, + } + + info!("Signal received, starting graceful shutdown"); +} + +fn create_grpc_service(rpc_impl: RpcImpl) -> Result { + let grpc_service = Routes::new( + TapAggregatorServer::new(rpc_impl).accept_compressed(CompressionEncoding::Zstd), + ) + .prepare(); Ok(grpc_service) } -pub fn create_rpc_tower_service( - wallet: PrivateKeySigner, - accepted_addresses: HashSet
, - domain_separator: Eip712Domain, +fn create_json_rpc_service( + rpc_impl: RpcImpl, max_request_body_size: u32, max_response_body_size: u32, max_concurrent_connections: u32, ) -> Result<(TowerService, ServerHandle)> { - println!("Starting server..."); let service_builder = ServerBuilder::new() .max_request_body_size(max_request_body_size) .max_response_body_size(max_response_body_size) .max_connections(max_concurrent_connections) .http_only() .to_service_builder(); - let rpc_impl = RpcImpl { - wallet, - accepted_addresses, - domain_separator, - }; use jsonrpsee::server::stop_channel; let (stop_handle, server_handle) = stop_channel(); let handle = service_builder.build(rpc_impl.into_rpc(), stop_handle); @@ -445,8 +478,7 @@ mod tests { .await .unwrap(); - handle.stop().unwrap(); - handle.stopped().await; + handle.abort(); } #[rstest] @@ -526,8 +558,7 @@ mod tests { assert!(remote_rav.recover_signer(&domain_separator).unwrap() == keys_main.address); - handle.stop().unwrap(); - handle.stopped().await; + handle.abort(); } #[rstest] @@ -616,8 +647,7 @@ mod tests { assert!(rav.recover_signer(&domain_separator).unwrap() == keys_main.address); - handle.stop().unwrap(); - handle.stopped().await; + handle.abort(); } #[rstest] @@ -691,8 +721,7 @@ mod tests { _ => panic!("Expected data in error"), } - handle.stop().unwrap(); - handle.stopped().await; + handle.abort(); } /// Test that the server returns an error when the request size exceeds the limit. @@ -789,7 +818,6 @@ mod tests { // Make sure the error is a HTTP 413 Content Too Large assert!(res.unwrap_err().to_string().contains("413")); - handle.stop().unwrap(); - handle.stopped().await; + handle.abort(); } } diff --git a/tap_aggregator/src/bin/client.rs b/tap_aggregator/tests/aggregate_test.rs similarity index 52% rename from tap_aggregator/src/bin/client.rs rename to tap_aggregator/tests/aggregate_test.rs index 1c908c6..9790767 100644 --- a/tap_aggregator/src/bin/client.rs +++ b/tap_aggregator/tests/aggregate_test.rs @@ -1,36 +1,50 @@ -use std::str::FromStr; +use std::{collections::HashSet, str::FromStr}; -use alloy::{ - primitives::{Address, B256}, - signers::local::PrivateKeySigner, -}; +use alloy::{primitives::Address, signers::local::PrivateKeySigner}; use jsonrpsee::{core::client::ClientT, http_client::HttpClientBuilder, rpc_params}; use tap_aggregator::{ + grpc::{tap_aggregator_client::TapAggregatorClient, RavRequest}, jsonrpsee_helpers::JsonRpcResponse, - tap_aggregator::{tap_aggregator_client::TapAggregatorClient, RavRequest}, + server, }; use tap_core::{ rav::ReceiptAggregateVoucher, receipt::Receipt, signed_message::EIP712SignedMessage, tap_eip712_domain, }; +use tonic::codec::CompressionEncoding; -#[tokio::main] -async fn main() { - println!("Hello world"); - +#[tokio::test] +async fn aggregation_test() { let domain_separator = tap_eip712_domain(1, Address::ZERO); - let mut client = TapAggregatorClient::connect("http://localhost:8080") - .await - .unwrap(); + let wallet = PrivateKeySigner::random(); - let wallet = PrivateKeySigner::from_bytes( - &B256::from_str("8990ccae49d8fff982bfcd2b3f83d6b14cb272fcd29e49f7a335accd906005dc") - .unwrap(), + let max_request_body_size = 1024 * 100; + let max_response_body_size = 1024 * 100; + let max_concurrent_connections = 1; + + let accepted_addresses = HashSet::from([wallet.address()]); + + let (join_handle, local_addr) = server::run_server( + 0, + wallet.clone(), + accepted_addresses, + domain_separator.clone(), + max_request_body_size, + max_response_body_size, + max_concurrent_connections, ) + .await .unwrap(); + let endpoint = format!("http://127.0.0.1:{}", local_addr.port()); + + let mut client = TapAggregatorClient::connect(endpoint.clone()) + .await + .unwrap() + .send_compressed(CompressionEncoding::Zstd); + let allocation_id = Address::from_str("0xabababababababababababababababababababab").unwrap(); // Create receipts @@ -46,18 +60,11 @@ async fn main() { ); } - let rav_request = RavRequest { - receipts: receipts.clone().into_iter().map(Into::into).collect(), - previous_rav: None, - }; + let rav_request = RavRequest::new(receipts.clone(), None); let res = client.aggregate_receipts(rav_request).await.unwrap(); - let signed_rav: tap_core::rav::SignedRAV = - res.into_inner().rav.take().unwrap().try_into().unwrap(); - println!("Response: {signed_rav:?}"); + let signed_rav: tap_core::rav::SignedRAV = res.into_inner().signed_rav().unwrap(); - let sender_aggregator = HttpClientBuilder::default() - .build("http://localhost:8080") - .unwrap(); + let sender_aggregator = HttpClientBuilder::default().build(&endpoint).unwrap(); let previous_rav: Option = None; @@ -74,6 +81,5 @@ async fn main() { .unwrap(); let response = response.data; assert_eq!(signed_rav, response); - - println!("Response: {response:?}"); + join_handle.abort(); } diff --git a/tap_integration_tests/tests/showcase.rs b/tap_integration_tests/tests/showcase.rs index 19210b3..6c89f05 100644 --- a/tap_integration_tests/tests/showcase.rs +++ b/tap_integration_tests/tests/showcase.rs @@ -35,6 +35,7 @@ use tap_core::{ signed_message::{EIP712SignedMessage, MessageId}, tap_eip712_domain, }; +use tokio::task::JoinHandle; use crate::indexer_mock; @@ -345,7 +346,7 @@ async fn single_indexer_test_server( indexer_1_context: ContextFixture, available_escrow: u128, receipt_threshold_1: u64, -) -> Result<(ServerHandle, SocketAddr, ServerHandle, SocketAddr)> { +) -> Result<(ServerHandle, SocketAddr, JoinHandle<()>, SocketAddr)> { let sender_id = keys_sender.address(); let (sender_aggregator_handle, sender_aggregator_addr) = start_sender_aggregator( keys_sender, @@ -390,7 +391,7 @@ async fn two_indexers_test_servers( SocketAddr, ServerHandle, SocketAddr, - ServerHandle, + JoinHandle<()>, SocketAddr, )> { let sender_id = keys_sender.address(); @@ -454,7 +455,7 @@ async fn single_indexer_wrong_sender_test_server( indexer_1_context: ContextFixture, available_escrow: u128, receipt_threshold_1: u64, -) -> Result<(ServerHandle, SocketAddr, ServerHandle, SocketAddr)> { +) -> Result<(ServerHandle, SocketAddr, JoinHandle<()>, SocketAddr)> { let sender_id = wrong_keys_sender.address(); let (sender_aggregator_handle, sender_aggregator_addr) = start_sender_aggregator( wrong_keys_sender, @@ -491,7 +492,7 @@ async fn single_indexer_wrong_sender_test_server( #[tokio::test] async fn test_manager_one_indexer( #[future] single_indexer_test_server: Result< - (ServerHandle, SocketAddr, ServerHandle, SocketAddr), + (ServerHandle, SocketAddr, JoinHandle<()>, SocketAddr), Error, >, requests_1: Vec>, @@ -522,7 +523,7 @@ async fn test_manager_two_indexers( SocketAddr, ServerHandle, SocketAddr, - ServerHandle, + JoinHandle<()>, SocketAddr, ), Error, @@ -559,7 +560,7 @@ async fn test_manager_two_indexers( #[tokio::test] async fn test_manager_wrong_aggregator_keys( #[future] single_indexer_wrong_sender_test_server: Result< - (ServerHandle, SocketAddr, ServerHandle, SocketAddr), + (ServerHandle, SocketAddr, JoinHandle<()>, SocketAddr), Error, >, requests_1: Vec>, @@ -601,7 +602,7 @@ async fn test_manager_wrong_aggregator_keys( #[tokio::test] async fn test_manager_wrong_requestor_keys( #[future] single_indexer_test_server: Result< - (ServerHandle, SocketAddr, ServerHandle, SocketAddr), + (ServerHandle, SocketAddr, JoinHandle<()>, SocketAddr), Error, >, wrong_requests: Vec>, @@ -631,7 +632,7 @@ async fn test_tap_manager_rav_timestamp_cuttoff( SocketAddr, ServerHandle, SocketAddr, - ServerHandle, + JoinHandle<()>, SocketAddr, ), Error, @@ -765,7 +766,7 @@ async fn test_tap_aggregator_rav_timestamp_cuttoff( } assert!(expected_value == second_rav_response.data.message.valueAggregate); - sender_handle.stop()?; + sender_handle.abort(); Ok(()) } @@ -833,7 +834,7 @@ async fn start_sender_aggregator( http_request_size_limit: u32, http_response_size_limit: u32, http_max_concurrent_connections: u32, -) -> Result<(ServerHandle, SocketAddr)> { +) -> Result<(JoinHandle<()>, SocketAddr)> { let http_port = { let listener = TcpListener::bind("127.0.0.1:0")?; listener.local_addr()?.port()