From f129063b4444538d78017508cf987db97856fc21 Mon Sep 17 00:00:00 2001 From: Pavlo Khrystenko Date: Mon, 9 Sep 2024 11:02:18 +0200 Subject: [PATCH 1/2] add tests for unstable_backend --- subxt/src/backend/mod.rs | 1098 +++++++++++++++++---- subxt/src/backend/unstable/rpc_methods.rs | 51 + 2 files changed, 974 insertions(+), 175 deletions(-) diff --git a/subxt/src/backend/mod.rs b/subxt/src/backend/mod.rs index b035d34b49..4e8b9158ac 100644 --- a/subxt/src/backend/mod.rs +++ b/subxt/src/backend/mod.rs @@ -336,54 +336,96 @@ pub struct StorageResponse { #[cfg(test)] mod test { use super::*; + pub use crate::{backend::StorageResponse, error::RpcError}; + pub use futures::StreamExt; + pub use serde::Serialize; + pub use std::collections::{HashMap, VecDeque}; + pub use subxt_core::{config::DefaultExtrinsicParams, Config}; + pub use tokio::sync::{mpsc, Mutex}; + pub type RpcResult = Result; + pub type Item = RpcResult; + pub use crate::backend::rpc::RawRpcSubscription; + pub use rpc::RpcClientT; + pub use serde_json::value::RawValue; + + fn storage_response>, V: Into>>(key: K, value: V) -> StorageResponse + where + Vec: From, + { + StorageResponse { + key: key.into(), + value: value.into(), + } + } + pub mod rpc_client { - mod legacy { - use super::rpc::{RpcClient, RpcClientT}; - use crate::backend::rpc::RawRpcSubscription; - use crate::backend::BackendExt; - use crate::{ - backend::{ - legacy::rpc_methods::Bytes, legacy::rpc_methods::RuntimeVersion, - legacy::LegacyBackend, StorageResponse, - }, - error::RpcError, - }; - use futures::StreamExt; - use serde::Serialize; - use serde_json::value::RawValue; - use std::{ - collections::{HashMap, VecDeque}, - sync::Arc, - }; - use subxt_core::{config::DefaultExtrinsicParams, Config}; - use tokio::sync::{mpsc, Mutex}; + use std::time::Duration; + + use super::*; - type RpcResult = Result; - type Item = RpcResult; + pub type SubscriptionHandler = Box< + dyn for<'a> Fn( + &'a mut MockDataTable, + &'a mut Option, + Option>, + ) + -> super::rpc::RawRpcFuture<'a, super::rpc::RawRpcSubscription> + + Send, + >; + + pub type MethodHandler = Box< + dyn for<'a> Fn( + &'a mut MockDataTable, + &'a mut Option, + Option>, + ) + -> super::rpc::RawRpcFuture<'a, Box> + + Send, + >; - struct MockDataTable { - items: HashMap, VecDeque>, + pub enum Message { + Many(RpcResult>), + Single(T), } - impl MockDataTable { - fn new() -> Self { - MockDataTable { - items: HashMap::new(), + impl Message { + pub fn single(self) -> T { + match self { + Self::Single(s) => s, + _ => todo!(), + } + } + pub fn many(self) -> RpcResult> { + match self { + Self::Many(s) => s, + _ => todo!(), } } + } - fn from_iter<'a, T: Serialize, I: IntoIterator)>>( - item: I, - ) -> Self { - let mut data = Self::new(); - for (key, item) in item.into_iter() { - data.push(key.into(), item); + pub struct MockDataTable { + items: HashMap, VecDeque>>, + } + + impl MockDataTable { + pub fn new() -> Self { + MockDataTable { + items: HashMap::new(), } - data } - fn push(&mut self, key: Vec, item: RpcResult) { - let item = item.map(|x| serde_json::to_string(&x).unwrap()); + pub fn push(&mut self, key: Vec, item: Message>) { + let item = match item { + Message::Many(items) => Message::Many(items.map(|items| { + items + .into_iter() + .map(|item| item.map(|x| serde_json::to_string(&x).unwrap())) + .collect() + })), + Message::Single(item) => { + Message::Single(item.map(|x| serde_json::to_string(&x).unwrap())) + } + }; match self.items.entry(key) { std::collections::hash_map::Entry::Occupied(v) => v.into_mut().push_back(item), std::collections::hash_map::Entry::Vacant(e) => { @@ -392,121 +434,211 @@ mod test { } } - fn pop(&mut self, key: Vec) -> Item { + pub fn pop(&mut self, key: Vec) -> Message { self.items.get_mut(&key).unwrap().pop_front().unwrap() } } - struct Subscription { - sender: mpsc::Sender>>, - receiver: mpsc::Receiver>>, + pub struct Subscription { + sender: mpsc::Sender, } impl Subscription { - fn new() -> Self { + pub fn new() -> (Self, mpsc::Receiver) { let (sender, receiver) = mpsc::channel(32); - Self { sender, receiver } + (Self { sender }, receiver) } - async fn from_iter< - T: Serialize, - S: IntoIterator>>>, - >( - items: S, - ) -> Self { - let sub = Self::new(); - for i in items { - let i: RpcResult> = i.map(|items| { - items - .into_iter() - .map(|item| item.map(|i| serde_json::to_string(&i).unwrap())) - .collect() - }); - sub.write(i).await - } - sub + pub async fn write(&self, items: Message) { + match items { + Message::Many(items) => { + for i in items.unwrap() { + self.sender.send(i).await.unwrap() + } + } + Message::Single(item) => self.sender.send(item).await.unwrap(), + }; } - async fn read(&mut self) -> RpcResult> { - self.receiver.recv().await.unwrap() - } + pub async fn write_delayed(&self, items: Message) { + let sender = self.sender.clone(); + tokio::spawn(async move { + tokio::time::sleep(Duration::from_millis(500)).await; - async fn write(&self, items: RpcResult>) { - self.sender.send(items).await.unwrap() + match items { + Message::Many(items) => { + for i in items.unwrap() { + let _ = sender.send(i).await; + } + } + Message::Single(item) => sender.send(item).await.unwrap(), + }; + }); } } struct Data { - request: MockDataTable, - subscription: Subscription, + data_table: MockDataTable, + subscription_channel: Option, + subscription_handlers: HashMap, + method_handlers: HashMap, + } + + impl Data { + async fn call_meth<'a>( + &'a mut self, + meth: &str, + params: Option>, + ) -> super::rpc::RawRpcFuture<'a, Box> { + let method = self.method_handlers.get(meth).expect(&format!( + "no method named {} registered. Params: {:?}", + meth, params + )); + + (*method)(&mut self.data_table, &mut self.subscription_channel, params) + } + + async fn call_subscription<'a>( + &'a mut self, + sub: &str, + params: Option>, + ) -> super::rpc::RawRpcFuture<'a, super::rpc::RawRpcSubscription> { + let sub = self.subscription_handlers.get(sub).expect(&format!( + "no subscription named {} registered. Params: {:?}", + sub, params + )); + + (*sub)(&mut self.data_table, &mut self.subscription_channel, params) + } + } + pub struct MockRpcBuilder { + data: Data, + } + + impl MockRpcBuilder { + pub fn new() -> Self { + Self { + data: Data { + data_table: MockDataTable::new(), + subscription_channel: None, + subscription_handlers: HashMap::new(), + method_handlers: HashMap::new(), + }, + } + } + + pub fn add_method(mut self, method_name: &str, meth: MethodHandler) -> Self { + self.data.method_handlers.insert(method_name.into(), meth); + self + } + + pub fn add_subscription( + mut self, + subscription_name: &str, + subscription_handler: SubscriptionHandler, + ) -> Self { + self.data + .subscription_handlers + .insert(subscription_name.into(), subscription_handler); + self + } + + pub fn add_mock_data_from_iter< + 'a, + T: Serialize, + I: IntoIterator>)>, + >( + mut self, + item: I, + ) -> Self { + let data = &mut self.data.data_table; + for (key, item) in item.into_iter() { + data.push(key.into(), item); + } + self + } + + pub fn build(self) -> MockRpcClient { + MockRpcClient { + data: Arc::new(Mutex::new(self.data)), + } + } } - struct MockRpcClientStorage { + pub struct MockRpcClient { data: Arc>, } - impl RpcClientT for MockRpcClientStorage { + impl RpcClientT for MockRpcClient { fn request_raw<'a>( &'a self, method: &'a str, params: Option>, ) -> super::rpc::RawRpcFuture<'a, Box> { - Box::pin(async move { - match method { - "state_getStorage" => { - let mut data = self.data.lock().await; - let params = params.map(|p| p.get().to_string()); - let rpc_params = jsonrpsee::types::Params::new(params.as_deref()); - let key: sp_core::Bytes = rpc_params.sequence().next().unwrap(); - let value = data.request.pop(key.0); - value.map(|v| serde_json::value::RawValue::from_string(v).unwrap()) - } - "chain_getBlockHash" => { - let mut data = self.data.lock().await; - let value = data.request.pop("chain_getBlockHash".into()); - value.map(|v| serde_json::value::RawValue::from_string(v).unwrap()) - } - _ => todo!(), - } + Box::pin(async { + let mut data = self.data.lock().await; + data.call_meth(method, params).await.await }) } fn subscribe_raw<'a>( &'a self, - _sub: &'a str, - _params: Option>, + sub: &'a str, + params: Option>, _unsub: &'a str, ) -> super::rpc::RawRpcFuture<'a, super::rpc::RawRpcSubscription> { Box::pin(async { let mut data = self.data.lock().await; - let values: RpcResult>>> = - data.subscription.read().await.map(|v| { - v.into_iter() - .map(|v| { - v.map(|v| serde_json::value::RawValue::from_string(v).unwrap()) - }) - .collect::>>>() - }); - values.map(|v| RawRpcSubscription { - stream: futures::stream::iter(v).boxed(), - id: Some("ID".to_string()), - }) + data.call_subscription(sub, params).await.await }) } } + } - // Define dummy config - enum Conf {} - impl Config for Conf { - type Hash = crate::utils::H256; - type AccountId = crate::utils::AccountId32; - type Address = crate::utils::MultiAddress; - type Signature = crate::utils::MultiSignature; - type Hasher = crate::config::substrate::BlakeTwo256; - type Header = crate::config::substrate::SubstrateHeader; - type ExtrinsicParams = DefaultExtrinsicParams; + // Define dummy config + enum Conf {} + impl Config for Conf { + type Hash = crate::utils::H256; + type AccountId = crate::utils::AccountId32; + type Address = crate::utils::MultiAddress; + type Signature = crate::utils::MultiSignature; + type Hasher = crate::config::substrate::BlakeTwo256; + type Header = crate::config::substrate::SubstrateHeader; + type ExtrinsicParams = DefaultExtrinsicParams; + + type AssetId = u32; + } - type AssetId = u32; + mod legacy { + use super::*; + use crate::backend::{ + legacy::rpc_methods::Bytes, legacy::rpc_methods::RuntimeVersion, legacy::LegacyBackend, + }; + use rpc_client::*; + + pub fn setup_mock_rpc() -> MockRpcBuilder { + MockRpcBuilder::new() + .add_method( + "state_getStorage", + Box::new(|data, _sub, params| { + Box::pin(async move { + let params = params.map(|p| p.get().to_string()); + let rpc_params = jsonrpsee::types::Params::new(params.as_deref()); + let key: sp_core::Bytes = rpc_params.sequence().next().unwrap(); + let value = data.pop(key.0).single(); + value.map(|v| serde_json::value::RawValue::from_string(v).unwrap()) + }) + }), + ) + .add_method( + "chain_getBlockHash", + Box::new(|data, _, _| { + Box::pin(async move { + let value = data.pop("chain_getBlockHash".into()).single(); + value.map(|v| serde_json::value::RawValue::from_string(v).unwrap()) + }) + }), + ) } use crate::backend::Backend; @@ -530,54 +662,26 @@ mod test { Ok(Some(Bytes(str.into()))) } - fn storage_response>, V: Into>>(key: K, value: V) -> StorageResponse - where - Vec: From, - { - StorageResponse { - key: key.into(), - value: value.into(), - } - } - - async fn build_mock_client< - 'a, - T: Serialize, - D: IntoIterator)>, - S: IntoIterator>>>, - >( - table_data: D, - subscription_data: S, - ) -> RpcClient { - let data = Data { - request: MockDataTable::from_iter(table_data), - subscription: Subscription::from_iter(subscription_data).await, - }; - RpcClient::new(MockRpcClientStorage { - data: Arc::new(Mutex::new(data)), - }) - } - #[tokio::test] async fn storage_fetch_values() { let mock_data = vec![ - ("ID1", bytes("Data1")), + ("ID1", Message::Single(bytes("Data1"))), ( "ID2", - Err(RpcError::DisconnectedWillReconnect( + Message::Single(Err(RpcError::DisconnectedWillReconnect( "Reconnecting".to_string(), - )), + ))), ), - ("ID2", bytes("Data2")), + ("ID2", Message::Single(bytes("Data2"))), ( "ID3", - Err(RpcError::DisconnectedWillReconnect( + Message::Single(Err(RpcError::DisconnectedWillReconnect( "Reconnecting".to_string(), - )), + ))), ), - ("ID3", bytes("Data3")), + ("ID3", Message::Single(bytes("Data3"))), ]; - let rpc_client = build_mock_client(mock_data, vec![]).await; + let rpc_client = setup_mock_rpc().add_mock_data_from_iter(mock_data).build(); let backend: LegacyBackend = LegacyBackend::builder().build(rpc_client); // Test @@ -609,13 +713,13 @@ mod test { let mock_data = [ ( "ID1", - Err(RpcError::DisconnectedWillReconnect( + Message::Single(Err(RpcError::DisconnectedWillReconnect( "Reconnecting".to_string(), - )), + ))), ), - ("ID1", bytes("Data1")), + ("ID1", Message::Single(bytes("Data1"))), ]; - let rpc_client = build_mock_client(mock_data, vec![]).await; + let rpc_client = setup_mock_rpc().add_mock_data_from_iter(mock_data).build(); // Test let backend: LegacyBackend = LegacyBackend::builder().build(rpc_client); @@ -648,13 +752,13 @@ mod test { let mock_data = vec![ ( "chain_getBlockHash", - Err(RpcError::DisconnectedWillReconnect( + Message::Single(Err(RpcError::DisconnectedWillReconnect( "Reconnecting".to_string(), - )), + ))), ), - ("chain_getBlockHash", Ok(Some(hash))), + ("chain_getBlockHash", Message::Single(Ok(Some(hash)))), ]; - let rpc_client = build_mock_client(mock_data, vec![]).await; + let rpc_client = setup_mock_rpc().add_mock_data_from_iter(mock_data).build(); // Test let backend: LegacyBackend = LegacyBackend::builder().build(rpc_client); @@ -688,27 +792,60 @@ mod test { /// ``` async fn stream_simple() { let mock_subscription_data = vec![ - Ok(vec![ - Ok(runtime_version(0)), - Err(RpcError::DisconnectedWillReconnect( - "Reconnecting".to_string(), - )), - Ok(runtime_version(1)), - ]), - Ok(vec![ - Err(RpcError::DisconnectedWillReconnect( - "Reconnecting".to_string(), - )), - Ok(runtime_version(2)), - Ok(runtime_version(3)), - ]), - Ok(vec![ - Ok(runtime_version(4)), - Ok(runtime_version(5)), - Err(RpcError::RequestRejected("Reconnecting".to_string())), - ]), + ( + "state_subscribeRuntimeVersion", + Message::Many(Ok(vec![ + Ok(runtime_version(0)), + Err(RpcError::DisconnectedWillReconnect( + "Reconnecting".to_string(), + )), + Ok(runtime_version(1)), + ])), + ), + ( + "state_subscribeRuntimeVersion", + Message::Many(Ok(vec![ + Err(RpcError::DisconnectedWillReconnect( + "Reconnecting".to_string(), + )), + Ok(runtime_version(2)), + Ok(runtime_version(3)), + ])), + ), + ( + "state_subscribeRuntimeVersion", + Message::Many(Ok(vec![ + Ok(runtime_version(4)), + Ok(runtime_version(5)), + Err(RpcError::RequestRejected("Reconnecting".to_string())), + ])), + ), ]; - let rpc_client = build_mock_client(vec![], mock_subscription_data).await; + let rpc_client = setup_mock_rpc() + .add_subscription( + "state_subscribeRuntimeVersion", + Box::new(|data, _, _| { + Box::pin(async move { + let values = data.pop("state_subscribeRuntimeVersion".into()).many(); + let values: RpcResult>>> = + values.map(|v| { + v.into_iter() + .map(|v| { + v.map(|v| { + serde_json::value::RawValue::from_string(v).unwrap() + }) + }) + .collect::>>>() + }); + values.map(|v| RawRpcSubscription { + stream: futures::stream::iter(v).boxed(), + id: Some("ID".to_string()), + }) + }) + }), + ) + .add_mock_data_from_iter(mock_subscription_data) + .build(); // Test let backend: LegacyBackend = LegacyBackend::builder().build(rpc_client); @@ -734,4 +871,615 @@ mod test { assert!(results.next().await.is_none()) } } + + mod unstable_backend { + + use std::sync::atomic::AtomicBool; + + use futures::task::Poll; + use rpc_client::{Message, MockRpcBuilder, Subscription}; + use rpc_methods::{ + Bytes, Initialized, MethodResponse, MethodResponseStarted, OperationError, OperationId, + OperationStorageItems, RuntimeSpec, RuntimeVersionEvent, + }; + use sp_core::H256; + + use super::unstable::*; + use super::*; + + fn runtime_spec() -> RuntimeSpec { + let spec = serde_json::json!({ + "specName": "westend", + "implName": "parity-westend", + "specVersion": 9122, + "implVersion": 0, + "transactionVersion": 7, + "apis": { + "0xdf6acb689907609b": 3, + "0x37e397fc7c91f5e4": 1, + "0x40fe3ad401f8959a": 5, + "0xd2bc9897eed08f15": 3, + "0xf78b278be53f454c": 2, + "0xaf2c0297a23e6d3d": 1, + "0x49eaaf1b548a0cb0": 1, + "0x91d5df18b0d2cf58": 1, + "0xed99c5acb25eedf5": 3, + "0xcbca25e39f142387": 2, + "0x687ad44ad37f03c2": 1, + "0xab3c0572291feb8b": 1, + "0xbc9d89904f5b923f": 1, + "0x37c8bb1350a9a2a8": 1 + } + }); + serde_json::from_value(spec).unwrap() + } + fn init_hash() -> crate::utils::H256 { + crate::utils::H256::random() + } + + type FollowEvent = unstable::rpc_methods::FollowEvent<::Hash>; + + fn setup_mock_rpc_client(cycle_ids: bool) -> MockRpcBuilder { + let hash = init_hash(); + let mut id = 0; + rpc_client::MockRpcBuilder::new().add_subscription( + "chainHead_v1_follow", + Box::new(move |_, sub, _| { + Box::pin(async move { + if cycle_ids { + id += 1; + } + let follow_event = + FollowEvent::Initialized(Initialized::<::Hash> { + finalized_block_hashes: vec![hash.clone()], + finalized_block_runtime: Some(rpc_methods::RuntimeEvent::Valid( + RuntimeVersionEvent { + spec: runtime_spec(), + }, + )), + }); + let (subscription, mut receiver) = Subscription::new(); + subscription + .write(Message::Single(Ok( + serde_json::to_string(&follow_event).unwrap() + ))) + .await; + sub.replace(subscription); + let read_stream = + futures::stream::poll_fn(move |cx| -> Poll> { + receiver.poll_recv(cx) + }) + .map(|item| item.map(|x| RawValue::from_string(x).unwrap())); + let stream = RawRpcSubscription { + stream: read_stream.boxed(), + id: Some(format!("ID{}", id)), + }; + Ok(stream) + }) + }), + ) + } + + fn response_started(id: &str) -> MethodResponse { + MethodResponse::Started(MethodResponseStarted { + operation_id: id.to_owned(), + discarded_items: None, + }) + } + + fn operation_error(id: &str) -> FollowEvent { + FollowEvent::OperationError(OperationError { + operation_id: id.to_owned(), + error: "errro".to_owned(), + }) + } + + fn storage_done(id: &str) -> FollowEvent { + FollowEvent::OperationStorageDone(OperationId { + operation_id: id.to_owned(), + }) + } + fn storage_result(key: &str, value: &str) -> unstable::rpc_methods::StorageResult { + unstable::rpc_methods::StorageResult { + key: Bytes(key.to_owned().into()), + result: rpc_methods::StorageResultType::Value(Bytes(value.to_owned().into())), + } + } + fn storage_items(id: &str, items: &[unstable::rpc_methods::StorageResult]) -> FollowEvent { + FollowEvent::OperationStorageItems(OperationStorageItems { + operation_id: id.to_owned(), + items: VecDeque::from(items.to_owned()), + }) + } + + fn operation_continue(id: &str) -> FollowEvent { + FollowEvent::OperationWaitingForContinue(OperationId { + operation_id: id.to_owned(), + }) + } + + #[tokio::test] + async fn storage_fetch_values_returns_stream_with_single_error() { + let response_data = vec![( + "method_response", + Message::Single(Ok(response_started("Id1"))), + )]; + let mock_subscription_data = vec![( + "chainHead_v1_storage", + Message::Many(Ok(vec![Ok(operation_error("Id1")), Ok(FollowEvent::Stop)])), + )]; + let rpc_client = setup_mock_rpc_client(false) + .add_method( + "chainHead_v1_storage", + Box::new(|data, sub, _| { + Box::pin(async move { + let response = data.pop("method_response".into()).single(); + if response.is_ok() { + let item = data.pop("chainHead_v1_storage".into()); + if let Some(sub) = sub { + let item = item; + sub.write_delayed(item).await + } + } + response.map(|x| RawValue::from_string(x).unwrap()) + }) + }), + ) + .add_mock_data_from_iter(mock_subscription_data) + .add_mock_data_from_iter(response_data) + .build(); + + let (backend, mut driver): (UnstableBackend, _) = + UnstableBackend::builder().build(rpc_client); + + let _ = tokio::spawn(async move { + while let Some(val) = driver.next().await { + if let Err(e) = val { + eprintln!("Error driving unstable backend: {e}; terminating client"); + } + } + }); + + // Test + // This request should encounter an error on `request` and do a retry. + let response = backend + .storage_fetch_values( + ["ID1".into(), "ID2".into(), "ID3".into()].into(), + crate::utils::H256::random(), + ) + .await + .unwrap(); + + // operation returned FollowEvent::OperationError + let response = response + .collect::>>() + .await; + + assert!(matches!( + response.as_slice(), + [Err(Error::Other(s) )] if *s == "errro".to_owned() + )); + } + + #[tokio::test] + /// Tests that the method will retry on failed query + async fn storage_fetch_values_retry_query() { + let response_data = vec![ + ( + "method_response", + Message::Single(Err(RpcError::DisconnectedWillReconnect("Error".into()))), + ), + ( + "method_response", + Message::Single(Ok(response_started("Id1"))), + ), + ]; + let mock_data = vec![( + "chainHead_v1_storage", + Message::Many(Ok(vec![ + Ok(storage_items( + "Id1", + &[ + storage_result("ID1", "Data1"), + storage_result("ID2", "Data2"), + storage_result("ID3", "Data3"), + ], + )), + Ok(storage_done("Id1")), + ])), + )]; + let rpc_client = setup_mock_rpc_client(false) + .add_method( + "chainHead_v1_storage", + Box::new(|data, sub, _| { + Box::pin(async move { + let response = data.pop("method_response".into()).single(); + if response.is_ok() { + let item = data.pop("chainHead_v1_storage".into()); + if let Some(sub) = sub { + let item = item; + sub.write_delayed(item).await + } + } + response.map(|x| RawValue::from_string(x).unwrap()) + }) + }), + ) + .add_mock_data_from_iter(mock_data) + .add_mock_data_from_iter(response_data) + .build(); + let (backend, mut driver): (UnstableBackend, _) = + UnstableBackend::builder().build(rpc_client); + + let _ = tokio::spawn(async move { + while let Some(val) = driver.next().await { + if let Err(e) = val { + eprintln!("Error driving unstable backend: {e}; terminating client"); + } + } + }); + + // We try again and should succeed + let response = backend + .storage_fetch_values( + ["ID1".into(), "ID2".into(), "ID3".into()].into(), + crate::utils::H256::random(), + ) + .await + .unwrap(); + + let response = response + .map(|x| x.unwrap()) + .collect::>() + .await; + + assert_eq!( + vec![ + storage_response("ID1", "Data1"), + storage_response("ID2", "Data2"), + storage_response("ID3", "Data3"), + ], + response + ) + } + #[tokio::test] + /// Tests the error behaviour in streams based on chainHead_v1_continue calls + async fn storage_fetch_values_retry_continue() { + fn compare_storage_responses( + expected_response: &StorageResponse, + received_response: &StorageResponse, + ) -> bool { + expected_response == received_response + } + + let response_data = vec![ + ( + "method_response", + Message::Single(Err::( + RpcError::DisconnectedWillReconnect("Error".into()), + )), + ), + ( + "method_response", + Message::Single(Ok(response_started("Id1"))), + ), + ( + "method_response", + Message::Single(Ok(response_started("Id1"))), + ), + ]; + let continue_data = vec![ + ("continue_response", Message::Single(Ok(()))), + ("continue_response", Message::Single(Ok(()))), + ( + "continue_response", + Message::Single(Err(RpcError::DisconnectedWillReconnect("Error".into()))), + ), + ("continue_response", Message::Single(Ok(()))), + ("continue_response", Message::Single(Ok(()))), + ]; + let mock_data = vec![ + ( + "chainHead_v1_storage", + Message::Many(Ok(vec![ + Ok(storage_items("Id1", &[storage_result("ID1", "Data1")])), + Ok(operation_continue("Id1")), + ])), + ), + ( + "chainHead_v1_storage", + Message::Many(Ok(vec![ + Ok(storage_items("Id1", &[storage_result("ID2", "Data2")])), + Ok(operation_continue("Id1")), + ])), + ), + ( + "chainHead_v1_storage", + Message::Many(Ok(vec![Ok(operation_error("Id1")), Ok(FollowEvent::Stop)])), + ), + ( + "chainHead_v1_storage", + Message::Many(Ok(vec![ + Ok(storage_items("Id1", &[storage_result("ID1", "Data1")])), + Ok(operation_continue("Id1")), + ])), + ), + ( + "chainHead_v1_storage", + Message::Many(Ok(vec![ + Ok(storage_items("Id1", &[storage_result("ID2", "Data2")])), + Ok(operation_continue("Id1")), + ])), + ), + ( + "chainHead_v1_storage", + Message::Many(Ok(vec![ + Ok(storage_items("Id1", &[storage_result("ID3", "Data3")])), + Ok(storage_done("Id1")), + ])), + ), + ]; + let rpc_client = setup_mock_rpc_client(false) + .add_method( + "chainHead_v1_storage", + Box::new(|data, sub, _| { + Box::pin(async move { + let response = data.pop("method_response".into()).single(); + if response.is_ok() { + let item = data.pop("chainHead_v1_storage".into()); + if let Some(sub) = sub { + let item = item; + sub.write_delayed(item).await + } + } + response.map(|x| RawValue::from_string(x).unwrap()) + }) + }), + ) + .add_method( + "chainHead_v1_continue", + Box::new(|data, sub, _| { + Box::pin(async move { + let response = data.pop("continue_response".into()).single(); + if response.is_ok() { + let item = data.pop("chainHead_v1_storage".into()); + if let Some(sub) = sub { + let item = item; + sub.write_delayed(item).await + } + } + response.map(|x| RawValue::from_string(x).unwrap()) + }) + }), + ) + .add_mock_data_from_iter(mock_data) + .add_mock_data_from_iter(response_data) + .add_mock_data_from_iter(continue_data) + .build(); + let (backend, mut driver): (UnstableBackend, _) = + UnstableBackend::builder().build(rpc_client); + + let _ = tokio::spawn(async move { + while let Some(val) = driver.next().await { + if let Err(e) = val { + eprintln!("Error driving unstable backend: {e}; terminating client"); + } + } + }); + + // We try again and should fail mid way + let response = backend + .storage_fetch_values( + ["ID1".into(), "ID2".into(), "ID3".into()].into(), + crate::utils::H256::random(), + ) + .await + .unwrap(); + // operation returned FollowEvent::OperationError + let response = response + .collect::>>() + .await; + + assert!(matches!( + response.as_slice(), + [ + Ok(resp1 @ StorageResponse { .. }), + Ok(resp2 @ StorageResponse { .. }), + Err(Error::Other(s)) + ] if *s == "errro".to_owned() + && compare_storage_responses(&storage_response("ID1", "Data1"), resp1) + && compare_storage_responses(&storage_response("ID2", "Data2"), resp2) + )); + + // We try again and should succeed + let response = backend + .storage_fetch_values( + ["ID1".into(), "ID2".into(), "ID3".into()].into(), + crate::utils::H256::random(), + ) + .await + .unwrap(); + + let response = response + .map(|x| x.unwrap()) + .collect::>() + .await; + + assert_eq!( + vec![ + storage_response("ID1", "Data1"), + storage_response("ID2", "Data2"), + storage_response("ID3", "Data3"), + ], + response + ) + } + + #[tokio::test] + async fn simple_fetch() { + let hash = init_hash(); + + let mock_data = vec![ + ( + "chainSpec_v1_genesisHash", + Message::Single(Err::(RpcError::RequestRejected( + "Error".to_owned(), + ))), + ), + ( + "chainSpec_v1_genesisHash", + Message::Single(Err(RpcError::DisconnectedWillReconnect("Error".to_owned()))), + ), + ("chainSpec_v1_genesisHash", Message::Single(Ok(hash.into()))), + ]; + let rpc_client = setup_mock_rpc_client(false) + .add_method( + "chainSpec_v1_genesisHash", + Box::new(|data, _, _| { + Box::pin(async move { + let response = data.pop("chainSpec_v1_genesisHash".into()).single(); + response.map(|x| RawValue::from_string(x).unwrap()) + }) + }), + ) + .add_mock_data_from_iter(mock_data) + .build(); + + let (backend, mut driver): (UnstableBackend, _) = + UnstableBackend::builder().build(rpc_client); + + let _ = tokio::spawn(async move { + while let Some(val) = driver.next().await { + if let Err(e) = val { + eprintln!("Error driving unstable backend: {e}; terminating client"); + } + } + }); + + // Test + // This request should encounter an error on `request` and do a retry. + let response_hash = backend.genesis_hash().await.unwrap(); + + assert_eq!(hash, response_hash) + } + + #[tokio::test] + // Failure as we do not wait for subscription id to be updated + async fn stale_subscription_id_failure() { + let response_data = vec![ + ( + "method_response", + Message::Single(Ok(response_started("Id1"))), + ), + ( + "method_response", + Message::Single(Err(RpcError::RequestRejected("stale id".into()))), + ), + ( + "method_response", + Message::Single(Ok(response_started("Id1"))), + ), + ]; + + let mock_data = vec![ + ( + "chainHead_v1_storage", + Message::Many(Ok(vec![Ok(operation_error("Id1")), Ok(FollowEvent::Stop)])), + ), + ( + "chainHead_v1_storage", + Message::Many(Ok(vec![ + Ok(storage_items( + "Id1", + &[ + storage_result("ID1", "Data1"), + storage_result("ID2", "Data2"), + storage_result("ID3", "Data3"), + ], + )), + Ok(storage_done("Id1")), + ])), + ), + ]; + let rpc_client = setup_mock_rpc_client(true) + .add_method( + "chainHead_v1_storage", + Box::new({ + let subscription_expired = Arc::new(AtomicBool::new(false)); + move |data, sub, params| { + let subscription_expired = subscription_expired.clone(); + Box::pin(async move { + let subscription_expired = subscription_expired.clone(); + if subscription_expired.load(std::sync::atomic::Ordering::SeqCst) { + let params = params.map(|p| p.get().to_string()); + let rpc_params = + jsonrpsee::types::Params::new(params.as_deref()); + let key: String = rpc_params.sequence().next().unwrap(); + if key == "ID1".to_owned() { + return Err(RpcError::RequestRejected("stale id".into())); + } else { + subscription_expired + .swap(false, std::sync::atomic::Ordering::SeqCst); + } + } + let response = data.pop("method_response".into()).single(); + if response.is_ok() { + let item = data.pop("chainHead_v1_storage".into()); + if let Some(sub) = sub { + let item = item; + sub.write_delayed(item).await + } + } else { + subscription_expired + .swap(true, std::sync::atomic::Ordering::SeqCst); + } + response.map(|x| RawValue::from_string(x).unwrap()) + }) + } + }), + ) + .add_mock_data_from_iter(mock_data) + .add_mock_data_from_iter(response_data) + .build(); + let (backend, mut driver): (UnstableBackend, _) = + UnstableBackend::builder().build(rpc_client); + + let _ = driver.next().await.unwrap(); + let _ = driver.next().await.unwrap(); + + // not getting new subscription id and hitting request rejected > 10 times + let response = backend + .storage_fetch_values( + ["ID1".into(), "ID2".into(), "ID3".into()].into(), + crate::utils::H256::random(), + ) + .await; + + let _ = driver.next().await.unwrap(); + + let binding = response + .unwrap() + .collect::>>() + .await; + let response = binding.last().unwrap(); + + assert!(matches!( + response, + Err(Error::Other(reason)) if reason == "errro" + )); + + // not getting new subscription id and hitting request rejected > 10 times + let response = backend + .storage_fetch_values( + ["ID1".into(), "ID2".into(), "ID3".into()].into(), + crate::utils::H256::random(), + ) + .await; + + assert!(matches!( + response, + Err(Error::Rpc(RpcError::RequestRejected(reason))) if reason == "stale id" + )) + } + } } diff --git a/subxt/src/backend/unstable/rpc_methods.rs b/subxt/src/backend/unstable/rpc_methods.rs index e72f2cadef..25e3fd1068 100644 --- a/subxt/src/backend/unstable/rpc_methods.rs +++ b/subxt/src/backend/unstable/rpc_methods.rs @@ -321,6 +321,7 @@ impl UnstableRpcMethods { #[derive(Debug, Clone, PartialEq, Eq, Deserialize)] #[serde(rename_all = "camelCase")] #[serde(tag = "event")] +#[cfg_attr(test, derive(Serialize))] pub enum FollowEvent { /// The latest finalized block. /// @@ -363,6 +364,8 @@ pub enum FollowEvent { /// This is the first event generated by the `follow` subscription /// and is submitted only once. #[derive(Debug, Clone, PartialEq, Eq)] +#[cfg_attr(test, derive(Serialize))] +#[cfg_attr(test, serde(rename_all = "camelCase"))] pub struct Initialized { /// The hashes of the last finalized blocks. pub finalized_block_hashes: Vec, @@ -404,6 +407,7 @@ impl<'de, Hash: Deserialize<'de>> Deserialize<'de> for Initialized { #[derive(Debug, Clone, PartialEq, Eq, Deserialize)] #[serde(rename_all = "camelCase")] #[serde(tag = "type")] +#[cfg_attr(test, derive(Serialize))] pub enum RuntimeEvent { /// The runtime version of this block. Valid(RuntimeVersionEvent), @@ -418,6 +422,7 @@ pub enum RuntimeEvent { /// - blocks that suffered a change in runtime compared with their parents #[derive(Debug, Clone, PartialEq, Eq, Deserialize)] #[serde(rename_all = "camelCase")] +#[cfg_attr(test, derive(Serialize))] pub struct RuntimeVersionEvent { /// Details about this runtime. pub spec: RuntimeSpec, @@ -427,6 +432,7 @@ pub struct RuntimeVersionEvent { /// the "initialized" event of `chainHead_follow` if the `withRuntime` flag is set. #[derive(Debug, Clone, PartialEq, Eq, Deserialize)] #[serde(rename_all = "camelCase")] +#[cfg_attr(test, derive(Serialize))] pub struct RuntimeSpec { /// Opaque string indicating the name of the chain. pub spec_name: String, @@ -460,6 +466,7 @@ pub struct RuntimeSpec { /// The operation could not be processed due to an error. #[derive(Debug, Clone, PartialEq, Eq, Deserialize)] #[serde(rename_all = "camelCase")] +#[cfg_attr(test, derive(Serialize))] pub struct ErrorEvent { /// Reason of the error. pub error: String, @@ -468,6 +475,7 @@ pub struct ErrorEvent { /// Indicate a new non-finalized block. #[derive(Debug, Clone, PartialEq, Eq, Deserialize)] #[serde(rename_all = "camelCase")] +#[cfg_attr(test, derive(Serialize))] pub struct NewBlock { /// The hash of the new block. pub block_hash: Hash, @@ -485,6 +493,7 @@ pub struct NewBlock { /// Indicate the block hash of the new best block. #[derive(Debug, Clone, PartialEq, Eq, Deserialize)] #[serde(rename_all = "camelCase")] +#[cfg_attr(test, derive(Serialize))] pub struct BestBlockChanged { /// The block hash of the new best block. pub best_block_hash: Hash, @@ -493,6 +502,7 @@ pub struct BestBlockChanged { /// Indicate the finalized and pruned block hashes. #[derive(Debug, Clone, PartialEq, Eq, Deserialize)] #[serde(rename_all = "camelCase")] +#[cfg_attr(test, derive(Serialize))] pub struct Finalized { /// Block hashes that are finalized. pub finalized_block_hashes: Vec, @@ -503,6 +513,7 @@ pub struct Finalized { /// Indicate the operation id of the event. #[derive(Debug, Clone, PartialEq, Eq, Deserialize)] #[serde(rename_all = "camelCase")] +#[cfg_attr(test, derive(Serialize))] pub struct OperationId { /// The operation id of the event. pub operation_id: String, @@ -511,6 +522,7 @@ pub struct OperationId { /// The response of the `chainHead_body` method. #[derive(Debug, Clone, PartialEq, Eq, Deserialize)] #[serde(rename_all = "camelCase")] +#[cfg_attr(test, derive(Serialize))] pub struct OperationBodyDone { /// The operation id of the event. pub operation_id: String, @@ -521,6 +533,7 @@ pub struct OperationBodyDone { /// The response of the `chainHead_call` method. #[derive(Debug, Clone, PartialEq, Eq, Deserialize)] #[serde(rename_all = "camelCase")] +#[cfg_attr(test, derive(Serialize))] pub struct OperationCallDone { /// The operation id of the event. pub operation_id: String, @@ -531,6 +544,7 @@ pub struct OperationCallDone { /// The response of the `chainHead_call` method. #[derive(Debug, Clone, PartialEq, Eq, Deserialize)] #[serde(rename_all = "camelCase")] +#[cfg_attr(test, derive(Serialize))] pub struct OperationStorageItems { /// The operation id of the event. pub operation_id: String, @@ -541,6 +555,7 @@ pub struct OperationStorageItems { /// Indicate a problem during the operation. #[derive(Debug, Clone, PartialEq, Eq, Deserialize)] #[serde(rename_all = "camelCase")] +#[cfg_attr(test, derive(Serialize))] pub struct OperationError { /// The operation id of the event. pub operation_id: String, @@ -551,6 +566,7 @@ pub struct OperationError { /// The storage result. #[derive(Debug, Clone, PartialEq, Eq, Deserialize)] #[serde(rename_all = "camelCase")] +#[cfg_attr(test, derive(Serialize))] pub struct StorageResult { /// The hex-encoded key of the result. pub key: Bytes, @@ -562,6 +578,7 @@ pub struct StorageResult { /// The type of the storage query. #[derive(Debug, Clone, PartialEq, Eq, Deserialize)] #[serde(rename_all = "camelCase")] +#[cfg_attr(test, derive(Serialize))] pub enum StorageResultType { /// Fetch the value of the provided key. Value(Bytes), @@ -575,6 +592,7 @@ pub enum StorageResultType { #[derive(Debug, Clone, PartialEq, Eq, Deserialize)] #[serde(rename_all = "camelCase")] #[serde(tag = "result")] +#[cfg_attr(test, derive(Serialize))] pub enum MethodResponse { /// The method has started. Started(MethodResponseStarted), @@ -585,6 +603,7 @@ pub enum MethodResponse { /// The `started` result of a method. #[derive(Debug, Clone, PartialEq, Eq, Deserialize)] #[serde(rename_all = "camelCase")] +#[cfg_attr(test, derive(Serialize))] pub struct MethodResponseStarted { /// The operation id of the response. pub operation_id: String, @@ -704,6 +723,7 @@ impl Stream for TransactionSubscription { #[derive(Debug, Clone, PartialEq, Eq, Deserialize)] #[serde(rename_all = "camelCase")] #[serde(tag = "event")] +#[cfg_attr(test, derive(Serialize))] pub enum TransactionStatus { /// Transaction is part of the future queue. Validated, @@ -745,6 +765,7 @@ pub enum TransactionStatus { /// Details of a block that a transaction is seen in. #[derive(Debug, Clone, PartialEq, Eq, Deserialize)] +#[cfg_attr(test, derive(Serialize))] pub struct TransactionBlockDetails { /// The block hash. pub hash: Hash, @@ -778,6 +799,9 @@ pub(crate) mod unsigned_number_as_string { use serde::de::{Deserializer, Visitor}; use std::fmt; + #[cfg(test)] + use serde::ser::Serializer; + /// Deserialize a number from a string or number. pub fn deserialize<'de, N: From, D>(deserializer: D) -> Result where @@ -786,6 +810,14 @@ pub(crate) mod unsigned_number_as_string { deserializer.deserialize_any(NumberVisitor(std::marker::PhantomData)) } + /// Serialize a number as string + #[cfg(test)] + pub fn serialize(item: &u64, serializer: S) -> Result + where + S: Serializer, + { + serializer.serialize_str(&item.to_string()) + } struct NumberVisitor(std::marker::PhantomData); impl<'de, N: From> Visitor<'de> for NumberVisitor { @@ -828,6 +860,9 @@ pub(crate) mod hashmap_as_tuple_list { use std::hash::{BuildHasher, Hash}; use std::marker::PhantomData; + #[cfg(test)] + use serde::ser::{Serialize, SerializeSeq, Serializer}; + /// Deserialize a [`HashMap`] from a list of tuples or object pub fn deserialize<'de, K, V, BH, D>(deserializer: D) -> Result, D::Error> where @@ -839,6 +874,22 @@ pub(crate) mod hashmap_as_tuple_list { deserializer.deserialize_any(HashMapVisitor(PhantomData)) } + /// Serialize a number as string + #[cfg(test)] + pub fn serialize( + item: &HashMap, + serializer: S, + ) -> Result + where + S: Serializer, + { + let mut seq = serializer.serialize_seq(None)?; + for i in item { + seq.serialize_element(&i)?; + } + seq.end() + } + #[allow(clippy::type_complexity)] struct HashMapVisitor(PhantomData HashMap>); From 42af8fd0f3b76e6f7b4301d0dc12c1a19a35f58c Mon Sep 17 00:00:00 2001 From: Pavlo Khrystenko Date: Mon, 9 Sep 2024 12:05:57 +0200 Subject: [PATCH 2/2] clippy --- subxt/src/backend/mod.rs | 35 ++++++++++++++++++----------------- 1 file changed, 18 insertions(+), 17 deletions(-) diff --git a/subxt/src/backend/mod.rs b/subxt/src/backend/mod.rs index 4e8b9158ac..471ee2f1d5 100644 --- a/subxt/src/backend/mod.rs +++ b/subxt/src/backend/mod.rs @@ -490,10 +490,9 @@ mod test { meth: &str, params: Option>, ) -> super::rpc::RawRpcFuture<'a, Box> { - let method = self.method_handlers.get(meth).expect(&format!( - "no method named {} registered. Params: {:?}", - meth, params - )); + let method = self.method_handlers.get(meth).unwrap_or_else(|| { + panic!("no method named {} registered. Params: {:?}", meth, params) + }); (*method)(&mut self.data_table, &mut self.subscription_channel, params) } @@ -503,10 +502,12 @@ mod test { sub: &str, params: Option>, ) -> super::rpc::RawRpcFuture<'a, super::rpc::RawRpcSubscription> { - let sub = self.subscription_handlers.get(sub).expect(&format!( - "no subscription named {} registered. Params: {:?}", - sub, params - )); + let sub = self.subscription_handlers.get(sub).unwrap_or_else(|| { + panic!( + "no subscription named {} registered. Params: {:?}", + sub, params + ) + }); (*sub)(&mut self.data_table, &mut self.subscription_channel, params) } @@ -931,7 +932,7 @@ mod test { } let follow_event = FollowEvent::Initialized(Initialized::<::Hash> { - finalized_block_hashes: vec![hash.clone()], + finalized_block_hashes: vec![hash], finalized_block_runtime: Some(rpc_methods::RuntimeEvent::Valid( RuntimeVersionEvent { spec: runtime_spec(), @@ -1032,7 +1033,7 @@ mod test { let (backend, mut driver): (UnstableBackend, _) = UnstableBackend::builder().build(rpc_client); - let _ = tokio::spawn(async move { + tokio::spawn(async move { while let Some(val) = driver.next().await { if let Err(e) = val { eprintln!("Error driving unstable backend: {e}; terminating client"); @@ -1057,7 +1058,7 @@ mod test { assert!(matches!( response.as_slice(), - [Err(Error::Other(s) )] if *s == "errro".to_owned() + [Err(Error::Other(s) )] if s == "errro" )); } @@ -1111,7 +1112,7 @@ mod test { let (backend, mut driver): (UnstableBackend, _) = UnstableBackend::builder().build(rpc_client); - let _ = tokio::spawn(async move { + tokio::spawn(async move { while let Some(val) = driver.next().await { if let Err(e) = val { eprintln!("Error driving unstable backend: {e}; terminating client"); @@ -1259,7 +1260,7 @@ mod test { let (backend, mut driver): (UnstableBackend, _) = UnstableBackend::builder().build(rpc_client); - let _ = tokio::spawn(async move { + tokio::spawn(async move { while let Some(val) = driver.next().await { if let Err(e) = val { eprintln!("Error driving unstable backend: {e}; terminating client"); @@ -1286,7 +1287,7 @@ mod test { Ok(resp1 @ StorageResponse { .. }), Ok(resp2 @ StorageResponse { .. }), Err(Error::Other(s)) - ] if *s == "errro".to_owned() + ] if s == "errro" && compare_storage_responses(&storage_response("ID1", "Data1"), resp1) && compare_storage_responses(&storage_response("ID2", "Data2"), resp2) )); @@ -1330,7 +1331,7 @@ mod test { "chainSpec_v1_genesisHash", Message::Single(Err(RpcError::DisconnectedWillReconnect("Error".to_owned()))), ), - ("chainSpec_v1_genesisHash", Message::Single(Ok(hash.into()))), + ("chainSpec_v1_genesisHash", Message::Single(Ok(hash))), ]; let rpc_client = setup_mock_rpc_client(false) .add_method( @@ -1348,7 +1349,7 @@ mod test { let (backend, mut driver): (UnstableBackend, _) = UnstableBackend::builder().build(rpc_client); - let _ = tokio::spawn(async move { + tokio::spawn(async move { while let Some(val) = driver.next().await { if let Err(e) = val { eprintln!("Error driving unstable backend: {e}; terminating client"); @@ -1415,7 +1416,7 @@ mod test { let rpc_params = jsonrpsee::types::Params::new(params.as_deref()); let key: String = rpc_params.sequence().next().unwrap(); - if key == "ID1".to_owned() { + if key == *"ID1" { return Err(RpcError::RequestRejected("stale id".into())); } else { subscription_expired