Skip to content

Commit

Permalink
Apply method properties to requests handling
Browse files Browse the repository at this point in the history
  • Loading branch information
tiram88 committed Dec 13, 2023
1 parent 11ce422 commit 0e2c28f
Show file tree
Hide file tree
Showing 3 changed files with 48 additions and 12 deletions.
55 changes: 44 additions & 11 deletions rpc/grpc/server/src/connection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,10 @@ use crate::{
connection_handler::ServerContext,
error::{GrpcServerError, GrpcServerResult},
manager::ManagerEvent,
request_handler::{factory::Factory, interface::Interface},
request_handler::{factory::Factory, interface::Interface, method::RoutingPolicy},
};
use async_channel::{bounded, Receiver as MpmcReceiver, Sender as MpmcSender, TrySendError as MpmcTrySendError};
use itertools::Itertools;
use kaspa_core::{debug, info, trace, warn};
use kaspa_grpc_core::{
ops::KaspadPayloadOps,
Expand All @@ -23,19 +25,19 @@ use std::{
Arc,
},
};
use tokio::sync::mpsc::{channel as mpsc_channel, Receiver as MpscReceiver, Sender as MpscSender};
use tokio::sync::mpsc::Sender as MpscSender;
use tokio::sync::oneshot::{channel as oneshot_channel, Sender as OneshotSender};
use tokio::{select, sync::mpsc::error::TrySendError};
use tonic::Streaming;
use uuid::Uuid;

pub type IncomingRoute = MpscReceiver<KaspadRequest>;
pub type IncomingRoute = MpmcReceiver<KaspadRequest>;
pub type GrpcNotifier = Notifier<Notification, Connection>;
pub type GrpcSender = MpscSender<KaspadResponse>;
pub type StatusResult<T> = Result<T, tonic::Status>;
pub type ConnectionId = Uuid;

type RequestSender = MpscSender<KaspadRequest>;
type RequestSender = MpmcSender<KaspadRequest>;
type RoutingMap = HashMap<KaspadPayloadOps, RequestSender>;

#[derive(Debug, Default)]
Expand Down Expand Up @@ -104,11 +106,34 @@ impl Router {
fn subscribe(&mut self, connection: &Connection, rpc_op: KaspadPayloadOps) -> RequestSender {
match self.routing_map.entry(rpc_op) {
Entry::Vacant(entry) => {
let (sender, receiver) = mpsc_channel(Connection::request_channel_size());
let handler = Factory::new_handler(rpc_op, receiver, self.server_context.clone(), &self.interface, connection.clone());
handler.launch();
let method = self.interface.get_method(&rpc_op);
let (sender, receiver) = bounded(method.queue_size());
let handlers = (0..method.tasks())
.map(|_| {
Factory::new_handler(
rpc_op,
receiver.clone(),
self.server_context.clone(),
&self.interface,
connection.clone(),
)
})
.collect_vec();
handlers.into_iter().for_each(|x| x.launch());
entry.insert(sender.clone());
trace!("GRPC, Connection::subscribe - {:?} route is registered, client:{:?}", rpc_op, connection.identity());
match method.tasks() {
1 => {
trace!("GRPC, Connection::subscribe - {:?} route is registered, client:{:?}", rpc_op, connection.identity());
}
n => {
trace!(
"GRPC, Connection::subscribe - {:?} route is registered with {} workers, client:{:?}",
rpc_op,
n,
connection.identity()
);
}
}
sender
}
Entry::Occupied(entry) => entry.get().clone(),
Expand All @@ -121,11 +146,19 @@ impl Router {
return Err(GrpcServerError::InvalidRequestPayload);
}
let rpc_op = request.payload.as_ref().unwrap().into();
let method = self.interface.get_method(&rpc_op);
let sender = self.routing_map.get(&rpc_op).cloned();
let sender = sender.unwrap_or_else(|| self.subscribe(connection, rpc_op));
match sender.send(request).await {
Ok(_) => Ok(()),
Err(_) => Err(GrpcServerError::ClosedHandler(rpc_op)),
match method.routing_policy() {
RoutingPolicy::Enqueue => match sender.send(request).await {
Ok(_) => Ok(()),
Err(_) => Err(GrpcServerError::ClosedHandler(rpc_op)),
},
RoutingPolicy::DropIfFull => match sender.try_send(request) {
Ok(_) => Ok(()),
Err(MpmcTrySendError::Full(_)) => Err(GrpcServerError::RouteIsFull(rpc_op)),
Err(MpmcTrySendError::Closed(_)) => Err(GrpcServerError::ClosedHandler(rpc_op)),
},
}
}

Expand Down
3 changes: 3 additions & 0 deletions rpc/grpc/server/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,9 @@ pub enum GrpcServerError {
#[error("{0:?} handler is closed")]
ClosedHandler(KaspadPayloadOps),

#[error("{0:?} route to handler is full")]
RouteIsFull(KaspadPayloadOps),

#[error("client connection is closed")]
ConnectionClosed,

Expand Down
2 changes: 1 addition & 1 deletion rpc/grpc/server/src/request_handler/handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ impl RequestHandler {
impl Handler for RequestHandler {
async fn start(&mut self) {
debug!("GRPC, Starting request handler {:?} for client {}", self.rpc_op, self.connection);
while let Some(request) = self.incoming_route.recv().await {
while let Ok(request) = self.incoming_route.recv().await {
let response = self.handle_request(request).await;
match response {
Ok(response) => {
Expand Down

0 comments on commit 0e2c28f

Please sign in to comment.