From 7e79824b24e9cd6e3ed50ae58c2b57be01b5fcef Mon Sep 17 00:00:00 2001 From: George Brewster <127057440+gbrew@users.noreply.github.com> Date: Sun, 5 Nov 2023 19:39:29 -0800 Subject: [PATCH] fix: change ws SubscriptionStream::unsubscribe to send the correct server-side ID (#2669) Also add a test which calls unsubscribe. --- ethers-core/src/utils/genesis.rs | 2 +- ethers-providers/src/rpc/pubsub.rs | 10 ++++-- .../src/rpc/transports/ws/types.rs | 6 ---- ethers-providers/tests/it/provider.rs | 32 +++++++++++++++++++ 4 files changed, 40 insertions(+), 10 deletions(-) diff --git a/ethers-core/src/utils/genesis.rs b/ethers-core/src/utils/genesis.rs index c16745124..72ae2bb0b 100644 --- a/ethers-core/src/utils/genesis.rs +++ b/ethers-core/src/utils/genesis.rs @@ -358,7 +358,7 @@ where return Numeric::from_str(num) .map(U256::from) .map(Some) - .map_err(serde::de::Error::custom); + .map_err(serde::de::Error::custom) } if let serde_json::Value::Number(num) = val { diff --git a/ethers-providers/src/rpc/pubsub.rs b/ethers-providers/src/rpc/pubsub.rs index a11579501..b0e92b203 100644 --- a/ethers-providers/src/rpc/pubsub.rs +++ b/ethers-providers/src/rpc/pubsub.rs @@ -1,4 +1,4 @@ -use crate::{JsonRpcClient, Middleware, Provider}; +use crate::{JsonRpcClient, Provider}; use ethers_core::types::U256; @@ -30,7 +30,8 @@ pub trait PubsubClient: JsonRpcClient { #[pin_project(PinnedDrop)] /// Streams data from an installed filter via `eth_subscribe` pub struct SubscriptionStream<'a, P: PubsubClient, R: DeserializeOwned> { - /// The subscription's installed id on the ethereum node + /// A client-side ID for the subscription. This may not be the same + /// as the server-side ID for this subscription on the ethereum node. pub id: U256, loaded_elements: VecDeque, @@ -63,7 +64,10 @@ where /// Unsubscribes from the subscription. pub async fn unsubscribe(&self) -> Result { - self.provider.unsubscribe(self.id).await + // Make sure to use PubSubClient unsubscribe() rather than Provider unsubscribe() + // Only the former handles mappings between client- and server-side subscription IDs + P::unsubscribe((*self.provider).as_ref(), self.id).map_err(Into::into)?; + Ok(true) } /// Set the loaded elements buffer. This buffer contains logs waiting for diff --git a/ethers-providers/src/rpc/transports/ws/types.rs b/ethers-providers/src/rpc/transports/ws/types.rs index 883380b4d..00119040f 100644 --- a/ethers-providers/src/rpc/transports/ws/types.rs +++ b/ethers-providers/src/rpc/transports/ws/types.rs @@ -11,12 +11,6 @@ pub type Response = Result, JsonRpcError>; #[derive(serde::Deserialize, serde::Serialize)] pub struct SubId(pub U256); -impl SubId { - pub(super) fn serialize_raw(&self) -> Result, serde_json::Error> { - to_raw_value(&self) - } -} - #[derive(Deserialize, Debug, Clone)] pub struct Notification { pub subscription: U256, diff --git a/ethers-providers/tests/it/provider.rs b/ethers-providers/tests/it/provider.rs index 68a6c0b28..e1155cf12 100644 --- a/ethers-providers/tests/it/provider.rs +++ b/ethers-providers/tests/it/provider.rs @@ -95,6 +95,38 @@ mod eth_tests { assert_eq!(&block, blocks.last().unwrap()); } + #[tokio::test] + #[cfg(feature = "ws")] + async fn unsubscribe_blocks_ws() { + let (provider, _anvil) = crate::spawn_anvil_ws().await; + generic_unsubscribe_blocks_test(provider).await; + } + + #[tokio::test] + #[cfg(feature = "ipc")] + async fn unsubscribe_blocks_ipc() { + let (provider, _anvil, _ipc) = crate::spawn_anvil_ipc().await; + generic_unsubscribe_blocks_test(provider).await; + } + + #[cfg(any(feature = "ws", feature = "ipc"))] + async fn generic_unsubscribe_blocks_test(provider: M) + where + M: Middleware, + M::Provider: ethers_providers::PubsubClient, + { + { + let stream = provider.subscribe_blocks().await.unwrap(); + stream.unsubscribe().await.unwrap(); + } + { + let _stream = provider.subscribe_blocks().await.unwrap(); + // stream will be unsubscribed automatically here on drop + } + // Sleep to give the unsubscription messages time to propagate + tokio::time::sleep(crate::Duration::from_millis(200)).await; + } + #[tokio::test] async fn send_tx_http() { let (provider, anvil) = spawn_anvil();