Skip to content

Commit

Permalink
Refactor BrokerService usage with WrappedBrokerService for enhanced m…
Browse files Browse the repository at this point in the history
…etric tracking
  • Loading branch information
nullchinchilla committed Jul 18, 2024
1 parent 8b98502 commit bb5a64d
Show file tree
Hide file tree
Showing 2 changed files with 30 additions and 6 deletions.
7 changes: 4 additions & 3 deletions binaries/geph5-broker/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,8 @@ use ed25519_dalek::SigningKey;
use geph5_broker_protocol::BrokerService;
use nanorpc::{JrpcRequest, JrpcResponse, RpcService};
use once_cell::sync::{Lazy, OnceCell};
use rpc_impl::BrokerImpl;

use rpc_impl::WrappedBrokerService;
use serde::Deserialize;
use smolscale::immortal::{Immortal, RespawnStrategy};
use std::{fmt::Debug, fs, net::SocketAddr, path::PathBuf};
Expand Down Expand Up @@ -114,7 +115,7 @@ async fn main() -> anyhow::Result<()> {
let _tcp_loop = Immortal::respawn(RespawnStrategy::Immediate, || async {
nanorpc_sillad::rpc_serve(
sillad::tcp::TcpListener::bind(CONFIG_FILE.wait().tcp_listen).await?,
BrokerService(BrokerImpl {}),
WrappedBrokerService::new(),
)
.await?;
anyhow::Ok(())
Expand All @@ -127,7 +128,7 @@ async fn main() -> anyhow::Result<()> {
}

async fn rpc(Json(payload): Json<JrpcRequest>) -> Json<JrpcResponse> {
Json(BrokerService(BrokerImpl {}).respond_raw(payload).await)
Json(WrappedBrokerService::new().respond_raw(payload).await)
}

fn log_error(e: &impl Debug) {
Expand Down
29 changes: 26 additions & 3 deletions binaries/geph5-broker/src/rpc_impl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,12 +5,13 @@ use cadence::{StatsdClient, UdpMetricSink};
use ed25519_dalek::VerifyingKey;
use futures_util::future::join_all;
use geph5_broker_protocol::{
AccountLevel, AuthError, BridgeDescriptor, BrokerProtocol, Credential, ExitDescriptor,
ExitList, GenericError, Mac, RouteDescriptor, Signed, DOMAIN_EXIT_DESCRIPTOR,
AccountLevel, AuthError, BridgeDescriptor, BrokerProtocol, BrokerService, Credential,
ExitDescriptor, ExitList, GenericError, Mac, RouteDescriptor, Signed, DOMAIN_EXIT_DESCRIPTOR,
};
use isocountry::CountryCode;
use mizaru2::{BlindedClientToken, BlindedSignature, ClientToken, UnblindedSignature};
use moka::future::Cache;
use nanorpc::{JrpcRequest, JrpcResponse, RpcService, ServerError};
use once_cell::sync::Lazy;
use std::{
net::SocketAddr,
Expand All @@ -27,7 +28,29 @@ use crate::{
CONFIG_FILE, FREE_MIZARU_SK, MASTER_SECRET, PLUS_MIZARU_SK,
};

pub struct BrokerImpl {}
pub struct WrappedBrokerService(BrokerService<BrokerImpl>);

impl WrappedBrokerService {
pub fn new() -> Self {
Self(BrokerService(BrokerImpl {}))
}
}

#[async_trait]
impl RpcService for WrappedBrokerService {
async fn respond(
&self,
method: &str,
params: Vec<serde_json::Value>,
) -> Option<Result<serde_json::Value, ServerError>> {
if let Some(client) = STATSD_CLIENT.as_ref() {
client.count(&format!("broker.{method}"), 1).unwrap();
}
self.0.respond(method, params).await
}
}

struct BrokerImpl {}

#[async_trait]
impl BrokerProtocol for BrokerImpl {
Expand Down

0 comments on commit bb5a64d

Please sign in to comment.