Skip to content

Commit

Permalink
fix: change ws SubscriptionStream::unsubscribe to send the correct se…
Browse files Browse the repository at this point in the history
…rver-side ID (gakonst#2669)

Also add a test which calls unsubscribe.
  • Loading branch information
gbrew committed Nov 8, 2023
1 parent 2ffa4ac commit 7e79824
Show file tree
Hide file tree
Showing 4 changed files with 40 additions and 10 deletions.
2 changes: 1 addition & 1 deletion ethers-core/src/utils/genesis.rs
Original file line number Diff line number Diff line change
Expand Up @@ -358,7 +358,7 @@ where
return Numeric::from_str(num)
.map(U256::from)
.map(Some)
.map_err(serde::de::Error::custom);
.map_err(serde::de::Error::custom)
}

if let serde_json::Value::Number(num) = val {
Expand Down
10 changes: 7 additions & 3 deletions ethers-providers/src/rpc/pubsub.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use crate::{JsonRpcClient, Middleware, Provider};
use crate::{JsonRpcClient, Provider};

use ethers_core::types::U256;

Expand Down Expand Up @@ -30,7 +30,8 @@ pub trait PubsubClient: JsonRpcClient {
#[pin_project(PinnedDrop)]
/// Streams data from an installed filter via `eth_subscribe`
pub struct SubscriptionStream<'a, P: PubsubClient, R: DeserializeOwned> {
/// The subscription's installed id on the ethereum node
/// A client-side ID for the subscription. This may not be the same
/// as the server-side ID for this subscription on the ethereum node.
pub id: U256,

loaded_elements: VecDeque<R>,
Expand Down Expand Up @@ -63,7 +64,10 @@ where

/// Unsubscribes from the subscription.
pub async fn unsubscribe(&self) -> Result<bool, crate::ProviderError> {
self.provider.unsubscribe(self.id).await
// Make sure to use PubSubClient unsubscribe() rather than Provider unsubscribe()
// Only the former handles mappings between client- and server-side subscription IDs
P::unsubscribe((*self.provider).as_ref(), self.id).map_err(Into::into)?;
Ok(true)
}

/// Set the loaded elements buffer. This buffer contains logs waiting for
Expand Down
6 changes: 0 additions & 6 deletions ethers-providers/src/rpc/transports/ws/types.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,12 +11,6 @@ pub type Response = Result<Box<RawValue>, JsonRpcError>;
#[derive(serde::Deserialize, serde::Serialize)]
pub struct SubId(pub U256);

impl SubId {
pub(super) fn serialize_raw(&self) -> Result<Box<RawValue>, serde_json::Error> {
to_raw_value(&self)
}
}

#[derive(Deserialize, Debug, Clone)]
pub struct Notification {
pub subscription: U256,
Expand Down
32 changes: 32 additions & 0 deletions ethers-providers/tests/it/provider.rs
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,38 @@ mod eth_tests {
assert_eq!(&block, blocks.last().unwrap());
}

#[tokio::test]
#[cfg(feature = "ws")]
async fn unsubscribe_blocks_ws() {
let (provider, _anvil) = crate::spawn_anvil_ws().await;
generic_unsubscribe_blocks_test(provider).await;
}

#[tokio::test]
#[cfg(feature = "ipc")]
async fn unsubscribe_blocks_ipc() {
let (provider, _anvil, _ipc) = crate::spawn_anvil_ipc().await;
generic_unsubscribe_blocks_test(provider).await;
}

#[cfg(any(feature = "ws", feature = "ipc"))]
async fn generic_unsubscribe_blocks_test<M>(provider: M)
where
M: Middleware,
M::Provider: ethers_providers::PubsubClient,
{
{
let stream = provider.subscribe_blocks().await.unwrap();
stream.unsubscribe().await.unwrap();
}
{
let _stream = provider.subscribe_blocks().await.unwrap();
// stream will be unsubscribed automatically here on drop
}
// Sleep to give the unsubscription messages time to propagate
tokio::time::sleep(crate::Duration::from_millis(200)).await;
}

#[tokio::test]
async fn send_tx_http() {
let (provider, anvil) = spawn_anvil();
Expand Down

0 comments on commit 7e79824

Please sign in to comment.