Skip to content

Commit

Permalink
Merge remote-tracking branch 'tiram-origin/parallel_block_submits' in…
Browse files Browse the repository at this point in the history
…to tn11-workarounds
  • Loading branch information
coderofstuff committed Dec 13, 2023
2 parents 7100011 + 0e2c28f commit 726043f
Show file tree
Hide file tree
Showing 11 changed files with 149 additions and 25 deletions.
11 changes: 8 additions & 3 deletions kaspad/src/daemon.rs
Original file line number Diff line number Diff line change
Expand Up @@ -442,7 +442,7 @@ do you confirm? (answer y/n or pass --yes to the Kaspad command line to confirm
mining_manager,
flow_context,
index_service.as_ref().map(|x| x.utxoindex().unwrap()),
config,
config.clone(),
core.clone(),
processing_counters,
wrpc_borsh_counters.clone(),
Expand All @@ -451,8 +451,13 @@ do you confirm? (answer y/n or pass --yes to the Kaspad command line to confirm
p2p_tower_counters.clone(),
grpc_tower_counters.clone(),
));
let grpc_service =
Arc::new(GrpcService::new(grpc_server_addr, rpc_core_service.clone(), args.rpc_max_clients, grpc_tower_counters));
let grpc_service = Arc::new(GrpcService::new(
grpc_server_addr,
config.bps(),
rpc_core_service.clone(),
args.rpc_max_clients,
grpc_tower_counters,
));

// Create an async runtime and register the top-level async services
let async_runtime = Arc::new(AsyncRuntime::new(args.async_threads));
Expand Down
3 changes: 2 additions & 1 deletion rpc/grpc/server/src/adaptor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,13 +33,14 @@ impl Adaptor {

pub fn server(
serve_address: NetAddress,
bps: u64,
manager: Manager,
core_service: DynRpcService,
core_notifier: Arc<Notifier<Notification, ChannelConnection>>,
counters: Arc<TowerConnectionCounters>,
) -> Arc<Self> {
let (manager_sender, manager_receiver) = mpsc_channel(Self::manager_channel_size());
let connection_handler = ConnectionHandler::new(manager_sender, core_service.clone(), core_notifier, counters);
let connection_handler = ConnectionHandler::new(bps, manager_sender, core_service.clone(), core_notifier, counters);
let server_termination = connection_handler.serve(serve_address);
let adaptor = Arc::new(Adaptor::new(Some(server_termination), connection_handler, manager, serve_address));
adaptor.manager.clone().start_event_loop(manager_receiver);
Expand Down
55 changes: 44 additions & 11 deletions rpc/grpc/server/src/connection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,10 @@ use crate::{
connection_handler::ServerContext,
error::{GrpcServerError, GrpcServerResult},
manager::ManagerEvent,
request_handler::{factory::Factory, interface::Interface},
request_handler::{factory::Factory, interface::Interface, method::RoutingPolicy},
};
use async_channel::{bounded, Receiver as MpmcReceiver, Sender as MpmcSender, TrySendError as MpmcTrySendError};
use itertools::Itertools;
use kaspa_core::{debug, info, trace, warn};
use kaspa_grpc_core::{
ops::KaspadPayloadOps,
Expand All @@ -23,19 +25,19 @@ use std::{
Arc,
},
};
use tokio::sync::mpsc::{channel as mpsc_channel, Receiver as MpscReceiver, Sender as MpscSender};
use tokio::sync::mpsc::Sender as MpscSender;
use tokio::sync::oneshot::{channel as oneshot_channel, Sender as OneshotSender};
use tokio::{select, sync::mpsc::error::TrySendError};
use tonic::Streaming;
use uuid::Uuid;

pub type IncomingRoute = MpscReceiver<KaspadRequest>;
pub type IncomingRoute = MpmcReceiver<KaspadRequest>;
pub type GrpcNotifier = Notifier<Notification, Connection>;
pub type GrpcSender = MpscSender<KaspadResponse>;
pub type StatusResult<T> = Result<T, tonic::Status>;
pub type ConnectionId = Uuid;

type RequestSender = MpscSender<KaspadRequest>;
type RequestSender = MpmcSender<KaspadRequest>;
type RoutingMap = HashMap<KaspadPayloadOps, RequestSender>;

#[derive(Debug, Default)]
Expand Down Expand Up @@ -104,11 +106,34 @@ impl Router {
fn subscribe(&mut self, connection: &Connection, rpc_op: KaspadPayloadOps) -> RequestSender {
match self.routing_map.entry(rpc_op) {
Entry::Vacant(entry) => {
let (sender, receiver) = mpsc_channel(Connection::request_channel_size());
let handler = Factory::new_handler(rpc_op, receiver, self.server_context.clone(), &self.interface, connection.clone());
handler.launch();
let method = self.interface.get_method(&rpc_op);
let (sender, receiver) = bounded(method.queue_size());
let handlers = (0..method.tasks())
.map(|_| {
Factory::new_handler(
rpc_op,
receiver.clone(),
self.server_context.clone(),
&self.interface,
connection.clone(),
)
})
.collect_vec();
handlers.into_iter().for_each(|x| x.launch());
entry.insert(sender.clone());
trace!("GRPC, Connection::subscribe - {:?} route is registered, client:{:?}", rpc_op, connection.identity());
match method.tasks() {
1 => {
trace!("GRPC, Connection::subscribe - {:?} route is registered, client:{:?}", rpc_op, connection.identity());
}
n => {
trace!(
"GRPC, Connection::subscribe - {:?} route is registered with {} workers, client:{:?}",
rpc_op,
n,
connection.identity()
);
}
}
sender
}
Entry::Occupied(entry) => entry.get().clone(),
Expand All @@ -121,11 +146,19 @@ impl Router {
return Err(GrpcServerError::InvalidRequestPayload);
}
let rpc_op = request.payload.as_ref().unwrap().into();
let method = self.interface.get_method(&rpc_op);
let sender = self.routing_map.get(&rpc_op).cloned();
let sender = sender.unwrap_or_else(|| self.subscribe(connection, rpc_op));
match sender.send(request).await {
Ok(_) => Ok(()),
Err(_) => Err(GrpcServerError::ClosedHandler(rpc_op)),
match method.routing_policy() {
RoutingPolicy::Enqueue => match sender.send(request).await {
Ok(_) => Ok(()),
Err(_) => Err(GrpcServerError::ClosedHandler(rpc_op)),
},
RoutingPolicy::DropIfFull => match sender.try_send(request) {
Ok(_) => Ok(()),
Err(MpmcTrySendError::Full(_)) => Err(GrpcServerError::RouteIsFull(rpc_op)),
Err(MpmcTrySendError::Closed(_)) => Err(GrpcServerError::ClosedHandler(rpc_op)),
},
}
}

Expand Down
3 changes: 2 additions & 1 deletion rpc/grpc/server/src/connection_handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,7 @@ const GRPC_SERVER: &str = "grpc-server";

impl ConnectionHandler {
pub(crate) fn new(
bps: u64,
manager_sender: MpscSender<ManagerEvent>,
core_service: DynRpcService,
core_notifier: Arc<Notifier<Notification, ChannelConnection>>,
Expand All @@ -93,7 +94,7 @@ impl ConnectionHandler {
let notifier: Arc<Notifier<Notification, Connection>> =
Arc::new(Notifier::new(GRPC_SERVER, core_events, vec![collector], vec![subscriber], 10));
let server_context = ServerContext::new(core_service, notifier);
let interface = Arc::new(Factory::new_interface(server_context.clone()));
let interface = Arc::new(Factory::new_interface(server_context.clone(), bps));
let running = Default::default();

Self { manager_sender, server_context, interface, running, counters }
Expand Down
3 changes: 3 additions & 0 deletions rpc/grpc/server/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,9 @@ pub enum GrpcServerError {
#[error("{0:?} handler is closed")]
ClosedHandler(KaspadPayloadOps),

#[error("{0:?} route to handler is full")]
RouteIsFull(KaspadPayloadOps),

#[error("client connection is closed")]
ConnectionClosed,

Expand Down
13 changes: 11 additions & 2 deletions rpc/grpc/server/src/request_handler/factory.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,9 @@
use super::{handler::RequestHandler, handler_trait::Handler, interface::Interface, method::Method};
use super::{
handler::RequestHandler,
handler_trait::Handler,
interface::Interface,
method::{Method, RoutingPolicy},
};
use crate::{
connection::{Connection, IncomingRoute},
connection_handler::ServerContext,
Expand All @@ -22,7 +27,7 @@ impl Factory {
Box::new(RequestHandler::new(rpc_op, incoming_route, server_context, interface, connection))
}

pub fn new_interface(server_ctx: ServerContext) -> Interface {
pub fn new_interface(server_ctx: ServerContext, bps: u64) -> Interface {
// The array as last argument in the macro call below must exactly match the full set of
// KaspadPayloadOps variants.
let mut interface = build_grpc_server_interface!(
Expand Down Expand Up @@ -122,6 +127,10 @@ impl Factory {
});
interface.replace_method(KaspadPayloadOps::NotifyFinalityConflict, method);

// Methods with special properties
let bps = bps as usize;
interface.replace_method_properties(KaspadPayloadOps::SubmitBlock, bps, 10.max(bps * 2), RoutingPolicy::DropIfFull);

interface
}
}
2 changes: 1 addition & 1 deletion rpc/grpc/server/src/request_handler/handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ impl RequestHandler {
impl Handler for RequestHandler {
async fn start(&mut self) {
debug!("GRPC, Starting request handler {:?} for client {}", self.rpc_op, self.connection);
while let Some(request) = self.incoming_route.recv().await {
while let Ok(request) = self.incoming_route.recv().await {
let response = self.handle_request(request).await;
match response {
Ok(response) => {
Expand Down
11 changes: 10 additions & 1 deletion rpc/grpc/server/src/request_handler/interface.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use super::method::{Method, MethodTrait};
use super::method::{Method, MethodTrait, RoutingPolicy};
use crate::{
connection::Connection,
connection_handler::ServerContext,
Expand Down Expand Up @@ -55,6 +55,15 @@ impl Interface {
let _ = self.methods.insert(op, method);
}

pub fn replace_method_properties(&mut self, op: KaspadPayloadOps, tasks: usize, queue_size: usize, routing_policy: RoutingPolicy) {
self.methods.entry(op).and_modify(|x| {
let method: Method<ServerContext, Connection, KaspadRequest, KaspadResponse> =
Method::with_properties(x.method_fn(), tasks, queue_size, routing_policy);
let method: Arc<dyn MethodTrait<ServerContext, Connection, KaspadRequest, KaspadResponse>> = Arc::new(method);
*x = method;
});
}

pub async fn call(
&self,
op: &KaspadPayloadOps,
Expand Down
57 changes: 56 additions & 1 deletion rpc/grpc/server/src/request_handler/method.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,21 @@ use async_trait::async_trait;
use futures::Future;
use std::{pin::Pin, sync::Arc};

#[derive(Clone, Copy, Eq, PartialEq)]
pub enum RoutingPolicy {
Enqueue,
DropIfFull,
}

#[async_trait]
pub trait MethodTrait<ServerContext, ConnectionContext, Request, Response>: Send + Sync + 'static {
async fn call(&self, server_ctx: ServerContext, connection_ctx: ConnectionContext, request: Request)
-> GrpcServerResult<Response>;

fn method_fn(&self) -> MethodFn<ServerContext, ConnectionContext, Request, Response>;
fn tasks(&self) -> usize;
fn queue_size(&self) -> usize;
fn routing_policy(&self) -> RoutingPolicy;
}

/// RPC method function type
Expand All @@ -24,7 +35,17 @@ where
Request: Send + Sync + 'static,
Response: Send + Sync + 'static,
{
/// Function called when executing the method
method: MethodFn<ServerContext, ConnectionContext, Request, Response>,

/// Number of connection concurrent request handlers
tasks: usize,

/// Size of the request queue
queue_size: usize,

/// Policy applied when the routing channel is full
routing_policy: RoutingPolicy,
}

impl<ServerContext, ConnectionContext, Request, Response> Method<ServerContext, ConnectionContext, Request, Response>
Expand All @@ -38,7 +59,25 @@ where
where
FN: Send + Sync + Fn(ServerContext, ConnectionContext, Request) -> MethodFnReturn<Response> + 'static,
{
Method { method: Arc::new(Box::new(method_fn)) }
Method {
method: Arc::new(Box::new(method_fn)),
tasks: 1,
queue_size: Self::default_queue_size(),
routing_policy: RoutingPolicy::Enqueue,
}
}

pub fn with_properties(
method_fn: MethodFn<ServerContext, ConnectionContext, Request, Response>,
tasks: usize,
queue_size: usize,
routing_policy: RoutingPolicy,
) -> Method<ServerContext, ConnectionContext, Request, Response> {
Method { method: method_fn, tasks, queue_size, routing_policy }
}

pub fn default_queue_size() -> usize {
256
}
}

Expand All @@ -59,4 +98,20 @@ where
) -> GrpcServerResult<Response> {
(self.method)(server_ctx, connection_ctx, request).await
}

fn method_fn(&self) -> MethodFn<ServerContext, ConnectionContext, Request, Response> {
self.method.clone()
}

fn tasks(&self) -> usize {
self.tasks
}

fn queue_size(&self) -> usize {
self.queue_size
}

fn routing_policy(&self) -> RoutingPolicy {
self.routing_policy
}
}
14 changes: 11 additions & 3 deletions rpc/grpc/server/src/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ const GRPC_SERVICE: &str = "grpc-service";

pub struct GrpcService {
net_address: NetAddress,
bps: u64,
core_service: Arc<RpcCoreService>,
rpc_max_clients: usize,
shutdown: SingleTrigger,
Expand All @@ -22,11 +23,12 @@ pub struct GrpcService {
impl GrpcService {
pub fn new(
address: NetAddress,
bps: u64,
core_service: Arc<RpcCoreService>,
rpc_max_clients: usize,
counters: Arc<TowerConnectionCounters>,
) -> Self {
Self { net_address: address, core_service, rpc_max_clients, shutdown: Default::default(), counters }
Self { net_address: address, bps, core_service, rpc_max_clients, shutdown: Default::default(), counters }
}
}

Expand All @@ -42,8 +44,14 @@ impl AsyncService for GrpcService {
let shutdown_signal = self.shutdown.listener.clone();

let manager = Manager::new(self.rpc_max_clients);
let grpc_adaptor =
Adaptor::server(self.net_address, manager, self.core_service.clone(), self.core_service.notifier(), self.counters.clone());
let grpc_adaptor = Adaptor::server(
self.net_address,
self.bps,
manager,
self.core_service.clone(),
self.core_service.notifier(),
self.counters.clone(),
);

// Launch the service and wait for a shutdown signal
Box::pin(async move {
Expand Down
2 changes: 1 addition & 1 deletion rpc/grpc/server/src/tests/client_server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -189,7 +189,7 @@ async fn test_client_server_notifications() {

fn create_server(core_service: Arc<RpcCoreMock>) -> Arc<Adaptor> {
let manager = Manager::new(128);
Adaptor::server(get_free_net_address(), manager, core_service.clone(), core_service.core_notifier(), Default::default())
Adaptor::server(get_free_net_address(), 1, manager, core_service.clone(), core_service.core_notifier(), Default::default())
}

async fn create_client(server_address: NetAddress) -> GrpcClient {
Expand Down

0 comments on commit 726043f

Please sign in to comment.