diff --git a/cln-rpc/src/lib.rs b/cln-rpc/src/lib.rs index 99dbf5cb757e..6d47b4c1366c 100644 --- a/cln-rpc/src/lib.rs +++ b/cln-rpc/src/lib.rs @@ -1,39 +1,39 @@ //! # A Core Lightning RPC-client -//! +//! //! Core Lightning exposes a JSON-RPC interface over unix-domain sockets. -//! The unix-domain socket appears like file and located by default in +//! The unix-domain socket appears like file and located by default in //! `~/.lightning//lightning-rpc`. //! //! This crate contains an RPC-client called [ClnRpc] and models //! for most [requests](crate::model::requests) and [responses](crate::model::responses). -//! +//! //! The example below shows how to initiate the client and celss the `getinfo`-rpc method. -//! +//! //! ```no_run //! use std::path::Path; //! use tokio_test; //! use cln_rpc::{ClnRpc, TypedRequest}; //! use cln_rpc::model::requests::GetinfoRequest; //! use cln_rpc::model::responses::GetinfoResponse; -//! +//! //! tokio_test::block_on( async { //! let path = Path::new("path_to_lightning_dir"); //! let mut rpc = ClnRpc::new(path).await.unwrap(); //! let request = GetinfoRequest {}; -//! let response : GetinfoResponse = rpc.call_typed(request).await.unwrap(); +//! let response : GetinfoResponse = rpc.call_typed(&request).await.unwrap(); //! }); //! ``` -//! -//! If the required model is not available you can implement [`TypedRequest`] +//! +//! If the required model is not available you can implement [`TypedRequest`] //! and use [`ClnRpc::call_typed`] without a problem. -//! +//! //! ```no_run //! use std::path::Path; //! use tokio_test; //! use cln_rpc::{ClnRpc, TypedRequest}; //! use serde::{Serialize, Deserialize}; //! -//! #[derive(Serialize, Debug)] +//! #[derive(Serialize, Debug)] //! struct CustomMethodRequest { //! param_a : String //! }; @@ -41,43 +41,43 @@ //! struct CustomMethodResponse { //! field_a : String //! }; -//! +//! //! impl TypedRequest for CustomMethodRequest { //! type Response = CustomMethodResponse; -//! +//! //! fn method(&self) -> &str { //! "custommethod" //! } //! } -//! +//! //! tokio_test::block_on( async { //! let path = Path::new("path_to_lightning_dir"); //! let mut rpc = ClnRpc::new(path).await.unwrap(); -//! +//! //! let request = CustomMethodRequest { param_a : String::from("example")}; -//! let response = rpc.call_typed(request).await.unwrap(); +//! let response = rpc.call_typed(&request).await.unwrap(); //! }) //! ``` -//! -//! An alternative is to use [`ClnRpc::call_raw`]. -//! +//! +//! An alternative is to use [`ClnRpc::call_raw`]. +//! //! ```no_run //! use std::path::Path; //! use tokio_test; //! use cln_rpc::{ClnRpc, TypedRequest}; -//! +//! //! tokio_test::block_on( async { //! let path = Path::new("path_to_lightning_dir"); //! let mut rpc = ClnRpc::new(path).await.unwrap(); //! let method = "custommethod"; //! let request = serde_json::json!({"param_a" : "example"}); -//! let response : serde_json::Value = rpc.call_raw(method, request).await.unwrap(); +//! let response : serde_json::Value = rpc.call_raw(method, &request).await.unwrap(); //! }) //! ``` -//! +//! use crate::codec::JsonCodec; pub use anyhow::Error; -use anyhow::Result; +use anyhow::Result; use core::fmt::Debug; use futures_util::sink::SinkExt; use futures_util::StreamExt; @@ -104,9 +104,9 @@ pub use crate::{ }; /// An RPC-client for Core Lightning -/// -/// -/// +/// +/// +/// pub struct ClnRpc { next_id: AtomicUsize, @@ -141,10 +141,10 @@ impl ClnRpc { /// /// An interesting choice of `R` and `P` is [`serde_json::Value`] because it allows /// ad-hoc calls to custom RPC-methods - /// - /// If you are using a model such as [`crate::model::requests::GetinfoRequest`] you'd - /// probably want to use [`Self::call_typed`] instead. - /// + /// + /// If you are using a model such as [`crate::model::requests::GetinfoRequest`] you'd + /// probably want to use [`Self::call_typed`] instead. + /// /// Example: /// ```no_run /// use cln_rpc::ClnRpc; @@ -156,15 +156,15 @@ impl ClnRpc { /// // Call using json-values /// let mut cln = ClnRpc::new(Path::new("./lightningd/rpc")).await.unwrap(); /// let request = serde_json::json!({}); - /// let response : serde_json::Value = cln.call_raw("getinfo", request).await.unwrap(); + /// let response : serde_json::Value = cln.call_raw("getinfo", &request).await.unwrap(); /// /// // Using a model /// // Prefer to use call_typed instead /// let request = GetinfoRequest {}; - /// let response : GetinfoResponse = cln.call_raw("getinfo", request.clone()).await.unwrap(); - /// }) + /// let response : GetinfoResponse = cln.call_raw("getinfo", &request).await.unwrap(); + /// }) /// ``` - pub async fn call_raw(&mut self, method: &str, params: P) -> Result + pub async fn call_raw(&mut self, method: &str, params: &P) -> Result where P: Serialize + Debug, R: DeserializeOwned + Debug, @@ -181,39 +181,25 @@ impl ClnRpc { "params" : params, }); - let mut response: serde_json::Value = self.call_raw_request(&req).await?; - trace!("Read response {:?}", response); + let response: serde_json::Value = self.call_raw_request(req).await?; - // Annotate the response with the method from the request, so - // serde_json knows which variant of [`Request`] should be - // used. - response["method"] = serde_json::Value::String(method.into()); - if let Some(_) = response.get("result") { - serde_json::from_value(response).map_err(|e| RpcError { - code: None, - message: format!("Malformed response from lightningd: {}", e), - data: None, - }) - } else if let Some(e) = response.get("error") { - let e: RpcError = serde_json::from_value(e.clone()).unwrap(); - Err(e) - } else { - Err(RpcError { - code: None, - message: format!("Malformed response from lightningd: {}", response), - data: None, - }) - } + serde_json::from_value(response).map_err(|e| RpcError { + code: None, + message: format!("Failed to parse response {:?}", e), + data: None, + }) } - /// A low level method to call raw reqeusts + /// A low level method to call raw requests /// /// This method is private by intention. /// The caller is (implicitly) providing the `id` of the JsonRpcRequest. /// This is dangerous because the caller might pick a non-unique id. /// - /// The request should serialize to a valid JsonRpcMessage and the response - /// should be able to deserialize any successful JsonRpcResponse. + /// The request should serialize to a valid JsonRpcMessage. + /// If the response is succesful the content of the "result" field is returned + /// If the response is an error the content of the "error" field is returned + /// /// ```no_run /// use std::path::Path; /// use cln_rpc::ClnRpc; @@ -231,20 +217,19 @@ impl ClnRpc { /// }) /// ``` /// - async fn call_raw_request(&mut self, request: &Req) -> Result - where - Req: Serialize + Debug, - Resp: DeserializeOwned, - { + async fn call_raw_request( + &mut self, + request: serde_json::Value, + ) -> Result +where { trace!("Sending request {:?}", request); - let request = serde_json::to_value(request).unwrap(); self.write.send(request).await.map_err(|e| RpcError { code: None, message: format!("Error passing request to lightningd: {}", e), data: None, })?; - let response = self + let mut response: serde_json::Value = self .read .next() .await @@ -259,11 +244,27 @@ impl ClnRpc { data: None, })?; - serde_json::from_value(response).map_err(|_| RpcError { - code: None, - message: "Failed to parse response".to_string(), - data: None, - }) + match response.get("result") { + Some(_) => Ok(response["result"].take()), + None => { + let _ = response.get("error").ok_or( + RpcError { + code : None, + message : "Invalid response from lightningd. Neither `result` or `error` field is present".to_string(), + data : None + })?; + let rpc_error: RpcError = serde_json::from_value(response["error"].take()) + .map_err(|e| RpcError { + code: None, + message: format!( + "Invalid response from lightningd. Failed to parse `error`. {:?}", + e + ), + data: None, + })?; + Err(rpc_error) + } + } } pub async fn call(&mut self, req: Request) -> Result { @@ -273,32 +274,32 @@ impl ClnRpc { /// Performs an rpc-call pub async fn call_enum(&mut self, req: Request) -> Result { trace!("call : Serialize and deserialize request {:?}", req); - // Construct the full JsonRpcRequest - let id = self.next_id.fetch_add(1, Ordering::SeqCst); - let mut value = serde_json::to_value(req).map_err(|e| RpcError { - code: None, - message: format!("Failed to serialize request: {}", e), - data: None, - })?; - value["jsonrpc"] = "2.0".into(); - value["id"] = id.into(); - let method = value["method"].clone(); + // A little bit hacky. But serialize the request to get the method name + let mut ser = serde_json::to_value(&req).unwrap(); + let method: String = if let serde_json::Value::String(method) = ser["method"].take() { + method + } else { + panic!("Method should be string") + }; + let params: serde_json::Value = ser["params"].take(); - // - let mut response: serde_json::Value = self.call_raw_request(&value).await?; + let response: serde_json::Value = self.call_raw(&method, ¶ms).await?; + let response = serde_json::json!({ + "method" : method, + "result" : response + }); // Parse the response // We add the `method` here because the Response-enum uses it to determine the type - response["method"] = method; serde_json::from_value(response).map_err(|e| RpcError { code: None, - message: format!("Failed to deserializer response : {}", e), + message: format!("Failed to deserialize response : {}", e), data: None, }) } /// Performs an rpc-call and performs type-checking. - /// + /// /// ```no_run /// use cln_rpc::ClnRpc; /// use cln_rpc::model::requests::GetinfoRequest; @@ -307,16 +308,16 @@ impl ClnRpc { /// tokio_test::block_on( async { /// let mut rpc = ClnRpc::new(Path::new("path_to_rpc")).await.unwrap(); /// let request = GetinfoRequest {}; - /// let response = rpc.call_typed(request); + /// let response = rpc.call_typed(&request); /// }) /// ``` - pub async fn call_typed(&mut self, request: R) -> Result + pub async fn call_typed(&mut self, request: &R) -> Result where R: TypedRequest + Serialize + std::fmt::Debug, R::Response: DeserializeOwned + std::fmt::Debug, { let method = request.method(); - self.call_raw(method, &request).await? + self.call_raw::(method, request).await } } @@ -332,12 +333,20 @@ where mod test { use super::*; use crate::model::*; + use crate::primitives::PublicKey; use futures_util::StreamExt; use serde_json::json; + use std::str::FromStr; use tokio_util::codec::{Framed, FramedRead}; #[tokio::test] async fn call_raw_request() { + // Set up a pair of unix-streams + // The frame is a mock rpc-server + let (uds1, uds2) = UnixStream::pair().unwrap(); + let mut cln = ClnRpc::from_stream(uds1).unwrap(); + let mut frame = Framed::new(uds2, JsonCodec::default()); + // Define the request and response send in the RPC-message let rpc_request = serde_json::json!({ "id" : 1, @@ -352,28 +361,21 @@ mod test { "id" : "1", "result" : {"field_6" : 6} }); - let rpc_response2 = rpc_response.clone(); - - // Set up a pair of unix-streams - // The ClnRpc will read and write from usd1 - // Im our test will read and write from usd2 and emulate Core Lightning behavior - let (uds1, uds2) = UnixStream::pair().unwrap(); - let mut cln = ClnRpc::from_stream(uds1).unwrap(); - - // Open the test dummy reader - let mut frame = Framed::new(uds2, JsonCodec::default()); // Spawn the task that performs the RPC-call - tokio::task::spawn(async move { - let returned: serde_json::Value = cln.call_raw_request(&rpc_request2).await.unwrap(); - assert_eq!(&returned, &rpc_response2) - }); + // Check that it reads the response correctly + let handle = tokio::task::spawn(async move { cln.call_raw_request(rpc_request2).await }); // Verify that our emulated server received a request + // and sendt the response let read_req = dbg!(frame.next().await.unwrap().unwrap()); assert_eq!(&rpc_request, &read_req); - frame.send(rpc_response).await.unwrap(); + + // Get the result from `call_raw_request` and verify + let actual_response: Result = handle.await.unwrap(); + let actual_response = actual_response.unwrap(); + assert_eq!(actual_response, json!({"field_6" : 6})); } #[tokio::test] @@ -384,7 +386,7 @@ mod test { let mut read = FramedRead::new(uds2, JsonCodec::default()); tokio::task::spawn(async move { - let _: serde_json::Value = cln.call_raw("getinfo", req).await.unwrap(); + let _: serde_json::Value = cln.call_raw("getinfo", &req).await.unwrap(); }); let read_req = dbg!(read.next().await.unwrap().unwrap()); @@ -396,40 +398,159 @@ mod test { } #[tokio::test] - async fn test_call() { - let req = Request::Getinfo(requests::GetinfoRequest {}); + async fn test_call_enum_remote_error() { + // Set up the rpc-connection + // The frame represents a Mock rpc-server let (uds1, uds2) = UnixStream::pair().unwrap(); let mut cln = ClnRpc::from_stream(uds1).unwrap(); + let mut frame = Framed::new(uds2, JsonCodec::default()); - let mut read = FramedRead::new(uds2, JsonCodec::default()); - tokio::task::spawn(async move { - cln.call(req).await.unwrap(); + // Construct the request and response + let req = Request::Ping(requests::PingRequest { + id: PublicKey::from_str( + "0364aeb75519be29d1af7b8cc6232dbda9fdabb79b66e4e1f6a223750954db210b", + ) + .unwrap(), + len: None, + pongbytes: None, }); - let read_req = dbg!(read.next().await.unwrap().unwrap()); + let mock_resp = json!({ + "id" : 1, + "jsonrpc" : "2.0", + "error" : { + "code" : 666, + "message" : "MOCK_ERROR" + } + }); - assert_eq!( - json!({"id": 1, "method": "getinfo", "params": {}, "jsonrpc": "2.0"}), - read_req - ); + // Spawn the task which calls the rpc + let handle = tokio::task::spawn(async move { cln.call(req).await }); + + // Ensure the mock receives the request and returns a response + let _ = dbg!(frame.next().await.unwrap().unwrap()); + frame.send(mock_resp).await.unwrap(); + + let rpc_response: Result<_, RpcError> = handle.await.unwrap(); + let rpc_error: RpcError = rpc_response.unwrap_err(); + + println!("RPC_ERROR : {:?}", rpc_error); + assert_eq!(rpc_error.code.unwrap(), 666); + assert_eq!(rpc_error.message, "MOCK_ERROR"); } #[tokio::test] - async fn test_typed_call() { - let req = requests::GetinfoRequest {}; + async fn test_call_enum() { + // Set up the rpc-connection + // The frame represents a Mock rpc-server let (uds1, uds2) = UnixStream::pair().unwrap(); let mut cln = ClnRpc::from_stream(uds1).unwrap(); + let mut frame = Framed::new(uds2, JsonCodec::default()); - let mut read = FramedRead::new(uds2, JsonCodec::default()); - tokio::task::spawn(async move { - let _: responses::GetinfoResponse = cln.call_typed(req).await.unwrap(); + // We'll use the Ping request here because both the request + // and response have few arguments + let req = Request::Ping(requests::PingRequest { + id: PublicKey::from_str( + "0364aeb75519be29d1af7b8cc6232dbda9fdabb79b66e4e1f6a223750954db210b", + ) + .unwrap(), + len: None, + pongbytes: None, + }); + let mock_resp = json!({ + "id" : 1, + "jsonrpc" : "2.0", + "result" : { "totlen" : 123 } }); - let read_req = dbg!(read.next().await.unwrap().unwrap()); + // we create a task that sends the response and returns the response + let handle = tokio::task::spawn(async move { cln.call(req).await }); + // Ensure our mock receives the request and sends the response + let read_req = dbg!(frame.next().await.unwrap().unwrap()); assert_eq!( - json!({"id": 1, "method": "getinfo", "params": {}, "jsonrpc": "2.0"}), - read_req + read_req, + json!({"id" : 1, "jsonrpc" : "2.0", "method" : "ping", "params" : {"id" : "0364aeb75519be29d1af7b8cc6232dbda9fdabb79b66e4e1f6a223750954db210b"}}) ); + frame.send(mock_resp).await.unwrap(); + + // Verify that the error response is correct + let rpc_response: Result<_, RpcError> = handle.await.unwrap(); + match rpc_response.unwrap() { + Response::Ping(ping) => { + assert_eq!(ping.totlen, 123); + } + _ => panic!("A Request::Getinfo should return Response::Getinfo"), + } + } + + #[tokio::test] + async fn test_call_typed() { + // Set up the rpc-connection + // The frame represents a Mock rpc-server + let (uds1, uds2) = UnixStream::pair().unwrap(); + let mut cln = ClnRpc::from_stream(uds1).unwrap(); + let mut frame = Framed::new(uds2, JsonCodec::default()); + + // We'll use the Ping request here because both the request + // and response have few arguments + let req = requests::PingRequest { + id: PublicKey::from_str( + "0364aeb75519be29d1af7b8cc6232dbda9fdabb79b66e4e1f6a223750954db210b", + ) + .unwrap(), + len: None, + pongbytes: None, + }; + let mock_resp = json!({ + "id" : 1, + "jsonrpc" : "2.0", + "result" : { "totlen" : 123 } + }); + + // we create a task that sends the response and returns the response + let handle = tokio::task::spawn(async move { cln.call_typed(&req).await }); + + // Ensure our mock receives the request and sends the response + _ = dbg!(frame.next().await.unwrap().unwrap()); + frame.send(mock_resp).await.unwrap(); + + // Verify that the error response is correct + let rpc_response: Result<_, RpcError> = handle.await.unwrap(); + let ping_response = rpc_response.unwrap(); + assert_eq!(ping_response.totlen, 123); + } + + #[tokio::test] + async fn test_call_typed_remote_error() { + // Create a dummy rpc-request + let req = requests::GetinfoRequest {}; + + // Create a dummy error response + let response = json!({ + "id" : 1, + "jsonrpc" : "2.0", + "error" : { + "code" : 666, + "message" : "MOCK_ERROR", + }}); + + let (uds1, uds2) = UnixStream::pair().unwrap(); + let mut cln = ClnRpc::from_stream(uds1).unwrap(); + + // Send out the request + let mut frame = Framed::new(uds2, JsonCodec::default()); + + let handle = tokio::task::spawn(async move { cln.call_typed(&req).await }); + + // Dummy-server ensures the request has been received and send the error response + let _ = dbg!(frame.next().await.unwrap().unwrap()); + frame.send(response).await.unwrap(); + + let rpc_response = handle.await.unwrap(); + let rpc_error = rpc_response.expect_err("Must be an RPC-error response"); + + assert_eq!(rpc_error.code.unwrap(), 666); + assert_eq!(rpc_error.message, "MOCK_ERROR"); } }