diff --git a/Cargo.lock b/Cargo.lock index 6e7390fb8b..50c6736496 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1291,6 +1291,7 @@ dependencies = [ "tempfile 3.0.7 (registry+https://github.com/rust-lang/crates.io-index)", "test_utils 0.0.49-alpha1", "tiny_http 0.6.2 (registry+https://github.com/rust-lang/crates.io-index)", + "tokio 0.1.22 (registry+https://github.com/rust-lang/crates.io-index)", "toml 0.5.0 (registry+https://github.com/rust-lang/crates.io-index)", "url 2.1.0 (registry+https://github.com/rust-lang/crates.io-index)", "ws 0.8.0 (registry+https://github.com/rust-lang/crates.io-index)", diff --git a/app_spec/test/files/test.js b/app_spec/test/files/test.js index 23cf1f7a46..733118e1f5 100644 --- a/app_spec/test/files/test.js +++ b/app_spec/test/files/test.js @@ -13,4 +13,20 @@ module.exports = scenario => { const result = await alice.call('app', 'blog', 'ping', params) t.deepEqual(result, { Ok: { msg_type: 'response', body: `got hello from ${alice.info('app').agentAddress}` } }) }) + + scenario('multiple zome calls', async (s, t) => { + const { alice, bob } = await s.players({ alice: one, bob: one }, true) + const params = { to_agent: bob.info('app').agentAddress, message: 'hello' } + + // shut down bob so ping to bob will timeout to complete + await bob.kill() + let results = [] + const f1 = alice.call('app', 'blog', 'ping', params).then(r => {results.push(2)}) + const f2 = alice.call('app',"blog", "get_test_properties", {}).then(r => {results.push(1)}) + await Promise.all([f1,f2]) + + // prove that show_env returned before ping + t.deepEqual(results,[1,2]) + + }) } diff --git a/app_spec/zomes/blog/code/src/blog.rs b/app_spec/zomes/blog/code/src/blog.rs index 7303532813..f1728ee610 100644 --- a/app_spec/zomes/blog/code/src/blog.rs +++ b/app_spec/zomes/blog/code/src/blog.rs @@ -97,7 +97,7 @@ pub fn handle_ping(to_agent: Address, message: String) -> ZomeApiResult, ) -> Result<(Broadcaster, thread::JoinHandle<()>), String> { let url = format!("0.0.0.0:{}", self.port); - + let runtime = Runtime::new().map_err(|e| e.to_string())?; let server = ServerBuilder::new(handler) + .event_loop_executor(runtime.executor()) .start_http(&url.parse().expect("Invalid URL!")) .map_err(|e| e.to_string())?; self.bound_address = Some(*server.address()); @@ -40,6 +42,7 @@ impl Interface for HttpInterface { .name(format!("http_interface/{}", url)) .spawn(move || { let _ = server; // move `server` into this thread + let _ = runtime; // move tokio runtime for RPC futures into this thread let _ = kill_switch.recv(); }) .expect("Could not spawn thread for HTTP interface"); diff --git a/crates/conductor_lib/src/interface_impls/websocket.rs b/crates/conductor_lib/src/interface_impls/websocket.rs index 75a544bcda..1a97976aac 100644 --- a/crates/conductor_lib/src/interface_impls/websocket.rs +++ b/crates/conductor_lib/src/interface_impls/websocket.rs @@ -3,6 +3,7 @@ use crossbeam_channel::Receiver; use jsonrpc_core::IoHandler; use jsonrpc_ws_server::ServerBuilder; use std::{net::SocketAddr, thread}; +use tokio::runtime::Runtime; pub struct WebsocketInterface { port: u16, @@ -30,7 +31,9 @@ impl Interface for WebsocketInterface { kill_switch: Receiver<()>, ) -> Result<(Broadcaster, thread::JoinHandle<()>), String> { let url = format!("0.0.0.0:{}", self.port); + let runtime = Runtime::new().map_err(|e| e.to_string())?; let server = ServerBuilder::new(handler) + .event_loop_executor(runtime.executor()) .start(&url.parse().expect("Invalid URL!")) .map_err(|e| e.to_string())?; self.bound_address = Some(*server.addr()); @@ -39,6 +42,7 @@ impl Interface for WebsocketInterface { .name(format!("websocket_interface/{}", url)) .spawn(move || { let _ = server; // move `server` into this thread + let _ = runtime; // move tokio runtime for RPC futures into this thread let _ = kill_switch.recv(); }) .expect("Could not spawn thread for websocket interface");