Skip to content

Commit

Permalink
more wip
Browse files Browse the repository at this point in the history
Signed-off-by: Gustavo Inacio <[email protected]>
  • Loading branch information
gusinacio committed Dec 24, 2024
1 parent 77041df commit 9d42bff
Show file tree
Hide file tree
Showing 6 changed files with 326 additions and 30 deletions.
8 changes: 7 additions & 1 deletion tap_aggregator/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,9 @@ description = "A JSON-RPC service for the Timeline Aggregation Protocol that let
name = "tap_aggregator"
path = "src/main.rs"

[[bin]]
name = "client"

[dependencies]
tap_core = { path = "../tap_core", version = "2.0.0" }
serde.workspace = true
Expand All @@ -22,6 +25,9 @@ clap = { version = "4.5.15", features = ["derive", "env"] }
serde_json = { version = "1.0.124", features = ["raw_value"] }
strum = { version = "0.26.3", features = ["derive"] }
tracing-subscriber = "0.3.17"
pin-project = "1.1.7"
hyper = { version = "1", features = ["full"] }
hyper-util = "0.1.10"
log = "0.4.19"
prometheus = "0.13.3"
axum = { version = "0.7.5", features = [
Expand All @@ -40,7 +46,7 @@ tonic = { version = "0.12.3", features = ["transport"] }
prost = "0.13.3"

[build-dependencies]
tonic-build = "0.12.3"
tonic-build = "0.12.3"


[dev-dependencies]
Expand Down
4 changes: 4 additions & 0 deletions tap_aggregator/src/bin/client.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@

fn main() {
println!("Hello world");
}
148 changes: 148 additions & 0 deletions tap_aggregator/src/hybrid.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,148 @@
use std::{future::Future, marker::PhantomData};

use hyper::body::Frame;
use std::pin::Pin;
use std::task::Poll;
use tower::Service;

use hyper::{
body::{Body, Incoming},
Request, Response,
};
use pin_project::pin_project;

pub fn hybrid_once<Web, Grpc, WebBody, GrpcBody>(
web: Web,
grpc: Grpc,
) -> HybridService<Web, Grpc, WebBody, GrpcBody>
where
Web: Service<Request<Incoming>, Response = Response<WebBody>>,
Grpc: Service<Request<Incoming>, Response = Response<GrpcBody>>,
Web::Error: Into<Box<dyn std::error::Error + Send + Sync + 'static>>,
Grpc::Error: Into<Box<dyn std::error::Error + Send + Sync + 'static>>,
{
HybridService {
web,
grpc,
_phantom: PhantomData,
}
}

#[derive(Clone)]
pub struct HybridService<Web, Grpc, WebBody, GrpcBody>
where
Web: Service<Request<Incoming>, Response = Response<WebBody>>,
Grpc: Service<Request<Incoming>, Response = Response<GrpcBody>>,
Web::Error: Into<Box<dyn std::error::Error + Send + Sync + 'static>>,
Grpc::Error: Into<Box<dyn std::error::Error + Send + Sync + 'static>>,
{
web: Web,
grpc: Grpc,
_phantom: PhantomData<(WebBody, GrpcBody)>,
}

impl<Web, Grpc, WebBody, GrpcBody> Service<Request<Incoming>>
for HybridService<Web, Grpc, WebBody, GrpcBody>
where
Web: Service<Request<Incoming>, Response = Response<WebBody>>,
Grpc: Service<Request<Incoming>, Response = Response<GrpcBody>>,
Web::Error: Into<Box<dyn std::error::Error + Send + Sync + 'static>>,
Grpc::Error: Into<Box<dyn std::error::Error + Send + Sync + 'static>>,
{
type Response = Response<HybridBody<WebBody, GrpcBody>>;
type Error = Box<dyn std::error::Error + Send + Sync + 'static>;
type Future = HybridFuture<Web::Future, Grpc::Future>;

fn poll_ready(
&mut self,
cx: &mut std::task::Context<'_>,
) -> std::task::Poll<Result<(), Self::Error>> {
match self.web.poll_ready(cx) {
Poll::Ready(Ok(())) => match self.grpc.poll_ready(cx) {
Poll::Ready(Ok(())) => Poll::Ready(Ok(())),
Poll::Ready(Err(e)) => Poll::Ready(Err(e.into())),
Poll::Pending => Poll::Pending,
},
Poll::Ready(Err(e)) => Poll::Ready(Err(e.into())),
Poll::Pending => Poll::Pending,
}
}

fn call(&mut self, req: Request<Incoming>) -> Self::Future {
if req.headers().get("content-type").map(|x| x.as_bytes()) == Some(b"application/grpc") {
HybridFuture::Grpc(self.grpc.call(req))
} else {
HybridFuture::Web(self.web.call(req))
}
}
}

#[derive(Clone)]
#[pin_project(project = HybridBodyProj)]
pub enum HybridBody<WebBody, GrpcBody> {
Web(#[pin] WebBody),
Grpc(#[pin] GrpcBody),
}

impl<WebBody, GrpcBody> Body for HybridBody<WebBody, GrpcBody>
where
WebBody: Body + Send + Unpin,
GrpcBody: Body<Data = WebBody::Data> + Send + Unpin,
WebBody::Error: std::error::Error + Send + Sync + 'static,
GrpcBody::Error: std::error::Error + Send + Sync + 'static,
{
type Data = WebBody::Data;
type Error = Box<dyn std::error::Error + Send + Sync + 'static>;

fn is_end_stream(&self) -> bool {
match self {
HybridBody::Web(b) => b.is_end_stream(),
HybridBody::Grpc(b) => b.is_end_stream(),
}
}

fn poll_frame(
self: Pin<&mut Self>,
cx: &mut std::task::Context,
) -> Poll<Option<Result<Frame<Self::Data>, Self::Error>>> {
match self.project() {
HybridBodyProj::Web(b) => b.poll_frame(cx).map_err(|e| e.into()),
HybridBodyProj::Grpc(b) => b.poll_frame(cx).map_err(|e| e.into()),
}
}
}

#[pin_project(project = HybridFutureProj)]
pub enum HybridFuture<WebFuture, GrpcFuture> {
Web(#[pin] WebFuture),
Grpc(#[pin] GrpcFuture),
}

impl<WebFuture, GrpcFuture, WebBody, GrpcBody, WebError, GrpcError> Future
for HybridFuture<WebFuture, GrpcFuture>
where
WebFuture: Future<Output = Result<Response<WebBody>, WebError>>,
GrpcFuture: Future<Output = Result<Response<GrpcBody>, GrpcError>>,
WebError: Into<Box<dyn std::error::Error + Send + Sync + 'static>>,
GrpcError: Into<Box<dyn std::error::Error + Send + Sync + 'static>>,
{
type Output = Result<
Response<HybridBody<WebBody, GrpcBody>>,
Box<dyn std::error::Error + Send + Sync + 'static>,
>;

fn poll(self: Pin<&mut Self>, cx: &mut std::task::Context) -> Poll<Self::Output> {
match self.project() {
HybridFutureProj::Web(a) => match a.poll(cx) {
Poll::Ready(Ok(res)) => Poll::Ready(Ok(res.map(HybridBody::Web))),
Poll::Ready(Err(e)) => Poll::Ready(Err(e.into())),
Poll::Pending => Poll::Pending,
},
HybridFutureProj::Grpc(b) => match b.poll(cx) {
Poll::Ready(Ok(res)) => Poll::Ready(Ok(res.map(HybridBody::Grpc))),
Poll::Ready(Err(e)) => Poll::Ready(Err(e.into())),
Poll::Pending => Poll::Pending,
},
}
}
}
2 changes: 2 additions & 0 deletions tap_aggregator/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@
pub mod aggregator;
pub mod api_versioning;
pub mod error_codes;
// pub mod hybrid;
pub mod jsonrpsee_helpers;
pub mod metrics;
pub mod server;
pub mod tap_aggregator;
94 changes: 80 additions & 14 deletions tap_aggregator/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,13 +12,20 @@ use alloy::primitives::Address;
use alloy::primitives::FixedBytes;
use alloy::signers::local::PrivateKeySigner;
use anyhow::Result;
use axum::error_handling::HandleError;
use axum::routing::post_service;
use axum::BoxError;
use axum::Router;
use clap::Parser;
use hyper::StatusCode;
use ruint::aliases::U256;
use tokio::signal::unix::{signal, SignalKind};
use tokio::net::TcpListener;

use log::{debug, info};
use tap_aggregator::metrics;
use tap_aggregator::server;
use tokio::signal;
use tower::make::Shared;

#[derive(Parser, Debug)]
#[command(author, version, about, long_about = None)]
Expand Down Expand Up @@ -114,24 +121,60 @@ async fn main() -> Result<()> {

// Start the JSON-RPC server.
// This await is non-blocking
let (handle, _) = server::run_server(
args.port,
wallet,
accepted_addresses,
domain_separator,
let (service, handle) = server::create_rpc_tower_service(
wallet.clone(),
accepted_addresses.clone(),
domain_separator.clone(),
args.max_request_body_size,
args.max_response_body_size,
args.max_connections,
)
.await?;
)?;

info!("Server started. Listening on port {}.", args.port);

// Have tokio wait for SIGTERM or SIGINT.
let mut signal_sigint = signal(SignalKind::interrupt())?;
let mut signal_sigterm = signal(SignalKind::terminate())?;
tokio::select! {
_ = signal_sigint.recv() => debug!("Received SIGINT."),
_ = signal_sigterm.recv() => debug!("Received SIGTERM."),
async fn handle_anyhow_error(err: BoxError) -> (StatusCode, String) {
(
StatusCode::INTERNAL_SERVER_ERROR,
format!("Something went wrong: {err}"),
)
}
let router = Router::new().route_service(
"/",
HandleError::new(post_service(service), handle_anyhow_error),
);

let grpc_service = server::create_grpc_service(wallet, accepted_addresses, domain_separator)?;

let service = tower::steer::Steer::new(
[router, grpc_service.into_axum_router()],
|req: &hyper::Request<_>, _services: &[_]| {
if req
.headers()
.get(hyper::header::CONTENT_TYPE)
.map(|content_type| content_type.as_bytes())
.filter(|content_type| content_type.starts_with(b"application/grpc"))
.is_some()
{
// route to the gRPC service (second service element) when the
// header is set
1
} else {
// otherwise route to the REST service
0
}
},
);

// Create a `TcpListener` using tokio.
let listener = TcpListener::bind(&format!("0.0.0.0:{}", args.port))
.await
.expect("Failed to bind to indexer-service port");

if let Err(e) = axum::serve(listener, Shared::new(service))
.with_graceful_shutdown(shutdown_handler())
.await
{
anyhow::bail!("Indexer service error: {e}");
}

// If we're here, we've received a signal to exit.
Expand All @@ -145,6 +188,29 @@ async fn main() -> Result<()> {
Ok(())
}

/// Graceful shutdown handler
async fn shutdown_handler() {
let ctrl_c = async {
signal::ctrl_c()
.await
.expect("Failed to install Ctrl+C handler");
};

let terminate = async {
signal::unix::signal(signal::unix::SignalKind::terminate())
.expect("Failed to install signal handler")
.recv()
.await;
};

tokio::select! {
_ = ctrl_c => {},
_ = terminate => {},
}

info!("Signal received, starting graceful shutdown");
}

fn create_eip712_domain(args: &Args) -> Result<Eip712Domain> {
// Transfrom the args into the types expected by Eip712Domain::new().

Expand Down
Loading

0 comments on commit 9d42bff

Please sign in to comment.