Skip to content

Commit

Permalink
Merge remote-tracking branch 'tiram-origin/parallel_block_submits' in…
Browse files Browse the repository at this point in the history
…to tn11-compiled
  • Loading branch information
coderofstuff committed Dec 14, 2023
2 parents 50d5233 + 39f79c3 commit 50c8c96
Show file tree
Hide file tree
Showing 16 changed files with 190 additions and 39 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 2 additions & 2 deletions kaspad/src/daemon.rs
Original file line number Diff line number Diff line change
Expand Up @@ -441,7 +441,7 @@ do you confirm? (answer y/n or pass --yes to the Kaspad command line to confirm
mining_manager,
flow_context,
index_service.as_ref().map(|x| x.utxoindex().unwrap()),
config,
config.clone(),
core.clone(),
processing_counters,
wrpc_borsh_counters.clone(),
Expand All @@ -451,7 +451,7 @@ do you confirm? (answer y/n or pass --yes to the Kaspad command line to confirm
grpc_tower_counters.clone(),
));
let grpc_service =
Arc::new(GrpcService::new(grpc_server_addr, rpc_core_service.clone(), args.rpc_max_clients, grpc_tower_counters));
Arc::new(GrpcService::new(grpc_server_addr, config, rpc_core_service.clone(), args.rpc_max_clients, grpc_tower_counters));

// Create an async runtime and register the top-level async services
let async_runtime = Arc::new(AsyncRuntime::new(args.async_threads));
Expand Down
8 changes: 2 additions & 6 deletions protocol/flows/src/flow_context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -381,12 +381,8 @@ impl FlowContext {
// Broadcast as soon as the block has been validated and inserted into the DAG
self.hub.broadcast(make_message!(Payload::InvRelayBlock, InvRelayBlockMessage { hash: Some(hash.into()) })).await;

let ctx = self.clone();
let consensus = consensus.clone();
tokio::spawn(async move {
ctx.on_new_block(&consensus, block, virtual_state_task).await;
ctx.log_block_acceptance(hash, BlockSource::Submit);
});
self.on_new_block(consensus, block, virtual_state_task).await;
self.log_block_acceptance(hash, BlockSource::Submit);

Ok(())
}
Expand Down
2 changes: 2 additions & 0 deletions rpc/core/src/model/message.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,13 +33,15 @@ impl SubmitBlockRequest {
pub enum SubmitBlockRejectReason {
BlockInvalid = 1,
IsInIBD = 2,
RouteIsFull = 3,
}
impl SubmitBlockRejectReason {
fn as_str(&self) -> &'static str {
// see app\appmessage\rpc_submit_block.go, line 35
match self {
SubmitBlockRejectReason::BlockInvalid => "Block is invalid",
SubmitBlockRejectReason::IsInIBD => "Node is in IBD",
SubmitBlockRejectReason::RouteIsFull => "Route is full",
}
}
}
Expand Down
1 change: 1 addition & 0 deletions rpc/grpc/core/proto/rpc.proto
Original file line number Diff line number Diff line change
Expand Up @@ -143,6 +143,7 @@ message SubmitBlockResponseMessage{
NONE = 0;
BLOCK_INVALID = 1;
IS_IN_IBD = 2;
ROUTE_IS_FULL = 3;
}
RejectReason rejectReason = 1;
RPCError error = 1000;
Expand Down
2 changes: 2 additions & 0 deletions rpc/grpc/core/src/convert/message.rs
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,7 @@ from!(item: &kaspa_rpc_core::SubmitBlockReport, RejectReason, {
kaspa_rpc_core::SubmitBlockReport::Success => RejectReason::None,
kaspa_rpc_core::SubmitBlockReport::Reject(kaspa_rpc_core::SubmitBlockRejectReason::BlockInvalid) => RejectReason::BlockInvalid,
kaspa_rpc_core::SubmitBlockReport::Reject(kaspa_rpc_core::SubmitBlockRejectReason::IsInIBD) => RejectReason::IsInIbd,
kaspa_rpc_core::SubmitBlockReport::Reject(kaspa_rpc_core::SubmitBlockRejectReason::RouteIsFull) => RejectReason::RouteIsFull,
}
});

Expand Down Expand Up @@ -451,6 +452,7 @@ from!(item: RejectReason, kaspa_rpc_core::SubmitBlockReport, {
RejectReason::None => kaspa_rpc_core::SubmitBlockReport::Success,
RejectReason::BlockInvalid => kaspa_rpc_core::SubmitBlockReport::Reject(kaspa_rpc_core::SubmitBlockRejectReason::BlockInvalid),
RejectReason::IsInIbd => kaspa_rpc_core::SubmitBlockReport::Reject(kaspa_rpc_core::SubmitBlockRejectReason::IsInIBD),
RejectReason::RouteIsFull => kaspa_rpc_core::SubmitBlockReport::Reject(kaspa_rpc_core::SubmitBlockRejectReason::RouteIsFull),
}
});

Expand Down
3 changes: 2 additions & 1 deletion rpc/grpc/server/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -8,14 +8,15 @@ include.workspace = true
license.workspace = true

[dependencies]
kaspa-consensus-core.workspace = true
kaspa-core.workspace = true
kaspa-grpc-core.workspace = true
kaspa-notify.workspace = true
kaspa-rpc-core.workspace = true
kaspa-rpc-macros.workspace = true
kaspa-rpc-service.workspace = true
kaspa-utils.workspace = true
kaspa-utils-tower.workspace = true
kaspa-utils.workspace = true

async-channel.workspace = true
async-stream.workspace = true
Expand Down
3 changes: 2 additions & 1 deletion rpc/grpc/server/src/adaptor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,13 +33,14 @@ impl Adaptor {

pub fn server(
serve_address: NetAddress,
network_bps: u64,
manager: Manager,
core_service: DynRpcService,
core_notifier: Arc<Notifier<Notification, ChannelConnection>>,
counters: Arc<TowerConnectionCounters>,
) -> Arc<Self> {
let (manager_sender, manager_receiver) = mpsc_channel(Self::manager_channel_size());
let connection_handler = ConnectionHandler::new(manager_sender, core_service.clone(), core_notifier, counters);
let connection_handler = ConnectionHandler::new(network_bps, manager_sender, core_service.clone(), core_notifier, counters);
let server_termination = connection_handler.serve(serve_address);
let adaptor = Arc::new(Adaptor::new(Some(server_termination), connection_handler, manager, serve_address));
adaptor.manager.clone().start_event_loop(manager_receiver);
Expand Down
102 changes: 83 additions & 19 deletions rpc/grpc/server/src/connection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,10 @@ use crate::{
connection_handler::ServerContext,
error::{GrpcServerError, GrpcServerResult},
manager::ManagerEvent,
request_handler::{factory::Factory, interface::Interface},
request_handler::{factory::Factory, interface::Interface, method::RoutingPolicy},
};
use async_channel::{bounded, Receiver as MpmcReceiver, Sender as MpmcSender, TrySendError as MpmcTrySendError};
use itertools::Itertools;
use kaspa_core::{debug, info, trace, warn};
use kaspa_grpc_core::{
ops::KaspadPayloadOps,
Expand All @@ -12,32 +14,30 @@ use kaspa_grpc_core::{
use kaspa_notify::{
connection::Connection as ConnectionT, error::Error as NotificationError, listener::ListenerId, notifier::Notifier,
};
use kaspa_rpc_core::Notification;
use kaspa_rpc_core::{Notification, SubmitBlockRejectReason, SubmitBlockReport, SubmitBlockResponse};
use parking_lot::Mutex;
use std::{
collections::{hash_map::Entry, HashMap},
fmt::Display,
net::SocketAddr,
ops::Deref,
sync::{
atomic::{AtomicBool, Ordering},
Arc,
},
};
use tokio::sync::mpsc::{channel as mpsc_channel, Receiver as MpscReceiver, Sender as MpscSender};
use tokio::sync::mpsc::Sender as MpscSender;
use tokio::sync::oneshot::{channel as oneshot_channel, Sender as OneshotSender};
use tokio::{select, sync::mpsc::error::TrySendError};
use tonic::Streaming;
use uuid::Uuid;

pub type IncomingRoute = MpscReceiver<KaspadRequest>;
pub type IncomingRoute = MpmcReceiver<KaspadRequest>;
pub type GrpcNotifier = Notifier<Notification, Connection>;
pub type GrpcSender = MpscSender<KaspadResponse>;
pub type StatusResult<T> = Result<T, tonic::Status>;
pub type ConnectionId = Uuid;

type RequestSender = MpscSender<KaspadRequest>;
type RoutingMap = HashMap<KaspadPayloadOps, RequestSender>;

#[derive(Debug, Default)]
struct InnerMutableState {
/// Used on connection close to signal the connection receive loop to exit
Expand Down Expand Up @@ -85,6 +85,30 @@ impl Drop for Inner {
}
}

type RequestSender = MpmcSender<KaspadRequest>;

#[derive(Clone)]
struct Route {
sender: RequestSender,
policy: RoutingPolicy,
}

impl Route {
fn new(sender: RequestSender, policy: RoutingPolicy) -> Self {
Self { sender, policy }
}
}

impl Deref for Route {
type Target = RequestSender;

fn deref(&self) -> &Self::Target {
&self.sender
}
}

type RoutingMap = HashMap<KaspadPayloadOps, Route>;

struct Router {
/// Routing map for mapping messages to RPC op handlers
routing_map: RoutingMap,
Expand All @@ -101,15 +125,39 @@ impl Router {
Self { routing_map: Default::default(), server_context, interface }
}

fn subscribe(&mut self, connection: &Connection, rpc_op: KaspadPayloadOps) -> RequestSender {
fn subscribe(&mut self, connection: &Connection, rpc_op: KaspadPayloadOps) -> Route {
match self.routing_map.entry(rpc_op) {
Entry::Vacant(entry) => {
let (sender, receiver) = mpsc_channel(Connection::request_channel_size());
let handler = Factory::new_handler(rpc_op, receiver, self.server_context.clone(), &self.interface, connection.clone());
handler.launch();
entry.insert(sender.clone());
trace!("GRPC, Connection::subscribe - {:?} route is registered, client:{:?}", rpc_op, connection.identity());
sender
let method = self.interface.get_method(&rpc_op);
let (sender, receiver) = bounded(method.queue_size());
let handlers = (0..method.tasks())
.map(|_| {
Factory::new_handler(
rpc_op,
receiver.clone(),
self.server_context.clone(),
&self.interface,
connection.clone(),
)
})
.collect_vec();
handlers.into_iter().for_each(|x| x.launch());
let route = Route::new(sender, method.routing_policy());
entry.insert(route.clone());
match method.tasks() {
1 => {
trace!("GRPC, Connection::subscribe - {:?} route is registered, client:{:?}", rpc_op, connection.identity());
}
n => {
trace!(
"GRPC, Connection::subscribe - {:?} route is registered with {} workers, client:{:?}",
rpc_op,
n,
connection.identity()
);
}
}
route
}
Entry::Occupied(entry) => entry.get().clone(),
}
Expand All @@ -121,11 +169,27 @@ impl Router {
return Err(GrpcServerError::InvalidRequestPayload);
}
let rpc_op = request.payload.as_ref().unwrap().into();
let sender = self.routing_map.get(&rpc_op).cloned();
let sender = sender.unwrap_or_else(|| self.subscribe(connection, rpc_op));
match sender.send(request).await {
Ok(_) => Ok(()),
Err(_) => Err(GrpcServerError::ClosedHandler(rpc_op)),
let route = self.routing_map.get(&rpc_op).cloned();
let route = route.unwrap_or_else(|| self.subscribe(connection, rpc_op));
match route.policy {
RoutingPolicy::Enqueue => match route.send(request).await {
Ok(_) => Ok(()),
Err(_) => Err(GrpcServerError::ClosedHandler(rpc_op)),
},
RoutingPolicy::DropIfFull => match route.try_send(request) {
Ok(_) => Ok(()),
Err(MpmcTrySendError::Full(_)) => match rpc_op {
KaspadPayloadOps::SubmitBlock => {
let response = SubmitBlockResponse { report: SubmitBlockReport::Reject(SubmitBlockRejectReason::RouteIsFull) };
connection.enqueue(Ok(response).into()).await?;
Ok(())
}
_ => {
panic!("RPC op {:?} is lacking support for the DropIfFull routing policy", rpc_op);
}
},
Err(MpmcTrySendError::Closed(_)) => Err(GrpcServerError::ClosedHandler(rpc_op)),
},
}
}

Expand Down
3 changes: 2 additions & 1 deletion rpc/grpc/server/src/connection_handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,7 @@ const GRPC_SERVER: &str = "grpc-server";

impl ConnectionHandler {
pub(crate) fn new(
network_bps: u64,
manager_sender: MpscSender<ManagerEvent>,
core_service: DynRpcService,
core_notifier: Arc<Notifier<Notification, ChannelConnection>>,
Expand All @@ -93,7 +94,7 @@ impl ConnectionHandler {
let notifier: Arc<Notifier<Notification, Connection>> =
Arc::new(Notifier::new(GRPC_SERVER, core_events, vec![collector], vec![subscriber], 10));
let server_context = ServerContext::new(core_service, notifier);
let interface = Arc::new(Factory::new_interface(server_context.clone()));
let interface = Arc::new(Factory::new_interface(server_context.clone(), network_bps));
let running = Default::default();

Self { manager_sender, server_context, interface, running, counters }
Expand Down
13 changes: 11 additions & 2 deletions rpc/grpc/server/src/request_handler/factory.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,9 @@
use super::{handler::RequestHandler, handler_trait::Handler, interface::Interface, method::Method};
use super::{
handler::RequestHandler,
handler_trait::Handler,
interface::Interface,
method::{Method, RoutingPolicy},
};
use crate::{
connection::{Connection, IncomingRoute},
connection_handler::ServerContext,
Expand All @@ -22,7 +27,7 @@ impl Factory {
Box::new(RequestHandler::new(rpc_op, incoming_route, server_context, interface, connection))
}

pub fn new_interface(server_ctx: ServerContext) -> Interface {
pub fn new_interface(server_ctx: ServerContext, network_bps: u64) -> Interface {
// The array as last argument in the macro call below must exactly match the full set of
// KaspadPayloadOps variants.
let mut interface = build_grpc_server_interface!(
Expand Down Expand Up @@ -122,6 +127,10 @@ impl Factory {
});
interface.replace_method(KaspadPayloadOps::NotifyFinalityConflict, method);

// Methods with special properties
let bps = network_bps as usize;
interface.replace_method_properties(KaspadPayloadOps::SubmitBlock, bps, 10.max(bps * 2), RoutingPolicy::DropIfFull);

interface
}
}
2 changes: 1 addition & 1 deletion rpc/grpc/server/src/request_handler/handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ impl RequestHandler {
impl Handler for RequestHandler {
async fn start(&mut self) {
debug!("GRPC, Starting request handler {:?} for client {}", self.rpc_op, self.connection);
while let Some(request) = self.incoming_route.recv().await {
while let Ok(request) = self.incoming_route.recv().await {
let response = self.handle_request(request).await;
match response {
Ok(response) => {
Expand Down
11 changes: 10 additions & 1 deletion rpc/grpc/server/src/request_handler/interface.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use super::method::{Method, MethodTrait};
use super::method::{Method, MethodTrait, RoutingPolicy};
use crate::{
connection::Connection,
connection_handler::ServerContext,
Expand Down Expand Up @@ -55,6 +55,15 @@ impl Interface {
let _ = self.methods.insert(op, method);
}

pub fn replace_method_properties(&mut self, op: KaspadPayloadOps, tasks: usize, queue_size: usize, routing_policy: RoutingPolicy) {
self.methods.entry(op).and_modify(|x| {
let method: Method<ServerContext, Connection, KaspadRequest, KaspadResponse> =
Method::with_properties(x.method_fn(), tasks, queue_size, routing_policy);
let method: Arc<dyn MethodTrait<ServerContext, Connection, KaspadRequest, KaspadResponse>> = Arc::new(method);
*x = method;
});
}

pub async fn call(
&self,
op: &KaspadPayloadOps,
Expand Down
Loading

0 comments on commit 50c8c96

Please sign in to comment.