diff --git a/Cargo.lock b/Cargo.lock index 78d9494c4..acb9df072 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1480,6 +1480,18 @@ dependencies = [ "syn 1.0.109", ] +[[package]] +name = "enum_dispatch" +version = "0.3.13" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "aa18ce2bc66555b3218614519ac839ddb759a7d6720732f979ef8d13be147ecd" +dependencies = [ + "once_cell", + "proc-macro2", + "quote", + "syn 2.0.60", +] + [[package]] name = "env_logger" version = "0.9.3" @@ -3201,6 +3213,7 @@ dependencies = [ "cfg-if 1.0.0", "derive_more", "downcast", + "enum_dispatch", "faster-hex 0.6.1", "hex", "js-sys", @@ -3694,6 +3707,7 @@ version = "0.14.1" dependencies = [ "async-trait", "borsh", + "enum_dispatch", "futures", "kaspa-consensus-core", "kaspa-core", @@ -6835,7 +6849,7 @@ dependencies = [ [[package]] name = "workflow-chrome" -version = "0.13.0" +version = "0.13.2" dependencies = [ "cfg-if 1.0.0", "chrome-sys", @@ -6848,7 +6862,7 @@ dependencies = [ [[package]] name = "workflow-core" -version = "0.13.0" +version = "0.13.2" dependencies = [ "async-channel 2.2.1", "async-std", @@ -6878,7 +6892,7 @@ dependencies = [ [[package]] name = "workflow-core-macros" -version = "0.13.0" +version = "0.13.2" dependencies = [ "convert_case 0.6.0", "parse-variants", @@ -6893,7 +6907,7 @@ dependencies = [ [[package]] name = "workflow-dom" -version = "0.13.0" +version = "0.13.2" dependencies = [ "futures", "js-sys", @@ -6909,7 +6923,7 @@ dependencies = [ [[package]] name = "workflow-http" -version = "0.13.0" +version = "0.13.2" dependencies = [ "cfg-if 1.0.0", "reqwest", @@ -6923,7 +6937,7 @@ dependencies = [ [[package]] name = "workflow-log" -version = "0.13.0" +version = "0.13.2" dependencies = [ "cfg-if 1.0.0", "console", @@ -6937,7 +6951,7 @@ dependencies = [ [[package]] name = "workflow-macro-tools" -version = "0.13.0" +version = "0.13.2" dependencies = [ "convert_case 0.6.0", "parse-variants", @@ -6948,7 +6962,7 @@ dependencies = [ [[package]] name = "workflow-node" -version = "0.13.0" +version = "0.13.2" dependencies = [ "borsh", "futures", @@ -6967,7 +6981,7 @@ dependencies = [ [[package]] name = "workflow-nw" -version = "0.13.0" +version = "0.13.2" dependencies = [ "ahash", "async-trait", @@ -6989,7 +7003,7 @@ dependencies = [ [[package]] name = "workflow-panic-hook" -version = "0.13.0" +version = "0.13.2" dependencies = [ "cfg-if 1.0.0", "wasm-bindgen", @@ -7012,7 +7026,7 @@ dependencies = [ [[package]] name = "workflow-rpc" -version = "0.13.0" +version = "0.13.2" dependencies = [ "ahash", "async-std", @@ -7040,7 +7054,7 @@ dependencies = [ [[package]] name = "workflow-rpc-macros" -version = "0.13.0" +version = "0.13.2" dependencies = [ "parse-variants", "proc-macro-error", @@ -7051,7 +7065,7 @@ dependencies = [ [[package]] name = "workflow-serializer" -version = "0.13.0" +version = "0.13.2" dependencies = [ "ahash", "borsh", @@ -7060,7 +7074,7 @@ dependencies = [ [[package]] name = "workflow-store" -version = "0.13.0" +version = "0.13.2" dependencies = [ "async-std", "base64 0.21.7", @@ -7086,7 +7100,7 @@ dependencies = [ [[package]] name = "workflow-task" -version = "0.13.0" +version = "0.13.2" dependencies = [ "futures", "thiserror", @@ -7096,7 +7110,7 @@ dependencies = [ [[package]] name = "workflow-task-macros" -version = "0.13.0" +version = "0.13.2" dependencies = [ "convert_case 0.6.0", "parse-variants", @@ -7110,7 +7124,7 @@ dependencies = [ [[package]] name = "workflow-terminal" -version = "0.13.0" +version = "0.13.2" dependencies = [ "async-std", "async-trait", @@ -7137,7 +7151,7 @@ dependencies = [ [[package]] name = "workflow-terminal-macros" -version = "0.13.0" +version = "0.13.2" dependencies = [ "convert_case 0.6.0", "parse-variants", @@ -7151,7 +7165,7 @@ dependencies = [ [[package]] name = "workflow-wasm" -version = "0.13.0" +version = "0.13.2" dependencies = [ "cfg-if 1.0.0", "faster-hex 0.9.0", @@ -7170,7 +7184,7 @@ dependencies = [ [[package]] name = "workflow-wasm-macros" -version = "0.13.0" +version = "0.13.2" dependencies = [ "js-sys", "proc-macro-error", @@ -7182,7 +7196,7 @@ dependencies = [ [[package]] name = "workflow-websocket" -version = "0.13.0" +version = "0.13.2" dependencies = [ "ahash", "async-channel 2.2.1", diff --git a/Cargo.toml b/Cargo.toml index dc0d81acc..d0da86413 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -168,6 +168,7 @@ dirs = "5.0.1" downcast = "0.11.0" downcast-rs = "1.2.0" duration-string = "0.3.0" +enum_dispatch = "0.3.13" enum-primitive-derive = "0.2.2" event-listener = "2.5.3" # TODO "3.0.1" evpkdf = "0.2.0" diff --git a/kaspad/src/daemon.rs b/kaspad/src/daemon.rs index 0950ad8fa..ddae4bcd5 100644 --- a/kaspad/src/daemon.rs +++ b/kaspad/src/daemon.rs @@ -32,6 +32,7 @@ use kaspa_mining::{ use kaspa_p2p_flows::{flow_context::FlowContext, service::P2pService}; use kaspa_perf_monitor::{builder::Builder as PerfMonitorBuilder, counters::CountersSnapshot}; +use kaspa_rpc_core::api::rpc::DummyRpcConnection; use kaspa_utxoindex::{api::UtxoIndexProxy, UtxoIndex}; use kaspa_wrpc_server::service::{Options as WrpcServerOptions, WebSocketCounters as WrpcServerCounters, WrpcEncoding, WrpcService}; @@ -171,6 +172,30 @@ impl Runtime { } } +use kaspa_rpc_core::api::connection::RpcConnection; +pub enum DaemonConnection { + Wrpc(DummyRpcConnection), + Grpc(DummyRpcConnection), +} + +impl RpcConnection for DaemonConnection { + fn id(&self) -> u64 { + match self { + DaemonConnection::Wrpc(conn) => conn.id(), + DaemonConnection::Grpc(conn) => conn.id(), + } + } +} + +impl Clone for DaemonConnection { + fn clone(&self) -> Self { + match self { + DaemonConnection::Wrpc(dummy) => Self::Wrpc(DummyRpcConnection {}), + DaemonConnection::Grpc(dummy) => Self::Grpc(DummyRpcConnection {}), + } + } +} + /// Create [`Core`] instance with supplied [`Args`]. /// This function will automatically create a [`Runtime`] /// instance with the supplied [`Args`] and then @@ -182,7 +207,7 @@ impl Runtime { /// The instance of the [`RpcCoreService`] needs to be released /// (dropped) before the `Core` is shut down. /// -pub fn create_core(args: Args, fd_total_budget: i32) -> (Arc, Arc) { +pub fn create_core(args: Args, fd_total_budget: i32) -> (Arc, Arc>) { let rt = Runtime::from_args(&args); create_core_with_runtime(&rt, &args, fd_total_budget) } @@ -198,7 +223,7 @@ pub fn create_core(args: Args, fd_total_budget: i32) -> (Arc, Arc (Arc, Arc) { +pub fn create_core_with_runtime(runtime: &Runtime, args: &Args, fd_total_budget: i32) -> (Arc, Arc>) { let network = args.network(); let mut fd_remaining = fd_total_budget; let utxo_files_limit = if args.utxoindex { diff --git a/metrics/core/src/lib.rs b/metrics/core/src/lib.rs index 4a3ca2a0f..fda4771d3 100644 --- a/metrics/core/src/lib.rs +++ b/metrics/core/src/lib.rs @@ -22,14 +22,14 @@ use workflow_log::*; pub type MetricsSinkFn = Arc Option>)>>> + 'static>>; -pub struct Metrics { +pub struct Metrics { task_ctl: DuplexChannel, - rpc: Arc>>>, + rpc: Arc>>>, sink: Arc>>, data: Arc>>, } -impl Default for Metrics { +impl Default for Metrics { fn default() -> Self { Metrics { task_ctl: DuplexChannel::oneshot(), @@ -40,12 +40,12 @@ impl Default for Metrics { } } -impl Metrics { - pub fn bind_rpc(&self, rpc: Option>) { +impl Metrics { + pub fn bind_rpc(&self, rpc: Option>) { *self.rpc.lock().unwrap() = rpc; } - fn rpc(&self) -> Option> { + fn rpc(&self) -> Option> { self.rpc.lock().unwrap().clone() } @@ -114,7 +114,7 @@ impl Metrics { // --- samplers - async fn sample_metrics(self: &Arc, rpc: Arc, data: &mut MetricsData) -> Result<()> { + async fn sample_metrics(self: &Arc, rpc: Arc, data: &mut MetricsData) -> Result<()> { let GetMetricsResponse { server_time: _, consensus_metrics, connection_metrics, bandwidth_metrics, process_metrics } = rpc.get_metrics(true, true, true, true).await?; diff --git a/rpc/core/Cargo.toml b/rpc/core/Cargo.toml index ef010a020..3115cf5b0 100644 --- a/rpc/core/Cargo.toml +++ b/rpc/core/Cargo.toml @@ -37,6 +37,7 @@ borsh.workspace = true cfg-if.workspace = true derive_more.workspace = true downcast.workspace = true +enum_dispatch.workspace = true faster-hex.workspace = true hex.workspace = true js-sys.workspace = true diff --git a/rpc/core/src/api/connection.rs b/rpc/core/src/api/connection.rs index 5b4254288..79f1334b8 100644 --- a/rpc/core/src/api/connection.rs +++ b/rpc/core/src/api/connection.rs @@ -1,7 +1,9 @@ use std::sync::Arc; +use enum_dispatch::enum_dispatch; +#[enum_dispatch] pub trait RpcConnection: Send + Sync { fn id(&self) -> u64; } -pub type DynRpcConnection = Arc; +// pub type DynRpcConnection = Arc; diff --git a/rpc/core/src/api/rpc.rs b/rpc/core/src/api/rpc.rs index 0096afcb5..437f69d80 100644 --- a/rpc/core/src/api/rpc.rs +++ b/rpc/core/src/api/rpc.rs @@ -4,30 +4,37 @@ //! All data provided by the RCP server can be trusted by the client //! No data submitted by the client to the server can be trusted -use crate::api::connection::DynRpcConnection; +use crate::api::connection::{RpcConnection}; use crate::{model::*, notify::connection::ChannelConnection, RpcResult}; use async_trait::async_trait; use downcast::{downcast_sync, AnySync}; +use enum_dispatch::enum_dispatch; use kaspa_notify::{listener::ListenerId, scope::Scope, subscription::Command}; +use std::future::Future; +use std::ops::Deref; use std::sync::Arc; pub const MAX_SAFE_WINDOW_SIZE: u32 = 10_000; -/// Client RPC Api -/// -/// The [`RpcApi`] trait defines RPC calls taking a request message as unique parameter. -/// -/// For each RPC call a matching readily implemented function taking detailed parameters is also provided. -#[async_trait] -pub trait RpcApi: Sync + Send + AnySync { - /// +#[derive(Clone, Default, Debug)] +pub struct DummyRpcConnection; + +impl RpcConnection for DummyRpcConnection { + fn id(&self) -> u64 { + panic!("don't call me"); + } +} + +impl RpcApi for Arc { + type RpcConnection = T::RpcConnection; + async fn ping(&self) -> RpcResult<()> { - self.ping_call(None, PingRequest {}).await?; - Ok(()) + self.deref().ping().await } - async fn ping_call(&self, connection: Option<&DynRpcConnection>, request: PingRequest) -> RpcResult; - // --- + async fn ping_call(&self, connection: Option, request: PingRequest) -> RpcResult { + self.deref().ping_call(connection, request).await + } async fn get_metrics( &self, @@ -36,359 +43,857 @@ pub trait RpcApi: Sync + Send + AnySync { bandwidth_metrics: bool, consensus_metrics: bool, ) -> RpcResult { - self.get_metrics_call(None, GetMetricsRequest { process_metrics, connection_metrics, bandwidth_metrics, consensus_metrics }) - .await + self.deref().get_metrics(process_metrics, connection_metrics, bandwidth_metrics, consensus_metrics).await } + async fn get_metrics_call( &self, - connection: Option<&DynRpcConnection>, + connection: Option, request: GetMetricsRequest, - ) -> RpcResult; + ) -> RpcResult { + self.deref().get_metrics_call(connection, request).await + } - // get_info alternative that carries only version, network_id (full), is_synced, virtual_daa_score - // these are the only variables needed to negotiate a wRPC connection (besides the wRPC handshake) async fn get_server_info(&self) -> RpcResult { - self.get_server_info_call(None, GetServerInfoRequest {}).await + self.deref().get_server_info().await } + async fn get_server_info_call( &self, - connection: Option<&DynRpcConnection>, + connection: Option, request: GetServerInfoRequest, - ) -> RpcResult; + ) -> RpcResult { + self.deref().get_server_info_call(connection, request).await + } - // Get current sync status of the node (should be converted to a notification subscription) async fn get_sync_status(&self) -> RpcResult { - Ok(self.get_sync_status_call(None, GetSyncStatusRequest {}).await?.is_synced) + self.deref().get_sync_status().await } + async fn get_sync_status_call( &self, - connection: Option<&DynRpcConnection>, + connection: Option, request: GetSyncStatusRequest, - ) -> RpcResult; - - // --- + ) -> RpcResult { + self.deref().get_sync_status_call(connection, request).await + } - /// Requests the network the node is currently running against. async fn get_current_network(&self) -> RpcResult { - Ok(self.get_current_network_call(None, GetCurrentNetworkRequest {}).await?.network) + self.deref().get_current_network().await } + async fn get_current_network_call( &self, - connection: Option<&DynRpcConnection>, + connection: Option, request: GetCurrentNetworkRequest, - ) -> RpcResult; + ) -> RpcResult { + self.deref().get_current_network_call(connection, request).await + } - /// Submit a block into the DAG. - /// - /// Blocks are generally expected to have been generated using the get_block_template call. async fn submit_block(&self, block: RpcBlock, allow_non_daa_blocks: bool) -> RpcResult { - self.submit_block_call(None, SubmitBlockRequest::new(block, allow_non_daa_blocks)).await + self.deref().submit_block(block, allow_non_daa_blocks).await } + async fn submit_block_call( &self, - connection: Option<&DynRpcConnection>, + connection: Option, request: SubmitBlockRequest, - ) -> RpcResult; + ) -> RpcResult { + self.deref().submit_block_call(connection, request).await + } - /// Request a current block template. - /// - /// Callers are expected to solve the block template and submit it using the submit_block call. async fn get_block_template(&self, pay_address: RpcAddress, extra_data: RpcExtraData) -> RpcResult { - self.get_block_template_call(None, GetBlockTemplateRequest::new(pay_address, extra_data)).await + self.deref().get_block_template(pay_address, extra_data).await } + async fn get_block_template_call( &self, - connection: Option<&DynRpcConnection>, + connection: Option, request: GetBlockTemplateRequest, - ) -> RpcResult; + ) -> RpcResult { + self.deref().get_block_template_call(connection, request).await + } - /// Requests the list of known kaspad addresses in the current network (mainnet, testnet, etc.) async fn get_peer_addresses(&self) -> RpcResult { - self.get_peer_addresses_call(None, GetPeerAddressesRequest {}).await + self.deref().get_peer_addresses().await } + async fn get_peer_addresses_call( &self, - connection: Option<&DynRpcConnection>, + connection: Option, request: GetPeerAddressesRequest, - ) -> RpcResult; + ) -> RpcResult { + self.deref().get_peer_addresses_call(connection, request).await + } - /// requests the hash of the current virtual's selected parent. async fn get_sink(&self) -> RpcResult { - self.get_sink_call(None, GetSinkRequest {}).await + self.deref().get_sink().await + } + + async fn get_sink_call(&self, connection: Option, request: GetSinkRequest) -> RpcResult { + self.deref().get_sink_call(connection, request).await } - async fn get_sink_call(&self, connection: Option<&DynRpcConnection>, request: GetSinkRequest) -> RpcResult; - /// Requests information about a specific transaction in the mempool. async fn get_mempool_entry( &self, transaction_id: RpcTransactionId, include_orphan_pool: bool, filter_transaction_pool: bool, ) -> RpcResult { - Ok(self - .get_mempool_entry_call(None, GetMempoolEntryRequest::new(transaction_id, include_orphan_pool, filter_transaction_pool)) - .await? - .mempool_entry) + self.deref().get_mempool_entry(transaction_id, include_orphan_pool, filter_transaction_pool).await } + async fn get_mempool_entry_call( &self, - connection: Option<&DynRpcConnection>, + connection: Option, request: GetMempoolEntryRequest, - ) -> RpcResult; + ) -> RpcResult { + self.deref().get_mempool_entry_call(connection, request).await + } - /// Requests information about all the transactions currently in the mempool. async fn get_mempool_entries(&self, include_orphan_pool: bool, filter_transaction_pool: bool) -> RpcResult> { - Ok(self - .get_mempool_entries_call(None, GetMempoolEntriesRequest::new(include_orphan_pool, filter_transaction_pool)) - .await? - .mempool_entries) + self.deref().get_mempool_entries(include_orphan_pool, filter_transaction_pool).await } + async fn get_mempool_entries_call( &self, - connection: Option<&DynRpcConnection>, + connection: Option, request: GetMempoolEntriesRequest, - ) -> RpcResult; + ) -> RpcResult { + self.deref().get_mempool_entries_call(connection, request).await + } - /// requests information about all the p2p peers currently connected to this node. async fn get_connected_peer_info(&self) -> RpcResult { - self.get_connected_peer_info_call(None, GetConnectedPeerInfoRequest {}).await + self.deref().get_connected_peer_info().await } + async fn get_connected_peer_info_call( &self, - connection: Option<&DynRpcConnection>, + connection: Option, request: GetConnectedPeerInfoRequest, - ) -> RpcResult; + ) -> RpcResult { + self.deref().get_connected_peer_info_call(connection, request).await + } - /// Adds a peer to the node's outgoing connection list. - /// - /// This will, in most cases, result in the node connecting to said peer. async fn add_peer(&self, peer_address: RpcContextualPeerAddress, is_permanent: bool) -> RpcResult<()> { - self.add_peer_call(None, AddPeerRequest::new(peer_address, is_permanent)).await?; - Ok(()) + self.deref().add_peer(peer_address, is_permanent).await + } + + async fn add_peer_call(&self, connection: Option, request: AddPeerRequest) -> RpcResult { + self.deref().add_peer_call(connection, request).await } - async fn add_peer_call(&self, connection: Option<&DynRpcConnection>, request: AddPeerRequest) -> RpcResult; - /// Submits a transaction to the mempool. async fn submit_transaction(&self, transaction: RpcTransaction, allow_orphan: bool) -> RpcResult { - Ok(self.submit_transaction_call(None, SubmitTransactionRequest { transaction, allow_orphan }).await?.transaction_id) + self.deref().submit_transaction(transaction, allow_orphan).await } + async fn submit_transaction_call( &self, - connection: Option<&DynRpcConnection>, + connection: Option, request: SubmitTransactionRequest, - ) -> RpcResult; + ) -> RpcResult { + self.deref().submit_transaction_call(connection, request).await + } - /// Requests information about a specific block. async fn get_block(&self, hash: RpcHash, include_transactions: bool) -> RpcResult { - Ok(self.get_block_call(None, GetBlockRequest::new(hash, include_transactions)).await?.block) + self.deref().get_block(hash, include_transactions).await + } + + async fn get_block_call(&self, connection: Option, request: GetBlockRequest) -> RpcResult { + self.deref().get_block_call(connection, request).await } - async fn get_block_call(&self, connection: Option<&DynRpcConnection>, request: GetBlockRequest) -> RpcResult; - /// Requests information about a specific subnetwork. async fn get_subnetwork(&self, subnetwork_id: RpcSubnetworkId) -> RpcResult { - self.get_subnetwork_call(None, GetSubnetworkRequest::new(subnetwork_id)).await + self.deref().get_subnetwork(subnetwork_id).await } + async fn get_subnetwork_call( &self, - connection: Option<&DynRpcConnection>, + connection: Option, request: GetSubnetworkRequest, - ) -> RpcResult; + ) -> RpcResult { + self.deref().get_subnetwork_call(connection, request).await + } - /// Requests the virtual selected parent chain from some `start_hash` to this node's current virtual. async fn get_virtual_chain_from_block( &self, start_hash: RpcHash, include_accepted_transaction_ids: bool, ) -> RpcResult { - self.get_virtual_chain_from_block_call( - None, - GetVirtualChainFromBlockRequest::new(start_hash, include_accepted_transaction_ids), - ) - .await + self.deref().get_virtual_chain_from_block(start_hash, include_accepted_transaction_ids).await } + async fn get_virtual_chain_from_block_call( &self, - connection: Option<&DynRpcConnection>, + connection: Option, request: GetVirtualChainFromBlockRequest, - ) -> RpcResult; + ) -> RpcResult { + self.deref().get_virtual_chain_from_block_call(connection, request).await + } - /// Requests blocks between a certain block `low_hash` up to this node's current virtual. async fn get_blocks( &self, low_hash: Option, include_blocks: bool, include_transactions: bool, ) -> RpcResult { - self.get_blocks_call(None, GetBlocksRequest::new(low_hash, include_blocks, include_transactions)).await + self.deref().get_blocks(low_hash, include_blocks, include_transactions).await + } + + async fn get_blocks_call( + &self, + connection: Option, + request: GetBlocksRequest, + ) -> RpcResult { + self.deref().get_blocks_call(connection, request).await } - async fn get_blocks_call(&self, connection: Option<&DynRpcConnection>, request: GetBlocksRequest) -> RpcResult; - /// Requests the current number of blocks in this node. - /// - /// Note that this number may decrease as pruning occurs. async fn get_block_count(&self) -> RpcResult { - self.get_block_count_call(None, GetBlockCountRequest {}).await + self.deref().get_block_count().await } + async fn get_block_count_call( &self, - connection: Option<&DynRpcConnection>, + connection: Option, request: GetBlockCountRequest, - ) -> RpcResult; + ) -> RpcResult { + self.deref().get_block_count_call(connection, request).await + } - /// Requests general information about the current state of this node's DAG. async fn get_block_dag_info(&self) -> RpcResult { - self.get_block_dag_info_call(None, GetBlockDagInfoRequest {}).await + self.deref().get_block_dag_info().await } + async fn get_block_dag_info_call( &self, - connection: Option<&DynRpcConnection>, + connection: Option, request: GetBlockDagInfoRequest, - ) -> RpcResult; + ) -> RpcResult { + self.deref().get_block_dag_info_call(connection, request).await + } - /// async fn resolve_finality_conflict(&self, finality_block_hash: RpcHash) -> RpcResult<()> { - self.resolve_finality_conflict_call(None, ResolveFinalityConflictRequest::new(finality_block_hash)).await?; - Ok(()) + self.deref().resolve_finality_conflict(finality_block_hash).await } + async fn resolve_finality_conflict_call( &self, - connection: Option<&DynRpcConnection>, + connection: Option, request: ResolveFinalityConflictRequest, - ) -> RpcResult; + ) -> RpcResult { + self.deref().resolve_finality_conflict_call(connection, request).await + } - /// Shuts down this node. async fn shutdown(&self) -> RpcResult<()> { - self.shutdown_call(None, ShutdownRequest {}).await?; - Ok(()) + self.deref().shutdown().await + } + + async fn shutdown_call(&self, connection: Option, request: ShutdownRequest) -> RpcResult { + self.deref().shutdown_call(connection, request).await } - async fn shutdown_call(&self, connection: Option<&DynRpcConnection>, request: ShutdownRequest) -> RpcResult; - /// Requests headers between the given `start_hash` and the current virtual, up to the given limit. async fn get_headers(&self, start_hash: RpcHash, limit: u64, is_ascending: bool) -> RpcResult> { - Ok(self.get_headers_call(None, GetHeadersRequest::new(start_hash, limit, is_ascending)).await?.headers) + self.deref().get_headers(start_hash, limit, is_ascending).await } + async fn get_headers_call( &self, - connection: Option<&DynRpcConnection>, + connection: Option, request: GetHeadersRequest, - ) -> RpcResult; + ) -> RpcResult { + self.deref().get_headers_call(connection, request).await + } - /// Returns the total balance in unspent transactions towards a given address. - /// - /// This call is only available when this node was started with `--utxoindex`. async fn get_balance_by_address(&self, address: RpcAddress) -> RpcResult { - Ok(self.get_balance_by_address_call(None, GetBalanceByAddressRequest::new(address)).await?.balance) + self.deref().get_balance_by_address(address).await } + async fn get_balance_by_address_call( &self, - connection: Option<&DynRpcConnection>, + connection: Option, request: GetBalanceByAddressRequest, - ) -> RpcResult; + ) -> RpcResult { + self.deref().get_balance_by_address_call(connection, request).await + } - /// async fn get_balances_by_addresses(&self, addresses: Vec) -> RpcResult> { - Ok(self.get_balances_by_addresses_call(None, GetBalancesByAddressesRequest::new(addresses)).await?.entries) + self.deref().get_balances_by_addresses(addresses).await } + async fn get_balances_by_addresses_call( &self, - connection: Option<&DynRpcConnection>, + connection: Option, request: GetBalancesByAddressesRequest, - ) -> RpcResult; + ) -> RpcResult { + self.deref().get_balances_by_addresses_call(connection, request).await + } - /// Requests all current UTXOs for the given node addresses. - /// - /// This call is only available when this node was started with `--utxoindex`. async fn get_utxos_by_addresses(&self, addresses: Vec) -> RpcResult> { - Ok(self.get_utxos_by_addresses_call(None, GetUtxosByAddressesRequest::new(addresses)).await?.entries) + self.deref().get_utxos_by_addresses(addresses).await } + async fn get_utxos_by_addresses_call( &self, - connection: Option<&DynRpcConnection>, + connection: Option, request: GetUtxosByAddressesRequest, - ) -> RpcResult; + ) -> RpcResult { + self.deref().get_utxos_by_addresses_call(connection, request).await + } - /// Requests the blue score of the current selected parent of the virtual block. async fn get_sink_blue_score(&self) -> RpcResult { - Ok(self.get_sink_blue_score_call(None, GetSinkBlueScoreRequest {}).await?.blue_score) + self.deref().get_sink_blue_score().await } + async fn get_sink_blue_score_call( &self, - connection: Option<&DynRpcConnection>, + connection: Option, request: GetSinkBlueScoreRequest, - ) -> RpcResult; + ) -> RpcResult { + self.deref().get_sink_blue_score_call(connection, request).await + } - /// Bans the given ip. async fn ban(&self, ip: RpcIpAddress) -> RpcResult<()> { - self.ban_call(None, BanRequest::new(ip)).await?; - Ok(()) + self.deref().ban(ip).await + } + + async fn ban_call(&self, connection: Option, request: BanRequest) -> RpcResult { + self.deref().ban_call(connection, request).await } - async fn ban_call(&self, connection: Option<&DynRpcConnection>, request: BanRequest) -> RpcResult; - /// Unbans the given ip. async fn unban(&self, ip: RpcIpAddress) -> RpcResult<()> { - self.unban_call(None, UnbanRequest::new(ip)).await?; - Ok(()) + self.deref().unban(ip).await + } + + async fn unban_call(&self, connection: Option, request: UnbanRequest) -> RpcResult { + self.deref().unban_call(connection, request).await } - async fn unban_call(&self, connection: Option<&DynRpcConnection>, request: UnbanRequest) -> RpcResult; - /// Returns info about the node. async fn get_info(&self) -> RpcResult { - self.get_info_call(None, GetInfoRequest {}).await + self.deref().get_info().await + } + + async fn get_info_call(&self, connection: Option, request: GetInfoRequest) -> RpcResult { + self.deref().get_info_call(connection, request).await } - async fn get_info_call(&self, connection: Option<&DynRpcConnection>, request: GetInfoRequest) -> RpcResult; - /// async fn estimate_network_hashes_per_second(&self, window_size: u32, start_hash: Option) -> RpcResult { - Ok(self - .estimate_network_hashes_per_second_call(None, EstimateNetworkHashesPerSecondRequest::new(window_size, start_hash)) - .await? - .network_hashes_per_second) + self.deref().estimate_network_hashes_per_second(window_size, start_hash).await } + async fn estimate_network_hashes_per_second_call( &self, - connection: Option<&DynRpcConnection>, + connection: Option, request: EstimateNetworkHashesPerSecondRequest, - ) -> RpcResult; + ) -> RpcResult { + self.deref().estimate_network_hashes_per_second_call(connection, request).await + } - /// async fn get_mempool_entries_by_addresses( &self, addresses: Vec, include_orphan_pool: bool, filter_transaction_pool: bool, ) -> RpcResult> { - Ok(self - .get_mempool_entries_by_addresses_call( - None, - GetMempoolEntriesByAddressesRequest::new(addresses, include_orphan_pool, filter_transaction_pool), - ) - .await? - .entries) + self.deref().get_mempool_entries_by_addresses(addresses, include_orphan_pool, filter_transaction_pool).await } + async fn get_mempool_entries_by_addresses_call( &self, - connection: Option<&DynRpcConnection>, + connection: Option, request: GetMempoolEntriesByAddressesRequest, - ) -> RpcResult; + ) -> RpcResult { + self.deref().get_mempool_entries_by_addresses_call(connection, request).await + } - /// async fn get_coin_supply(&self) -> RpcResult { - self.get_coin_supply_call(None, GetCoinSupplyRequest {}).await + self.deref().get_coin_supply().await } + async fn get_coin_supply_call( &self, - connection: Option<&DynRpcConnection>, + connection: Option, request: GetCoinSupplyRequest, - ) -> RpcResult; + ) -> RpcResult { + self.deref().get_coin_supply_call(connection, request).await + } async fn get_daa_score_timestamp_estimate(&self, daa_scores: Vec) -> RpcResult> { - Ok(self.get_daa_score_timestamp_estimate_call(None, GetDaaScoreTimestampEstimateRequest { daa_scores }).await?.timestamps) + self.deref().get_daa_score_timestamp_estimate(daa_scores).await } + async fn get_daa_score_timestamp_estimate_call( &self, - connection: Option<&DynRpcConnection>, + connection: Option, + request: GetDaaScoreTimestampEstimateRequest, + ) -> RpcResult { + self.deref().get_daa_score_timestamp_estimate_call(connection, request).await + } + + fn register_new_listener(&self, connection: ChannelConnection) -> ListenerId { + self.deref().register_new_listener(connection) + } + + async fn unregister_listener(&self, id: ListenerId) -> RpcResult<()> { + self.deref().unregister_listener(id).await + } + + async fn start_notify(&self, id: ListenerId, scope: Scope) -> RpcResult<()> { + self.deref().start_notify(id, scope).await + } + + async fn stop_notify(&self, id: ListenerId, scope: Scope) -> RpcResult<()> { + self.deref().stop_notify(id, scope).await + } + + async fn execute_subscribe_command(&self, id: ListenerId, scope: Scope, command: Command) -> RpcResult<()> { + self.deref().execute_subscribe_command(id, scope, command).await + } +} + +/// Client RPC Api +/// +/// The [`RpcApi`] trait defines RPC calls taking a request message as unique parameter. +/// +/// For each RPC call a matching readily implemented function taking detailed parameters is also provided. +#[enum_dispatch] +pub trait RpcApi: Sync + Send + AnySync { + type RpcConnection: RpcConnection; + + fn ping(&self) -> impl Future> + Send { + async move { + self.ping_call(Default::default(), PingRequest {}).await?; + Ok(()) + } + } + fn ping_call( + &self, + connection: Option, + request: PingRequest, + ) -> impl Future> + Send; + + fn get_metrics( + &self, + process_metrics: bool, + connection_metrics: bool, + bandwidth_metrics: bool, + consensus_metrics: bool, + ) -> impl Future> + Send { + async move { + self.get_metrics_call( + Default::default(), + GetMetricsRequest { process_metrics, connection_metrics, bandwidth_metrics, consensus_metrics }, + ) + .await + } + } + fn get_metrics_call( + &self, + connection: Option, + request: GetMetricsRequest, + ) -> impl Future> + Send; + + fn get_server_info(&self) -> impl Future> + Send { + async move { self.get_server_info_call(Default::default(), GetServerInfoRequest {}).await } + } + fn get_server_info_call( + &self, + connection: Option, + request: GetServerInfoRequest, + ) -> impl Future> + Send; + + fn get_sync_status(&self) -> impl Future> + Send { + async move { Ok(self.get_sync_status_call(Default::default(), GetSyncStatusRequest {}).await?.is_synced) } + } + fn get_sync_status_call( + &self, + connection: Option, + request: GetSyncStatusRequest, + ) -> impl Future> + Send; + + fn get_current_network(&self) -> impl Future> + Send { + async move { Ok(self.get_current_network_call(Default::default(), GetCurrentNetworkRequest {}).await?.network) } + } + fn get_current_network_call( + &self, + connection: Option, + request: GetCurrentNetworkRequest, + ) -> impl Future> + Send; + + fn submit_block( + &self, + block: RpcBlock, + allow_non_daa_blocks: bool, + ) -> impl Future> + Send { + async move { self.submit_block_call(Default::default(), SubmitBlockRequest::new(block, allow_non_daa_blocks)).await } + } + fn submit_block_call( + &self, + connection: Option, + request: SubmitBlockRequest, + ) -> impl Future> + Send; + + fn get_block_template( + &self, + pay_address: RpcAddress, + extra_data: RpcExtraData, + ) -> impl Future> + Send { + async move { self.get_block_template_call(Default::default(), GetBlockTemplateRequest::new(pay_address, extra_data)).await } + } + fn get_block_template_call( + &self, + connection: Option, + request: GetBlockTemplateRequest, + ) -> impl Future> + Send; + + fn get_peer_addresses(&self) -> impl Future> + Send { + async move { self.get_peer_addresses_call(Default::default(), GetPeerAddressesRequest {}).await } + } + fn get_peer_addresses_call( + &self, + connection: Option, + request: GetPeerAddressesRequest, + ) -> impl Future> + Send; + + fn get_sink(&self) -> impl Future> + Send { + async move { self.get_sink_call(Default::default(), GetSinkRequest {}).await } + } + fn get_sink_call( + &self, + connection: Option, + request: GetSinkRequest, + ) -> impl Future> + Send; + + fn get_mempool_entry( + &self, + transaction_id: RpcTransactionId, + include_orphan_pool: bool, + filter_transaction_pool: bool, + ) -> impl Future> + Send { + async move { + Ok(self + .get_mempool_entry_call( + Default::default(), + GetMempoolEntryRequest::new(transaction_id, include_orphan_pool, filter_transaction_pool), + ) + .await? + .mempool_entry) + } + } + fn get_mempool_entry_call( + &self, + connection: Option, + request: GetMempoolEntryRequest, + ) -> impl Future> + Send; + + fn get_mempool_entries( + &self, + include_orphan_pool: bool, + filter_transaction_pool: bool, + ) -> impl Future>> + Send { + async move { + Ok(self + .get_mempool_entries_call( + Default::default(), + GetMempoolEntriesRequest::new(include_orphan_pool, filter_transaction_pool), + ) + .await? + .mempool_entries) + } + } + fn get_mempool_entries_call( + &self, + connection: Option, + request: GetMempoolEntriesRequest, + ) -> impl Future> + Send; + + fn get_connected_peer_info(&self) -> impl Future> + Send { + async move { self.get_connected_peer_info_call(Default::default(), GetConnectedPeerInfoRequest {}).await } + } + fn get_connected_peer_info_call( + &self, + connection: Option, + request: GetConnectedPeerInfoRequest, + ) -> impl Future> + Send; + + fn add_peer(&self, peer_address: RpcContextualPeerAddress, is_permanent: bool) -> impl Future> + Send { + async move { + self.add_peer_call(Default::default(), AddPeerRequest::new(peer_address, is_permanent)).await?; + Ok(()) + } + } + fn add_peer_call( + &self, + connection: Option, + request: AddPeerRequest, + ) -> impl Future> + Send; + + fn submit_transaction( + &self, + transaction: RpcTransaction, + allow_orphan: bool, + ) -> impl Future> + Send { + async move { + Ok(self + .submit_transaction_call(Default::default(), SubmitTransactionRequest { transaction, allow_orphan }) + .await? + .transaction_id) + } + } + fn submit_transaction_call( + &self, + connection: Option, + request: SubmitTransactionRequest, + ) -> impl Future> + Send; + + fn get_block(&self, hash: RpcHash, include_transactions: bool) -> impl Future> + Send { + async move { Ok(self.get_block_call(Default::default(), GetBlockRequest::new(hash, include_transactions)).await?.block) } + } + fn get_block_call( + &self, + connection: Option, + request: GetBlockRequest, + ) -> impl Future> + Send; + + fn get_subnetwork(&self, subnetwork_id: RpcSubnetworkId) -> impl Future> + Send { + async move { self.get_subnetwork_call(Default::default(), GetSubnetworkRequest::new(subnetwork_id)).await } + } + fn get_subnetwork_call( + &self, + connection: Option, + request: GetSubnetworkRequest, + ) -> impl Future> + Send; + + fn get_virtual_chain_from_block( + &self, + start_hash: RpcHash, + include_accepted_transaction_ids: bool, + ) -> impl Future> + Send { + async move { + self.get_virtual_chain_from_block_call( + Default::default(), + GetVirtualChainFromBlockRequest::new(start_hash, include_accepted_transaction_ids), + ) + .await + } + } + fn get_virtual_chain_from_block_call( + &self, + connection: Option, + request: GetVirtualChainFromBlockRequest, + ) -> impl Future> + Send; + + fn get_blocks( + &self, + low_hash: Option, + include_blocks: bool, + include_transactions: bool, + ) -> impl Future> + Send { + async move { self.get_blocks_call(Default::default(), GetBlocksRequest::new(low_hash, include_blocks, include_transactions)).await } + } + fn get_blocks_call( + &self, + connection: Option, + request: GetBlocksRequest, + ) -> impl Future> + Send; + + fn get_block_count(&self) -> impl Future> + Send { + async move { self.get_block_count_call(Default::default(), GetBlockCountRequest {}).await } + } + fn get_block_count_call( + &self, + connection: Option, + request: GetBlockCountRequest, + ) -> impl Future> + Send; + + fn get_block_dag_info(&self) -> impl Future> + Send { + async move { self.get_block_dag_info_call(Default::default(), GetBlockDagInfoRequest {}).await } + } + fn get_block_dag_info_call( + &self, + connection: Option, + request: GetBlockDagInfoRequest, + ) -> impl Future> + Send; + + fn resolve_finality_conflict(&self, finality_block_hash: RpcHash) -> impl Future> + Send { + async move { + self.resolve_finality_conflict_call(Default::default(), ResolveFinalityConflictRequest::new(finality_block_hash)).await?; + Ok(()) + } + } + fn resolve_finality_conflict_call( + &self, + connection: Option, + request: ResolveFinalityConflictRequest, + ) -> impl Future> + Send; + + fn shutdown(&self) -> impl Future> + Send { + async move { + self.shutdown_call(Default::default(), ShutdownRequest {}).await?; + Ok(()) + } + } + fn shutdown_call( + &self, + connection: Option, + request: ShutdownRequest, + ) -> impl Future> + Send; + + fn get_headers( + &self, + start_hash: RpcHash, + limit: u64, + is_ascending: bool, + ) -> impl Future>> + Send { + async move { Ok(self.get_headers_call(Default::default(), GetHeadersRequest::new(start_hash, limit, is_ascending)).await?.headers) } + } + fn get_headers_call( + &self, + connection: Option, + request: GetHeadersRequest, + ) -> impl Future> + Send; + + fn get_balance_by_address(&self, address: RpcAddress) -> impl Future> + Send { + async move { Ok(self.get_balance_by_address_call(Default::default(), GetBalanceByAddressRequest::new(address)).await?.balance) } + } + fn get_balance_by_address_call( + &self, + connection: Option, + request: GetBalanceByAddressRequest, + ) -> impl Future> + Send; + + fn get_balances_by_addresses( + &self, + addresses: Vec, + ) -> impl Future>> + Send { + async move { + Ok(self.get_balances_by_addresses_call(Default::default(), GetBalancesByAddressesRequest::new(addresses)).await?.entries) + } + } + fn get_balances_by_addresses_call( + &self, + connection: Option, + request: GetBalancesByAddressesRequest, + ) -> impl Future> + Send; + + fn get_utxos_by_addresses( + &self, + addresses: Vec, + ) -> impl Future>> + Send { + async move { Ok(self.get_utxos_by_addresses_call(Default::default(), GetUtxosByAddressesRequest::new(addresses)).await?.entries) } + } + fn get_utxos_by_addresses_call( + &self, + connection: Option, + request: GetUtxosByAddressesRequest, + ) -> impl Future> + Send; + + fn get_sink_blue_score(&self) -> impl Future> + Send { + async move { Ok(self.get_sink_blue_score_call(Default::default(), GetSinkBlueScoreRequest {}).await?.blue_score) } + } + fn get_sink_blue_score_call( + &self, + connection: Option, + request: GetSinkBlueScoreRequest, + ) -> impl Future> + Send; + + fn ban(&self, ip: RpcIpAddress) -> impl Future> + Send { + async move { + self.ban_call(Default::default(), BanRequest::new(ip)).await?; + Ok(()) + } + } + fn ban_call( + &self, + connection: Option, + request: BanRequest, + ) -> impl Future> + Send; + + fn unban(&self, ip: RpcIpAddress) -> impl Future> + Send { + async move { + self.unban_call(Default::default(), UnbanRequest::new(ip)).await?; + Ok(()) + } + } + fn unban_call( + &self, + connection: Option, + request: UnbanRequest, + ) -> impl Future> + Send; + + fn get_info(&self) -> impl Future> + Send { + async move { self.get_info_call(Default::default(), GetInfoRequest {}).await } + } + fn get_info_call( + &self, + connection: Option, + request: GetInfoRequest, + ) -> impl Future> + Send; + + fn estimate_network_hashes_per_second( + &self, + window_size: u32, + start_hash: Option, + ) -> impl Future> + Send { + async move { + Ok(self + .estimate_network_hashes_per_second_call( + Default::default(), + EstimateNetworkHashesPerSecondRequest::new(window_size, start_hash), + ) + .await? + .network_hashes_per_second) + } + } + fn estimate_network_hashes_per_second_call( + &self, + connection: Option, + request: EstimateNetworkHashesPerSecondRequest, + ) -> impl Future> + Send; + + fn get_mempool_entries_by_addresses( + &self, + addresses: Vec, + include_orphan_pool: bool, + filter_transaction_pool: bool, + ) -> impl Future>> + Send { + async move { + Ok(self + .get_mempool_entries_by_addresses_call( + Default::default(), + GetMempoolEntriesByAddressesRequest::new(addresses, include_orphan_pool, filter_transaction_pool), + ) + .await? + .entries) + } + } + fn get_mempool_entries_by_addresses_call( + &self, + connection: Option, + request: GetMempoolEntriesByAddressesRequest, + ) -> impl Future> + Send; + + fn get_coin_supply(&self) -> impl Future> + Send { + async move { self.get_coin_supply_call(Default::default(), GetCoinSupplyRequest {}).await } + } + fn get_coin_supply_call( + &self, + connection: Option, + request: GetCoinSupplyRequest, + ) -> impl Future> + Send; + + fn get_daa_score_timestamp_estimate(&self, daa_scores: Vec) -> impl Future>> + Send { + async move { + Ok(self + .get_daa_score_timestamp_estimate_call(Default::default(), GetDaaScoreTimestampEstimateRequest { daa_scores }) + .await? + .timestamps) + } + } + fn get_daa_score_timestamp_estimate_call( + &self, + connection: Option, request: GetDaaScoreTimestampEstimateRequest, - ) -> RpcResult; + ) -> impl Future> + Send; // ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ // Notification API @@ -399,24 +904,26 @@ pub trait RpcApi: Sync + Send + AnySync { /// Unregister an existing listener. /// /// Stop all notifications for this listener, unregister the id and its associated connection. - async fn unregister_listener(&self, id: ListenerId) -> RpcResult<()>; + + fn unregister_listener(&self, id: ListenerId) -> impl Future> + Send; /// Start sending notifications of some type to a listener. - async fn start_notify(&self, id: ListenerId, scope: Scope) -> RpcResult<()>; + fn start_notify(&self, id: ListenerId, scope: Scope) -> impl Future> + Send; /// Stop sending notifications of some type to a listener. - async fn stop_notify(&self, id: ListenerId, scope: Scope) -> RpcResult<()>; + fn stop_notify(&self, id: ListenerId, scope: Scope) -> impl Future> + Send; /// Execute a subscription command leading to either start or stop sending notifications /// of some type to a listener. - async fn execute_subscribe_command(&self, id: ListenerId, scope: Scope, command: Command) -> RpcResult<()> { - match command { - Command::Start => self.start_notify(id, scope).await, - Command::Stop => self.stop_notify(id, scope).await, + fn execute_subscribe_command(&self, id: ListenerId, scope: Scope, command: Command) -> impl Future> + Send { + async move { + match command { + Command::Start => self.start_notify(id, scope).await, + Command::Stop => self.stop_notify(id, scope).await, + } } } } +pub type DynRpcService = Arc; -pub type DynRpcService = Arc; - -downcast_sync!(dyn RpcApi); +// downcast_sync!(dyn RpcApi); diff --git a/rpc/grpc/client/src/lib.rs b/rpc/grpc/client/src/lib.rs index c7eebd8d1..602993a8b 100644 --- a/rpc/grpc/client/src/lib.rs +++ b/rpc/grpc/client/src/lib.rs @@ -27,6 +27,7 @@ use kaspa_notify::{ UtxosChangedMutationPolicy, }, }; +use kaspa_rpc_core::api::connection::RpcConnection; use kaspa_rpc_core::{ api::rpc::RpcApi, error::RpcError, @@ -230,8 +231,15 @@ impl GrpcClient { } } -#[async_trait] +pub struct DummyConnection {} +impl RpcConnection for DummyConnection { + fn id(&self) -> u64 { + todo!() + } +} + impl RpcApi for GrpcClient { + type RpcConnection = DummyConnection; // this example illustrates the body of the function created by the route!() macro // async fn submit_block_call(&self, request: SubmitBlockRequest) -> RpcResult { // self.inner.call(KaspadPayloadOps::SubmitBlock, request).await?.as_ref().try_into() diff --git a/rpc/grpc/client/src/route.rs b/rpc/grpc/client/src/route.rs index bb5b5ce56..652a54e1c 100644 --- a/rpc/grpc/client/src/route.rs +++ b/rpc/grpc/client/src/route.rs @@ -9,15 +9,11 @@ macro_rules! route { clippy::type_repetition_in_bounds, clippy::used_underscore_binding )] - fn $fn<'life0, 'life1, 'async_trait>( + fn $fn<'life0>( &'life0 self, - _connection : ::core::option::Option<&'life1 Arc>, + _connection : core::option::Option, request: [<$name Request>], - ) -> ::core::pin::Pin]>> + ::core::marker::Send + 'async_trait>> - where - 'life0: 'async_trait, - 'life1: 'async_trait, - Self: 'async_trait, + ) -> impl ::core::future::Future]>> + ::core::marker::Send { Box::pin(async move { if let ::core::option::Option::Some(__ret) = ::core::option::Option::None::]>> { diff --git a/rpc/grpc/server/src/adaptor.rs b/rpc/grpc/server/src/adaptor.rs index c5536c6b6..47e30994c 100644 --- a/rpc/grpc/server/src/adaptor.rs +++ b/rpc/grpc/server/src/adaptor.rs @@ -1,31 +1,33 @@ use crate::{connection_handler::ConnectionHandler, manager::Manager}; use kaspa_core::debug; use kaspa_notify::{notifier::Notifier, subscription::context::SubscriptionContext}; -use kaspa_rpc_core::{api::rpc::DynRpcService, notify::connection::ChannelConnection, Notification, RpcResult}; +use kaspa_rpc_core::api::rpc::RpcApi; +use kaspa_rpc_core::{notify::connection::ChannelConnection, Notification, RpcResult}; use kaspa_utils::networking::NetAddress; use kaspa_utils_tower::counters::TowerConnectionCounters; +use std::fmt::Debug; use std::{ops::Deref, sync::Arc}; use tokio::sync::{mpsc::channel as mpsc_channel, oneshot::Sender as OneshotSender}; -pub struct Adaptor { +pub struct Adaptor { /// If a server was started, it will get cleaned up when this sender is dropped or invoked _server_termination: Option>, /// An object for handling new connections coming from clients - connection_handler: ConnectionHandler, + connection_handler: ConnectionHandler, /// An object for managing a list of active connections - manager: Manager, + manager: Manager, /// The network address of the server serve_address: NetAddress, } -impl Adaptor { +impl Adaptor { fn new( server_termination: Option>, - connection_handler: ConnectionHandler, - manager: Manager, + connection_handler: ConnectionHandler, + manager: Manager, serve_address: NetAddress, ) -> Self { Self { _server_termination: server_termination, connection_handler, manager, serve_address } @@ -34,8 +36,8 @@ impl Adaptor { pub fn server( serve_address: NetAddress, network_bps: u64, - manager: Manager, - core_service: DynRpcService, + manager: Manager, + core_service: RpcApiImpl, core_notifier: Arc>, subscription_context: SubscriptionContext, broadcasters: usize, @@ -80,8 +82,8 @@ impl Adaptor { } /// Expose all public `Manager` methods directly through the `Adaptor` -impl Deref for Adaptor { - type Target = Manager; +impl Deref for Adaptor { + type Target = Manager; fn deref(&self) -> &Self::Target { &self.manager diff --git a/rpc/grpc/server/src/connection.rs b/rpc/grpc/server/src/connection.rs index 442eb2f42..5a7520658 100644 --- a/rpc/grpc/server/src/connection.rs +++ b/rpc/grpc/server/src/connection.rs @@ -23,6 +23,7 @@ use kaspa_notify::{ }; use kaspa_rpc_core::Notification; use parking_lot::Mutex; +use std::fmt::{Debug, Formatter}; use std::{ collections::{hash_map::Entry, HashMap}, fmt::Display, @@ -40,7 +41,7 @@ use tonic::Streaming; use uuid::Uuid; pub type IncomingRoute = MpmcReceiver; -pub type GrpcNotifier = Notifier; +pub type GrpcNotifier = Notifier>; pub type GrpcSender = MpscSender; pub type StatusResult = Result; pub type ConnectionId = Uuid; @@ -63,7 +64,7 @@ impl InnerMutableState { } #[derive(Debug)] -struct Inner { +struct Inner { connection_id: ConnectionId, /// The socket address of this client @@ -74,10 +75,10 @@ struct Inner { /// A channel sender for internal event management. /// Used to send information from each router to a central manager object - manager_sender: MpscSender, + manager_sender: MpscSender>, /// The server RPC core service and notifier - server_context: ServerContext, + server_context: ServerContext, /// Used for managing connection mutable state mutable_state: Mutex, @@ -86,7 +87,9 @@ struct Inner { is_closed: AtomicBool, } -impl Drop for Inner { +impl Drop + for Inner +{ fn drop(&mut self) { debug!("GRPC, Dropping connection {}", self.connection_id); } @@ -116,23 +119,23 @@ impl Deref for Route { type RoutingMap = HashMap; -struct Router { +struct Router { /// Routing map for mapping messages to RPC op handlers routing_map: RoutingMap, /// The server RPC core service and notifier - server_context: ServerContext, + server_context: ServerContext, /// The interface providing the RPC methods to the request handlers - interface: Arc, + interface: Arc>, } -impl Router { - fn new(server_context: ServerContext, interface: Arc) -> Self { +impl Router { + fn new(server_context: ServerContext, interface: Arc>) -> Self { Self { routing_map: Default::default(), server_context, interface } } - fn get_or_subscribe(&mut self, connection: &Connection, rpc_op: KaspadPayloadOps) -> &Route { + fn get_or_subscribe(&mut self, connection: &Connection, rpc_op: KaspadPayloadOps) -> &Route { match self.routing_map.entry(rpc_op) { Entry::Vacant(entry) => { let method = self.interface.get_method(&rpc_op); @@ -170,7 +173,7 @@ impl Router { self.routing_map.get(&rpc_op).unwrap() } - async fn route_to_handler(&mut self, connection: &Connection, request: KaspadRequest) -> GrpcServerResult<()> { + async fn route_to_handler(&mut self, connection: &Connection, request: KaspadRequest) -> GrpcServerResult<()> { if request.payload.is_none() { debug!("GRPC, Route to handler got empty payload, client: {}", connection); return Err(GrpcServerError::InvalidRequestPayload); @@ -201,23 +204,40 @@ impl Router { } } -#[derive(Clone, Debug)] -pub struct Connection { - inner: Arc, +#[derive(Debug)] +pub struct Connection { + inner: Arc>, +} + +impl Clone for Connection { + fn clone(&self) -> Self { + Self { inner: self.inner.clone() } + } } -impl Display for Connection { +impl< + RpcApi: kaspa_rpc_core::api::rpc::RpcApi + + std::clone::Clone + + std::clone::Clone + + std::clone::Clone + + std::clone::Clone + + std::fmt::Debug + + std::fmt::Debug + + std::fmt::Debug + + std::fmt::Debug, + > Display for Connection +{ fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { write!(f, "{}@{}", self.inner.connection_id, self.inner.net_address) } } -impl Connection { +impl Connection { pub(crate) fn new( net_address: SocketAddr, - server_context: ServerContext, - interface: Arc, - manager_sender: MpscSender, + server_context: ServerContext, + interface: Arc>, + manager_sender: MpscSender>, mut incoming_stream: Streaming, outgoing_route: GrpcSender, ) -> Self { @@ -314,7 +334,7 @@ impl Connection { self.inner.connection_id } - pub fn notifier(&self) -> Arc { + pub fn notifier(&self) -> Arc> { self.inner.server_context.notifier.clone() } @@ -402,7 +422,7 @@ pub enum GrpcEncoding { } #[async_trait::async_trait] -impl ConnectionT for Connection { +impl ConnectionT for Connection { type Notification = Notification; type Message = Arc; type Encoding = GrpcEncoding; diff --git a/rpc/grpc/server/src/connection_handler.rs b/rpc/grpc/server/src/connection_handler.rs index d581ea441..38c6d16e4 100644 --- a/rpc/grpc/server/src/connection_handler.rs +++ b/rpc/grpc/server/src/connection_handler.rs @@ -21,8 +21,8 @@ use kaspa_notify::{ subscriber::Subscriber, subscription::{context::SubscriptionContext, MutationPolicies, UtxosChangedMutationPolicy}, }; +use kaspa_rpc_core::api::rpc::RpcApi; use kaspa_rpc_core::{ - api::rpc::DynRpcService, notify::{channel::NotificationChannel, connection::ChannelConnection}, Notification, RpcResult, }; @@ -49,20 +49,20 @@ use tokio_stream::{wrappers::ReceiverStream, StreamExt}; use tonic::{codec::CompressionEncoding, transport::Server as TonicServer, Request, Response}; #[derive(Clone)] -pub struct ServerContext { +pub struct ServerContext { /// The RPC core service API the RPC methods are calling - pub core_service: DynRpcService, + pub core_service: RpcApiImpl, /// The notifier relaying RPC core notifications to connections - pub notifier: Arc>, + pub notifier: Arc>>, } -impl ServerContext { - pub fn new(core_service: DynRpcService, notifier: Arc>) -> Self { +impl ServerContext { + pub fn new(core_service: RpcApiImpl, notifier: Arc>>) -> Self { Self { core_service, notifier } } } -impl Debug for ServerContext { +impl Debug for ServerContext { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { f.debug_struct("ServerContext").finish() } @@ -70,21 +70,21 @@ impl Debug for ServerContext { /// A protowire gRPC connections handler. #[derive(Clone)] -pub struct ConnectionHandler { - manager_sender: MpscSender, - server_context: ServerContext, - interface: Arc, +pub struct ConnectionHandler { + manager_sender: MpscSender>, + server_context: ServerContext, + interface: Arc>, running: Arc, counters: Arc, } const GRPC_SERVER: &str = "grpc-server"; -impl ConnectionHandler { +impl ConnectionHandler { pub(crate) fn new( network_bps: u64, - manager_sender: MpscSender, - core_service: DynRpcService, + manager_sender: MpscSender>, + core_service: RpcApiImpl, core_notifier: Arc>, subscription_context: SubscriptionContext, broadcasters: usize, @@ -105,7 +105,7 @@ impl ConnectionHandler { let converter = Arc::new(GrpcServiceConverter::new()); let collector = Arc::new(GrpcServiceCollector::new(GRPC_SERVER, core_channel.receiver(), converter)); let subscriber = Arc::new(Subscriber::new(GRPC_SERVER, core_events, core_notifier, core_listener_id)); - let notifier: Arc> = Arc::new(Notifier::new( + let notifier: Arc>> = Arc::new(Notifier::new( GRPC_SERVER, core_events, vec![collector], @@ -173,22 +173,22 @@ impl ConnectionHandler { } #[inline(always)] - fn server_context(&self) -> ServerContext { + fn server_context(&self) -> ServerContext { self.server_context.clone() } #[inline(always)] - fn interface(&self) -> Arc { + fn interface(&self) -> Arc> { self.interface.clone() } #[inline(always)] - fn manager_sender(&self) -> MpscSender { + fn manager_sender(&self) -> MpscSender> { self.manager_sender.clone() } #[inline(always)] - fn notifier(&self) -> Arc> { + fn notifier(&self) -> Arc>> { self.server_context.notifier.clone() } @@ -227,14 +227,18 @@ impl ConnectionHandler { } } -impl Drop for ConnectionHandler { +impl Drop + for ConnectionHandler +{ fn drop(&mut self) { debug!("GRPC, Dropping connection handler, refs {}", Arc::strong_count(&self.running)); } } #[tonic::async_trait] -impl Rpc for ConnectionHandler { +impl Rpc + for ConnectionHandler +{ type MessageStreamStream = Pin> + Send + Sync + 'static>>; /// Handle the new arriving client connection diff --git a/rpc/grpc/server/src/manager.rs b/rpc/grpc/server/src/manager.rs index cae37bc37..a54e0bc49 100644 --- a/rpc/grpc/server/src/manager.rs +++ b/rpc/grpc/server/src/manager.rs @@ -1,7 +1,9 @@ use crate::connection::{Connection, ConnectionId}; use kaspa_core::{debug, info, warn}; use kaspa_notify::connection::Connection as ConnectionT; +use kaspa_rpc_core::api::rpc::RpcApi; use parking_lot::RwLock; +use std::fmt::Debug; use std::{ collections::{hash_map::Entry::Occupied, HashMap, HashSet}, sync::Arc, @@ -18,36 +20,38 @@ pub(crate) enum RegistrationError { } pub(crate) type RegistrationResult = Result<(), RegistrationError>; -pub(crate) struct RegistrationRequest { - connection: Connection, +pub(crate) struct RegistrationRequest { + connection: Connection, response_sender: OneshotSender, } -impl RegistrationRequest { - pub fn new(connection: Connection, response_sender: OneshotSender) -> Self { +impl RegistrationRequest { + pub fn new(connection: Connection, response_sender: OneshotSender) -> Self { Self { connection, response_sender } } } -pub(crate) enum ManagerEvent { - NewConnection(RegistrationRequest), - ConnectionClosing(Connection), +pub(crate) enum ManagerEvent { + NewConnection(RegistrationRequest), + ConnectionClosing(Connection), } #[derive(Clone, Debug)] -pub struct Manager { - connections: Arc>>, +pub struct Manager { + connections: Arc>>>, max_connections: usize, } -impl Manager { +impl + Manager +{ pub fn new(max_connections: usize) -> Self { Self { connections: Default::default(), max_connections } } /// Starts a loop for receiving central manager events from all connections. This mechanism is used for /// managing a collection of active connections. - pub(crate) fn start_event_loop(self, mut manager_receiver: MpscReceiver) { + pub(crate) fn start_event_loop(self, mut manager_receiver: MpscReceiver>) { debug!("GRPC, Manager event loop starting"); tokio::spawn(async move { while let Some(new_event) = manager_receiver.recv().await { @@ -70,7 +74,7 @@ impl Manager { }); } - fn register(&self, connection: Connection) -> RegistrationResult { + fn register(&self, connection: Connection) -> RegistrationResult { let mut connections_write = self.connections.write(); // Check if there is room for a new connection @@ -94,7 +98,7 @@ impl Manager { Ok(()) } - fn unregister(&self, connection: Connection) { + fn unregister(&self, connection: Connection) { let mut connections_write = self.connections.write(); let connection_count = connections_write.len(); if let Occupied(entry) = connections_write.entry(connection.identity()) { @@ -142,7 +146,7 @@ impl Manager { } } -impl Drop for Manager { +impl Drop for Manager { fn drop(&mut self) { debug!("GRPC, Dropping Manager, refs count {}", Arc::strong_count(&self.connections)); } diff --git a/rpc/grpc/server/src/request_handler/factory.rs b/rpc/grpc/server/src/request_handler/factory.rs index 802cb6cd6..356dacea0 100644 --- a/rpc/grpc/server/src/request_handler/factory.rs +++ b/rpc/grpc/server/src/request_handler/factory.rs @@ -1,3 +1,4 @@ +use std::fmt::Debug; use std::sync::Arc; use super::{ @@ -14,29 +15,33 @@ use crate::{ use kaspa_grpc_core::protowire::{kaspad_request::Payload, *}; use kaspa_grpc_core::{ops::KaspadPayloadOps, protowire::NotifyFinalityConflictResponseMessage}; use kaspa_notify::{scope::FinalityConflictResolvedScope, subscriber::SubscriptionManager}; +use kaspa_rpc_core::api::rpc::RpcApi; use kaspa_rpc_core::{SubmitBlockRejectReason, SubmitBlockReport, SubmitBlockResponse}; use kaspa_rpc_macros::build_grpc_server_interface; pub struct Factory {} impl Factory { - pub fn new_handler( + pub fn new_handler( rpc_op: KaspadPayloadOps, incoming_route: IncomingRoute, - server_context: ServerContext, - interface: &Interface, - connection: Connection, + server_context: ServerContext, + interface: &Interface, + connection: Connection, ) -> Box { Box::new(RequestHandler::new(rpc_op, incoming_route, server_context, interface, connection)) } - pub fn new_interface(server_ctx: ServerContext, network_bps: u64) -> Interface { + pub fn new_interface( + server_ctx: ServerContext, + network_bps: u64, + ) -> Interface { // The array as last argument in the macro call below must exactly match the full set of // KaspadPayloadOps variants. let mut interface = build_grpc_server_interface!( server_ctx.clone(), - ServerContext, - Connection, + ServerContext, + Connection, KaspadRequest, KaspadResponse, KaspadPayloadOps, @@ -90,43 +95,44 @@ impl Factory { // Manually reimplementing the NotifyFinalityConflictRequest method so subscription // gets mirrored to FinalityConflictResolved notifications as well. - let method: KaspadMethod = Method::new(|server_ctx: ServerContext, connection: Connection, request: KaspadRequest| { - Box::pin(async move { - let mut response: KaspadResponse = match request.payload { - Some(Payload::NotifyFinalityConflictRequest(ref request)) => { - match kaspa_rpc_core::NotifyFinalityConflictRequest::try_from(request) { - Ok(request) => { - let listener_id = connection.get_or_register_listener_id()?; - let command = request.command; - let result = server_ctx - .notifier - .clone() - .execute_subscribe_command(listener_id, request.into(), command) - .await - .and( - server_ctx - .notifier - .clone() - .execute_subscribe_command( - listener_id, - FinalityConflictResolvedScope::default().into(), - command, - ) - .await, - ); - NotifyFinalityConflictResponseMessage::from(result).into() + let method: KaspadMethod = + Method::new(|server_ctx: ServerContext, connection: Connection, request: KaspadRequest| { + Box::pin(async move { + let mut response: KaspadResponse = match request.payload { + Some(Payload::NotifyFinalityConflictRequest(ref request)) => { + match kaspa_rpc_core::NotifyFinalityConflictRequest::try_from(request) { + Ok(request) => { + let listener_id = connection.get_or_register_listener_id()?; + let command = request.command; + let result = server_ctx + .notifier + .clone() + .execute_subscribe_command(listener_id, request.into(), command) + .await + .and( + server_ctx + .notifier + .clone() + .execute_subscribe_command( + listener_id, + FinalityConflictResolvedScope::default().into(), + command, + ) + .await, + ); + NotifyFinalityConflictResponseMessage::from(result).into() + } + Err(err) => NotifyFinalityConflictResponseMessage::from(err).into(), } - Err(err) => NotifyFinalityConflictResponseMessage::from(err).into(), } - } - _ => { - return Err(GrpcServerError::InvalidRequestPayload); - } - }; - response.id = request.id; - Ok(response) - }) - }); + _ => { + return Err(GrpcServerError::InvalidRequestPayload); + } + }; + response.id = request.id; + Ok(response) + }) + }); interface.replace_method(KaspadPayloadOps::NotifyFinalityConflict, method); // Methods with special properties diff --git a/rpc/grpc/server/src/request_handler/handler.rs b/rpc/grpc/server/src/request_handler/handler.rs index 6c1e008ca..bbd620ec2 100644 --- a/rpc/grpc/server/src/request_handler/handler.rs +++ b/rpc/grpc/server/src/request_handler/handler.rs @@ -12,22 +12,23 @@ use kaspa_grpc_core::{ ops::KaspadPayloadOps, protowire::{KaspadRequest, KaspadResponse}, }; +use std::fmt::Debug; -pub struct RequestHandler { +pub struct RequestHandler { rpc_op: KaspadPayloadOps, incoming_route: IncomingRoute, - server_ctx: ServerContext, - method: DynKaspadMethod, - connection: Connection, + server_ctx: ServerContext, + method: DynKaspadMethod, + connection: Connection, } -impl RequestHandler { +impl RequestHandler { pub fn new( rpc_op: KaspadPayloadOps, incoming_route: IncomingRoute, - server_context: ServerContext, - interface: &Interface, - connection: Connection, + server_context: ServerContext, + interface: &Interface, + connection: Connection, ) -> Self { let method = interface.get_method(&rpc_op); Self { rpc_op, incoming_route, server_ctx: server_context, method, connection } @@ -42,7 +43,7 @@ impl RequestHandler { } #[async_trait::async_trait] -impl Handler for RequestHandler { +impl Handler for RequestHandler { async fn start(&mut self) { debug!("GRPC, Starting request handler {:?} for client {}", self.rpc_op, self.connection); while let Ok(request) = self.incoming_route.recv().await { diff --git a/rpc/grpc/server/src/request_handler/interface.rs b/rpc/grpc/server/src/request_handler/interface.rs index 39bd5680f..fe004d585 100644 --- a/rpc/grpc/server/src/request_handler/interface.rs +++ b/rpc/grpc/server/src/request_handler/interface.rs @@ -11,8 +11,9 @@ use kaspa_grpc_core::{ use std::fmt::Debug; use std::{collections::HashMap, sync::Arc}; -pub type KaspadMethod = Method; -pub type DynKaspadMethod = Arc>; +pub type KaspadMethod = Method, Connection, KaspadRequest, KaspadResponse>; +pub type DynKaspadMethod = + Arc, Connection, KaspadRequest, KaspadResponse>>; pub type KaspadDropFn = DropFn; pub type KaspadRoutingPolicy = RoutingPolicy; @@ -24,14 +25,14 @@ pub type KaspadRoutingPolicy = RoutingPolicy; /// /// It is also possible to directly let the interface itself process a request by invoking /// the `call()` method. -pub struct Interface { - server_ctx: ServerContext, - methods: HashMap, - method_not_implemented: DynKaspadMethod, +pub struct Interface { + server_ctx: ServerContext, + methods: HashMap>, + method_not_implemented: DynKaspadMethod, } -impl Interface { - pub fn new(server_ctx: ServerContext) -> Self { +impl Interface { + pub fn new(server_ctx: ServerContext) -> Self { let method_not_implemented = Arc::new(Method::new(|_, _, kaspad_request: KaspadRequest| { Box::pin(async move { match kaspad_request.payload { @@ -46,15 +47,15 @@ impl Interface { Self { server_ctx, methods: Default::default(), method_not_implemented } } - pub fn method(&mut self, op: KaspadPayloadOps, method: KaspadMethod) { - let method: DynKaspadMethod = Arc::new(method); + pub fn method(&mut self, op: KaspadPayloadOps, method: KaspadMethod) { + let method: DynKaspadMethod = Arc::new(method); if self.methods.insert(op, method).is_some() { panic!("RPC method {op:?} is declared multiple times") } } - pub fn replace_method(&mut self, op: KaspadPayloadOps, method: KaspadMethod) { - let method: DynKaspadMethod = Arc::new(method); + pub fn replace_method(&mut self, op: KaspadPayloadOps, method: KaspadMethod) { + let method: DynKaspadMethod = Arc::new(method); let _ = self.methods.insert(op, method); } @@ -66,9 +67,10 @@ impl Interface { routing_policy: KaspadRoutingPolicy, ) { self.methods.entry(op).and_modify(|x| { - let method: Method = + let method: Method, Connection, KaspadRequest, KaspadResponse> = Method::with_properties(x.method_fn(), tasks, queue_size, routing_policy); - let method: Arc> = Arc::new(method); + let method: Arc, Connection, KaspadRequest, KaspadResponse>> = + Arc::new(method); *x = method; }); } @@ -76,18 +78,18 @@ impl Interface { pub async fn call( &self, op: &KaspadPayloadOps, - connection: Connection, + connection: Connection, request: KaspadRequest, ) -> GrpcServerResult { self.methods.get(op).unwrap_or(&self.method_not_implemented).call(self.server_ctx.clone(), connection, request).await } - pub fn get_method(&self, op: &KaspadPayloadOps) -> DynKaspadMethod { + pub fn get_method(&self, op: &KaspadPayloadOps) -> DynKaspadMethod { self.methods.get(op).unwrap_or(&self.method_not_implemented).clone() } } -impl Debug for Interface { +impl Debug for Interface { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { f.debug_struct("Interface").finish() } diff --git a/rpc/grpc/server/src/service.rs b/rpc/grpc/server/src/service.rs index 7d810bf97..19c4a285b 100644 --- a/rpc/grpc/server/src/service.rs +++ b/rpc/grpc/server/src/service.rs @@ -5,16 +5,18 @@ use kaspa_core::{ task::service::{AsyncService, AsyncServiceFuture}, trace, warn, }; +use kaspa_rpc_core::api::connection::RpcConnection; +use kaspa_rpc_core::api::rpc::RpcApi; use kaspa_rpc_service::service::RpcCoreService; use kaspa_utils::{networking::NetAddress, triggers::SingleTrigger}; use kaspa_utils_tower::counters::TowerConnectionCounters; use std::sync::Arc; use triggered::Listener; -pub struct GrpcService { +pub struct GrpcService { net_address: NetAddress, config: Arc, - core_service: Arc, + core_service: Arc>, rpc_max_clients: usize, broadcasters: usize, started: SingleTrigger, @@ -22,13 +24,13 @@ pub struct GrpcService { counters: Arc, } -impl GrpcService { +impl GrpcService { pub const IDENT: &'static str = "grpc-service"; pub fn new( address: NetAddress, config: Arc, - core_service: Arc, + core_service: Arc>, rpc_max_clients: usize, broadcasters: usize, counters: Arc, @@ -50,7 +52,7 @@ impl GrpcService { } } -impl AsyncService for GrpcService { +impl AsyncService for GrpcService { fn ident(self: Arc) -> &'static str { Self::IDENT } diff --git a/rpc/macros/src/grpc/server.rs b/rpc/macros/src/grpc/server.rs index f6f3f33b9..e5929d1e1 100644 --- a/rpc/macros/src/grpc/server.rs +++ b/rpc/macros/src/grpc/server.rs @@ -4,38 +4,30 @@ use quote::{quote, ToTokens}; use std::convert::Into; use syn::{ parse::{Parse, ParseStream}, - parse_macro_input, - punctuated::Punctuated, - Error, Expr, ExprArray, Result, Token, + parse_macro_input, Expr, ExprArray, Result, Token, Type, }; #[derive(Debug)] struct RpcTable { server_ctx: Expr, - server_ctx_type: Expr, - connection_ctx_type: Expr, - kaspad_request_type: Expr, - kaspad_response_type: Expr, - payload_ops: Expr, + server_ctx_type: Type, + connection_ctx_type: Type, + kaspad_request_type: Type, + kaspad_response_type: Type, + payload_ops: Type, handlers: ExprArray, } impl Parse for RpcTable { fn parse(input: ParseStream) -> Result { - let parsed = Punctuated::::parse_terminated(input).unwrap(); - if parsed.len() != 7 { - return Err(Error::new_spanned(parsed, - "usage: build_grpc_server_interface!(server_context, ServerContextType, ConnectionType, KaspadRequestType, KaspadResponseType, KaspadPayloadOps, [GetInfo, ..])".to_string())); - } - - let mut iter = parsed.iter(); - let server_ctx = iter.next().unwrap().clone(); - let server_ctx_type = iter.next().unwrap().clone(); - let connection_ctx_type = iter.next().unwrap().clone(); - let kaspad_request_type = iter.next().unwrap().clone(); - let kaspad_response_type = iter.next().unwrap().clone(); - let payload_ops = iter.next().unwrap().clone(); - let handlers = get_handlers(iter.next().unwrap().clone())?; + let server_ctx: Expr = input.parse()?; + let [server_ctx_type, connection_ctx_type, kaspad_request_type, kaspad_response_type, payload_ops] = + core::array::from_fn(|_| { + input.parse::().unwrap(); + input.parse().unwrap() + }); + input.parse::()?; + let handlers: ExprArray = input.parse()?; Ok(RpcTable { server_ctx, @@ -73,7 +65,7 @@ impl ToTokens for RpcTable { let mut response: #kaspad_response_type = match request.payload { Some(Payload::#request_type(ref request)) => match request.try_into() { // TODO: RPC-CONNECTION - Ok(request) => server_ctx.core_service.#fn_call(None,request).await.into(), + Ok(request) => server_ctx.core_service.#fn_call(core::default::Default::default(), request).await.into(), Err(err) => #response_message_type::from(err).into(), }, _ => { diff --git a/rpc/macros/src/wrpc/client.rs b/rpc/macros/src/wrpc/client.rs index 12f41687a..1fb2256da 100644 --- a/rpc/macros/src/wrpc/client.rs +++ b/rpc/macros/src/wrpc/client.rs @@ -52,18 +52,13 @@ impl ToTokens for RpcTable { // the async implementation of the RPC caller is inlined targets.push(quote! { - fn #fn_call<'life0, 'life1, 'async_trait>( - &'life0 self, - _connection : ::core::option::Option<&'life1 Arc>, + async fn #fn_call( + &self, + _connection : ::core::option::Option, request: #request_type, - ) -> ::core::pin::Pin> + ::core::marker::Send + 'async_trait>> - where - 'life0: 'async_trait, - 'life1: 'async_trait, - Self: 'async_trait, + ) -> RpcResult<#response_type> { use workflow_serializer::prelude::*; - Box::pin(async move { if let ::core::option::Option::Some(__ret) = ::core::option::Option::None::> { return __ret; } @@ -75,7 +70,6 @@ impl ToTokens for RpcTable { }; #[allow(unreachable_code)] __ret.map(Serializable::into_inner) - }) } }); diff --git a/rpc/macros/src/wrpc/server.rs b/rpc/macros/src/wrpc/server.rs index ee2afc551..0a17395cd 100644 --- a/rpc/macros/src/wrpc/server.rs +++ b/rpc/macros/src/wrpc/server.rs @@ -4,34 +4,28 @@ use quote::{quote, ToTokens}; use std::convert::Into; use syn::{ parse::{Parse, ParseStream}, - parse_macro_input, - punctuated::Punctuated, - Error, Expr, ExprArray, Result, Token, + parse_macro_input, Expr, ExprArray, Result, Token, Type, }; #[derive(Debug)] struct RpcTable { server_ctx: Expr, - server_ctx_type: Expr, - connection_ctx_type: Expr, - rpc_api_ops: Expr, + server_ctx_type: Type, + connection_ctx_type: Type, + rpc_api_ops: Type, handlers: ExprArray, } impl Parse for RpcTable { fn parse(input: ParseStream) -> Result { - let parsed = Punctuated::::parse_terminated(input).unwrap(); - if parsed.len() != 5 { - return Err(Error::new_spanned(parsed, - "usage: build_wrpc_server_interface!(server_instance,router_instance,ServerType,ConnectionType,RpcApiOps,[getInfo, ..])".to_string())); - } + let server_ctx: Expr = input.parse()?; + let [server_ctx_type, connection_ctx_type, rpc_api_ops] = core::array::from_fn(|_| { + input.parse::().unwrap(); + input.parse().unwrap() + }); + input.parse::()?; - let mut iter = parsed.iter(); - let server_ctx = iter.next().unwrap().clone(); - let server_ctx_type = iter.next().unwrap().clone(); - let connection_ctx_type = iter.next().unwrap().clone(); - let rpc_api_ops = iter.next().unwrap().clone(); - let handlers = get_handlers(iter.next().unwrap().clone())?; + let handlers = input.parse()?; Ok(RpcTable { server_ctx, server_ctx_type, connection_ctx_type, rpc_api_ops, handlers }) } @@ -54,7 +48,7 @@ impl ToTokens for RpcTable { let verbose = server_ctx.verbose(); if verbose { workflow_log::log_info!("request: {:?}",request); } // TODO: RPC-CONNECT - let response: #response_type = server_ctx.rpc_service(&connection_ctx).#fn_call(None, request.into_inner()).await + let response: #response_type = server_ctx.rpc_service(&connection_ctx).#fn_call(core::default::Default::default(), request.into_inner()).await .map_err(|e|ServerError::Text(e.to_string()))?; if verbose { workflow_log::log_info!("response: {:?}",response); } Ok(Serializable(response)) diff --git a/rpc/macros/src/wrpc/wasm.rs b/rpc/macros/src/wrpc/wasm.rs index 30af3e74a..34a25c8a4 100644 --- a/rpc/macros/src/wrpc/wasm.rs +++ b/rpc/macros/src/wrpc/wasm.rs @@ -59,7 +59,7 @@ impl ToTokens for RpcHandlers { pub async fn #fn_no_suffix(&self, request : Option<#ts_request_type>) -> Result<#ts_response_type> { let request: #request_type = request.unwrap_or_default().try_into()?; // log_info!("request: {:#?}",request); - let result: RpcResult<#response_type> = self.inner.client.#fn_call(None, request).await; + let result: RpcResult<#response_type> = self.inner.client.#fn_call(core::default::Default::default(), request).await; // log_info!("result: {:#?}",result); let response: #response_type = result.map_err(|err|wasm_bindgen::JsError::new(&err.to_string()))?; //log_info!("response: {:#?}",response); @@ -83,7 +83,7 @@ impl ToTokens for RpcHandlers { #[wasm_bindgen(js_name = #fn_camel)] pub async fn #fn_no_suffix(&self, request: #ts_request_type) -> Result<#ts_response_type> { let request: #request_type = request.try_into()?; - let result: RpcResult<#response_type> = self.inner.client.#fn_call(None, request).await; + let result: RpcResult<#response_type> = self.inner.client.#fn_call(core::default::Default::default(), request).await; let response: #response_type = result.map_err(|err|wasm_bindgen::JsError::new(&err.to_string()))?; Ok(response.try_into()?) } diff --git a/rpc/service/src/service.rs b/rpc/service/src/service.rs index c1beff244..50c37f443 100644 --- a/rpc/service/src/service.rs +++ b/rpc/service/src/service.rs @@ -51,9 +51,9 @@ use kaspa_notify::{ use kaspa_p2p_flows::flow_context::FlowContext; use kaspa_p2p_lib::common::ProtocolError; use kaspa_perf_monitor::{counters::CountersSnapshot, Monitor as PerfMonitor}; +use kaspa_rpc_core::api::connection::RpcConnection; use kaspa_rpc_core::{ api::{ - connection::DynRpcConnection, ops::RPC_API_VERSION, rpc::{RpcApi, MAX_SAFE_WINDOW_SIZE}, }, @@ -65,6 +65,8 @@ use kaspa_txscript::{extract_script_pub_key_address, pay_to_address_script}; use kaspa_utils::{channel::Channel, triggers::SingleTrigger}; use kaspa_utils_tower::counters::TowerConnectionCounters; use kaspa_utxoindex::api::UtxoIndexProxy; +use std::fmt::{Debug, Formatter}; +use std::marker::PhantomData; use std::{ collections::HashMap, iter::once, @@ -91,7 +93,7 @@ use workflow_rpc::server::WebSocketCounters as WrpcServerCounters; /// from this instance to registered services and backwards should occur /// by adding respectively to the registered service a Collector and a /// Subscriber. -pub struct RpcCoreService { +pub struct RpcCoreService { consensus_manager: Arc, notifier: Arc>, mining_manager: MiningManagerProxy, @@ -110,11 +112,18 @@ pub struct RpcCoreService { perf_monitor: Arc>>, p2p_tower_counters: Arc, grpc_tower_counters: Arc, + rpc_conn: PhantomData, +} + +impl Debug for RpcCoreService { + fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { + todo!() + } } const RPC_CORE: &str = "rpc-core"; -impl RpcCoreService { +impl RpcCoreService { pub const IDENT: &'static str = "rpc-core-service"; #[allow(clippy::too_many_arguments)] @@ -209,6 +218,7 @@ impl RpcCoreService { perf_monitor, p2p_tower_counters, grpc_tower_counters, + rpc_conn: Default::default(), } } @@ -274,13 +284,9 @@ impl RpcCoreService { } } -#[async_trait] -impl RpcApi for RpcCoreService { - async fn submit_block_call( - &self, - _connection: Option<&DynRpcConnection>, - request: SubmitBlockRequest, - ) -> RpcResult { +impl RpcApi for RpcCoreService { + type RpcConnection = RpcConn; + async fn submit_block_call(&self, _connection: Option, request: SubmitBlockRequest) -> RpcResult { let session = self.consensus_manager.consensus().unguarded_session(); // TODO: consider adding an error field to SubmitBlockReport to document both the report and error fields @@ -340,7 +346,7 @@ NOTE: This error usually indicates an RPC conversion error between the node and async fn get_block_template_call( &self, - _connection: Option<&DynRpcConnection>, + _connection: Option, request: GetBlockTemplateRequest, ) -> RpcResult { trace!("incoming GetBlockTemplate request"); @@ -374,7 +380,7 @@ NOTE: This error usually indicates an RPC conversion error between the node and }) } - async fn get_block_call(&self, _connection: Option<&DynRpcConnection>, request: GetBlockRequest) -> RpcResult { + async fn get_block_call(&self, _connection: Option, request: GetBlockRequest) -> RpcResult { // TODO: test let session = self.consensus_manager.consensus().session().await; let block = session.async_get_block_even_if_header_only(request.hash).await?; @@ -386,11 +392,7 @@ NOTE: This error usually indicates an RPC conversion error between the node and }) } - async fn get_blocks_call( - &self, - _connection: Option<&DynRpcConnection>, - request: GetBlocksRequest, - ) -> RpcResult { + async fn get_blocks_call(&self, _connection: Option, request: GetBlocksRequest) -> RpcResult { // Validate that user didn't set include_transactions without setting include_blocks if !request.include_blocks && request.include_transactions { return Err(RpcError::InvalidGetBlocksRequest); @@ -439,7 +441,7 @@ NOTE: This error usually indicates an RPC conversion error between the node and Ok(GetBlocksResponse { block_hashes, blocks }) } - async fn get_info_call(&self, _connection: Option<&DynRpcConnection>, _request: GetInfoRequest) -> RpcResult { + async fn get_info_call(&self, _connection: Option, _request: GetInfoRequest) -> RpcResult { let is_nearly_synced = self.consensus_manager.consensus().unguarded_session().async_is_nearly_synced().await; Ok(GetInfoResponse { p2p_id: self.flow_context.node_id.to_string(), @@ -454,7 +456,7 @@ NOTE: This error usually indicates an RPC conversion error between the node and async fn get_mempool_entry_call( &self, - _connection: Option<&DynRpcConnection>, + _connection: Option, request: GetMempoolEntryRequest, ) -> RpcResult { let query = self.extract_tx_query(request.filter_transaction_pool, request.include_orphan_pool)?; @@ -467,7 +469,7 @@ NOTE: This error usually indicates an RPC conversion error between the node and async fn get_mempool_entries_call( &self, - _connection: Option<&DynRpcConnection>, + _connection: Option, request: GetMempoolEntriesRequest, ) -> RpcResult { let query = self.extract_tx_query(request.filter_transaction_pool, request.include_orphan_pool)?; @@ -483,7 +485,7 @@ NOTE: This error usually indicates an RPC conversion error between the node and async fn get_mempool_entries_by_addresses_call( &self, - _connection: Option<&DynRpcConnection>, + _connection: Option, request: GetMempoolEntriesByAddressesRequest, ) -> RpcResult { let query = self.extract_tx_query(request.filter_transaction_pool, request.include_orphan_pool)?; @@ -509,7 +511,7 @@ NOTE: This error usually indicates an RPC conversion error between the node and async fn submit_transaction_call( &self, - _connection: Option<&DynRpcConnection>, + _connection: Option, request: SubmitTransactionRequest, ) -> RpcResult { let allow_orphan = self.config.unsafe_rpc && request.allow_orphan; @@ -534,27 +536,23 @@ NOTE: This error usually indicates an RPC conversion error between the node and async fn get_current_network_call( &self, - _connection: Option<&DynRpcConnection>, + _connection: Option, _: GetCurrentNetworkRequest, ) -> RpcResult { Ok(GetCurrentNetworkResponse::new(*self.config.net)) } - async fn get_subnetwork_call( - &self, - _connection: Option<&DynRpcConnection>, - _: GetSubnetworkRequest, - ) -> RpcResult { + async fn get_subnetwork_call(&self, _connection: Option, _: GetSubnetworkRequest) -> RpcResult { Err(RpcError::NotImplemented) } - async fn get_sink_call(&self, _connection: Option<&DynRpcConnection>, _: GetSinkRequest) -> RpcResult { + async fn get_sink_call(&self, _connection: Option, _: GetSinkRequest) -> RpcResult { Ok(GetSinkResponse::new(self.consensus_manager.consensus().unguarded_session().async_get_sink().await)) } async fn get_sink_blue_score_call( &self, - _connection: Option<&DynRpcConnection>, + _connection: Option, _: GetSinkBlueScoreRequest, ) -> RpcResult { let session = self.consensus_manager.consensus().unguarded_session(); @@ -563,7 +561,7 @@ NOTE: This error usually indicates an RPC conversion error between the node and async fn get_virtual_chain_from_block_call( &self, - _connection: Option<&DynRpcConnection>, + _connection: Option, request: GetVirtualChainFromBlockRequest, ) -> RpcResult { let session = self.consensus_manager.consensus().session().await; @@ -576,17 +574,13 @@ NOTE: This error usually indicates an RPC conversion error between the node and Ok(GetVirtualChainFromBlockResponse::new(virtual_chain.removed, virtual_chain.added, accepted_transaction_ids)) } - async fn get_block_count_call( - &self, - _connection: Option<&DynRpcConnection>, - _: GetBlockCountRequest, - ) -> RpcResult { + async fn get_block_count_call(&self, _connection: Option, _: GetBlockCountRequest) -> RpcResult { Ok(self.consensus_manager.consensus().unguarded_session().async_estimate_block_count().await) } async fn get_utxos_by_addresses_call( &self, - _connection: Option<&DynRpcConnection>, + _connection: Option, request: GetUtxosByAddressesRequest, ) -> RpcResult { if !self.config.utxoindex { @@ -600,7 +594,7 @@ NOTE: This error usually indicates an RPC conversion error between the node and async fn get_balance_by_address_call( &self, - _connection: Option<&DynRpcConnection>, + _connection: Option, request: GetBalanceByAddressRequest, ) -> RpcResult { if !self.config.utxoindex { @@ -613,7 +607,7 @@ NOTE: This error usually indicates an RPC conversion error between the node and async fn get_balances_by_addresses_call( &self, - _connection: Option<&DynRpcConnection>, + _connection: Option, request: GetBalancesByAddressesRequest, ) -> RpcResult { if !self.config.utxoindex { @@ -632,11 +626,7 @@ NOTE: This error usually indicates an RPC conversion error between the node and Ok(GetBalancesByAddressesResponse::new(entries)) } - async fn get_coin_supply_call( - &self, - _connection: Option<&DynRpcConnection>, - _: GetCoinSupplyRequest, - ) -> RpcResult { + async fn get_coin_supply_call(&self, _connection: Option, _: GetCoinSupplyRequest) -> RpcResult { if !self.config.utxoindex { return Err(RpcError::NoUtxoIndex); } @@ -647,7 +637,7 @@ NOTE: This error usually indicates an RPC conversion error between the node and async fn get_daa_score_timestamp_estimate_call( &self, - _connection: Option<&DynRpcConnection>, + _connection: Option, request: GetDaaScoreTimestampEstimateRequest, ) -> RpcResult { let session = self.consensus_manager.consensus().session().await; @@ -704,21 +694,17 @@ NOTE: This error usually indicates an RPC conversion error between the node and Ok(GetDaaScoreTimestampEstimateResponse::new(timestamps)) } - async fn ping_call(&self, _connection: Option<&DynRpcConnection>, _: PingRequest) -> RpcResult { + async fn ping_call(&self, _connection: Option, _: PingRequest) -> RpcResult { Ok(PingResponse {}) } - async fn get_headers_call( - &self, - _connection: Option<&DynRpcConnection>, - _request: GetHeadersRequest, - ) -> RpcResult { + async fn get_headers_call(&self, _connection: Option, _request: GetHeadersRequest) -> RpcResult { Err(RpcError::NotImplemented) } async fn get_block_dag_info_call( &self, - _connection: Option<&DynRpcConnection>, + _connection: Option, _: GetBlockDagInfoRequest, ) -> RpcResult { let session = self.consensus_manager.consensus().unguarded_session(); @@ -740,7 +726,7 @@ NOTE: This error usually indicates an RPC conversion error between the node and async fn estimate_network_hashes_per_second_call( &self, - _connection: Option<&DynRpcConnection>, + _connection: Option, request: EstimateNetworkHashesPerSecondRequest, ) -> RpcResult { if !self.config.unsafe_rpc && request.window_size > MAX_SAFE_WINDOW_SIZE { @@ -770,7 +756,7 @@ NOTE: This error usually indicates an RPC conversion error between the node and )) } - async fn add_peer_call(&self, _connection: Option<&DynRpcConnection>, request: AddPeerRequest) -> RpcResult { + async fn add_peer_call(&self, _connection: Option, request: AddPeerRequest) -> RpcResult { if !self.config.unsafe_rpc { warn!("AddPeer RPC command called while node in safe RPC mode -- ignoring."); return Err(RpcError::UnavailableInSafeMode); @@ -786,14 +772,14 @@ NOTE: This error usually indicates an RPC conversion error between the node and async fn get_peer_addresses_call( &self, - _connection: Option<&DynRpcConnection>, + _connection: Option, _: GetPeerAddressesRequest, ) -> RpcResult { let address_manager = self.flow_context.address_manager.lock(); Ok(GetPeerAddressesResponse::new(address_manager.get_all_addresses(), address_manager.get_all_banned_addresses())) } - async fn ban_call(&self, _connection: Option<&DynRpcConnection>, request: BanRequest) -> RpcResult { + async fn ban_call(&self, _connection: Option, request: BanRequest) -> RpcResult { if !self.config.unsafe_rpc { warn!("Ban RPC command called while node in safe RPC mode -- ignoring."); return Err(RpcError::UnavailableInSafeMode); @@ -810,7 +796,7 @@ NOTE: This error usually indicates an RPC conversion error between the node and Ok(BanResponse {}) } - async fn unban_call(&self, _connection: Option<&DynRpcConnection>, request: UnbanRequest) -> RpcResult { + async fn unban_call(&self, _connection: Option, request: UnbanRequest) -> RpcResult { if !self.config.unsafe_rpc { warn!("Unban RPC command called while node in safe RPC mode -- ignoring."); return Err(RpcError::UnavailableInSafeMode); @@ -826,7 +812,7 @@ NOTE: This error usually indicates an RPC conversion error between the node and async fn get_connected_peer_info_call( &self, - _connection: Option<&DynRpcConnection>, + _connection: Option, _: GetConnectedPeerInfoRequest, ) -> RpcResult { let peers = self.flow_context.hub().active_peers(); @@ -834,7 +820,7 @@ NOTE: This error usually indicates an RPC conversion error between the node and Ok(GetConnectedPeerInfoResponse::new(peer_info)) } - async fn shutdown_call(&self, _connection: Option<&DynRpcConnection>, _: ShutdownRequest) -> RpcResult { + async fn shutdown_call(&self, _connection: Option, _: ShutdownRequest) -> RpcResult { if !self.config.unsafe_rpc { warn!("Shutdown RPC command called while node in safe RPC mode -- ignoring."); return Err(RpcError::UnavailableInSafeMode); @@ -857,7 +843,7 @@ NOTE: This error usually indicates an RPC conversion error between the node and async fn resolve_finality_conflict_call( &self, - _connection: Option<&DynRpcConnection>, + _connection: Option, _request: ResolveFinalityConflictRequest, ) -> RpcResult { if !self.config.unsafe_rpc { @@ -867,7 +853,7 @@ NOTE: This error usually indicates an RPC conversion error between the node and Err(RpcError::NotImplemented) } - async fn get_metrics_call(&self, _connection: Option<&DynRpcConnection>, req: GetMetricsRequest) -> RpcResult { + async fn get_metrics_call(&self, _connection: Option, req: GetMetricsRequest) -> RpcResult { let CountersSnapshot { resident_set_size, virtual_memory_size, @@ -950,7 +936,7 @@ NOTE: This error usually indicates an RPC conversion error between the node and async fn get_server_info_call( &self, - _connection: Option<&DynRpcConnection>, + _connection: Option, _request: GetServerInfoRequest, ) -> RpcResult { let session = self.consensus_manager.consensus().unguarded_session(); @@ -969,7 +955,7 @@ NOTE: This error usually indicates an RPC conversion error between the node and async fn get_sync_status_call( &self, - _connection: Option<&DynRpcConnection>, + _connection: Option, _request: GetSyncStatusRequest, ) -> RpcResult { let session = self.consensus_manager.consensus().unguarded_session(); @@ -1022,7 +1008,7 @@ NOTE: This error usually indicates an RPC conversion error between the node and // It might be necessary to opt this out in the context of wasm32 -impl AsyncService for RpcCoreService { +impl AsyncService for RpcCoreService { fn ident(self: Arc) -> &'static str { Self::IDENT } diff --git a/rpc/wrpc/client/src/client.rs b/rpc/wrpc/client/src/client.rs index 0d274069a..c2240b503 100644 --- a/rpc/wrpc/client/src/client.rs +++ b/rpc/wrpc/client/src/client.rs @@ -19,6 +19,8 @@ pub use workflow_rpc::client::{ ConnectOptions, ConnectResult, ConnectStrategy, Resolver as RpcResolver, ResolverResult, WebSocketConfig, WebSocketError, }; use workflow_serializer::prelude::*; +use kaspa_rpc_core::api::rpc::DummyRpcConnection; + type RpcClientNotifier = Arc>; struct Inner { @@ -364,7 +366,7 @@ impl KaspaRpcClient { &self.inner.rpc_client } - pub fn rpc_api(self: &Arc) -> Arc { + pub fn rpc_api(self: &Arc) -> Arc { self.clone() } @@ -571,8 +573,8 @@ impl KaspaRpcClient { } } -#[async_trait] impl RpcApi for KaspaRpcClient { + type RpcConnection = DummyRpcConnection; // // The following proc-macro iterates over the array of enum variants // generating a function for each variant as follows: diff --git a/rpc/wrpc/proxy/src/main.rs b/rpc/wrpc/proxy/src/main.rs index 5450463e6..55d5a59f3 100644 --- a/rpc/wrpc/proxy/src/main.rs +++ b/rpc/wrpc/proxy/src/main.rs @@ -15,6 +15,7 @@ use std::sync::Arc; use workflow_log::*; use workflow_rpc::server::prelude::*; use workflow_rpc::server::WebSocketCounters; +use kaspa_rpc_core::api::rpc::DummyRpcConnection; #[derive(Debug, Parser)] #[clap(name = "proxy")] @@ -85,7 +86,7 @@ async fn main() -> Result<()> { let rpc_handler = Arc::new(KaspaRpcHandler::new(tasks, encoding, None, options.clone())); let router = Arc::new(Router::new(rpc_handler.server.clone())); - let server = RpcServer::new_with_encoding::( + let server = RpcServer::new_with_encoding::, Connection, RpcApiOps, Id64>( encoding, rpc_handler.clone(), router.interface.clone(), @@ -97,7 +98,8 @@ async fn main() -> Result<()> { log_info!("Using `{encoding}` protocol encoding"); let config = WebSocketConfig { max_message_size: Some(1024 * 1024 * 1024), ..Default::default() }; - server.listen(&options.listen_address, Some(config)).await?; + let listener = TcpListener::bind(&options.listen_address).await.unwrap(); + server.listen(listener, Some(config)).await?; Ok(()) } diff --git a/rpc/wrpc/server/Cargo.toml b/rpc/wrpc/server/Cargo.toml index f0f55aed7..e7812a292 100644 --- a/rpc/wrpc/server/Cargo.toml +++ b/rpc/wrpc/server/Cargo.toml @@ -15,6 +15,7 @@ crate-type = ["cdylib", "lib"] [dependencies] async-trait.workspace = true borsh = { workspace = true, features = ["rc"] } +enum_dispatch.workspace = true futures.workspace = true kaspa-consensus-core.workspace = true kaspa-core.workspace = true diff --git a/rpc/wrpc/server/src/router.rs b/rpc/wrpc/server/src/router.rs index f734caf03..0b6e0eab2 100644 --- a/rpc/wrpc/server/src/router.rs +++ b/rpc/wrpc/server/src/router.rs @@ -1,5 +1,7 @@ use crate::{connection::*, server::*}; use kaspa_notify::scope::Scope; +use kaspa_rpc_core::api::connection::RpcConnection; +use kaspa_rpc_core::api::rpc::RpcApi; use kaspa_rpc_core::{api::ops::RpcApiOps, prelude::*}; use kaspa_rpc_macros::build_wrpc_server_interface; use std::sync::Arc; @@ -11,13 +13,13 @@ use workflow_serializer::prelude::*; /// is later given to the RpcServer. This wrapper exists to allow /// a single initialization location for both the Kaspad Server and /// the GRPC Proxy. -pub struct Router { - pub interface: Arc>, - pub server_context: Server, +pub struct Router { + pub interface: Arc, Connection, RpcApiOps>>, + pub server_context: Server, } -impl Router { - pub fn new(server_context: Server) -> Self { +impl Router { + pub fn new(server_context: Server) -> Self { // let router_target = server_context.router_target(); // The following macro iterates the supplied enum variants taking the variant @@ -29,7 +31,7 @@ impl Router { #[allow(unreachable_patterns)] let mut interface = build_wrpc_server_interface!( server_context.clone(), - Server, + Server, Connection, RpcApiOps, [ @@ -73,7 +75,7 @@ impl Router { interface.method( RpcApiOps::Subscribe, - workflow_rpc::server::Method::new(move |manager: Server, connection: Connection, scope: Serializable| { + workflow_rpc::server::Method::new(move |manager: Server, connection: Connection, scope: Serializable| { Box::pin(async move { manager.start_notify(&connection, scope.into_inner()).await.map_err(|err| err.to_string())?; Ok(Serializable(SubscribeResponse::new(connection.id()))) @@ -83,7 +85,7 @@ impl Router { interface.method( RpcApiOps::Unsubscribe, - workflow_rpc::server::Method::new(move |manager: Server, connection: Connection, scope: Serializable| { + workflow_rpc::server::Method::new(move |manager: Server, connection: Connection, scope: Serializable| { Box::pin(async move { manager.stop_notify(&connection, scope.into_inner()).await.unwrap_or_else(|err| { workflow_log::log_trace!("wRPC server -> error calling stop_notify(): {err}"); diff --git a/rpc/wrpc/server/src/server.rs b/rpc/wrpc/server/src/server.rs index 562bbf34b..3221eb1b3 100644 --- a/rpc/wrpc/server/src/server.rs +++ b/rpc/wrpc/server/src/server.rs @@ -4,6 +4,7 @@ use crate::{ result::Result, service::Options, }; +use enum_dispatch::enum_dispatch; use kaspa_grpc_client::GrpcClient; use kaspa_notify::{ connection::ChannelType, @@ -14,8 +15,10 @@ use kaspa_notify::{ subscriber::Subscriber, subscription::{MutationPolicies, UtxosChangedMutationPolicy}, }; +use kaspa_rpc_core::api::connection::RpcConnection; +use kaspa_rpc_core::api::rpc::DynRpcService; use kaspa_rpc_core::{ - api::rpc::{DynRpcService, RpcApi}, + api::rpc::RpcApi, notify::{channel::NotificationChannel, connection::ChannelConnection, mode::NotificationMode}, Notification, RpcResult, }; @@ -32,28 +35,28 @@ use workflow_rpc::server::prelude::*; pub type WrpcNotifier = Notifier; -struct RpcCore { - pub service: Arc, +struct RpcCore { + pub service: Arc>, pub wrpc_notifier: Arc, } -struct ServerInner { +struct ServerInner { pub next_connection_id: AtomicU64, pub _encoding: Encoding, pub sockets: Mutex>, - pub rpc_core: Option, + pub rpc_core: Option>, pub options: Arc, } #[derive(Clone)] -pub struct Server { - inner: Arc, +pub struct Server { + inner: Arc>, } const WRPC_SERVER: &str = "wrpc-server"; -impl Server { - pub fn new(tasks: usize, encoding: Encoding, core_service: Option>, options: Arc) -> Self { +impl Server { + pub fn new(tasks: usize, encoding: Encoding, core_service: Option>>, options: Arc) -> Self { // This notifier UTXOs subscription granularity to rpc-core notifier let policies = MutationPolicies::new(UtxosChangedMutationPolicy::AddressSet); @@ -167,13 +170,19 @@ impl Server { pub fn notifier(&self) -> Option> { self.inner.rpc_core.as_ref().map(|x| x.wrpc_notifier.clone()) } - - pub fn rpc_service(&self, connection: &Connection) -> DynRpcService { - if let Some(rpc_core) = &self.inner.rpc_core { - rpc_core.service.clone() - } else { - connection.grpc_client() + pub fn rpc_service(&self, connection: &Connection) -> impl RpcApi { + #[enum_dispatch(RpcApi)] + enum RpcApiImpl { + RpcCore(Arc>), + GrpcClient(Arc), } + + // if let Some(rpc_core) = &self.inner.rpc_core { + // RpcApiImpl::RpcCore(rpc_core.service.clone()) + // } else { + connection.grpc_client() + // RpcApiImpl::GrpcClient(connection.grpc_client()) + // } } pub async fn start_notify(&self, connection: &Connection, scope: Scope) -> RpcResult<()> { diff --git a/rpc/wrpc/server/src/service.rs b/rpc/wrpc/server/src/service.rs index 866c4cdd6..bc1a33902 100644 --- a/rpc/wrpc/server/src/service.rs +++ b/rpc/wrpc/server/src/service.rs @@ -5,6 +5,7 @@ use kaspa_core::{ task::service::{AsyncService, AsyncServiceError, AsyncServiceFuture}, trace, warn, }; +use kaspa_rpc_core::api::connection::RpcConnection; use kaspa_rpc_core::api::ops::RpcApiOps; use kaspa_rpc_service::service::RpcCoreService; use kaspa_utils::triggers::SingleTrigger; @@ -43,24 +44,24 @@ impl Default for Options { /// /// RPC method handling is implemented in the [`Router`]. /// -pub struct KaspaRpcHandler { - pub server: Server, +pub struct KaspaRpcHandler { + pub server: Server, pub options: Arc, } -impl KaspaRpcHandler { +impl KaspaRpcHandler { pub fn new( tasks: usize, encoding: WrpcEncoding, - core_service: Option>, + core_service: Option>>, options: Arc, - ) -> KaspaRpcHandler { + ) -> KaspaRpcHandler { KaspaRpcHandler { server: Server::new(tasks, encoding, core_service, options.clone()), options } } } #[async_trait] -impl RpcHandler for KaspaRpcHandler { +impl RpcHandler for KaspaRpcHandler { type Context = Connection; async fn handshake( @@ -94,19 +95,19 @@ impl RpcHandler for KaspaRpcHandler { /// /// wRPC Server - A wrapper around and an initializer of the RpcServer /// -pub struct WrpcService { +pub struct WrpcService { // TODO: see if tha Adapter/ConnectionHandler design of P2P and gRPC can be applied here too options: Arc, server: RpcServer, - rpc_handler: Arc, + rpc_handler: Arc>, shutdown: SingleTrigger, } -impl WrpcService { +impl WrpcService { /// Create and initialize RpcServer pub fn new( tasks: usize, - core_service: Option>, + core_service: Option>>, encoding: &Encoding, counters: Arc, options: Options, @@ -118,7 +119,7 @@ impl WrpcService { // Create router (initializes Interface registering RPC method and notification handlers) let router = Arc::new(Router::new(rpc_handler.server.clone())); // Create a server - let server = RpcServer::new_with_encoding::( + let server = RpcServer::new_with_encoding::, Connection, RpcApiOps, Id64>( *encoding, rpc_handler.clone(), router.interface.clone(), @@ -147,7 +148,8 @@ impl WrpcService { info!("WRPC Server starting on: {}", listen_address); tokio::spawn(async move { let config = WebSocketConfig { max_message_size: Some(MAX_WRPC_MESSAGE_SIZE), ..Default::default() }; - let serve_result = self.server.listen(&listen_address, Some(config)).await; + let listener = TcpListener::bind(&listen_address).await.unwrap(); + let serve_result = self.server.listen(listener, Some(config)).await; match serve_result { Ok(_) => info!("WRPC Server stopped on: {}", listen_address), Err(err) => panic!("WRPC Server {listen_address} stopped with error: {err:?}"), @@ -160,7 +162,7 @@ impl WrpcService { const WRPC_SERVER: &str = "wrpc-service"; -impl AsyncService for WrpcService { +impl AsyncService for WrpcService { fn ident(self: Arc) -> &'static str { WRPC_SERVER } diff --git a/wallet/core/src/account/mod.rs b/wallet/core/src/account/mod.rs index b921bf491..d9f6fb672 100644 --- a/wallet/core/src/account/mod.rs +++ b/wallet/core/src/account/mod.rs @@ -26,7 +26,7 @@ use workflow_core::abortable::Abortable; /// Notification callback type used by [`Account::sweep`] and [`Account::send`]. /// Allows tracking in-flight transactions during transaction generation. -pub type GenerationNotifier = Arc; +pub type GenerationNotifier = Arc) + Send + Sync>; /// Scan notification callback type used by [`DerivationCapableAccount::derivation_scan`]. /// Provides derivation discovery scan progress information. pub type ScanNotifier = Arc) + Send + Sync>; @@ -47,23 +47,23 @@ impl Context { } /// Account `Inner` struct used by most account types. -pub struct Inner { +pub struct Inner { context: Mutex, id: AccountId, storage_key: AccountStorageKey, - wallet: Arc, - utxo_context: UtxoContext, + wallet: Arc>, + utxo_context: UtxoContext, } -impl Inner { - pub fn new(wallet: &Arc, id: AccountId, storage_key: AccountStorageKey, settings: AccountSettings) -> Self { +impl Inner { + pub fn new(wallet: &Arc>, id: AccountId, storage_key: AccountStorageKey, settings: AccountSettings) -> Self { let utxo_context = UtxoContext::new(wallet.utxo_processor(), UtxoContextBinding::AccountId(id)); let context = Context { settings }; Inner { context: Mutex::new(context), id, storage_key, wallet: wallet.clone(), utxo_context: utxo_context.clone() } } - pub fn from_storage(wallet: &Arc, storage: &AccountStorage) -> Self { + pub fn from_storage(wallet: &Arc>, storage: &AccountStorage) -> Self { Self::new(wallet, storage.id, storage.storage_key, storage.settings.clone()) } diff --git a/wallet/core/src/account/variants/bip32.rs b/wallet/core/src/account/variants/bip32.rs index 3a8b3ad40..202eae9a1 100644 --- a/wallet/core/src/account/variants/bip32.rs +++ b/wallet/core/src/account/variants/bip32.rs @@ -82,18 +82,18 @@ impl BorshDeserialize for Payload { } } -pub struct Bip32 { - inner: Arc, +pub struct Bip32 { + inner: Arc>, prv_key_data_id: PrvKeyDataId, account_index: u64, xpub_keys: ExtendedPublicKeys, ecdsa: bool, - derivation: Arc, + derivation: Arc>, } -impl Bip32 { +impl Bip32 { pub async fn try_new( - wallet: &Arc, + wallet: &Arc>, name: Option, prv_key_data_id: PrvKeyDataId, account_index: u64, @@ -120,7 +120,7 @@ impl Bip32 { Ok(Self { inner, prv_key_data_id, account_index, xpub_keys, ecdsa, derivation }) } - pub async fn try_load(wallet: &Arc, storage: &AccountStorage, meta: Option>) -> Result { + pub async fn try_load(wallet: &Arc>, storage: &AccountStorage, meta: Option>) -> Result { let storable = Payload::try_load(storage)?; let prv_key_data_id: PrvKeyDataId = storage.prv_key_data_ids.clone().try_into()?; let inner = Arc::new(Inner::from_storage(wallet, storage)); @@ -160,8 +160,8 @@ impl Bip32 { } #[async_trait] -impl Account for Bip32 { - fn inner(&self) -> &Arc { +impl Account for Bip32 { + fn inner(&self) -> &Arc> { &self.inner } @@ -234,7 +234,7 @@ impl Account for Bip32 { } } -impl DerivationCapableAccount for Bip32 { +impl DerivationCapableAccount for Bip32 { fn derivation(&self) -> Arc { self.derivation.clone() } diff --git a/wallet/core/src/account/variants/keypair.rs b/wallet/core/src/account/variants/keypair.rs index d0d37f74e..7e3c15c4e 100644 --- a/wallet/core/src/account/variants/keypair.rs +++ b/wallet/core/src/account/variants/keypair.rs @@ -85,8 +85,8 @@ impl BorshDeserialize for Payload { } } -pub struct Keypair { - inner: Arc, +pub struct Keypair { + inner: Arc>, prv_key_data_id: PrvKeyDataId, public_key: PublicKey, ecdsa: bool, diff --git a/wallet/core/src/account/variants/legacy.rs b/wallet/core/src/account/variants/legacy.rs index 0ea69554d..0b347bac9 100644 --- a/wallet/core/src/account/variants/legacy.rs +++ b/wallet/core/src/account/variants/legacy.rs @@ -67,14 +67,14 @@ impl BorshDeserialize for Payload { } } -pub struct Legacy { - inner: Arc, +pub struct Legacy { + inner: Arc>, prv_key_data_id: PrvKeyDataId, - derivation: Arc, + derivation: Arc>, } -impl Legacy { - pub async fn try_new(wallet: &Arc, name: Option, prv_key_data_id: PrvKeyDataId) -> Result { +impl Legacy { + pub async fn try_new(wallet: &Arc>, name: Option, prv_key_data_id: PrvKeyDataId) -> Result { let storable = Payload; let settings = AccountSettings { name, ..Default::default() }; diff --git a/wallet/core/src/account/variants/multisig.rs b/wallet/core/src/account/variants/multisig.rs index 253464543..1fba23c0e 100644 --- a/wallet/core/src/account/variants/multisig.rs +++ b/wallet/core/src/account/variants/multisig.rs @@ -83,19 +83,19 @@ impl BorshDeserialize for Payload { } } -pub struct MultiSig { - inner: Arc, +pub struct MultiSig { + inner: Arc>, xpub_keys: ExtendedPublicKeys, prv_key_data_ids: Option>>, cosigner_index: Option, minimum_signatures: u16, ecdsa: bool, - derivation: Arc, + derivation: Arc>, } -impl MultiSig { +impl MultiSig { pub async fn try_new( - wallet: &Arc, + wallet: &Arc>, name: Option, xpub_keys: ExtendedPublicKeys, prv_key_data_ids: Option>>, diff --git a/wallet/core/src/account/variants/resident.rs b/wallet/core/src/account/variants/resident.rs index 74f386896..2b3205d29 100644 --- a/wallet/core/src/account/variants/resident.rs +++ b/wallet/core/src/account/variants/resident.rs @@ -9,8 +9,8 @@ use secp256k1::{PublicKey, SecretKey}; pub const RESIDENT_ACCOUNT_KIND: &str = "kaspa-resident-standard"; -pub struct Resident { - inner: Arc, +pub struct Resident { + inner: Arc>, public_key: PublicKey, #[allow(dead_code)] diff --git a/wallet/core/src/api/transport.rs b/wallet/core/src/api/transport.rs index 4de2d7824..2d5db51c5 100644 --- a/wallet/core/src/api/transport.rs +++ b/wallet/core/src/api/transport.rs @@ -120,9 +120,9 @@ pub trait EventHandler: Send + Sync { /// API methods that can be invoked via Borsh or Serde messages containing /// serializations created using the [`Transport`] interface. The [`WalletServer`] /// is a counter-part to [`WalletClient`]. -pub struct WalletServer { +pub struct WalletServer { // pub wallet_api: Arc, - pub wallet: Arc, + pub wallet: Arc>, pub event_handler: Arc, task_ctl: DuplexChannel, } diff --git a/wallet/core/src/derivation.rs b/wallet/core/src/derivation.rs index 15ad78503..64c6f9aa6 100644 --- a/wallet/core/src/derivation.rs +++ b/wallet/core/src/derivation.rs @@ -48,8 +48,8 @@ pub struct Inner { pub address_to_index_map: HashMap, } -pub struct AddressManager { - pub wallet: Arc, +pub struct AddressManager { + pub wallet: Arc>, pub account_kind: AccountKind, pub pubkey_managers: Vec>, pub ecdsa: bool, @@ -57,9 +57,9 @@ pub struct AddressManager { pub minimum_signatures: usize, } -impl AddressManager { +impl AddressManager { pub fn new( - wallet: Arc, + wallet: Arc>, account_kind: AccountKind, pubkey_managers: Vec>, ecdsa: bool, @@ -171,20 +171,20 @@ impl AddressManager { } } -pub struct AddressDerivationManager { +pub struct AddressDerivationManager { pub account_kind: AccountKind, pub account_index: u64, pub cosigner_index: Option, pub derivators: Vec>, #[allow(dead_code)] - wallet: Arc, - pub receive_address_manager: Arc, - pub change_address_manager: Arc, + wallet: Arc>, + pub receive_address_manager: Arc>, + pub change_address_manager: Arc>, } -impl AddressDerivationManager { +impl AddressDerivationManager { pub async fn new( - wallet: &Arc, + wallet: &Arc>, account_kind: AccountKind, keys: &ExtendedPublicKeys, ecdsa: bool, diff --git a/wallet/core/src/imports.rs b/wallet/core/src/imports.rs index 2d2ce79fd..e111bed97 100644 --- a/wallet/core/src/imports.rs +++ b/wallet/core/src/imports.rs @@ -14,7 +14,7 @@ pub use crate::factory::{factories, Factory}; pub use crate::metrics::{MetricsUpdate, MetricsUpdateKind}; pub use crate::result::Result; pub use crate::rpc::Rpc; -pub use crate::rpc::{DynRpcApi, RpcCtl}; +pub use crate::rpc::{RpcCtl}; pub use crate::serializer::*; pub use crate::storage::*; pub use crate::tx::MassCombinationStrategy; diff --git a/wallet/core/src/prelude.rs b/wallet/core/src/prelude.rs index eb9bb2b1e..57249f7c9 100644 --- a/wallet/core/src/prelude.rs +++ b/wallet/core/src/prelude.rs @@ -10,7 +10,7 @@ pub use crate::deterministic::{AccountId, AccountStorageKey}; pub use crate::encryption::EncryptionKind; pub use crate::events::{Events, SyncState}; pub use crate::metrics::{MetricsUpdate, MetricsUpdateKind}; -pub use crate::rpc::{ConnectOptions, ConnectStrategy, DynRpcApi}; +pub use crate::rpc::{ConnectOptions, ConnectStrategy}; pub use crate::settings::WalletSettings; pub use crate::storage::{IdT, Interface, PrvKeyDataId, PrvKeyDataInfo, TransactionId, TransactionRecord, WalletDescriptor}; pub use crate::tx::{Fees, PaymentDestination, PaymentOutput, PaymentOutputs}; diff --git a/wallet/core/src/rpc.rs b/wallet/core/src/rpc.rs index 999e09e30..f9e0f842e 100644 --- a/wallet/core/src/rpc.rs +++ b/wallet/core/src/rpc.rs @@ -6,7 +6,6 @@ use std::sync::Arc; pub use kaspa_rpc_core::api::ctl::RpcCtl; pub use kaspa_rpc_core::api::rpc::RpcApi; -pub type DynRpcApi = dyn RpcApi; pub type NotificationChannel = kaspa_utils::channel::Channel; pub use kaspa_rpc_core::notify::mode::NotificationMode; pub use kaspa_wrpc_client::client::{ConnectOptions, ConnectStrategy}; @@ -16,17 +15,17 @@ pub use kaspa_wrpc_client::WrpcEncoding; /// RPC adaptor class that holds the [`RpcApi`] /// and [`RpcCtl`] instances. #[derive(Clone)] -pub struct Rpc { - pub rpc_api: Arc, +pub struct Rpc { + pub rpc_api: Arc, pub rpc_ctl: RpcCtl, } -impl Rpc { - pub fn new(rpc_api: Arc, rpc_ctl: RpcCtl) -> Self { +impl Rpc { + pub fn new(rpc_api: RpcImpl, rpc_ctl: RpcCtl) -> Self { Rpc { rpc_api, rpc_ctl } } - pub fn rpc_api(&self) -> &Arc { + pub fn rpc_api(&self) -> &Arc { &self.rpc_api } diff --git a/wallet/core/src/tx/generator/generator.rs b/wallet/core/src/tx/generator/generator.rs index 533f2e046..b071c21de 100644 --- a/wallet/core/src/tx/generator/generator.rs +++ b/wallet/core/src/tx/generator/generator.rs @@ -250,7 +250,7 @@ struct MassDisposition { /// /// Internal Generator settings and references /// -struct Inner { +struct Inner { // Atomic abortable trigger that will cause the processing to halt with `Error::Aborted` abortable: Option, // Optional signer that is passed on to the [`PendingTransaction`] allowing [`PendingTransaction`] to expose signing functions for convenience. @@ -263,9 +263,9 @@ struct Inner { network_params: NetworkParams, // Source Utxo Context (Used for source UtxoEntry aggregation) - source_utxo_context: Option, + source_utxo_context: Option>, // Destination Utxo Context (Used only during transfer transactions) - destination_utxo_context: Option, + destination_utxo_context: Option>, // Event multiplexer multiplexer: Option>>, // typically a number of keys required to sign the transaction @@ -327,11 +327,11 @@ impl std::fmt::Debug for Inner { /// Transaction generator /// #[derive(Clone)] -pub struct Generator { - inner: Arc, +pub struct Generator { + inner: Arc>, } -impl Generator { +impl Generator { /// Create a new [`Generator`] instance using [`GeneratorSettings`]. pub fn try_new(settings: GeneratorSettings, signer: Option>, abortable: Option<&Abortable>) -> Result { let GeneratorSettings { diff --git a/wallet/core/src/tx/generator/iterator.rs b/wallet/core/src/tx/generator/iterator.rs index a7d4e88b6..bd9adbeb6 100644 --- a/wallet/core/src/tx/generator/iterator.rs +++ b/wallet/core/src/tx/generator/iterator.rs @@ -5,17 +5,17 @@ use crate::result::Result; use crate::tx::{Generator, PendingTransaction}; -pub struct PendingTransactionIterator { - generator: Generator, +pub struct PendingTransactionIterator { + generator: Generator, } -impl PendingTransactionIterator { - pub fn new(generator: &Generator) -> Self { +impl PendingTransactionIterator { + pub fn new(generator: &Generator) -> Self { Self { generator: generator.clone() } } } -impl Iterator for PendingTransactionIterator { +impl Iterator for PendingTransactionIterator { type Item = Result; fn next(&mut self) -> Option { self.generator.generate_transaction().transpose() diff --git a/wallet/core/src/tx/generator/pending.rs b/wallet/core/src/tx/generator/pending.rs index cd757e54b..a4289ca26 100644 --- a/wallet/core/src/tx/generator/pending.rs +++ b/wallet/core/src/tx/generator/pending.rs @@ -5,16 +5,16 @@ use crate::imports::*; use crate::result::Result; -use crate::rpc::DynRpcApi; use crate::tx::{DataKind, Generator}; use crate::utxo::{UtxoContext, UtxoEntryId, UtxoEntryReference}; use kaspa_consensus_core::sign::sign_with_multiple_v2; use kaspa_consensus_core::tx::{SignableTransaction, Transaction, TransactionId}; use kaspa_rpc_core::{RpcTransaction, RpcTransactionId}; +use kaspa_rpc_core::api::rpc::RpcApi; -pub(crate) struct PendingTransactionInner { +pub(crate) struct PendingTransactionInner { /// Generator that produced the transaction - pub(crate) generator: Generator, + pub(crate) generator: Generator, /// UtxoEntryReferences of the pending transaction pub(crate) utxo_entries: AHashMap, /// Transaction Id (cached in pending to avoid mutex lock) @@ -62,14 +62,14 @@ impl std::fmt::Debug for PendingTransaction { /// Contains auxiliary information about the transaction such as aggregate /// input/output amounts, fees, etc. #[derive(Clone)] -pub struct PendingTransaction { - pub(crate) inner: Arc, +pub struct PendingTransaction { + pub(crate) inner: Arc>, } -impl PendingTransaction { +impl PendingTransaction { #[allow(clippy::too_many_arguments)] pub fn try_new( - generator: &Generator, + generator: &Generator, transaction: Transaction, utxo_entries: Vec, addresses: Vec
, @@ -175,7 +175,7 @@ impl PendingTransaction { } /// Submit the transaction on the supplied rpc - pub async fn try_submit(&self, rpc: &Arc) -> Result { + pub async fn try_submit(&self, rpc: &Arc) -> Result { // sanity check to prevent multiple invocations (for API use) self.inner.is_submitted.load(Ordering::SeqCst).then(|| { panic!("PendingTransaction::try_submit() called multiple times"); diff --git a/wallet/core/src/tx/generator/settings.rs b/wallet/core/src/tx/generator/settings.rs index 0055d8fb4..439e4943f 100644 --- a/wallet/core/src/tx/generator/settings.rs +++ b/wallet/core/src/tx/generator/settings.rs @@ -11,7 +11,7 @@ use crate::utxo::{UtxoContext, UtxoEntryReference, UtxoIterator}; use kaspa_addresses::Address; use workflow_core::channel::Multiplexer; -pub struct GeneratorSettings { +pub struct GeneratorSettings { // Network type pub network_id: NetworkId, // Event multiplexer @@ -19,7 +19,7 @@ pub struct GeneratorSettings { // Utxo iterator pub utxo_iterator: Box + Send + Sync + 'static>, // Utxo Context - pub source_utxo_context: Option, + pub source_utxo_context: Option>, // typically a number of keys required to sign the transaction pub sig_op_count: u8, // number of minimum signatures required to sign the transaction @@ -33,7 +33,7 @@ pub struct GeneratorSettings { // payload pub final_transaction_payload: Option>, // transaction is a transfer between accounts - pub destination_utxo_context: Option, + pub destination_utxo_context: Option>, } // impl std::fmt::Debug for GeneratorSettings { @@ -54,7 +54,7 @@ pub struct GeneratorSettings { // } // } -impl GeneratorSettings { +impl GeneratorSettings { pub fn try_new_with_account( account: Arc, final_transaction_destination: PaymentDestination, diff --git a/wallet/core/src/tx/generator/stream.rs b/wallet/core/src/tx/generator/stream.rs index c41a930cd..3644aef0b 100644 --- a/wallet/core/src/tx/generator/stream.rs +++ b/wallet/core/src/tx/generator/stream.rs @@ -9,12 +9,12 @@ use crate::result::Result; use crate::tx::{Generator, PendingTransaction}; use futures::Stream; -pub struct PendingTransactionStream { - generator: Generator, +pub struct PendingTransactionStream { + generator: Generator, } -impl PendingTransactionStream { - pub fn new(generator: &Generator) -> Self { +impl PendingTransactionStream { + pub fn new(generator: &Generator) -> Self { Self { generator: generator.clone() } } } diff --git a/wallet/core/src/utxo/context.rs b/wallet/core/src/utxo/context.rs index 51ef0e5ea..2fb88ecb1 100644 --- a/wallet/core/src/utxo/context.rs +++ b/wallet/core/src/utxo/context.rs @@ -79,7 +79,7 @@ pub enum UtxoEntryVariant { Stasis(UtxoEntryReference), } -pub struct Context { +pub struct Context { /// Mature (Confirmed) UTXOs pub(crate) mature: Vec, /// UTXOs that are pending confirmation @@ -91,14 +91,14 @@ pub struct Context { /// Outgoing transactions that have not yet been confirmed. /// Confirmation occurs when the transaction UTXOs are /// removed from the context by the UTXO change notification. - pub(crate) outgoing: AHashMap, + pub(crate) outgoing: AHashMap>, /// Total balance of all UTXOs in this context (mature, pending) balance: Option, /// Addresses monitored by this UTXO context addresses: Arc>>, } -impl Default for Context { +impl Default for Context { fn default() -> Self { Self { mature: vec![], @@ -112,7 +112,7 @@ impl Default for Context { } } -impl Context { +impl Context { fn new_with_mature(mature: Vec) -> Self { Self { mature, ..Default::default() } } @@ -128,19 +128,19 @@ impl Context { } } -struct Inner { +struct Inner { id: UtxoContextId, binding: UtxoContextBinding, - context: Mutex, - processor: UtxoProcessor, + context: Mutex>, + processor: UtxoProcessor, } -impl Inner { - pub fn new(processor: &UtxoProcessor, binding: UtxoContextBinding) -> Self { +impl Inner { + pub fn new(processor: &UtxoProcessor, binding: UtxoContextBinding) -> Self { Self { id: binding.id(), binding, context: Mutex::new(Context::default()), processor: processor.clone() } } - pub fn new_with_mature_entries(processor: &UtxoProcessor, binding: UtxoContextBinding, mature: Vec) -> Self { + pub fn new_with_mature_entries(processor: &UtxoProcessor, binding: UtxoContextBinding, mature: Vec) -> Self { let context = Context::new_with_mature(mature); Self { id: binding.id(), binding, context: Mutex::new(context), processor: processor.clone() } } @@ -161,17 +161,17 @@ impl Inner { /// different types of UtxoEntry updates (regular incoming vs. change). /// #[derive(Clone)] -pub struct UtxoContext { - inner: Arc, +pub struct UtxoContext { + inner: Arc>, } -impl UtxoContext { - pub fn new(processor: &UtxoProcessor, binding: UtxoContextBinding) -> Self { +impl UtxoContext { + pub fn new(processor: &UtxoProcessor, binding: UtxoContextBinding) -> Self { Self { inner: Arc::new(Inner::new(processor, binding)) } } pub fn new_with_mature_entries( - processor: &UtxoProcessor, + processor: &UtxoProcessor, binding: UtxoContextBinding, mature_entries: Vec, ) -> Self { @@ -182,7 +182,7 @@ impl UtxoContext { self.inner.context.lock().unwrap() } - pub fn processor(&self) -> &UtxoProcessor { + pub fn processor(&self) -> &UtxoProcessor { &self.inner.processor } diff --git a/wallet/core/src/utxo/outgoing.rs b/wallet/core/src/utxo/outgoing.rs index 7a8578f4a..bb2a34832 100644 --- a/wallet/core/src/utxo/outgoing.rs +++ b/wallet/core/src/utxo/outgoing.rs @@ -8,11 +8,11 @@ use crate::imports::*; use crate::tx::PendingTransaction; use crate::utxo::{UtxoContext, UtxoEntryId, UtxoEntryReference}; -struct Inner { +struct Inner { pub id: TransactionId, - pub pending_transaction: PendingTransaction, - pub originating_context: UtxoContext, - pub destination_context: Option, + pub pending_transaction: PendingTransaction, + pub originating_context: UtxoContext, + pub destination_context: Option>, #[allow(dead_code)] pub creation_daa_score: u64, pub acceptance_daa_score: AtomicU64, @@ -21,12 +21,12 @@ struct Inner { /// A wrapper around [`PendingTransaction`] that adds additional context and /// convenience methods for handling within [`UtxoContext`]. #[derive(Clone)] -pub struct OutgoingTransaction { - inner: Arc, +pub struct OutgoingTransaction { + inner: Arc>, } -impl OutgoingTransaction { - pub fn new(current_daa_score: u64, originating_context: UtxoContext, pending_transaction: PendingTransaction) -> Self { +impl OutgoingTransaction { + pub fn new(current_daa_score: u64, originating_context: UtxoContext, pending_transaction: PendingTransaction) -> Self { let destination_context = pending_transaction.generator().destination_utxo_context().clone(); let inner = Inner { diff --git a/wallet/core/src/utxo/pending.rs b/wallet/core/src/utxo/pending.rs index 0b9307790..e043338c7 100644 --- a/wallet/core/src/utxo/pending.rs +++ b/wallet/core/src/utxo/pending.rs @@ -6,23 +6,23 @@ use crate::imports::*; use crate::utxo::{Maturity, UtxoContext, UtxoEntryId, UtxoEntryReference, UtxoEntryReferenceExtension}; -pub struct PendingUtxoEntryReferenceInner { +pub struct PendingUtxoEntryReferenceInner { pub entry: UtxoEntryReference, - pub utxo_context: UtxoContext, + pub utxo_context: UtxoContext, } #[derive(Clone)] -pub struct PendingUtxoEntryReference { - pub inner: Arc, +pub struct PendingUtxoEntryReference { + pub inner: Arc>, } -impl PendingUtxoEntryReference { - pub fn new(entry: UtxoEntryReference, utxo_context: UtxoContext) -> Self { +impl PendingUtxoEntryReference { + pub fn new(entry: UtxoEntryReference, utxo_context: UtxoContext) -> Self { Self { inner: Arc::new(PendingUtxoEntryReferenceInner { entry, utxo_context }) } } #[inline(always)] - pub fn inner(&self) -> &PendingUtxoEntryReferenceInner { + pub fn inner(&self) -> &PendingUtxoEntryReferenceInner { &self.inner } @@ -32,7 +32,7 @@ impl PendingUtxoEntryReference { } #[inline(always)] - pub fn utxo_context(&self) -> &UtxoContext { + pub fn utxo_context(&self) -> &UtxoContext { &self.inner().utxo_context } @@ -52,14 +52,14 @@ impl PendingUtxoEntryReference { } } -impl From<(&Arc, UtxoEntryReference)> for PendingUtxoEntryReference { +impl From<(&Arc, UtxoEntryReference)> for PendingUtxoEntryReference { fn from((account, entry): (&Arc, UtxoEntryReference)) -> Self { Self::new(entry, (*account.utxo_context()).clone()) } } -impl From for UtxoEntryReference { - fn from(pending: PendingUtxoEntryReference) -> Self { +impl From> for UtxoEntryReference { + fn from(pending: PendingUtxoEntryReference) -> Self { pending.inner().entry.clone() } } diff --git a/wallet/core/src/utxo/processor.rs b/wallet/core/src/utxo/processor.rs index e788272f2..9039fe821 100644 --- a/wallet/core/src/utxo/processor.rs +++ b/wallet/core/src/utxo/processor.rs @@ -33,40 +33,41 @@ use kaspa_rpc_core::{ notify::connection::{ChannelConnection, ChannelType}, Notification, }; +use kaspa_rpc_core::api::rpc::RpcApi; // use workflow_core::task; // use kaspa_metrics_core::{Metrics,Metric}; -pub struct Inner { +pub struct Inner { /// Coinbase UTXOs in stasis - stasis: DashMap, + stasis: DashMap>, /// UTXOs pending maturity - pending: DashMap, + pending: DashMap>, /// Outgoing Transactions - outgoing: DashMap, + outgoing: DashMap>, /// Address to UtxoContext map (maps all addresses used by /// all UtxoContexts to their respective UtxoContexts) - address_to_utxo_context_map: DashMap, UtxoContext>, + address_to_utxo_context_map: DashMap, UtxoContext>, // --- current_daa_score: Arc, network_id: Arc>>, - rpc: Mutex>, + rpc: Mutex>>, is_connected: AtomicBool, listener_id: Mutex>, task_ctl: DuplexChannel, task_is_running: AtomicBool, notification_channel: Channel, - sync_proc: SyncMonitor, + sync_proc: SyncMonitor, multiplexer: Multiplexer>, wallet_bus: Option>, notification_guard: AsyncMutex<()>, connect_disconnect_guard: AsyncMutex<()>, - metrics: Arc, + metrics: Arc>, metrics_kinds: Mutex>, } -impl Inner { +impl Inner { pub fn new( - rpc: Option, + rpc: Option, network_id: Option, multiplexer: Multiplexer>, wallet_bus: Option>, @@ -96,13 +97,13 @@ impl Inner { } #[derive(Clone)] -pub struct UtxoProcessor { - inner: Arc, +pub struct UtxoProcessor { + inner: Arc>, } -impl UtxoProcessor { +impl UtxoProcessor { pub fn new( - rpc: Option, + rpc: Option>, network_id: Option, multiplexer: Option>>, wallet_bus: Option>, @@ -111,11 +112,11 @@ impl UtxoProcessor { UtxoProcessor { inner: Arc::new(Inner::new(rpc, network_id, multiplexer, wallet_bus)) } } - pub fn rpc_api(&self) -> Arc { + pub fn rpc_api(&self) -> Arc { self.inner.rpc.lock().unwrap().as_ref().expect("UtxoProcessor RPC not initialized").rpc_api().clone() } - pub fn try_rpc_api(&self) -> Option> { + pub fn try_rpc_api(&self) -> Option> { self.inner.rpc.lock().unwrap().as_ref().map(|rpc| rpc.rpc_api()).cloned() } @@ -135,7 +136,7 @@ impl UtxoProcessor { self.rpc_api().clone().downcast_arc::().ok() } - pub async fn bind_rpc(&self, rpc: Option) -> Result<()> { + pub async fn bind_rpc(&self, rpc: Option) -> Result<()> { self.inner.rpc.lock().unwrap().clone_from(&rpc); let rpc_api = rpc.as_ref().map(|rpc| rpc.rpc_api().clone()); self.metrics().bind_rpc(rpc_api); @@ -143,7 +144,7 @@ impl UtxoProcessor { Ok(()) } - pub fn metrics(&self) -> &Arc { + pub fn metrics(&self) -> &Arc> { &self.inner.metrics } @@ -163,7 +164,7 @@ impl UtxoProcessor { self.inner.notification_guard.lock().await } - pub fn sync_proc(&self) -> &SyncMonitor { + pub fn sync_proc(&self) -> &SyncMonitor { &self.inner.sync_proc } @@ -184,7 +185,7 @@ impl UtxoProcessor { Ok(network_id.into()) } - pub fn pending(&self) -> &DashMap { + pub fn pending(&self) -> &DashMap> { &self.inner.pending } @@ -192,7 +193,7 @@ impl UtxoProcessor { &self.inner.outgoing } - pub fn stasis(&self) -> &DashMap { + pub fn stasis(&self) -> &DashMap> { &self.inner.stasis } @@ -200,15 +201,15 @@ impl UtxoProcessor { self.is_connected().then_some(self.inner.current_daa_score.load(Ordering::SeqCst)) } - pub fn address_to_utxo_context_map(&self) -> &DashMap, UtxoContext> { + pub fn address_to_utxo_context_map(&self) -> &DashMap, UtxoContext> { &self.inner.address_to_utxo_context_map } - pub fn address_to_utxo_context(&self, address: &Address) -> Option { + pub fn address_to_utxo_context(&self, address: &Address) -> Option> { self.inner.address_to_utxo_context_map.get(address).map(|v| v.clone()) } - pub async fn register_addresses(&self, addresses: Vec>, utxo_context: &UtxoContext) -> Result<()> { + pub async fn register_addresses(&self, addresses: Vec>, utxo_context: &UtxoContext) -> Result<()> { addresses.iter().for_each(|address| { self.inner.address_to_utxo_context_map.insert(address.clone(), utxo_context.clone()); }); @@ -714,7 +715,7 @@ impl UtxoProcessor { pub(crate) mod mock { use super::*; - impl UtxoProcessor { + impl UtxoProcessor { pub fn mock_set_connected(&self, connected: bool) { self.inner.is_connected.store(connected, Ordering::SeqCst); } diff --git a/wallet/core/src/utxo/scan.rs b/wallet/core/src/utxo/scan.rs index f01257c96..aab784cf0 100644 --- a/wallet/core/src/utxo/scan.rs +++ b/wallet/core/src/utxo/scan.rs @@ -21,30 +21,30 @@ pub enum ScanExtent { Depth(u32), } -enum Provider { - AddressManager(Arc), +enum Provider { + AddressManager(Arc>), AddressSet(HashSet
), } -pub struct Scan { - provider: Provider, +pub struct Scan { + provider: Provider, window_size: Option, extent: Option, balance: Arc, current_daa_score: u64, } -impl Scan { +impl Scan { pub fn new_with_address_manager( - address_manager: Arc, + address_manager: Arc>, balance: &Arc, current_daa_score: u64, window_size: Option, extent: Option, - ) -> Scan { + ) -> Scan { Scan { provider: Provider::AddressManager(address_manager), window_size, extent, balance: balance.clone(), current_daa_score } } - pub fn new_with_address_set(addresses: HashSet
, balance: &Arc, current_daa_score: u64) -> Scan { + pub fn new_with_address_set(addresses: HashSet
, balance: &Arc, current_daa_score: u64) -> Scan { Scan { provider: Provider::AddressSet(addresses), window_size: None, @@ -54,7 +54,7 @@ impl Scan { } } - pub async fn scan(&self, utxo_context: &UtxoContext) -> Result<()> { + pub async fn scan(&self, utxo_context: &UtxoContext) -> Result<()> { match &self.provider { Provider::AddressManager(address_manager) => self.scan_with_address_manager(address_manager, utxo_context).await, Provider::AddressSet(addresses) => self.scan_with_address_set(addresses, utxo_context).await, diff --git a/wallet/core/src/utxo/stream.rs b/wallet/core/src/utxo/stream.rs index cd7e6c54e..e5c8b4a08 100644 --- a/wallet/core/src/utxo/stream.rs +++ b/wallet/core/src/utxo/stream.rs @@ -5,13 +5,13 @@ use super::{UtxoContext, UtxoEntryReference}; use crate::imports::*; -pub struct UtxoStream { - utxo_context: UtxoContext, +pub struct UtxoStream { + utxo_context: UtxoContext, cursor: usize, } -impl UtxoStream { - pub fn new(utxo_context: &UtxoContext) -> Self { +impl UtxoStream { + pub fn new(utxo_context: &UtxoContext) -> Self { Self { utxo_context: utxo_context.clone(), cursor: 0 } } } diff --git a/wallet/core/src/utxo/sync.rs b/wallet/core/src/utxo/sync.rs index e9eece4f0..46342519d 100644 --- a/wallet/core/src/utxo/sync.rs +++ b/wallet/core/src/utxo/sync.rs @@ -8,9 +8,11 @@ use crate::result::Result; use futures::pin_mut; use futures::stream::StreamExt; use regex::Regex; -struct Inner { +use kaspa_rpc_core::api::rpc::RpcApi; + +struct Inner { task_ctl: DuplexChannel, - rpc: Mutex>, + rpc: Mutex>>, multiplexer: Multiplexer>, running: AtomicBool, is_synced: AtomicBool, @@ -18,12 +20,12 @@ struct Inner { } #[derive(Clone)] -pub struct SyncMonitor { - inner: Arc, +pub struct SyncMonitor { + inner: Arc>, } -impl SyncMonitor { - pub fn new(rpc: Option, multiplexer: &Multiplexer>) -> Self { +impl SyncMonitor { + pub fn new(rpc: Option>, multiplexer: &Multiplexer>) -> Self { Self { inner: Arc::new(Inner { rpc: Mutex::new(rpc.clone()), @@ -76,7 +78,7 @@ impl SyncMonitor { Ok(()) } - pub fn rpc_api(&self) -> Arc { + pub fn rpc_api(&self) -> Arc { self.inner.rpc.lock().unwrap().as_ref().expect("SyncMonitor RPC not initialized").rpc_api().clone() } diff --git a/wallet/core/src/wallet/mod.rs b/wallet/core/src/wallet/mod.rs index d20f041ad..7d2230e8d 100644 --- a/wallet/core/src/wallet/mod.rs +++ b/wallet/core/src/wallet/mod.rs @@ -23,6 +23,7 @@ use kaspa_notify::{ }; use kaspa_wrpc_client::{KaspaRpcClient, Resolver, WrpcEncoding}; use workflow_core::task::spawn; +use kaspa_rpc_core::api::rpc::RpcApi; #[derive(Debug)] pub struct EncryptedMnemonic> { @@ -77,7 +78,7 @@ pub enum WalletBusMessage { Discovery { record: TransactionRecord }, } -pub struct Inner { +pub struct Inner { active_accounts: ActiveAccountMap, legacy_accounts: ActiveAccountMap, listener_id: Mutex>, @@ -85,7 +86,7 @@ pub struct Inner { selected_account: Mutex>>, store: Arc, settings: SettingsStore, - utxo_processor: Arc, + utxo_processor: Arc>, multiplexer: Multiplexer>, wallet_bus: Channel, estimation_abortables: Mutex>, @@ -100,11 +101,11 @@ pub struct Inner { /// @category Wallet API /// #[derive(Clone)] -pub struct Wallet { - inner: Arc, +pub struct Wallet{ + inner: Arc>, } -impl Wallet { +impl Wallet { pub fn local_store() -> Result> { Ok(Arc::new(LocalStore::try_new(false)?)) } @@ -130,8 +131,7 @@ impl Wallet { // )?); let rpc_ctl = rpc_client.ctl().clone(); - let rpc_api: Arc = rpc_client; - let rpc = Rpc::new(rpc_api, rpc_ctl); + let rpc = Rpc::new(rpc_client, rpc_ctl); Self::try_with_rpc(Some(rpc), store, network_id) } @@ -462,11 +462,11 @@ impl Wallet { self.try_rpc_api().and_then(|api| api.clone().downcast_arc::().ok()) } - pub fn rpc_api(&self) -> Arc { + pub fn rpc_api(&self) -> Arc { self.utxo_processor().rpc_api() } - pub fn try_rpc_api(&self) -> Option> { + pub fn try_rpc_api(&self) -> Option> { self.utxo_processor().try_rpc_api() } diff --git a/wallet/core/src/wasm/tx/generator/generator.rs b/wallet/core/src/wasm/tx/generator/generator.rs index 5303b4d3e..ee2afacd1 100644 --- a/wallet/core/src/wasm/tx/generator/generator.rs +++ b/wallet/core/src/wasm/tx/generator/generator.rs @@ -135,7 +135,7 @@ extern "C" { /// @category Wallet SDK #[wasm_bindgen] pub struct Generator { - inner: Arc, + inner: Arc, // todo } #[wasm_bindgen] diff --git a/wallet/core/src/wasm/tx/generator/pending.rs b/wallet/core/src/wasm/tx/generator/pending.rs index dfa84c9bd..c285bf39b 100644 --- a/wallet/core/src/wasm/tx/generator/pending.rs +++ b/wallet/core/src/wasm/tx/generator/pending.rs @@ -94,7 +94,7 @@ impl PendingTransaction { /// a failed submission. /// @see {@link RpcClient.submitTransaction} pub async fn submit(&self, wasm_rpc_client: &RpcClient) -> Result { - let rpc: Arc = wasm_rpc_client.client().clone(); + let rpc = wasm_rpc_client.client().clone(); let txid = self.inner.try_submit(&rpc).await?; Ok(txid.to_string()) } diff --git a/wallet/core/src/wasm/utxo/processor.rs b/wallet/core/src/wasm/utxo/processor.rs index 0e41d8f77..4d82c2479 100644 --- a/wallet/core/src/wasm/utxo/processor.rs +++ b/wallet/core/src/wasm/utxo/processor.rs @@ -105,7 +105,7 @@ impl UtxoProcessor { #[wasm_bindgen(constructor)] pub fn ctor(js_value: IUtxoProcessorArgs) -> Result { let UtxoProcessorCreateArgs { rpc, network_id } = js_value.try_into()?; - let rpc_api: Arc = rpc.client().clone(); + let rpc_api = rpc.client().clone(); let rpc_ctl = rpc.client().rpc_ctl().clone(); let rpc_binding = Rpc::new(rpc_api, rpc_ctl); let processor = native::UtxoProcessor::new(Some(rpc_binding), Some(network_id), None, None); diff --git a/wallet/core/src/wasm/wallet/wallet.rs b/wallet/core/src/wasm/wallet/wallet.rs index bd91bedf2..abb4eab26 100644 --- a/wallet/core/src/wasm/wallet/wallet.rs +++ b/wallet/core/src/wasm/wallet/wallet.rs @@ -151,7 +151,7 @@ impl Wallet { let rpc_config = RpcConfig { url, resolver, encoding, network_id }; let rpc = RpcClient::new(Some(rpc_config))?; - let rpc_api: Arc = rpc.client().rpc_api().clone(); + let rpc_api = rpc.client().rpc_api().clone(); let rpc_ctl = rpc.client().rpc_ctl().clone(); let rpc_binding = Rpc::new(rpc_api, rpc_ctl); let wallet = Arc::new(native::Wallet::try_with_rpc(Some(rpc_binding), store, network_id)?);