diff --git a/subxt/src/backend/mod.rs b/subxt/src/backend/mod.rs index b035d34b49..40779e593d 100644 --- a/subxt/src/backend/mod.rs +++ b/subxt/src/backend/mod.rs @@ -336,177 +336,311 @@ pub struct StorageResponse { #[cfg(test)] mod test { use super::*; + pub use crate::backend::rpc::{RawRpcFuture, RawRpcSubscription}; + pub use crate::{backend::StorageResponse, error::RpcError}; + pub use futures::StreamExt; + pub use rpc::RpcClientT; + pub use serde::Serialize; + pub use serde_json::value::RawValue; + pub use sp_core::H256; + pub use std::collections::{HashMap, VecDeque}; + pub use subxt_core::{config::DefaultExtrinsicParams, Config}; + pub use tokio::sync::{mpsc, Mutex}; + + pub type RpcResult = Result; + pub type Item = RpcResult; + + fn random_hash() -> H256 { + H256::random() + } - mod legacy { - use super::rpc::{RpcClient, RpcClientT}; - use crate::backend::rpc::RawRpcSubscription; - use crate::backend::BackendExt; - use crate::{ - backend::{ - legacy::rpc_methods::Bytes, legacy::rpc_methods::RuntimeVersion, - legacy::LegacyBackend, StorageResponse, - }, - error::RpcError, - }; - use futures::StreamExt; - use serde::Serialize; - use serde_json::value::RawValue; - use std::{ - collections::{HashMap, VecDeque}, - sync::Arc, - }; - use subxt_core::{config::DefaultExtrinsicParams, Config}; - use tokio::sync::{mpsc, Mutex}; - - type RpcResult = Result; - type Item = RpcResult; - - struct MockDataTable { - items: HashMap, VecDeque>, + fn storage_response>, V: Into>>(key: K, value: V) -> StorageResponse + where + Vec: From, + { + StorageResponse { + key: key.into(), + value: value.into(), + } + } + pub mod rpc_client { + use super::*; + use std::time::Duration; + + pub type SubscriptionHandler = Box< + dyn for<'a> Fn( + &'a mut MockDataTable, + &'a mut Option, + Option>, + ) -> RawRpcFuture<'a, RawRpcSubscription> + + Send, + >; + + pub type MethodHandler = Box< + dyn for<'a> Fn( + &'a mut MockDataTable, + &'a mut Option, + Option>, + ) -> RawRpcFuture<'a, Box> + + Send, + >; + + pub enum Message { + Many(RpcResult>), + Single(T), } - impl MockDataTable { - fn new() -> Self { - MockDataTable { - items: HashMap::new(), + impl Message { + pub fn unwrap_single(self) -> T { + match self { + Self::Single(s) => s, + _ => panic!("cannot unwrap_single on Message::Many"), } } - - fn from_iter<'a, T: Serialize, I: IntoIterator)>>( - item: I, - ) -> Self { - let mut data = Self::new(); - for (key, item) in item.into_iter() { - data.push(key.into(), item); + pub fn unwrap_many(self) -> RpcResult> { + match self { + Self::Many(s) => s, + _ => panic!("cannot unwrap_many on Message::Single"), } - data } + } + + #[derive(Default)] + pub struct MockDataTable { + items: HashMap, VecDeque>>, + } - fn push(&mut self, key: Vec, item: RpcResult) { - let item = item.map(|x| serde_json::to_string(&x).unwrap()); - match self.items.entry(key) { - std::collections::hash_map::Entry::Occupied(v) => v.into_mut().push_back(item), - std::collections::hash_map::Entry::Vacant(e) => { - e.insert(VecDeque::from([item])); + impl MockDataTable { + pub fn push(&mut self, key: Vec, item: Message>) { + let item = match item { + Message::Many(items) => Message::Many(items.map(|items| { + items + .into_iter() + .map(|item| item.map(|x| serde_json::to_string(&x).unwrap())) + .collect() + })), + Message::Single(item) => { + Message::Single(item.map(|x| serde_json::to_string(&x).unwrap())) } - } + }; + self.items.entry(key).or_default().push_back(item); } - fn pop(&mut self, key: Vec) -> Item { + pub fn pop(&mut self, key: Vec) -> Message { self.items.get_mut(&key).unwrap().pop_front().unwrap() } } - struct Subscription { - sender: mpsc::Sender>>, - receiver: mpsc::Receiver>>, + pub struct Subscription { + sender: mpsc::Sender, } impl Subscription { - fn new() -> Self { + pub fn new() -> (Self, mpsc::Receiver) { let (sender, receiver) = mpsc::channel(32); - Self { sender, receiver } + (Self { sender }, receiver) } - async fn from_iter< - T: Serialize, - S: IntoIterator>>>, - >( - items: S, - ) -> Self { - let sub = Self::new(); - for i in items { - let i: RpcResult> = i.map(|items| { - items - .into_iter() - .map(|item| item.map(|i| serde_json::to_string(&i).unwrap())) - .collect() - }); - sub.write(i).await - } - sub + pub async fn write(&self, items: Message) { + match items { + Message::Many(items) => { + for i in items.unwrap() { + self.sender.send(i).await.unwrap() + } + } + Message::Single(item) => self.sender.send(item).await.unwrap(), + }; } - async fn read(&mut self) -> RpcResult> { - self.receiver.recv().await.unwrap() + pub async fn write_delayed(&self, items: Message) { + let sender = self.sender.clone(); + tokio::spawn(async move { + tokio::time::sleep(Duration::from_millis(500)).await; + + match items { + Message::Many(items) => { + for i in items.unwrap() { + let _ = sender.send(i).await; + } + } + Message::Single(item) => sender.send(item).await.unwrap(), + }; + }); } + } + + #[derive(Default)] + struct InnerMockedRpcClient { + data_table: MockDataTable, + subscription_channel: Option, + subscription_handlers: HashMap, + method_handlers: HashMap, + } - async fn write(&self, items: RpcResult>) { - self.sender.send(items).await.unwrap() + impl InnerMockedRpcClient { + fn call<'a>( + &'a mut self, + method_handler: &str, + params: Option>, + ) -> RawRpcFuture<'a, Box> { + let method = self.method_handlers.get(method_handler).unwrap_or_else(|| { + panic!( + "no method named {} registered. Params: {:?}", + method_handler, params + ) + }); + + (*method)(&mut self.data_table, &mut self.subscription_channel, params) + } + + fn subscribe<'a>( + &'a mut self, + sub: &str, + params: Option>, + ) -> RawRpcFuture<'a, RawRpcSubscription> { + let sub = self.subscription_handlers.get(sub).unwrap_or_else(|| { + panic!( + "no subscription named {} registered. Params: {:?}", + sub, params + ) + }); + + (*sub)(&mut self.data_table, &mut self.subscription_channel, params) } } - struct Data { - request: MockDataTable, - subscription: Subscription, + #[derive(Default)] + pub struct MockRpcBuilder { + data: InnerMockedRpcClient, } - struct MockRpcClientStorage { - data: Arc>, + impl MockRpcBuilder { + pub fn add_method(mut self, method_name: &str, method_handler: F) -> Self + where + F: Send + + for<'a> Fn( + &'a mut MockDataTable, + &'a mut Option, + Option>, + ) + -> RawRpcFuture<'a, Box> + + 'static, + { + self.data + .method_handlers + .insert(method_name.into(), Box::new(method_handler)); + self + } + + pub fn add_subscription( + mut self, + subscription_name: &str, + subscription_handler: F, + ) -> Self + where + F: Send + + for<'a> Fn( + &'a mut MockDataTable, + &'a mut Option, + Option>, + ) -> RawRpcFuture<'a, RawRpcSubscription> + + 'static, + { + self.data + .subscription_handlers + .insert(subscription_name.into(), Box::new(subscription_handler)); + self + } + + pub fn add_mock_data< + 'a, + T: Serialize, + I: IntoIterator>)>, + >( + mut self, + item: I, + ) -> Self { + let data = &mut self.data.data_table; + for (key, item) in item.into_iter() { + data.push(key.into(), item); + } + self + } + + pub fn build(self) -> MockRpcClient { + MockRpcClient { + data: Arc::new(Mutex::new(self.data)), + } + } } - impl RpcClientT for MockRpcClientStorage { + pub struct MockRpcClient { + data: Arc>, + } + + impl RpcClientT for MockRpcClient { fn request_raw<'a>( &'a self, method: &'a str, params: Option>, - ) -> super::rpc::RawRpcFuture<'a, Box> { - Box::pin(async move { - match method { - "state_getStorage" => { - let mut data = self.data.lock().await; - let params = params.map(|p| p.get().to_string()); - let rpc_params = jsonrpsee::types::Params::new(params.as_deref()); - let key: sp_core::Bytes = rpc_params.sequence().next().unwrap(); - let value = data.request.pop(key.0); - value.map(|v| serde_json::value::RawValue::from_string(v).unwrap()) - } - "chain_getBlockHash" => { - let mut data = self.data.lock().await; - let value = data.request.pop("chain_getBlockHash".into()); - value.map(|v| serde_json::value::RawValue::from_string(v).unwrap()) - } - _ => todo!(), - } + ) -> RawRpcFuture<'a, Box> { + Box::pin(async { + let mut data = self.data.lock().await; + data.call(method, params).await }) } fn subscribe_raw<'a>( &'a self, - _sub: &'a str, - _params: Option>, + sub: &'a str, + params: Option>, _unsub: &'a str, - ) -> super::rpc::RawRpcFuture<'a, super::rpc::RawRpcSubscription> { + ) -> RawRpcFuture<'a, RawRpcSubscription> { Box::pin(async { let mut data = self.data.lock().await; - let values: RpcResult>>> = - data.subscription.read().await.map(|v| { - v.into_iter() - .map(|v| { - v.map(|v| serde_json::value::RawValue::from_string(v).unwrap()) - }) - .collect::>>>() - }); - values.map(|v| RawRpcSubscription { - stream: futures::stream::iter(v).boxed(), - id: Some("ID".to_string()), - }) + data.subscribe(sub, params).await }) } } + } - // Define dummy config - enum Conf {} - impl Config for Conf { - type Hash = crate::utils::H256; - type AccountId = crate::utils::AccountId32; - type Address = crate::utils::MultiAddress; - type Signature = crate::utils::MultiSignature; - type Hasher = crate::config::substrate::BlakeTwo256; - type Header = crate::config::substrate::SubstrateHeader; - type ExtrinsicParams = DefaultExtrinsicParams; + // Define dummy config + enum Conf {} + impl Config for Conf { + type Hash = H256; + type AccountId = crate::utils::AccountId32; + type Address = crate::utils::MultiAddress; + type Signature = crate::utils::MultiSignature; + type Hasher = crate::config::substrate::BlakeTwo256; + type Header = crate::config::substrate::SubstrateHeader; + type ExtrinsicParams = DefaultExtrinsicParams; + type AssetId = u32; + } - type AssetId = u32; + mod legacy { + use super::*; + use crate::backend::{ + legacy::rpc_methods::Bytes, legacy::rpc_methods::RuntimeVersion, legacy::LegacyBackend, + }; + use rpc_client::*; + + pub fn setup_mock_rpc() -> MockRpcBuilder { + MockRpcBuilder::default() + .add_method("state_getStorage", |data, _sub, params| { + Box::pin(async move { + let params = params.map(|p| p.get().to_string()); + let rpc_params = jsonrpsee::types::Params::new(params.as_deref()); + let key: sp_core::Bytes = rpc_params.sequence().next().unwrap(); + let value = data.pop(key.0).unwrap_single(); + value.map(|v| serde_json::value::RawValue::from_string(v).unwrap()) + }) + }) + .add_method("chain_getBlockHash", |data, _, _| { + Box::pin(async move { + let value = data.pop("chain_getBlockHash".into()).unwrap_single(); + value.map(|v| serde_json::value::RawValue::from_string(v).unwrap()) + }) + }) } use crate::backend::Backend; @@ -530,61 +664,33 @@ mod test { Ok(Some(Bytes(str.into()))) } - fn storage_response>, V: Into>>(key: K, value: V) -> StorageResponse - where - Vec: From, - { - StorageResponse { - key: key.into(), - value: value.into(), - } - } - - async fn build_mock_client< - 'a, - T: Serialize, - D: IntoIterator)>, - S: IntoIterator>>>, - >( - table_data: D, - subscription_data: S, - ) -> RpcClient { - let data = Data { - request: MockDataTable::from_iter(table_data), - subscription: Subscription::from_iter(subscription_data).await, - }; - RpcClient::new(MockRpcClientStorage { - data: Arc::new(Mutex::new(data)), - }) - } - #[tokio::test] async fn storage_fetch_values() { let mock_data = vec![ - ("ID1", bytes("Data1")), + ("ID1", Message::Single(bytes("Data1"))), ( "ID2", - Err(RpcError::DisconnectedWillReconnect( + Message::Single(Err(RpcError::DisconnectedWillReconnect( "Reconnecting".to_string(), - )), + ))), ), - ("ID2", bytes("Data2")), + ("ID2", Message::Single(bytes("Data2"))), ( "ID3", - Err(RpcError::DisconnectedWillReconnect( + Message::Single(Err(RpcError::DisconnectedWillReconnect( "Reconnecting".to_string(), - )), + ))), ), - ("ID3", bytes("Data3")), + ("ID3", Message::Single(bytes("Data3"))), ]; - let rpc_client = build_mock_client(mock_data, vec![]).await; + let rpc_client = setup_mock_rpc().add_mock_data(mock_data).build(); let backend: LegacyBackend = LegacyBackend::builder().build(rpc_client); // Test let response = backend .storage_fetch_values( ["ID1".into(), "ID2".into(), "ID3".into()].into(), - crate::utils::H256::random(), + random_hash(), ) .await .unwrap(); @@ -609,18 +715,18 @@ mod test { let mock_data = [ ( "ID1", - Err(RpcError::DisconnectedWillReconnect( + Message::Single(Err(RpcError::DisconnectedWillReconnect( "Reconnecting".to_string(), - )), + ))), ), - ("ID1", bytes("Data1")), + ("ID1", Message::Single(bytes("Data1"))), ]; - let rpc_client = build_mock_client(mock_data, vec![]).await; + let rpc_client = setup_mock_rpc().add_mock_data(mock_data).build(); // Test let backend: LegacyBackend = LegacyBackend::builder().build(rpc_client); let response = backend - .storage_fetch_value("ID1".into(), crate::utils::H256::random()) + .storage_fetch_value("ID1".into(), random_hash()) .await .unwrap(); @@ -644,17 +750,17 @@ mod test { /// } /// ``` async fn simple_fetch() { - let hash = crate::utils::H256::random(); + let hash = random_hash(); let mock_data = vec![ ( "chain_getBlockHash", - Err(RpcError::DisconnectedWillReconnect( + Message::Single(Err(RpcError::DisconnectedWillReconnect( "Reconnecting".to_string(), - )), + ))), ), - ("chain_getBlockHash", Ok(Some(hash))), + ("chain_getBlockHash", Message::Single(Ok(Some(hash)))), ]; - let rpc_client = build_mock_client(mock_data, vec![]).await; + let rpc_client = setup_mock_rpc().add_mock_data(mock_data).build(); // Test let backend: LegacyBackend = LegacyBackend::builder().build(rpc_client); @@ -688,27 +794,56 @@ mod test { /// ``` async fn stream_simple() { let mock_subscription_data = vec![ - Ok(vec![ - Ok(runtime_version(0)), - Err(RpcError::DisconnectedWillReconnect( - "Reconnecting".to_string(), - )), - Ok(runtime_version(1)), - ]), - Ok(vec![ - Err(RpcError::DisconnectedWillReconnect( - "Reconnecting".to_string(), - )), - Ok(runtime_version(2)), - Ok(runtime_version(3)), - ]), - Ok(vec![ - Ok(runtime_version(4)), - Ok(runtime_version(5)), - Err(RpcError::RequestRejected("Reconnecting".to_string())), - ]), + ( + "state_subscribeRuntimeVersion", + Message::Many(Ok(vec![ + Ok(runtime_version(0)), + Err(RpcError::DisconnectedWillReconnect( + "Reconnecting".to_string(), + )), + Ok(runtime_version(1)), + ])), + ), + ( + "state_subscribeRuntimeVersion", + Message::Many(Ok(vec![ + Err(RpcError::DisconnectedWillReconnect( + "Reconnecting".to_string(), + )), + Ok(runtime_version(2)), + Ok(runtime_version(3)), + ])), + ), + ( + "state_subscribeRuntimeVersion", + Message::Many(Ok(vec![ + Ok(runtime_version(4)), + Ok(runtime_version(5)), + Err(RpcError::RequestRejected("Reconnecting".to_string())), + ])), + ), ]; - let rpc_client = build_mock_client(vec![], mock_subscription_data).await; + let rpc_client = setup_mock_rpc() + .add_subscription("state_subscribeRuntimeVersion", |data, _, _| { + Box::pin(async move { + let values = data + .pop("state_subscribeRuntimeVersion".into()) + .unwrap_many(); + let values: RpcResult>>> = values.map(|v| { + v.into_iter() + .map(|v| { + v.map(|v| serde_json::value::RawValue::from_string(v).unwrap()) + }) + .collect::>>>() + }); + values.map(|v| RawRpcSubscription { + stream: futures::stream::iter(v).boxed(), + id: Some("ID".to_string()), + }) + }) + }) + .add_mock_data(mock_subscription_data) + .build(); // Test let backend: LegacyBackend = LegacyBackend::builder().build(rpc_client); @@ -734,4 +869,575 @@ mod test { assert!(results.next().await.is_none()) } } + + mod unstable_backend { + + use std::sync::atomic::AtomicBool; + + use futures::task::Poll; + use rpc_client::{Message, MockRpcBuilder, Subscription}; + use rpc_methods::{ + Bytes, Initialized, MethodResponse, MethodResponseStarted, OperationError, OperationId, + OperationStorageItems, RuntimeSpec, RuntimeVersionEvent, + }; + + use super::unstable::*; + use super::*; + + fn build_backend( + rpc_client: impl RpcClientT, + ) -> (UnstableBackend, UnstableBackendDriver) { + let (backend, driver): (UnstableBackend, _) = + UnstableBackend::builder().build(rpc_client); + (backend, driver) + } + + fn build_backend_spawn_background(rpc_client: impl RpcClientT) -> UnstableBackend { + let (backend, mut driver) = build_backend(rpc_client); + tokio::spawn(async move { + while let Some(val) = driver.next().await { + if let Err(e) = val { + eprintln!("Error driving unstable backend: {e}; terminating client"); + } + } + }); + backend + } + + fn runtime_spec() -> RuntimeSpec { + let spec = serde_json::json!({ + "specName": "westend", + "implName": "parity-westend", + "specVersion": 9122, + "implVersion": 0, + "transactionVersion": 7, + "apis": { + "0xdf6acb689907609b": 3, + "0x37e397fc7c91f5e4": 1, + "0x40fe3ad401f8959a": 5, + "0xd2bc9897eed08f15": 3, + "0xf78b278be53f454c": 2, + "0xaf2c0297a23e6d3d": 1, + "0x49eaaf1b548a0cb0": 1, + "0x91d5df18b0d2cf58": 1, + "0xed99c5acb25eedf5": 3, + "0xcbca25e39f142387": 2, + "0x687ad44ad37f03c2": 1, + "0xab3c0572291feb8b": 1, + "0xbc9d89904f5b923f": 1, + "0x37c8bb1350a9a2a8": 1 + } + }); + serde_json::from_value(spec).unwrap() + } + + type FollowEvent = unstable::rpc_methods::FollowEvent<::Hash>; + + fn setup_mock_rpc_client(cycle_ids: bool) -> MockRpcBuilder { + let hash = random_hash(); + let mut id = 0; + rpc_client::MockRpcBuilder::default().add_subscription( + "chainHead_v1_follow", + move |_, sub, _| { + Box::pin(async move { + if cycle_ids { + id += 1; + } + let follow_event = + FollowEvent::Initialized(Initialized::<::Hash> { + finalized_block_hashes: vec![hash], + finalized_block_runtime: Some(rpc_methods::RuntimeEvent::Valid( + RuntimeVersionEvent { + spec: runtime_spec(), + }, + )), + }); + let (subscription, mut receiver) = Subscription::new(); + subscription + .write(Message::Single(Ok( + serde_json::to_string(&follow_event).unwrap() + ))) + .await; + sub.replace(subscription); + let read_stream = + futures::stream::poll_fn(move |cx| -> Poll> { + receiver.poll_recv(cx) + }) + .map(|item| item.map(|x| RawValue::from_string(x).unwrap())); + let stream = RawRpcSubscription { + stream: read_stream.boxed(), + id: Some(format!("ID{}", id)), + }; + Ok(stream) + }) + }, + ) + } + + fn response_started(id: &str) -> MethodResponse { + MethodResponse::Started(MethodResponseStarted { + operation_id: id.to_owned(), + discarded_items: None, + }) + } + + fn operation_error(id: &str) -> FollowEvent { + FollowEvent::OperationError(OperationError { + operation_id: id.to_owned(), + error: "error".to_owned(), + }) + } + + fn storage_done(id: &str) -> FollowEvent { + FollowEvent::OperationStorageDone(OperationId { + operation_id: id.to_owned(), + }) + } + fn storage_result(key: &str, value: &str) -> unstable::rpc_methods::StorageResult { + unstable::rpc_methods::StorageResult { + key: Bytes(key.to_owned().into()), + result: rpc_methods::StorageResultType::Value(Bytes(value.to_owned().into())), + } + } + fn storage_items(id: &str, items: &[unstable::rpc_methods::StorageResult]) -> FollowEvent { + FollowEvent::OperationStorageItems(OperationStorageItems { + operation_id: id.to_owned(), + items: VecDeque::from(items.to_owned()), + }) + } + + fn operation_continue(id: &str) -> FollowEvent { + FollowEvent::OperationWaitingForContinue(OperationId { + operation_id: id.to_owned(), + }) + } + + #[tokio::test] + async fn storage_fetch_values_returns_stream_with_single_error() { + let response_data = vec![( + "method_response", + Message::Single(Ok(response_started("Id1"))), + )]; + let mock_subscription_data = vec![( + "chainHead_v1_storage", + Message::Many(Ok(vec![Ok(operation_error("Id1")), Ok(FollowEvent::Stop)])), + )]; + let rpc_client = setup_mock_rpc_client(false) + .add_method("chainHead_v1_storage", |data, sub, _| { + Box::pin(async move { + let response = data.pop("method_response".into()).unwrap_single(); + if response.is_ok() { + let item = data.pop("chainHead_v1_storage".into()); + if let Some(sub) = sub { + let item = item; + sub.write_delayed(item).await + } + } + response.map(|x| RawValue::from_string(x).unwrap()) + }) + }) + .add_mock_data(mock_subscription_data) + .add_mock_data(response_data) + .build(); + + let backend = build_backend_spawn_background(rpc_client); + + // Test + // This request should encounter an error on `request` and do a retry. + let response = backend + .storage_fetch_values( + ["ID1".into(), "ID2".into(), "ID3".into()].into(), + random_hash(), + ) + .await + .unwrap(); + + // operation returned FollowEvent::OperationError + let response = response + .collect::>>() + .await; + + assert!(matches!( + response.as_slice(), + [Err(Error::Other(s) )] if s == "error" + )); + } + + #[tokio::test] + /// Tests that the method will retry on failed query + async fn storage_fetch_values_retry_query() { + let response_data = vec![ + ( + "method_response", + Message::Single(Err(RpcError::DisconnectedWillReconnect("Error".into()))), + ), + ( + "method_response", + Message::Single(Ok(response_started("Id1"))), + ), + ]; + let mock_data = vec![( + "chainHead_v1_storage", + Message::Many(Ok(vec![ + Ok(storage_items( + "Id1", + &[ + storage_result("ID1", "Data1"), + storage_result("ID2", "Data2"), + storage_result("ID3", "Data3"), + ], + )), + Ok(storage_done("Id1")), + ])), + )]; + let rpc_client = setup_mock_rpc_client(false) + .add_method("chainHead_v1_storage", |data, sub, _| { + Box::pin(async move { + let response = data.pop("method_response".into()).unwrap_single(); + if response.is_ok() { + let item = data.pop("chainHead_v1_storage".into()); + if let Some(sub) = sub { + let item = item; + sub.write_delayed(item).await + } + } + response.map(|x| RawValue::from_string(x).unwrap()) + }) + }) + .add_mock_data(mock_data) + .add_mock_data(response_data) + .build(); + let backend = build_backend_spawn_background(rpc_client); + + // We try again and should succeed + let response = backend + .storage_fetch_values( + ["ID1".into(), "ID2".into(), "ID3".into()].into(), + random_hash(), + ) + .await + .unwrap(); + + let response = response + .map(|x| x.unwrap()) + .collect::>() + .await; + + assert_eq!( + vec![ + storage_response("ID1", "Data1"), + storage_response("ID2", "Data2"), + storage_response("ID3", "Data3"), + ], + response + ) + } + #[tokio::test] + async fn storage_fetch_values_retry_chainhead_continue() { + fn compare_storage_responses( + expected_response: &StorageResponse, + received_response: &StorageResponse, + ) -> bool { + expected_response == received_response + } + + let response_data = vec![ + ( + "method_response", + Message::Single(Err::( + RpcError::DisconnectedWillReconnect("Error".into()), + )), + ), + ( + "method_response", + Message::Single(Ok(response_started("Id1"))), + ), + ( + "method_response", + Message::Single(Ok(response_started("Id1"))), + ), + ]; + let continue_data = vec![ + ("continue_response", Message::Single(Ok(()))), + ("continue_response", Message::Single(Ok(()))), + ( + "continue_response", + Message::Single(Err(RpcError::DisconnectedWillReconnect("Error".into()))), + ), + ("continue_response", Message::Single(Ok(()))), + ("continue_response", Message::Single(Ok(()))), + ]; + let mock_data = vec![ + ( + "chainHead_v1_storage", + Message::Many(Ok(vec![ + Ok(storage_items("Id1", &[storage_result("ID1", "Data1")])), + Ok(operation_continue("Id1")), + ])), + ), + ( + "chainHead_v1_storage", + Message::Many(Ok(vec![ + Ok(storage_items("Id1", &[storage_result("ID2", "Data2")])), + Ok(operation_continue("Id1")), + ])), + ), + ( + "chainHead_v1_storage", + Message::Many(Ok(vec![Ok(operation_error("Id1")), Ok(FollowEvent::Stop)])), + ), + ( + "chainHead_v1_storage", + Message::Many(Ok(vec![ + Ok(storage_items("Id1", &[storage_result("ID1", "Data1")])), + Ok(operation_continue("Id1")), + ])), + ), + ( + "chainHead_v1_storage", + Message::Many(Ok(vec![ + Ok(storage_items("Id1", &[storage_result("ID2", "Data2")])), + Ok(operation_continue("Id1")), + ])), + ), + ( + "chainHead_v1_storage", + Message::Many(Ok(vec![ + Ok(storage_items("Id1", &[storage_result("ID3", "Data3")])), + Ok(storage_done("Id1")), + ])), + ), + ]; + let rpc_client = setup_mock_rpc_client(false) + .add_method("chainHead_v1_storage", |data, sub, _| { + Box::pin(async move { + let response = data.pop("method_response".into()).unwrap_single(); + if response.is_ok() { + let item = data.pop("chainHead_v1_storage".into()); + if let Some(sub) = sub { + let item = item; + sub.write_delayed(item).await + } + } + response.map(|x| RawValue::from_string(x).unwrap()) + }) + }) + .add_method("chainHead_v1_continue", |data, sub, _| { + Box::pin(async move { + let response = data.pop("continue_response".into()).unwrap_single(); + if response.is_ok() { + let item = data.pop("chainHead_v1_storage".into()); + if let Some(sub) = sub { + let item = item; + sub.write_delayed(item).await + } + } + response.map(|x| RawValue::from_string(x).unwrap()) + }) + }) + .add_mock_data(mock_data) + .add_mock_data(response_data) + .add_mock_data(continue_data) + .build(); + let backend = build_backend_spawn_background(rpc_client); + + // We try again and should fail mid way + let response = backend + .storage_fetch_values( + ["ID1".into(), "ID2".into(), "ID3".into()].into(), + random_hash(), + ) + .await + .unwrap(); + // operation returned FollowEvent::OperationError + let response = response + .collect::>>() + .await; + + assert!(matches!( + response.as_slice(), + [ + Ok(resp1 @ StorageResponse { .. }), + Ok(resp2 @ StorageResponse { .. }), + Err(Error::Other(s)) + ] if s == "error" + && compare_storage_responses(&storage_response("ID1", "Data1"), resp1) + && compare_storage_responses(&storage_response("ID2", "Data2"), resp2) + )); + + // We try again and should succeed + let response = backend + .storage_fetch_values( + ["ID1".into(), "ID2".into(), "ID3".into()].into(), + random_hash(), + ) + .await + .unwrap(); + + let response = response + .map(|x| x.unwrap()) + .collect::>() + .await; + + assert_eq!( + vec![ + storage_response("ID1", "Data1"), + storage_response("ID2", "Data2"), + storage_response("ID3", "Data3"), + ], + response + ) + } + + #[tokio::test] + async fn simple_fetch() { + let hash = random_hash(); + + let mock_data = vec![ + ( + "chainSpec_v1_genesisHash", + Message::Single(Err::(RpcError::RequestRejected( + "Error".to_owned(), + ))), + ), + ( + "chainSpec_v1_genesisHash", + Message::Single(Err(RpcError::DisconnectedWillReconnect("Error".to_owned()))), + ), + ("chainSpec_v1_genesisHash", Message::Single(Ok(hash))), + ]; + let rpc_client = setup_mock_rpc_client(false) + .add_method("chainSpec_v1_genesisHash", |data, _, _| { + Box::pin(async move { + let response = data.pop("chainSpec_v1_genesisHash".into()).unwrap_single(); + response.map(|x| RawValue::from_string(x).unwrap()) + }) + }) + .add_mock_data(mock_data) + .build(); + + let backend = build_backend_spawn_background(rpc_client); + + // Test + // This request should encounter an error on `request` and do a retry. + let response_hash = backend.genesis_hash().await.unwrap(); + + assert_eq!(hash, response_hash) + } + + #[tokio::test] + // Failure as we do not wait for subscription id to be updated. + // see https://github.com/paritytech/subxt/issues/1567 + async fn stale_subscription_id_failure() { + let response_data = vec![ + ( + "method_response", + Message::Single(Ok(response_started("Id1"))), + ), + ( + "method_response", + Message::Single(Err(RpcError::RequestRejected("stale id".into()))), + ), + ( + "method_response", + Message::Single(Ok(response_started("Id1"))), + ), + ]; + + let mock_data = vec![ + ( + "chainHead_v1_storage", + Message::Many(Ok(vec![Ok(operation_error("Id1")), Ok(FollowEvent::Stop)])), + ), + ( + "chainHead_v1_storage", + Message::Many(Ok(vec![ + Ok(storage_items( + "Id1", + &[ + storage_result("ID1", "Data1"), + storage_result("ID2", "Data2"), + storage_result("ID3", "Data3"), + ], + )), + Ok(storage_done("Id1")), + ])), + ), + ]; + let rpc_client = setup_mock_rpc_client(true) + .add_method("chainHead_v1_storage", { + let subscription_expired = Arc::new(AtomicBool::new(false)); + move |data, sub, params| { + let subscription_expired = subscription_expired.clone(); + Box::pin(async move { + let subscription_expired = subscription_expired.clone(); + if subscription_expired.load(std::sync::atomic::Ordering::SeqCst) { + let params = params.map(|p| p.get().to_string()); + let rpc_params = jsonrpsee::types::Params::new(params.as_deref()); + let key: String = rpc_params.sequence().next().unwrap(); + if key == *"ID1" { + return Err(RpcError::RequestRejected("stale id".into())); + } else { + subscription_expired + .swap(false, std::sync::atomic::Ordering::SeqCst); + } + } + let response = data.pop("method_response".into()).unwrap_single(); + if response.is_ok() { + let item = data.pop("chainHead_v1_storage".into()); + if let Some(sub) = sub { + let item = item; + sub.write_delayed(item).await + } + } else { + subscription_expired + .swap(true, std::sync::atomic::Ordering::SeqCst); + } + response.map(|x| RawValue::from_string(x).unwrap()) + }) + } + }) + .add_mock_data(mock_data) + .add_mock_data(response_data) + .build(); + let (backend, mut driver): (UnstableBackend, _) = build_backend(rpc_client); + + let _ = driver.next().await.unwrap(); + let _ = driver.next().await.unwrap(); + + // not getting new subscription id and hitting request rejected > 10 times + let response = backend + .storage_fetch_values( + ["ID1".into(), "ID2".into(), "ID3".into()].into(), + random_hash(), + ) + .await; + + let _ = driver.next().await.unwrap(); + + let binding = response + .unwrap() + .collect::>>() + .await; + let response = binding.last().unwrap(); + + assert!(matches!( + response, + Err(Error::Other(reason)) if reason == "error" + )); + + // not getting new subscription id and hitting request rejected > 10 times + let response = backend + .storage_fetch_values( + ["ID1".into(), "ID2".into(), "ID3".into()].into(), + random_hash(), + ) + .await; + + assert!(matches!( + response, + Err(Error::Rpc(RpcError::RequestRejected(reason))) if reason == "stale id" + )) + } + } } diff --git a/subxt/src/backend/unstable/rpc_methods.rs b/subxt/src/backend/unstable/rpc_methods.rs index e72f2cadef..0e730c4fb1 100644 --- a/subxt/src/backend/unstable/rpc_methods.rs +++ b/subxt/src/backend/unstable/rpc_methods.rs @@ -321,6 +321,7 @@ impl UnstableRpcMethods { #[derive(Debug, Clone, PartialEq, Eq, Deserialize)] #[serde(rename_all = "camelCase")] #[serde(tag = "event")] +#[cfg_attr(test, derive(Serialize))] pub enum FollowEvent { /// The latest finalized block. /// @@ -363,6 +364,8 @@ pub enum FollowEvent { /// This is the first event generated by the `follow` subscription /// and is submitted only once. #[derive(Debug, Clone, PartialEq, Eq)] +#[cfg_attr(test, derive(Serialize))] +#[cfg_attr(test, serde(rename_all = "camelCase"))] pub struct Initialized { /// The hashes of the last finalized blocks. pub finalized_block_hashes: Vec, @@ -404,6 +407,7 @@ impl<'de, Hash: Deserialize<'de>> Deserialize<'de> for Initialized { #[derive(Debug, Clone, PartialEq, Eq, Deserialize)] #[serde(rename_all = "camelCase")] #[serde(tag = "type")] +#[cfg_attr(test, derive(Serialize))] pub enum RuntimeEvent { /// The runtime version of this block. Valid(RuntimeVersionEvent), @@ -418,6 +422,7 @@ pub enum RuntimeEvent { /// - blocks that suffered a change in runtime compared with their parents #[derive(Debug, Clone, PartialEq, Eq, Deserialize)] #[serde(rename_all = "camelCase")] +#[cfg_attr(test, derive(Serialize))] pub struct RuntimeVersionEvent { /// Details about this runtime. pub spec: RuntimeSpec, @@ -427,6 +432,7 @@ pub struct RuntimeVersionEvent { /// the "initialized" event of `chainHead_follow` if the `withRuntime` flag is set. #[derive(Debug, Clone, PartialEq, Eq, Deserialize)] #[serde(rename_all = "camelCase")] +#[cfg_attr(test, derive(Serialize))] pub struct RuntimeSpec { /// Opaque string indicating the name of the chain. pub spec_name: String, @@ -453,13 +459,18 @@ pub struct RuntimeSpec { /// /// **Note:** In Substrate, the keys in the apis field consists of the hexadecimal-encoded 8-bytes blake2 /// hash of the name of the API. For example, the `TaggedTransactionQueue` API is 0xd2bc9897eed08f15. - #[serde(with = "hashmap_as_tuple_list")] + #[serde(deserialize_with = "hashmap_as_tuple_list::deserialize")] + #[cfg_attr( + test, + serde(serialize_with = "hashmap_as_tuple_list::for_test::serialize") + )] pub apis: HashMap, } /// The operation could not be processed due to an error. #[derive(Debug, Clone, PartialEq, Eq, Deserialize)] #[serde(rename_all = "camelCase")] +#[cfg_attr(test, derive(Serialize))] pub struct ErrorEvent { /// Reason of the error. pub error: String, @@ -468,6 +479,7 @@ pub struct ErrorEvent { /// Indicate a new non-finalized block. #[derive(Debug, Clone, PartialEq, Eq, Deserialize)] #[serde(rename_all = "camelCase")] +#[cfg_attr(test, derive(Serialize))] pub struct NewBlock { /// The hash of the new block. pub block_hash: Hash, @@ -485,6 +497,7 @@ pub struct NewBlock { /// Indicate the block hash of the new best block. #[derive(Debug, Clone, PartialEq, Eq, Deserialize)] #[serde(rename_all = "camelCase")] +#[cfg_attr(test, derive(Serialize))] pub struct BestBlockChanged { /// The block hash of the new best block. pub best_block_hash: Hash, @@ -493,6 +506,7 @@ pub struct BestBlockChanged { /// Indicate the finalized and pruned block hashes. #[derive(Debug, Clone, PartialEq, Eq, Deserialize)] #[serde(rename_all = "camelCase")] +#[cfg_attr(test, derive(Serialize))] pub struct Finalized { /// Block hashes that are finalized. pub finalized_block_hashes: Vec, @@ -503,6 +517,7 @@ pub struct Finalized { /// Indicate the operation id of the event. #[derive(Debug, Clone, PartialEq, Eq, Deserialize)] #[serde(rename_all = "camelCase")] +#[cfg_attr(test, derive(Serialize))] pub struct OperationId { /// The operation id of the event. pub operation_id: String, @@ -511,6 +526,7 @@ pub struct OperationId { /// The response of the `chainHead_body` method. #[derive(Debug, Clone, PartialEq, Eq, Deserialize)] #[serde(rename_all = "camelCase")] +#[cfg_attr(test, derive(Serialize))] pub struct OperationBodyDone { /// The operation id of the event. pub operation_id: String, @@ -521,6 +537,7 @@ pub struct OperationBodyDone { /// The response of the `chainHead_call` method. #[derive(Debug, Clone, PartialEq, Eq, Deserialize)] #[serde(rename_all = "camelCase")] +#[cfg_attr(test, derive(Serialize))] pub struct OperationCallDone { /// The operation id of the event. pub operation_id: String, @@ -531,6 +548,7 @@ pub struct OperationCallDone { /// The response of the `chainHead_call` method. #[derive(Debug, Clone, PartialEq, Eq, Deserialize)] #[serde(rename_all = "camelCase")] +#[cfg_attr(test, derive(Serialize))] pub struct OperationStorageItems { /// The operation id of the event. pub operation_id: String, @@ -541,6 +559,7 @@ pub struct OperationStorageItems { /// Indicate a problem during the operation. #[derive(Debug, Clone, PartialEq, Eq, Deserialize)] #[serde(rename_all = "camelCase")] +#[cfg_attr(test, derive(Serialize))] pub struct OperationError { /// The operation id of the event. pub operation_id: String, @@ -551,6 +570,7 @@ pub struct OperationError { /// The storage result. #[derive(Debug, Clone, PartialEq, Eq, Deserialize)] #[serde(rename_all = "camelCase")] +#[cfg_attr(test, derive(Serialize))] pub struct StorageResult { /// The hex-encoded key of the result. pub key: Bytes, @@ -562,6 +582,7 @@ pub struct StorageResult { /// The type of the storage query. #[derive(Debug, Clone, PartialEq, Eq, Deserialize)] #[serde(rename_all = "camelCase")] +#[cfg_attr(test, derive(Serialize))] pub enum StorageResultType { /// Fetch the value of the provided key. Value(Bytes), @@ -575,6 +596,7 @@ pub enum StorageResultType { #[derive(Debug, Clone, PartialEq, Eq, Deserialize)] #[serde(rename_all = "camelCase")] #[serde(tag = "result")] +#[cfg_attr(test, derive(Serialize))] pub enum MethodResponse { /// The method has started. Started(MethodResponseStarted), @@ -585,6 +607,7 @@ pub enum MethodResponse { /// The `started` result of a method. #[derive(Debug, Clone, PartialEq, Eq, Deserialize)] #[serde(rename_all = "camelCase")] +#[cfg_attr(test, derive(Serialize))] pub struct MethodResponseStarted { /// The operation id of the response. pub operation_id: String, @@ -704,6 +727,7 @@ impl Stream for TransactionSubscription { #[derive(Debug, Clone, PartialEq, Eq, Deserialize)] #[serde(rename_all = "camelCase")] #[serde(tag = "event")] +#[cfg_attr(test, derive(Serialize))] pub enum TransactionStatus { /// Transaction is part of the future queue. Validated, @@ -745,11 +769,16 @@ pub enum TransactionStatus { /// Details of a block that a transaction is seen in. #[derive(Debug, Clone, PartialEq, Eq, Deserialize)] +#[cfg_attr(test, derive(Serialize))] pub struct TransactionBlockDetails { /// The block hash. pub hash: Hash, /// The index of the transaction in the block. - #[serde(with = "unsigned_number_as_string")] + #[serde(deserialize_with = "unsigned_number_as_string::deserialize")] + #[cfg_attr( + test, + serde(serialize_with = "unsigned_number_as_string::for_test::serialize") + )] pub index: u64, } @@ -804,6 +833,18 @@ pub(crate) mod unsigned_number_as_string { Ok(v.into()) } } + #[cfg(test)] + pub mod for_test { + use serde::ser::Serializer; + + /// Serialize a number as string + pub fn serialize(item: &u64, serializer: S) -> Result + where + S: Serializer, + { + serializer.serialize_str(&item.to_string()) + } + } } /// A temporary shim to decode "spec.apis" if it comes back as an array like: @@ -880,6 +921,28 @@ pub(crate) mod hashmap_as_tuple_list { Ok(map) } } + #[cfg(test)] + pub mod for_test { + use std::collections::HashMap; + use std::hash::Hash; + + use serde::ser::{Serialize, SerializeSeq, Serializer}; + + /// Serialize hashmap as list of tuples + pub fn serialize( + item: &HashMap, + serializer: S, + ) -> Result + where + S: Serializer, + { + let mut seq = serializer.serialize_seq(None)?; + for i in item { + seq.serialize_element(&i)?; + } + seq.end() + } + } } #[cfg(test)]