Skip to content

Commit

Permalink
almost done
Browse files Browse the repository at this point in the history
Signed-off-by: Gustavo Inacio <[email protected]>
  • Loading branch information
gusinacio committed Dec 26, 2024
1 parent 6233313 commit 313eecc
Show file tree
Hide file tree
Showing 9 changed files with 202 additions and 392 deletions.
5 changes: 1 addition & 4 deletions tap_aggregator/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -11,9 +11,6 @@ description = "A JSON-RPC service for the Timeline Aggregation Protocol that let
name = "tap_aggregator"
path = "src/main.rs"

[[bin]]
name = "client"

[dependencies]
tap_core = { path = "../tap_core", version = "2.0.0" }
serde.workspace = true
Expand Down Expand Up @@ -42,7 +39,7 @@ futures-util = "0.3.28"
lazy_static = "1.4.0"
ruint = "1.10.1"
tower = { version = "0.4", features = ["util", "steer"] }
tonic = { version = "0.12.3", features = ["transport"] }
tonic = { version = "0.12.3", features = ["transport", "zstd"] }
prost = "0.13.3"

[build-dependencies]
Expand Down
15 changes: 1 addition & 14 deletions tap_aggregator/proto/tap_aggregator.proto
Original file line number Diff line number Diff line change
Expand Up @@ -33,20 +33,7 @@ message RavResponse {
SignedRav rav = 1;
}

message TapRpcApiVersion {
string version = 1;
}

message TapRpcApiVersionsInfo {
repeated TapRpcApiVersion versions_supported = 1;
repeated TapRpcApiVersion versions_deprecated = 2;
}

// Optional request message for ApiVersions (TODO: should we use use google.protobuf.Empty?)
message ApiVersionsRequest {}

service TapAggregator {
rpc ApiVersions(ApiVersionsRequest) returns (TapRpcApiVersionsInfo);
rpc AggregateReceipts(RavRequest) returns (RavResponse);
}

Expand All @@ -55,4 +42,4 @@ message Uint128 {
uint64 high = 1;
// Lowest 64 bits of a 128 bit number.
uint64 low = 2;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -107,3 +107,26 @@ impl From<u128> for Uint128 {
Self { high, low }
}
}

impl RavRequest {
pub fn new(
receipts: Vec<tap_core::receipt::SignedReceipt>,
previous_rav: Option<tap_core::rav::SignedRAV>,
) -> Self {
Self {
receipts: receipts.into_iter().map(Into::into).collect(),
previous_rav: previous_rav.map(Into::into),
}
}
}

impl RavResponse {
pub fn signed_rav(mut self) -> anyhow::Result<tap_core::rav::SignedRAV> {
let signed_rav: tap_core::rav::SignedRAV = self
.rav
.take()
.ok_or(anyhow!("Couldn't find rav"))?
.try_into()?;
Ok(signed_rav)
}
}
148 changes: 0 additions & 148 deletions tap_aggregator/src/hybrid.rs

This file was deleted.

3 changes: 1 addition & 2 deletions tap_aggregator/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,7 @@
pub mod aggregator;
pub mod api_versioning;
pub mod error_codes;
// pub mod hybrid;
pub mod grpc;
pub mod jsonrpsee_helpers;
pub mod metrics;
pub mod server;
pub mod tap_aggregator;
107 changes: 12 additions & 95 deletions tap_aggregator/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,27 +3,15 @@

#![doc = include_str!("../README.md")]

use std::collections::HashSet;
use std::str::FromStr;
use std::{collections::HashSet, str::FromStr};

use alloy::dyn_abi::Eip712Domain;
use alloy::primitives::Address;
use alloy::signers::local::PrivateKeySigner;
use alloy::{dyn_abi::Eip712Domain, primitives::Address, signers::local::PrivateKeySigner};
use anyhow::Result;
use axum::error_handling::HandleError;
use axum::routing::post_service;
use axum::BoxError;
use axum::Router;
use clap::Parser;
use hyper::StatusCode;
use log::{debug, info};
use tap_core::tap_eip712_domain;
use tokio::net::TcpListener;

use log::{debug, info};
use tap_aggregator::metrics;
use tap_aggregator::server;
use tokio::signal;
use tower::make::Shared;
use tap_aggregator::{metrics, server};

#[derive(Parser, Debug)]
#[command(author, version, about, long_about = None)]
Expand Down Expand Up @@ -119,96 +107,25 @@ async fn main() -> Result<()> {

// Start the JSON-RPC server.
// This await is non-blocking
let (service, handle) = server::create_rpc_tower_service(
wallet.clone(),
accepted_addresses.clone(),
domain_separator.clone(),
let (handle, _) = server::run_server(
args.port,
wallet,
accepted_addresses,
domain_separator,
args.max_request_body_size,
args.max_response_body_size,
args.max_connections,
)?;

)
.await?;
info!("Server started. Listening on port {}.", args.port);

async fn handle_anyhow_error(err: BoxError) -> (StatusCode, String) {
(
StatusCode::INTERNAL_SERVER_ERROR,
format!("Something went wrong: {err}"),
)
}
let router = Router::new().route_service(
"/",
HandleError::new(post_service(service), handle_anyhow_error),
);

let grpc_service = server::create_grpc_service(wallet, accepted_addresses, domain_separator)?;

let service = tower::steer::Steer::new(
[router, grpc_service.into_axum_router()],
|req: &hyper::Request<_>, _services: &[_]| {
if req
.headers()
.get(hyper::header::CONTENT_TYPE)
.map(|content_type| content_type.as_bytes())
.filter(|content_type| content_type.starts_with(b"application/grpc"))
.is_some()
{
// route to the gRPC service (second service element) when the
// header is set
1
} else {
// otherwise route to the REST service
0
}
},
);

// Create a `TcpListener` using tokio.
let listener = TcpListener::bind(&format!("0.0.0.0:{}", args.port))
.await
.expect("Failed to bind to indexer-service port");

if let Err(e) = axum::serve(listener, Shared::new(service))
.with_graceful_shutdown(shutdown_handler())
.await
{
anyhow::bail!("Indexer service error: {e}");
}
let _ = handle.await;

// If we're here, we've received a signal to exit.
info!("Shutting down...");

// Stop the server and wait for it to finish gracefully.
let _ = handle.stop();
handle.stopped().await;

debug!("Goodbye!");
Ok(())
}

/// Graceful shutdown handler
async fn shutdown_handler() {
let ctrl_c = async {
signal::ctrl_c()
.await
.expect("Failed to install Ctrl+C handler");
};

let terminate = async {
signal::unix::signal(signal::unix::SignalKind::terminate())
.expect("Failed to install signal handler")
.recv()
.await;
};

tokio::select! {
_ = ctrl_c => {},
_ = terminate => {},
}

info!("Signal received, starting graceful shutdown");
}

fn create_eip712_domain(args: &Args) -> Result<Eip712Domain> {
// Transfrom the args into the types expected by Eip712Domain::new().

Expand Down
Loading

0 comments on commit 313eecc

Please sign in to comment.