diff --git a/python/examples/rpc.py b/python/examples/rpc.py index a7fa8c3c6..3a0f85444 100644 --- a/python/examples/rpc.py +++ b/python/examples/rpc.py @@ -3,15 +3,15 @@ import time import os -from kaspa import RpcClient +from kaspa import RpcClient, Resolver def subscription_callback(event, name, **kwargs): - print(f'{name} | {event}') + print(f"{name} | {event}") async def rpc_subscriptions(client): - # client.add_event_listener('all', subscription_callback, callback_id=1, kwarg1='Im a kwarg!!') - client.add_event_listener('all', subscription_callback, name="all") + # client.add_event_listener("all", subscription_callback, callback_id=1, kwarg1="Im a kwarg!!") + client.add_event_listener("all", subscription_callback, name="all") await client.subscribe_virtual_daa_score_changed() await client.subscribe_virtual_chain_changed(True) @@ -20,8 +20,8 @@ async def rpc_subscriptions(client): await asyncio.sleep(5) - client.remove_event_listener('all') - print('Removed all event listeners. Sleeping for 5 seconds before unsubscribing. Should see nothing print.') + client.remove_event_listener("all") + print("Removed all event listeners. Sleeping for 5 seconds before unsubscribing. Should see nothing print.") await asyncio.sleep(5) @@ -38,20 +38,21 @@ async def rpc_calls(client): block_dag_info_response = await client.get_block_dag_info() print(block_dag_info_response) - tip_hash = block_dag_info_response['tipHashes'][0] - get_block_request = {'hash': tip_hash, 'includeTransactions': True} + tip_hash = block_dag_info_response["tipHashes"][0] + get_block_request = {"hash": tip_hash, "includeTransactions": True} get_block_response = await client.get_block_call(get_block_request) print(get_block_response) - get_balances_by_addresses_request = {'addresses': ['kaspa:qqxn4k5dchwk3m207cmh9ewagzlwwvfesngkc8l90tj44mufcgmujpav8hakt', 'kaspa:qr5ekyld6j4zn0ngennj9nx5gpt3254fzs77ygh6zzkvyy8scmp97de4ln8v5']} + get_balances_by_addresses_request = {"addresses": ["kaspa:qqxn4k5dchwk3m207cmh9ewagzlwwvfesngkc8l90tj44mufcgmujpav8hakt", "kaspa:qr5ekyld6j4zn0ngennj9nx5gpt3254fzs77ygh6zzkvyy8scmp97de4ln8v5"]} get_balances_by_addresses_response = await client.get_balances_by_addresses_call(get_balances_by_addresses_request) print(get_balances_by_addresses_response) async def main(): - rpc_host = os.environ.get("KASPA_RPC_HOST") - client = RpcClient(url = f"ws://{rpc_host}:17210") + # rpc_host = os.environ.get("KASPA_RPC_HOST") + # client = RpcClient(url=f"ws://{rpc_host}:17210") + client = RpcClient(resolver=Resolver(), network="testnet", network_suffix=11) await client.connect() - print(f'Client is connected: {client.is_connected()}') + print(f"Client is connected: {client.is_connected()}") await rpc_calls(client) await rpc_subscriptions(client) diff --git a/python/src/lib.rs b/python/src/lib.rs index ee8ce0642..e742c2b05 100644 --- a/python/src/lib.rs +++ b/python/src/lib.rs @@ -11,6 +11,7 @@ cfg_if::cfg_if! { m.add_class::()?; m.add_class::()?; + m.add_class::()?; Ok(()) } diff --git a/rpc/wrpc/python/src/client.rs b/rpc/wrpc/python/src/client.rs index a48cfb927..a7a527fb2 100644 --- a/rpc/wrpc/python/src/client.rs +++ b/rpc/wrpc/python/src/client.rs @@ -1,3 +1,4 @@ +use crate::resolver::{into_network_id, Resolver}; use ahash::AHashMap; use futures::*; use kaspa_addresses::Address; @@ -10,13 +11,7 @@ use kaspa_rpc_core::api::rpc::RpcApi; use kaspa_rpc_core::model::*; use kaspa_rpc_core::notify::connection::ChannelConnection; use kaspa_rpc_macros::{build_wrpc_python_interface, build_wrpc_python_subscriptions}; -use kaspa_wrpc_client::{ - client::{ConnectOptions, ConnectStrategy}, - error::Error, - prelude::*, - result::Result, - KaspaRpcClient, WrpcEncoding, -}; +use kaspa_wrpc_client::{client::ConnectOptions, error::Error, prelude::*, result::Result, KaspaRpcClient, WrpcEncoding}; use pyo3::{ exceptions::PyException, prelude::*, @@ -79,14 +74,13 @@ impl PyCallback { } fn execute(&self, py: Python, event: Bound) -> PyResult { - let args = self.append_to_args(py, event).unwrap(); + let args = self.append_to_args(py, event)?; let kwargs = self.kwargs.as_ref().map(|kw| kw.bind(py)); let result = self .callback .call_bound(py, args.bind(py), kwargs) - .map_err(|e| pyo3::exceptions::PyException::new_err(format!("Error while executing RPC notification callback: {}", e))) - .unwrap(); + .map_err(|e| pyo3::exceptions::PyException::new_err(format!("Error while executing RPC notification callback: {}", e)))?; Ok(result) } @@ -94,7 +88,7 @@ impl PyCallback { pub struct Inner { client: Arc, - // resolver TODO + resolver: Option, notification_task: Arc, notification_ctl: DuplexChannel, callbacks: Arc>>>, @@ -126,14 +120,24 @@ pub struct RpcClient { } impl RpcClient { - fn new(url: Option, encoding: Option) -> Result { - let encoding = encoding.unwrap_or(WrpcEncoding::Borsh); - - let client = Arc::new(KaspaRpcClient::new(encoding, url.as_deref(), None, None, None).unwrap()); + pub fn new( + resolver: Option, + url: Option, + encoding: WrpcEncoding, + network_id: Option, + ) -> Result { + let client = Arc::new(KaspaRpcClient::new( + encoding, + url.as_deref(), + Some(resolver.as_ref().unwrap().clone().into()), + network_id, + None, + )?); let rpc_client = RpcClient { inner: Arc::new(Inner { client, + resolver, notification_task: Arc::new(AtomicBool::new(false)), notification_ctl: DuplexChannel::oneshot(), callbacks: Arc::new(Default::default()), @@ -149,16 +153,43 @@ impl RpcClient { #[pymethods] impl RpcClient { #[new] - fn ctor(url: Option) -> PyResult { + fn ctor( + resolver: Option, + url: Option, + encoding: Option, + network: Option, + network_suffix: Option, + ) -> PyResult { // TODO expose args to Python similar to WASM wRPC Client IRpcConfig + let resolver = resolver.unwrap_or(Resolver::ctor(None)?); + let encoding = WrpcEncoding::from_str(encoding.unwrap_or(String::from("borsh")).as_str()).unwrap(); + let network = network.unwrap_or(String::from("mainnet")); + + // TODO find better way of accepting NetworkId type from Python + let network_id = into_network_id(&network, network_suffix)?; - Ok(Self::new(url, None)?) + Ok(Self::new(Some(resolver), url, encoding, Some(network_id))?) } fn url(&self) -> Option { self.inner.client.url() } + fn resolver(&self) -> Option { + self.inner.resolver.clone() + } + + fn set_resolver(&self, resolver: Resolver) -> PyResult<()> { + self.inner.client.set_resolver(resolver.into())?; + Ok(()) + } + + fn set_network_id(&self, network: String, network_suffix: Option) -> PyResult<()> { + let network_id = into_network_id(&network, network_suffix)?; + self.inner.client.set_network_id(&network_id)?; + Ok(()) + } + fn is_connected(&self) -> bool { self.inner.client.is_connected() } @@ -167,16 +198,40 @@ impl RpcClient { self.inner.client.encoding().to_string() } - fn connect(&self, py: Python) -> PyResult> { - // TODO expose args to Python similar to WASM wRPC Client IConnectOptions - let options = ConnectOptions { - block_async_connect: true, - connect_timeout: Some(Duration::from_millis(5_000)), - strategy: ConnectStrategy::Fallback, - ..Default::default() + fn resolver_node_id(&self) -> Option { + self.inner.client.node_descriptor().map(|node| node.id.clone()) + } + + fn resolver_node_provider_name(&self) -> Option { + self.inner.client.node_descriptor().and_then(|node| node.provider_name.clone()) + } + + fn resolver_node_provider_url(&self) -> Option { + self.inner.client.node_descriptor().and_then(|node| node.provider_url.clone()) + } + + pub fn connect( + &self, + py: Python, + block_async_connect: Option, + strategy: Option, + url: Option, + connect_timeout: Option, + retry_interval: Option, + ) -> PyResult> { + // TODO expose args to Python similar to WASM wRPC Client IConnectOptions? + + let block_async_connect = block_async_connect.unwrap_or(true); + let strategy = match strategy { + Some(strategy) => ConnectStrategy::from_str(&strategy).unwrap(), + None => ConnectStrategy::Retry, }; + let connect_timeout: Option = connect_timeout.and_then(|ms| Some(Duration::from_millis(ms))); + let retry_interval: Option = retry_interval.and_then(|ms| Some(Duration::from_millis(ms))); - self.start_notification_task(py).unwrap(); + let options = ConnectOptions { block_async_connect, strategy, url, connect_timeout, retry_interval }; + + self.start_notification_task(py)?; let client = self.inner.client.clone(); py_async! {py, async move { @@ -195,25 +250,9 @@ impl RpcClient { }} } - fn get_server_info(&self, py: Python) -> PyResult> { - let client = self.inner.client.clone(); - py_async! {py, async move { - let response = client.get_server_info_call(GetServerInfoRequest { }).await?; - Python::with_gil(|py| { - Ok(serde_pyobject::to_pyobject(py, &response).unwrap().to_object(py)) - }) - }} - } - - fn get_block_dag_info(&self, py: Python) -> PyResult> { - let client = self.inner.client.clone(); - py_async! {py, async move { - let response = client.get_block_dag_info_call(GetBlockDagInfoRequest { }).await?; - Python::with_gil(|py| { - Ok(serde_pyobject::to_pyobject(py, &response).unwrap().to_object(py)) - }) - }} - } + // fn start() TODO + // fn stop() TODO + // fn trigger_abort() TODO #[pyo3(signature = (event, callback, *args, **kwargs))] fn add_event_listener( @@ -226,8 +265,8 @@ impl RpcClient { ) -> PyResult<()> { let event = NotificationEvent::from_str(event.as_str()).unwrap(); - let args = args.to_object(py).extract::>(py).unwrap(); - let kwargs = kwargs.unwrap().to_object(py).extract::>(py).unwrap(); + let args = args.to_object(py).extract::>(py)?; + let kwargs = kwargs.unwrap().to_object(py).extract::>(py)?; let py_callback = PyCallback { callback, args: Some(args), kwargs: Some(kwargs) }; @@ -281,10 +320,14 @@ impl RpcClient { } impl RpcClient { + // fn new_with_rpc_client() TODO + pub fn listener_id(&self) -> Option { *self.inner.listener_id.lock().unwrap() } + // fn client() TODO + async fn stop_notification_task(&self) -> Result<()> { if self.inner.notification_task.load(Ordering::SeqCst) { self.inner.notification_ctl.signal(()).await?; @@ -338,7 +381,7 @@ impl RpcClient { Python::with_gil(|py| { let event = PyDict::new_bound(py); event.set_item("type", ctl.to_string()).unwrap(); - // objectdict.set_item("rpc", ).unwrap(); TODO + event.set_item("rpc", this.url()).unwrap(); handler.execute(py, event).unwrap(); }); @@ -411,6 +454,29 @@ impl RpcClient { } } +#[pymethods] +impl RpcClient { + fn get_server_info(&self, py: Python) -> PyResult> { + let client = self.inner.client.clone(); + py_async! {py, async move { + let response = client.get_server_info_call(GetServerInfoRequest { }).await?; + Python::with_gil(|py| { + Ok(serde_pyobject::to_pyobject(py, &response)?.to_object(py)) + }) + }} + } + + fn get_block_dag_info(&self, py: Python) -> PyResult> { + let client = self.inner.client.clone(); + py_async! {py, async move { + let response = client.get_block_dag_info_call(GetBlockDagInfoRequest { }).await?; + Python::with_gil(|py| { + Ok(serde_pyobject::to_pyobject(py, &response)?.to_object(py)) + }) + }} + } +} + #[pymethods] impl RpcClient { fn subscribe_utxos_changed(&self, py: Python, addresses: Vec
) -> PyResult> { diff --git a/rpc/wrpc/python/src/lib.rs b/rpc/wrpc/python/src/lib.rs index 5c952e13e..b76dd4b88 100644 --- a/rpc/wrpc/python/src/lib.rs +++ b/rpc/wrpc/python/src/lib.rs @@ -3,5 +3,6 @@ use cfg_if::cfg_if; cfg_if! { if #[cfg(feature = "py-sdk")] { pub mod client; + pub mod resolver; } } diff --git a/rpc/wrpc/python/src/resolver.rs b/rpc/wrpc/python/src/resolver.rs new file mode 100644 index 000000000..fd8907611 --- /dev/null +++ b/rpc/wrpc/python/src/resolver.rs @@ -0,0 +1,81 @@ +use kaspa_consensus_core::network::{NetworkId, NetworkType}; +use kaspa_python_macros::py_async; +use kaspa_wrpc_client::{Resolver as NativeResolver, WrpcEncoding}; +use pyo3::exceptions::PyException; +use pyo3::prelude::*; +use std::{str::FromStr, sync::Arc}; + +#[derive(Debug, Clone)] +#[pyclass] +pub struct Resolver { + resolver: NativeResolver, +} + +impl Resolver { + pub fn new(resolver: NativeResolver) -> Self { + Self { resolver } + } +} + +#[pymethods] +impl Resolver { + #[new] + pub fn ctor(urls: Option>) -> PyResult { + if let Some(urls) = urls { + Ok(Self { resolver: NativeResolver::new(urls.into_iter().map(|url| Arc::new(url)).collect::>()) }) + } else { + Ok(Self { resolver: NativeResolver::default() }) + } + } +} + +#[pymethods] +impl Resolver { + fn urls(&self) -> Vec { + self.resolver.urls().into_iter().map(|url| String::clone(&url)).collect::>() + } + + fn get_node(&self, py: Python, encoding: String, network: String, network_suffix: Option) -> PyResult> { + let encoding = WrpcEncoding::from_str(encoding.as_str()).unwrap(); + + // TODO find better way of accepting NetworkId type from Python + let network_id = into_network_id(&network, network_suffix)?; + + let resolver = self.resolver.clone(); + py_async! {py, async move { + resolver.get_node(encoding, network_id).await?; + Ok(()) + }} + } + + fn get_url(&self, py: Python, encoding: String, network: String, network_suffix: Option) -> PyResult> { + let encoding = WrpcEncoding::from_str(encoding.as_str()).unwrap(); + + // TODO find better way of accepting NetworkId type from Python + let network_id = into_network_id(&network, network_suffix)?; + + let resolver = self.resolver.clone(); + py_async! {py, async move { + resolver.get_node(encoding, network_id).await?; + Ok(()) + }} + } + + // fn connect() TODO +} + +impl From for NativeResolver { + fn from(resolver: Resolver) -> Self { + resolver.resolver + } +} + +pub fn into_network_id(network: &str, network_suffix: Option) -> Result { + let network_type = NetworkType::from_str(network).map_err(|_| PyErr::new::("Invalid network type"))?; + NetworkId::try_from(network_type).or_else(|_| { + network_suffix.map_or_else( + || Err(PyErr::new::("Network suffix required for this network")), + |suffix| Ok(NetworkId::with_suffix(network_type, suffix)), + ) + }) +}