From 9471bc38e1c0370e4abe960f598685c7b06a7c08 Mon Sep 17 00:00:00 2001 From: Gregory Sobol Date: Mon, 22 Jul 2024 08:31:05 +0200 Subject: [PATCH] fix(ethexe): fixing of ethexe tests hang on (#4071) --- ethexe/cli/src/tests.rs | 19 +++++++++++++------ ethexe/observer/src/observer.rs | 8 ++++++-- 2 files changed, 19 insertions(+), 8 deletions(-) diff --git a/ethexe/cli/src/tests.rs b/ethexe/cli/src/tests.rs index c6b4e598266..039a330be20 100644 --- a/ethexe/cli/src/tests.rs +++ b/ethexe/cli/src/tests.rs @@ -34,7 +34,10 @@ use gear_core::ids::prelude::*; use gprimitives::{ActorId, CodeId, H256}; use std::{sync::Arc, time::Duration}; use tokio::{ - sync::mpsc::{self, Receiver}, + sync::{ + mpsc::{self, Receiver}, + oneshot, + }, task::{self, JoinHandle}, }; @@ -44,17 +47,21 @@ struct Listener { } impl Listener { - pub fn new(mut observer: Observer) -> Self { + pub async fn new(mut observer: Observer) -> Self { let (sender, receiver) = mpsc::channel::(1024); + let (send_subscription_created, receive_subscription_created) = oneshot::channel::<()>(); let _handle = task::spawn(async move { let observer_events = observer.events(); futures::pin_mut!(observer_events); + send_subscription_created.send(()).unwrap(); + while let Some(event) = observer_events.next().await { sender.send(event).await.unwrap(); } }); + receive_subscription_created.await.unwrap(); Self { receiver, _handle } } @@ -198,8 +205,8 @@ impl TestEnv { Ok(env) } - pub fn new_listener(&self) -> Listener { - Listener::new(self.observer.clone()) + pub async fn new_listener(&self) -> Listener { + Listener::new(self.observer.clone()).await } pub async fn upload_code(&self, code: &[u8]) -> Result<(H256, CodeId)> { @@ -220,11 +227,11 @@ impl TestEnv { async fn ping() { gear_utils::init_default_logger(); - let mut anvil = Anvil::new().block_time(1).try_spawn().unwrap(); + let mut anvil = Anvil::new().try_spawn().unwrap(); drop(anvil.child_mut().stdout.take()); //temp fix for alloy#1078 let mut env = TestEnv::new(anvil.ws_endpoint()).await.unwrap(); - let mut listener = env.new_listener(); + let mut listener = env.new_listener().await; let service = env.service.take().unwrap(); let service_handle = task::spawn(service.run()); diff --git a/ethexe/observer/src/observer.rs b/ethexe/observer/src/observer.rs index 6f19e671e4a..400fc5a3cdb 100644 --- a/ethexe/observer/src/observer.rs +++ b/ethexe/observer/src/observer.rs @@ -215,7 +215,7 @@ mod tests { use alloy::node_bindings::Anvil; use ethexe_ethereum::Ethereum; use ethexe_signer::Signer; - use tokio::task; + use tokio::{sync::oneshot, task}; fn wat2wasm_with_validate(s: &str, validate: bool) -> Vec { wabt::Wat2Wasm::new() @@ -234,7 +234,7 @@ mod tests { async fn test_deployment() -> Result<()> { gear_utils::init_default_logger(); - let mut anvil = Anvil::new().block_time(1).try_spawn()?; + let mut anvil = Anvil::new().try_spawn()?; drop(anvil.child_mut().stdout.take()); //temp fix for alloy#1078 let ethereum_rpc = anvil.ws_endpoint(); @@ -253,6 +253,7 @@ mod tests { let router_address = ethereum.router().address(); let cloned_blob_reader = blob_reader.clone(); + let (send_subscription_created, receive_subscription_created) = oneshot::channel::<()>(); let handle = task::spawn(async move { let mut observer = Observer::new(ðereum_rpc, router_address, cloned_blob_reader) .await @@ -261,6 +262,8 @@ mod tests { let observer_events = observer.events(); futures::pin_mut!(observer_events); + send_subscription_created.send(()).unwrap(); + while let Some(event) = observer_events.next().await { if matches!(event, Event::CodeLoaded { .. }) { return Some(event); @@ -269,6 +272,7 @@ mod tests { None }); + receive_subscription_created.await.unwrap(); let wat = r#" (module