Skip to content

Commit

Permalink
tmp commit
Browse files Browse the repository at this point in the history
  • Loading branch information
biryukovmaxim committed Jun 19, 2024
1 parent 22730f5 commit 3612f3b
Show file tree
Hide file tree
Showing 56 changed files with 1,273 additions and 692 deletions.
56 changes: 35 additions & 21 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -168,6 +168,7 @@ dirs = "5.0.1"
downcast = "0.11.0"
downcast-rs = "1.2.0"
duration-string = "0.3.0"
enum_dispatch = "0.3.13"
enum-primitive-derive = "0.2.2"
event-listener = "2.5.3" # TODO "3.0.1"
evpkdf = "0.2.0"
Expand Down
29 changes: 27 additions & 2 deletions kaspad/src/daemon.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ use kaspa_mining::{
use kaspa_p2p_flows::{flow_context::FlowContext, service::P2pService};

use kaspa_perf_monitor::{builder::Builder as PerfMonitorBuilder, counters::CountersSnapshot};
use kaspa_rpc_core::api::rpc::DummyRpcConnection;
use kaspa_utxoindex::{api::UtxoIndexProxy, UtxoIndex};
use kaspa_wrpc_server::service::{Options as WrpcServerOptions, WebSocketCounters as WrpcServerCounters, WrpcEncoding, WrpcService};

Expand Down Expand Up @@ -171,6 +172,30 @@ impl Runtime {
}
}

use kaspa_rpc_core::api::connection::RpcConnection;
pub enum DaemonConnection {
Wrpc(DummyRpcConnection),
Grpc(DummyRpcConnection),
}

impl RpcConnection for DaemonConnection {
fn id(&self) -> u64 {
match self {
DaemonConnection::Wrpc(conn) => conn.id(),
DaemonConnection::Grpc(conn) => conn.id(),
}
}
}

impl Clone for DaemonConnection {
fn clone(&self) -> Self {
match self {
DaemonConnection::Wrpc(dummy) => Self::Wrpc(DummyRpcConnection {}),
DaemonConnection::Grpc(dummy) => Self::Grpc(DummyRpcConnection {}),
}
}
}

/// Create [`Core`] instance with supplied [`Args`].
/// This function will automatically create a [`Runtime`]
/// instance with the supplied [`Args`] and then
Expand All @@ -182,7 +207,7 @@ impl Runtime {
/// The instance of the [`RpcCoreService`] needs to be released
/// (dropped) before the `Core` is shut down.
///
pub fn create_core(args: Args, fd_total_budget: i32) -> (Arc<Core>, Arc<RpcCoreService>) {
pub fn create_core(args: Args, fd_total_budget: i32) -> (Arc<Core>, Arc<RpcCoreService<DaemonConnection>>) {
let rt = Runtime::from_args(&args);
create_core_with_runtime(&rt, &args, fd_total_budget)
}
Expand All @@ -198,7 +223,7 @@ pub fn create_core(args: Args, fd_total_budget: i32) -> (Arc<Core>, Arc<RpcCoreS
/// The instance of the [`RpcCoreService`] needs to be released
/// (dropped) before the `Core` is shut down.
///
pub fn create_core_with_runtime(runtime: &Runtime, args: &Args, fd_total_budget: i32) -> (Arc<Core>, Arc<RpcCoreService>) {
pub fn create_core_with_runtime(runtime: &Runtime, args: &Args, fd_total_budget: i32) -> (Arc<Core>, Arc<RpcCoreService<DaemonConnection>>) {
let network = args.network();
let mut fd_remaining = fd_total_budget;
let utxo_files_limit = if args.utxoindex {
Expand Down
14 changes: 7 additions & 7 deletions metrics/core/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,14 +22,14 @@ use workflow_log::*;
pub type MetricsSinkFn =
Arc<Box<dyn Send + Sync + Fn(MetricsSnapshot) -> Option<Pin<Box<(dyn Send + 'static + Future<Output = Result<()>>)>>> + 'static>>;

pub struct Metrics {
pub struct Metrics<Rpc: RpcApi> {
task_ctl: DuplexChannel,
rpc: Arc<Mutex<Option<Arc<dyn RpcApi>>>>,
rpc: Arc<Mutex<Option<Arc<Rpc>>>>,
sink: Arc<Mutex<Option<MetricsSinkFn>>>,
data: Arc<Mutex<Option<MetricsData>>>,
}

impl Default for Metrics {
impl<Rpc: RpcApi> Default for Metrics<Rpc> {
fn default() -> Self {
Metrics {
task_ctl: DuplexChannel::oneshot(),
Expand All @@ -40,12 +40,12 @@ impl Default for Metrics {
}
}

impl Metrics {
pub fn bind_rpc(&self, rpc: Option<Arc<dyn RpcApi>>) {
impl<Rpc: RpcApi> Metrics<Rpc> {
pub fn bind_rpc(&self, rpc: Option<Arc<Rpc>>) {
*self.rpc.lock().unwrap() = rpc;
}

fn rpc(&self) -> Option<Arc<dyn RpcApi>> {
fn rpc(&self) -> Option<Arc<Rpc>> {
self.rpc.lock().unwrap().clone()
}

Expand Down Expand Up @@ -114,7 +114,7 @@ impl Metrics {

// --- samplers

async fn sample_metrics(self: &Arc<Self>, rpc: Arc<dyn RpcApi>, data: &mut MetricsData) -> Result<()> {
async fn sample_metrics(self: &Arc<Self>, rpc: Arc<Rpc>, data: &mut MetricsData) -> Result<()> {
let GetMetricsResponse { server_time: _, consensus_metrics, connection_metrics, bandwidth_metrics, process_metrics } =
rpc.get_metrics(true, true, true, true).await?;

Expand Down
1 change: 1 addition & 0 deletions rpc/core/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ borsh.workspace = true
cfg-if.workspace = true
derive_more.workspace = true
downcast.workspace = true
enum_dispatch.workspace = true
faster-hex.workspace = true
hex.workspace = true
js-sys.workspace = true
Expand Down
4 changes: 3 additions & 1 deletion rpc/core/src/api/connection.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
use std::sync::Arc;
use enum_dispatch::enum_dispatch;

#[enum_dispatch]
pub trait RpcConnection: Send + Sync {
fn id(&self) -> u64;
}

pub type DynRpcConnection = Arc<dyn RpcConnection>;
// pub type DynRpcConnection = Arc<dyn RpcConnection>;
Loading

0 comments on commit 3612f3b

Please sign in to comment.