Skip to content

Commit

Permalink
Resolver for Python wRPC client prototype (kaspanet#70)
Browse files Browse the repository at this point in the history
* wRPC minor cleanup

* resolver scaffolding

* wRPC client failing to init due to resolver network issue

* wRPC client constructor accept Resolver, network id

* network null when using url

* connect options and resolver methods

* lints

* rpc.py example tn11

* set network id on client

* bubble errors where possible

* lint
  • Loading branch information
smartgoo committed Sep 17, 2024
1 parent 08f43d8 commit 289c0a3
Show file tree
Hide file tree
Showing 5 changed files with 209 additions and 59 deletions.
25 changes: 13 additions & 12 deletions python/examples/rpc.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,15 +3,15 @@
import time
import os

from kaspa import RpcClient
from kaspa import RpcClient, Resolver


def subscription_callback(event, name, **kwargs):
print(f'{name} | {event}')
print(f"{name} | {event}")

async def rpc_subscriptions(client):
# client.add_event_listener('all', subscription_callback, callback_id=1, kwarg1='Im a kwarg!!')
client.add_event_listener('all', subscription_callback, name="all")
# client.add_event_listener("all", subscription_callback, callback_id=1, kwarg1="Im a kwarg!!")
client.add_event_listener("all", subscription_callback, name="all")

await client.subscribe_virtual_daa_score_changed()
await client.subscribe_virtual_chain_changed(True)
Expand All @@ -20,8 +20,8 @@ async def rpc_subscriptions(client):

await asyncio.sleep(5)

client.remove_event_listener('all')
print('Removed all event listeners. Sleeping for 5 seconds before unsubscribing. Should see nothing print.')
client.remove_event_listener("all")
print("Removed all event listeners. Sleeping for 5 seconds before unsubscribing. Should see nothing print.")

await asyncio.sleep(5)

Expand All @@ -38,20 +38,21 @@ async def rpc_calls(client):
block_dag_info_response = await client.get_block_dag_info()
print(block_dag_info_response)

tip_hash = block_dag_info_response['tipHashes'][0]
get_block_request = {'hash': tip_hash, 'includeTransactions': True}
tip_hash = block_dag_info_response["tipHashes"][0]
get_block_request = {"hash": tip_hash, "includeTransactions": True}
get_block_response = await client.get_block_call(get_block_request)
print(get_block_response)

get_balances_by_addresses_request = {'addresses': ['kaspa:qqxn4k5dchwk3m207cmh9ewagzlwwvfesngkc8l90tj44mufcgmujpav8hakt', 'kaspa:qr5ekyld6j4zn0ngennj9nx5gpt3254fzs77ygh6zzkvyy8scmp97de4ln8v5']}
get_balances_by_addresses_request = {"addresses": ["kaspa:qqxn4k5dchwk3m207cmh9ewagzlwwvfesngkc8l90tj44mufcgmujpav8hakt", "kaspa:qr5ekyld6j4zn0ngennj9nx5gpt3254fzs77ygh6zzkvyy8scmp97de4ln8v5"]}
get_balances_by_addresses_response = await client.get_balances_by_addresses_call(get_balances_by_addresses_request)
print(get_balances_by_addresses_response)

async def main():
rpc_host = os.environ.get("KASPA_RPC_HOST")
client = RpcClient(url = f"ws://{rpc_host}:17210")
# rpc_host = os.environ.get("KASPA_RPC_HOST")
# client = RpcClient(url=f"ws://{rpc_host}:17210")
client = RpcClient(resolver=Resolver(), network="testnet", network_suffix=11)
await client.connect()
print(f'Client is connected: {client.is_connected()}')
print(f"Client is connected: {client.is_connected()}")

await rpc_calls(client)
await rpc_subscriptions(client)
Expand Down
1 change: 1 addition & 0 deletions python/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ cfg_if::cfg_if! {
m.add_class::<kaspa_wallet_keys::publickey::PublicKey>()?;

m.add_class::<kaspa_wrpc_python::client::RpcClient>()?;
m.add_class::<kaspa_wrpc_python::resolver::Resolver>()?;

Ok(())
}
Expand Down
160 changes: 113 additions & 47 deletions rpc/wrpc/python/src/client.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
use crate::resolver::{into_network_id, Resolver};
use ahash::AHashMap;
use futures::*;
use kaspa_addresses::Address;
Expand All @@ -10,13 +11,7 @@ use kaspa_rpc_core::api::rpc::RpcApi;
use kaspa_rpc_core::model::*;
use kaspa_rpc_core::notify::connection::ChannelConnection;
use kaspa_rpc_macros::{build_wrpc_python_interface, build_wrpc_python_subscriptions};
use kaspa_wrpc_client::{
client::{ConnectOptions, ConnectStrategy},
error::Error,
prelude::*,
result::Result,
KaspaRpcClient, WrpcEncoding,
};
use kaspa_wrpc_client::{client::ConnectOptions, error::Error, prelude::*, result::Result, KaspaRpcClient, WrpcEncoding};
use pyo3::{
exceptions::PyException,
prelude::*,
Expand Down Expand Up @@ -79,22 +74,21 @@ impl PyCallback {
}

fn execute(&self, py: Python, event: Bound<PyDict>) -> PyResult<PyObject> {
let args = self.append_to_args(py, event).unwrap();
let args = self.append_to_args(py, event)?;
let kwargs = self.kwargs.as_ref().map(|kw| kw.bind(py));

let result = self
.callback
.call_bound(py, args.bind(py), kwargs)
.map_err(|e| pyo3::exceptions::PyException::new_err(format!("Error while executing RPC notification callback: {}", e)))
.unwrap();
.map_err(|e| pyo3::exceptions::PyException::new_err(format!("Error while executing RPC notification callback: {}", e)))?;

Ok(result)
}
}

pub struct Inner {
client: Arc<KaspaRpcClient>,
// resolver TODO
resolver: Option<Resolver>,
notification_task: Arc<AtomicBool>,
notification_ctl: DuplexChannel,
callbacks: Arc<Mutex<AHashMap<NotificationEvent, Vec<PyCallback>>>>,
Expand Down Expand Up @@ -126,14 +120,24 @@ pub struct RpcClient {
}

impl RpcClient {
fn new(url: Option<String>, encoding: Option<WrpcEncoding>) -> Result<RpcClient> {
let encoding = encoding.unwrap_or(WrpcEncoding::Borsh);

let client = Arc::new(KaspaRpcClient::new(encoding, url.as_deref(), None, None, None).unwrap());
pub fn new(
resolver: Option<Resolver>,
url: Option<String>,
encoding: WrpcEncoding,
network_id: Option<NetworkId>,
) -> Result<RpcClient> {
let client = Arc::new(KaspaRpcClient::new(
encoding,
url.as_deref(),
Some(resolver.as_ref().unwrap().clone().into()),
network_id,
None,
)?);

let rpc_client = RpcClient {
inner: Arc::new(Inner {
client,
resolver,
notification_task: Arc::new(AtomicBool::new(false)),
notification_ctl: DuplexChannel::oneshot(),
callbacks: Arc::new(Default::default()),
Expand All @@ -149,16 +153,43 @@ impl RpcClient {
#[pymethods]
impl RpcClient {
#[new]
fn ctor(url: Option<String>) -> PyResult<RpcClient> {
fn ctor(
resolver: Option<Resolver>,
url: Option<String>,
encoding: Option<String>,
network: Option<String>,
network_suffix: Option<u32>,
) -> PyResult<RpcClient> {
// TODO expose args to Python similar to WASM wRPC Client IRpcConfig
let resolver = resolver.unwrap_or(Resolver::ctor(None)?);
let encoding = WrpcEncoding::from_str(encoding.unwrap_or(String::from("borsh")).as_str()).unwrap();
let network = network.unwrap_or(String::from("mainnet"));

// TODO find better way of accepting NetworkId type from Python
let network_id = into_network_id(&network, network_suffix)?;

Ok(Self::new(url, None)?)
Ok(Self::new(Some(resolver), url, encoding, Some(network_id))?)
}

fn url(&self) -> Option<String> {
self.inner.client.url()
}

fn resolver(&self) -> Option<Resolver> {
self.inner.resolver.clone()
}

fn set_resolver(&self, resolver: Resolver) -> PyResult<()> {
self.inner.client.set_resolver(resolver.into())?;
Ok(())
}

fn set_network_id(&self, network: String, network_suffix: Option<u32>) -> PyResult<()> {
let network_id = into_network_id(&network, network_suffix)?;
self.inner.client.set_network_id(&network_id)?;
Ok(())
}

fn is_connected(&self) -> bool {
self.inner.client.is_connected()
}
Expand All @@ -167,16 +198,40 @@ impl RpcClient {
self.inner.client.encoding().to_string()
}

fn connect(&self, py: Python) -> PyResult<Py<PyAny>> {
// TODO expose args to Python similar to WASM wRPC Client IConnectOptions
let options = ConnectOptions {
block_async_connect: true,
connect_timeout: Some(Duration::from_millis(5_000)),
strategy: ConnectStrategy::Fallback,
..Default::default()
fn resolver_node_id(&self) -> Option<String> {
self.inner.client.node_descriptor().map(|node| node.id.clone())
}

fn resolver_node_provider_name(&self) -> Option<String> {
self.inner.client.node_descriptor().and_then(|node| node.provider_name.clone())
}

fn resolver_node_provider_url(&self) -> Option<String> {
self.inner.client.node_descriptor().and_then(|node| node.provider_url.clone())
}

pub fn connect(
&self,
py: Python,
block_async_connect: Option<bool>,
strategy: Option<String>,
url: Option<String>,
connect_timeout: Option<u64>,
retry_interval: Option<u64>,
) -> PyResult<Py<PyAny>> {
// TODO expose args to Python similar to WASM wRPC Client IConnectOptions?

let block_async_connect = block_async_connect.unwrap_or(true);
let strategy = match strategy {
Some(strategy) => ConnectStrategy::from_str(&strategy).unwrap(),
None => ConnectStrategy::Retry,
};
let connect_timeout: Option<Duration> = connect_timeout.and_then(|ms| Some(Duration::from_millis(ms)));
let retry_interval: Option<Duration> = retry_interval.and_then(|ms| Some(Duration::from_millis(ms)));

self.start_notification_task(py).unwrap();
let options = ConnectOptions { block_async_connect, strategy, url, connect_timeout, retry_interval };

self.start_notification_task(py)?;

let client = self.inner.client.clone();
py_async! {py, async move {
Expand All @@ -195,25 +250,9 @@ impl RpcClient {
}}
}

fn get_server_info(&self, py: Python) -> PyResult<Py<PyAny>> {
let client = self.inner.client.clone();
py_async! {py, async move {
let response = client.get_server_info_call(GetServerInfoRequest { }).await?;
Python::with_gil(|py| {
Ok(serde_pyobject::to_pyobject(py, &response).unwrap().to_object(py))
})
}}
}

fn get_block_dag_info(&self, py: Python) -> PyResult<Py<PyAny>> {
let client = self.inner.client.clone();
py_async! {py, async move {
let response = client.get_block_dag_info_call(GetBlockDagInfoRequest { }).await?;
Python::with_gil(|py| {
Ok(serde_pyobject::to_pyobject(py, &response).unwrap().to_object(py))
})
}}
}
// fn start() TODO
// fn stop() TODO
// fn trigger_abort() TODO

#[pyo3(signature = (event, callback, *args, **kwargs))]
fn add_event_listener(
Expand All @@ -226,8 +265,8 @@ impl RpcClient {
) -> PyResult<()> {
let event = NotificationEvent::from_str(event.as_str()).unwrap();

let args = args.to_object(py).extract::<Py<PyTuple>>(py).unwrap();
let kwargs = kwargs.unwrap().to_object(py).extract::<Py<PyDict>>(py).unwrap();
let args = args.to_object(py).extract::<Py<PyTuple>>(py)?;
let kwargs = kwargs.unwrap().to_object(py).extract::<Py<PyDict>>(py)?;

let py_callback = PyCallback { callback, args: Some(args), kwargs: Some(kwargs) };

Expand Down Expand Up @@ -281,10 +320,14 @@ impl RpcClient {
}

impl RpcClient {
// fn new_with_rpc_client() TODO

pub fn listener_id(&self) -> Option<ListenerId> {
*self.inner.listener_id.lock().unwrap()
}

// fn client() TODO

async fn stop_notification_task(&self) -> Result<()> {
if self.inner.notification_task.load(Ordering::SeqCst) {
self.inner.notification_ctl.signal(()).await?;
Expand Down Expand Up @@ -338,7 +381,7 @@ impl RpcClient {
Python::with_gil(|py| {
let event = PyDict::new_bound(py);
event.set_item("type", ctl.to_string()).unwrap();
// objectdict.set_item("rpc", ).unwrap(); TODO
event.set_item("rpc", this.url()).unwrap();

handler.execute(py, event).unwrap();
});
Expand Down Expand Up @@ -411,6 +454,29 @@ impl RpcClient {
}
}

#[pymethods]
impl RpcClient {
fn get_server_info(&self, py: Python) -> PyResult<Py<PyAny>> {
let client = self.inner.client.clone();
py_async! {py, async move {
let response = client.get_server_info_call(GetServerInfoRequest { }).await?;
Python::with_gil(|py| {
Ok(serde_pyobject::to_pyobject(py, &response)?.to_object(py))
})
}}
}

fn get_block_dag_info(&self, py: Python) -> PyResult<Py<PyAny>> {
let client = self.inner.client.clone();
py_async! {py, async move {
let response = client.get_block_dag_info_call(GetBlockDagInfoRequest { }).await?;
Python::with_gil(|py| {
Ok(serde_pyobject::to_pyobject(py, &response)?.to_object(py))
})
}}
}
}

#[pymethods]
impl RpcClient {
fn subscribe_utxos_changed(&self, py: Python, addresses: Vec<Address>) -> PyResult<Py<PyAny>> {
Expand Down
1 change: 1 addition & 0 deletions rpc/wrpc/python/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,5 +3,6 @@ use cfg_if::cfg_if;
cfg_if! {
if #[cfg(feature = "py-sdk")] {
pub mod client;
pub mod resolver;
}
}
Loading

0 comments on commit 289c0a3

Please sign in to comment.