From b91820cf2004f8e0301352510f6655c0772bdacf Mon Sep 17 00:00:00 2001 From: Nicolas Luck Date: Wed, 8 Apr 2020 17:13:46 +0200 Subject: [PATCH 1/9] Add waker map and channel to redux loop / instance --- crates/core/src/instance.rs | 36 +++++++++++++++++++++++++++++++----- 1 file changed, 31 insertions(+), 5 deletions(-) diff --git a/crates/core/src/instance.rs b/crates/core/src/instance.rs index e141a6ef55..b2e9fb650b 100644 --- a/crates/core/src/instance.rs +++ b/crates/core/src/instance.rs @@ -37,12 +37,19 @@ use std::{ thread, time::{Duration, Instant}, }; +use futures::task::Waker; +use std::collections::HashMap; pub const RECV_DEFAULT_TIMEOUT_MS: Duration = Duration::from_millis(10000); pub const RETRY_VALIDATION_DURATION_MIN: Duration = Duration::from_millis(500); pub const RETRY_VALIDATION_DURATION_MAX: Duration = Duration::from_secs(60 * 60); +pub enum WakerRequest { + Add(ProcessUniqueId, Waker), + Remove(ProcessUniqueId), +} + /// Object representing a Holochain instance, i.e. a running holochain (DNA + DHT + source-chain) /// Holds the Event loop and processes it with the redux pattern. #[derive(Clone)] @@ -51,6 +58,7 @@ pub struct Instance { state: Arc>, action_channel: Option, observer_channel: Option>, + waker_channel: Option>, scheduler_handle: Option>, persister: Option>>, consistency_model: ConsistencyModel, @@ -71,7 +79,7 @@ impl Instance { /// This is initializing and starting the redux action loop and adding channels to dispatch /// actions and observers to the context pub(in crate::instance) fn inner_setup(&mut self, context: Arc) -> Arc { - let (rx_action, rx_observer) = self.initialize_channels(); + let (rx_action, rx_observer, rx_waker) = self.initialize_channels(); let context = self.initialize_context(context); let mut scheduler = Scheduler::new(); scheduler @@ -89,7 +97,7 @@ impl Instance { self.persister = Some(context.persister.clone()); - self.start_action_loop(context.clone(), rx_action, rx_observer); + self.start_action_loop(context.clone(), rx_action, rx_observer, rx_waker); self.start_holding_loop(context.clone()); context @@ -160,13 +168,15 @@ impl Instance { } /// Returns recievers for actions and observers that get added to this instance - fn initialize_channels(&mut self) -> (ActionReceiver, Receiver) { + fn initialize_channels(&mut self) -> (ActionReceiver, Receiver, Receiver) { let (tx_action, rx_action) = unbounded::>(); let (tx_observer, rx_observer) = unbounded::(); + let (tx_waker, rx_waker) = unbounded::(); self.action_channel = Some(tx_action.into()); self.observer_channel = Some(tx_observer); + self.waker_channel = Some(tx_waker); - (rx_action, rx_observer) + (rx_action, rx_observer, rx_waker) } pub fn initialize_context(&self, context: Arc) -> Arc { @@ -174,6 +184,7 @@ impl Instance { sub_context.set_state(self.state.clone()); sub_context.action_channel = self.action_channel.clone(); sub_context.observer_channel = self.observer_channel.clone(); + sub_context.waker_channel = self.waker_channel.clone(); Arc::new(sub_context) } @@ -183,6 +194,7 @@ impl Instance { context: Arc, rx_action: ActionReceiver, rx_observer: Receiver, + rx_waker: Receiver, ) { self.stop_action_loop(); @@ -202,10 +214,18 @@ impl Instance { .spawn(move || { let mut state_observers: Vec = Vec::new(); let mut unprocessed_action: Option> = None; + let mut wakers: HashMap:: = HashMap::new(); while kill_receiver.try_recv().is_err() { if let Some(action_wrapper) = unprocessed_action.take().or_else(|| rx_action.recv_timeout(Duration::from_secs(1)).ok()) { // Add new observers state_observers.extend(rx_observer.try_iter()); + // Process waker requests + for waker_request in rx_waker.iter() { + match waker_request { + WakerRequest::Add(id, waker) => wakers.insert(id, waker), + WakerRequest::Remove(id) => wakers.remove(&id), + }; + } let action = action_wrapper.action(); // Ping can happen often, and should be as lightweight as possible let should_process = *action != Action::Ping; @@ -223,6 +243,10 @@ impl Instance { .into_iter() .filter(|observer| observer.ticker.send(()).is_ok()) .collect(); + // Tick all wakers + for waker in wakers.values() { + waker.clone().wake(); + } }, Err(HolochainError::Timeout(s)) => { warn!("Instance::process_action() couldn't get lock on state. Trying again next loop. Timeout string: {}", s); @@ -424,6 +448,7 @@ impl Instance { state: Arc::new(RwLock::new(StateWrapper::new(context.clone()))), action_channel: None, observer_channel: None, + waker_channel: None, scheduler_handle: None, persister: None, consistency_model: ConsistencyModel::new(context), @@ -437,6 +462,7 @@ impl Instance { state: Arc::new(RwLock::new(StateWrapper::from(state))), action_channel: None, observer_channel: None, + waker_channel: None, scheduler_handle: None, persister: None, consistency_model: ConsistencyModel::new(context), @@ -782,7 +808,7 @@ pub mod tests { let netname = Some("can_process_action"); let mut instance = Instance::new(test_context("jason", netname)); let context = instance.initialize_context(test_context("jane", netname)); - let (rx_action, rx_observer) = instance.initialize_channels(); + let (rx_action, rx_observer, _) = instance.initialize_channels(); let action_wrapper = test_action_wrapper_commit(); instance From 07aff927c2c32f0803cfd37529927e023f6788b7 Mon Sep 17 00:00:00 2001 From: Nicolas Luck Date: Wed, 8 Apr 2020 17:15:47 +0200 Subject: [PATCH 2/9] Add waker (un-)register functions to Context --- crates/core/src/context.rs | 14 ++++++++++++++ 1 file changed, 14 insertions(+) diff --git a/crates/core/src/context.rs b/crates/core/src/context.rs index 81d00c4f28..661b859533 100644 --- a/crates/core/src/context.rs +++ b/crates/core/src/context.rs @@ -49,6 +49,9 @@ use std::{ #[cfg(test)] use test_utils::mock_signing::mock_conductor_api; +use crate::instance::WakerRequest; +use futures::task::Waker; +use snowflake::ProcessUniqueId; pub type ActionSender = ht::channel::SpanSender; pub type ActionReceiver = ht::channel::SpanReceiver; @@ -94,6 +97,7 @@ pub struct Context { state: Option>>, pub action_channel: Option, pub observer_channel: Option>, + pub waker_channel: Option>, pub chain_storage: Arc>, pub dht_storage: Arc>, pub eav_storage: Arc>>, @@ -159,6 +163,7 @@ impl Context { action_channel: None, signal_tx, observer_channel: None, + waker_channel: None, chain_storage, dht_storage, eav_storage: eav, @@ -199,6 +204,7 @@ impl Context { action_channel, signal_tx, observer_channel, + waker_channel: None, chain_storage: cas.clone(), dht_storage: cas, eav_storage: eav, @@ -350,6 +356,14 @@ impl Context { tick_rx } + pub fn register_waker(&self, future_id: ProcessUniqueId, waker: Waker) { + self.waker_channel.as_ref().map(|c| c.send(WakerRequest::Add(future_id, waker))); + } + + pub fn unregister_waker(&self, future_id: ProcessUniqueId) { + self.waker_channel.as_ref().map(|c| c.send(WakerRequest::Remove(future_id))); + } + /// Custom future executor that enables nested futures and nested calls of `block_on`. /// This makes use of the redux action loop and the observers. /// The given future gets polled everytime the instance's state got changed. From 6bd3e3790d8a089b94b87e71b9a8fc40917606c9 Mon Sep 17 00:00:00 2001 From: Nicolas Luck Date: Wed, 8 Apr 2020 17:25:24 +0200 Subject: [PATCH 3/9] Use redux-synced waker in 5 futures --- crates/core/src/agent/actions/commit.rs | 12 +++++++----- crates/core/src/dht/actions/hold_aspect.rs | 18 ++++++++++++------ .../src/dht/actions/queue_holding_workflow.rs | 18 ++++++++++++------ .../actions/remove_queued_holding_workflow.rs | 18 ++++++++++++------ .../network/actions/get_validation_package.rs | 13 +++++++------ 5 files changed, 50 insertions(+), 29 deletions(-) diff --git a/crates/core/src/agent/actions/commit.rs b/crates/core/src/agent/actions/commit.rs index 5560a9e636..21e1aa27f6 100644 --- a/crates/core/src/agent/actions/commit.rs +++ b/crates/core/src/agent/actions/commit.rs @@ -7,6 +7,7 @@ use crate::{ use futures::{future::Future, task::Poll}; use holochain_core_types::{entry::Entry, error::HolochainError}; use holochain_persistence_api::cas::content::Address; +use snowflake::ProcessUniqueId; use std::{pin::Pin, sync::Arc}; /// Commit Action Creator @@ -26,9 +27,11 @@ pub async fn commit_entry( vec![], ))); dispatch_action(context.action_channel(), action_wrapper.clone()); + let id = ProcessUniqueId::new(); CommitFuture { context: context.clone(), action: action_wrapper, + id, } .await } @@ -38,6 +41,7 @@ pub async fn commit_entry( pub struct CommitFuture { context: Arc, action: ActionWrapper, + id: ProcessUniqueId, } #[holochain_tracing_macros::newrelic_autotrace(HOLOCHAIN_CORE)] @@ -48,11 +52,8 @@ impl Future for CommitFuture { if let Some(err) = self.context.action_channel_error("CommitFuture") { return Poll::Ready(Err(err)); } - // - // TODO: connect the waker to state updates for performance reasons - // See: https://github.com/holochain/holochain-rust/issues/314 - // - cx.waker().clone().wake(); + self.context + .register_waker(self.id.clone(), cx.waker().clone()); if let Some(state) = self.context.try_state() { match state.agent().actions().get(&self.action) { Some(r) => match r.response() { @@ -63,6 +64,7 @@ impl Future for CommitFuture { self.action.id().to_string(), )), ); + self.context.unregister_waker(self.id.clone()); Poll::Ready(result.clone()) } _ => unreachable!(), diff --git a/crates/core/src/dht/actions/hold_aspect.rs b/crates/core/src/dht/actions/hold_aspect.rs index d1f99d3fd6..6f39039992 100644 --- a/crates/core/src/dht/actions/hold_aspect.rs +++ b/crates/core/src/dht/actions/hold_aspect.rs @@ -5,17 +5,25 @@ use crate::{ }; use futures::{future::Future, task::Poll}; use holochain_core_types::{error::HolochainError, network::entry_aspect::EntryAspect}; +use snowflake::ProcessUniqueId; use std::{pin::Pin, sync::Arc}; pub async fn hold_aspect(aspect: EntryAspect, context: Arc) -> Result<(), HolochainError> { let action_wrapper = ActionWrapper::new(Action::HoldAspect(aspect.clone())); dispatch_action(context.action_channel(), action_wrapper.clone()); - HoldAspectFuture { context, aspect }.await + let id = ProcessUniqueId::new(); + HoldAspectFuture { + context, + aspect, + id, + } + .await } pub struct HoldAspectFuture { context: Arc, aspect: EntryAspect, + id: ProcessUniqueId, } #[holochain_tracing_macros::newrelic_autotrace(HOLOCHAIN_CORE)] @@ -26,15 +34,13 @@ impl Future for HoldAspectFuture { if let Some(err) = self.context.action_channel_error("HoldAspectFuture") { return Poll::Ready(Err(err)); } - // - // TODO: connect the waker to state updates for performance reasons - // See: https://github.com/holochain/holochain-rust/issues/314 - // - cx.waker().clone().wake(); + self.context + .register_waker(self.id.clone(), cx.waker().clone()); if let Some(state) = self.context.try_state() { // TODO: wait for it to show up in the holding list // i.e. once we write the reducer we'll know if state.dht().get_holding_map().contains(&self.aspect) { + self.context.unregister_waker(self.id.clone()); Poll::Ready(Ok(())) } else { Poll::Pending diff --git a/crates/core/src/dht/actions/queue_holding_workflow.rs b/crates/core/src/dht/actions/queue_holding_workflow.rs index fee4b24cb2..cdf7e1fdc5 100644 --- a/crates/core/src/dht/actions/queue_holding_workflow.rs +++ b/crates/core/src/dht/actions/queue_holding_workflow.rs @@ -5,6 +5,7 @@ use crate::{ instance::dispatch_action, }; use futures::{future::Future, task::Poll}; +use snowflake::ProcessUniqueId; use std::{ pin::Pin, sync::Arc, @@ -37,7 +38,13 @@ pub async fn queue_holding_workflow( { log_trace!(context, "Queueing holding workflow: {:?}", pending); dispatch_queue_holding_workflow(pending.clone(), delay, context.clone()); - QueueHoldingWorkflowFuture { context, pending }.await + let id = ProcessUniqueId::new(); + QueueHoldingWorkflowFuture { + context, + pending, + id, + } + .await } else { log_trace!( context, @@ -50,6 +57,7 @@ pub async fn queue_holding_workflow( pub struct QueueHoldingWorkflowFuture { context: Arc, pending: PendingValidation, + id: ProcessUniqueId, } #[holochain_tracing_macros::newrelic_autotrace(HOLOCHAIN_CORE)] @@ -57,14 +65,12 @@ impl Future for QueueHoldingWorkflowFuture { type Output = (); fn poll(self: Pin<&mut Self>, cx: &mut std::task::Context) -> Poll { - // - // TODO: connect the waker to state updates for performance reasons - // See: https://github.com/holochain/holochain-rust/issues/314 - // - cx.waker().clone().wake(); + self.context + .register_waker(self.id.clone().into(), cx.waker().clone()); if let Some(state) = self.context.try_state() { if state.dht().has_exact_queued_holding_workflow(&self.pending) { + self.context.unregister_waker(self.id.clone().into()); Poll::Ready(()) } else { Poll::Pending diff --git a/crates/core/src/dht/actions/remove_queued_holding_workflow.rs b/crates/core/src/dht/actions/remove_queued_holding_workflow.rs index ec12774ce9..49096e0e3e 100644 --- a/crates/core/src/dht/actions/remove_queued_holding_workflow.rs +++ b/crates/core/src/dht/actions/remove_queued_holding_workflow.rs @@ -5,18 +5,26 @@ use crate::{ instance::dispatch_action, }; use futures::{future::Future, task::Poll}; +use snowflake::ProcessUniqueId; use std::{pin::Pin, sync::Arc}; #[holochain_tracing_macros::newrelic_autotrace(HOLOCHAIN_CORE)] pub async fn remove_queued_holding_workflow(pending: PendingValidation, context: Arc) { let action_wrapper = ActionWrapper::new(Action::RemoveQueuedHoldingWorkflow(pending.clone())); dispatch_action(context.action_channel(), action_wrapper.clone()); - RemoveQueuedHoldingWorkflowFuture { context, pending }.await + let id = ProcessUniqueId::new(); + RemoveQueuedHoldingWorkflowFuture { + context, + pending, + id, + } + .await } pub struct RemoveQueuedHoldingWorkflowFuture { context: Arc, pending: PendingValidation, + id: ProcessUniqueId, } #[holochain_tracing_macros::newrelic_autotrace(HOLOCHAIN_CORE)] @@ -24,16 +32,14 @@ impl Future for RemoveQueuedHoldingWorkflowFuture { type Output = (); fn poll(self: Pin<&mut Self>, cx: &mut std::task::Context) -> Poll { - // - // TODO: connect the waker to state updates for performance reasons - // See: https://github.com/holochain/holochain-rust/issues/314 - // - cx.waker().clone().wake(); + self.context + .register_waker(self.id.clone(), cx.waker().clone()); if let Some(state) = self.context.try_state() { if state.dht().has_exact_queued_holding_workflow(&self.pending) { Poll::Pending } else { + self.context.unregister_waker(self.id.clone()); Poll::Ready(()) } } else { diff --git a/crates/core/src/network/actions/get_validation_package.rs b/crates/core/src/network/actions/get_validation_package.rs index 76f064c7c3..ad5a140ba6 100644 --- a/crates/core/src/network/actions/get_validation_package.rs +++ b/crates/core/src/network/actions/get_validation_package.rs @@ -8,7 +8,7 @@ use futures::{future::Future, task::Poll}; use holochain_core_types::{ chain_header::ChainHeader, error::HcResult, validation::ValidationPackage, }; - +use snowflake::ProcessUniqueId; use std::{pin::Pin, sync::Arc}; /// GetValidationPackage Action Creator @@ -29,9 +29,11 @@ pub async fn get_validation_package( }; let action_wrapper = ActionWrapper::new(Action::GetValidationPackage((key.clone(), header))); dispatch_action(context.action_channel(), action_wrapper.clone()); + let id = ProcessUniqueId::new(); GetValidationPackageFuture { context: context.clone(), key, + id, } .await } @@ -42,6 +44,7 @@ pub async fn get_validation_package( pub struct GetValidationPackageFuture { context: Arc, key: ValidationKey, + id: ProcessUniqueId, } #[holochain_tracing_macros::newrelic_autotrace(HOLOCHAIN_CORE)] @@ -56,11 +59,8 @@ impl Future for GetValidationPackageFuture { return Poll::Ready(Err(err)); } - // - // TODO: connect the waker to state updates for performance reasons - // See: https://github.com/holochain/holochain-rust/issues/314 - // - cx.waker().clone().wake(); + self.context + .register_waker(self.id.clone(), cx.waker().clone()); if let Some(state) = self.context.try_state() { let state = state.network(); @@ -74,6 +74,7 @@ impl Future for GetValidationPackageFuture { self.context.action_channel(), ActionWrapper::new(Action::ClearValidationPackageResult(self.key.clone())), ); + self.context.unregister_waker(self.id.clone()); Poll::Ready(result.clone()) } _ => Poll::Pending, From d12a719946bdd895a524773d80892246c81579a9 Mon Sep 17 00:00:00 2001 From: Nicolas Luck Date: Wed, 8 Apr 2020 17:25:42 +0200 Subject: [PATCH 4/9] rustfmt --- crates/core/src/context.rs | 12 ++++++++---- crates/core/src/instance.rs | 8 +++++--- 2 files changed, 13 insertions(+), 7 deletions(-) diff --git a/crates/core/src/context.rs b/crates/core/src/context.rs index 661b859533..ed5b170823 100644 --- a/crates/core/src/context.rs +++ b/crates/core/src/context.rs @@ -47,11 +47,11 @@ use std::{ time::Duration, }; -#[cfg(test)] -use test_utils::mock_signing::mock_conductor_api; use crate::instance::WakerRequest; use futures::task::Waker; use snowflake::ProcessUniqueId; +#[cfg(test)] +use test_utils::mock_signing::mock_conductor_api; pub type ActionSender = ht::channel::SpanSender; pub type ActionReceiver = ht::channel::SpanReceiver; @@ -357,11 +357,15 @@ impl Context { } pub fn register_waker(&self, future_id: ProcessUniqueId, waker: Waker) { - self.waker_channel.as_ref().map(|c| c.send(WakerRequest::Add(future_id, waker))); + self.waker_channel + .as_ref() + .map(|c| c.send(WakerRequest::Add(future_id, waker))); } pub fn unregister_waker(&self, future_id: ProcessUniqueId) { - self.waker_channel.as_ref().map(|c| c.send(WakerRequest::Remove(future_id))); + self.waker_channel + .as_ref() + .map(|c| c.send(WakerRequest::Remove(future_id))); } /// Custom future executor that enables nested futures and nested calls of `block_on`. diff --git a/crates/core/src/instance.rs b/crates/core/src/instance.rs index b2e9fb650b..ead0914747 100644 --- a/crates/core/src/instance.rs +++ b/crates/core/src/instance.rs @@ -20,6 +20,7 @@ use crate::{ }; use clokwerk::{ScheduleHandle, Scheduler, TimeUnits}; use crossbeam_channel::{unbounded, Receiver, Sender}; +use futures::task::Waker; use holochain_core_types::{ dna::Dna, error::{HcResult, HolochainError}, @@ -30,6 +31,7 @@ use holochain_persistence_api::cas::content::Address; use holochain_tracing::{self as ht, channel::lax_send_wrapped}; use snowflake::ProcessUniqueId; use std::{ + collections::HashMap, sync::{ atomic::Ordering::{self, Relaxed}, Arc, @@ -37,8 +39,6 @@ use std::{ thread, time::{Duration, Instant}, }; -use futures::task::Waker; -use std::collections::HashMap; pub const RECV_DEFAULT_TIMEOUT_MS: Duration = Duration::from_millis(10000); @@ -168,7 +168,9 @@ impl Instance { } /// Returns recievers for actions and observers that get added to this instance - fn initialize_channels(&mut self) -> (ActionReceiver, Receiver, Receiver) { + fn initialize_channels( + &mut self, + ) -> (ActionReceiver, Receiver, Receiver) { let (tx_action, rx_action) = unbounded::>(); let (tx_observer, rx_observer) = unbounded::(); let (tx_waker, rx_waker) = unbounded::(); From 3772d8d6d10714b254a4ee6412432ed7251e8a1b Mon Sep 17 00:00:00 2001 From: Nicolas Luck Date: Wed, 8 Apr 2020 17:27:20 +0200 Subject: [PATCH 5/9] Remove unnecessary .into() --- crates/core/src/dht/actions/queue_holding_workflow.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/crates/core/src/dht/actions/queue_holding_workflow.rs b/crates/core/src/dht/actions/queue_holding_workflow.rs index cdf7e1fdc5..ae358c1b08 100644 --- a/crates/core/src/dht/actions/queue_holding_workflow.rs +++ b/crates/core/src/dht/actions/queue_holding_workflow.rs @@ -66,11 +66,11 @@ impl Future for QueueHoldingWorkflowFuture { fn poll(self: Pin<&mut Self>, cx: &mut std::task::Context) -> Poll { self.context - .register_waker(self.id.clone().into(), cx.waker().clone()); + .register_waker(self.id.clone(), cx.waker().clone()); if let Some(state) = self.context.try_state() { if state.dht().has_exact_queued_holding_workflow(&self.pending) { - self.context.unregister_waker(self.id.clone().into()); + self.context.unregister_waker(self.id.clone()); Poll::Ready(()) } else { Poll::Pending From deacc5a52da8875b68f76826eeb38681685f1f62 Mon Sep 17 00:00:00 2001 From: Nicolas Luck Date: Wed, 8 Apr 2020 20:01:21 +0200 Subject: [PATCH 6/9] Switch to non-blocking Receiver::try_iter() --- crates/core/src/instance.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/crates/core/src/instance.rs b/crates/core/src/instance.rs index ead0914747..fd9b9b4c37 100644 --- a/crates/core/src/instance.rs +++ b/crates/core/src/instance.rs @@ -222,7 +222,7 @@ impl Instance { // Add new observers state_observers.extend(rx_observer.try_iter()); // Process waker requests - for waker_request in rx_waker.iter() { + for waker_request in rx_waker.try_iter() { match waker_request { WakerRequest::Add(id, waker) => wakers.insert(id, waker), WakerRequest::Remove(id) => wakers.remove(&id), From 5fa92fc01b10b84cc1fe215f55075fc148304a8f Mon Sep 17 00:00:00 2001 From: Nicolas Luck Date: Wed, 8 Apr 2020 20:41:23 +0200 Subject: [PATCH 7/9] Sync waker of all remaining futures --- .../core/src/network/actions/custom_send.rs | 12 ++++---- .../src/network/actions/initialize_network.rs | 12 +++++--- crates/core/src/network/actions/publish.rs | 12 ++++---- .../network/actions/publish_header_entry.rs | 12 ++++---- crates/core/src/network/actions/query.rs | 12 ++++---- .../src/nucleus/actions/call_zome_function.rs | 12 ++++---- crates/core/src/nucleus/actions/initialize.rs | 30 ++++++++++++------- 7 files changed, 62 insertions(+), 40 deletions(-) diff --git a/crates/core/src/network/actions/custom_send.rs b/crates/core/src/network/actions/custom_send.rs index 8e8733e584..969c96c0a1 100644 --- a/crates/core/src/network/actions/custom_send.rs +++ b/crates/core/src/network/actions/custom_send.rs @@ -7,6 +7,7 @@ use crate::{ use futures::{future::Future, task::Poll}; use holochain_core_types::{error::HolochainError, time::Timeout}; use holochain_persistence_api::cas::content::Address; +use snowflake::ProcessUniqueId; use std::{pin::Pin, sync::Arc, time::SystemTime}; /// SendDirectMessage Action Creator for custom (=app) messages @@ -34,9 +35,11 @@ pub async fn custom_send( ))); dispatch_action(context.action_channel(), action_wrapper); + let future_id = ProcessUniqueId::new(); SendResponseFuture { context: context.clone(), id, + future_id, } .await } @@ -45,6 +48,7 @@ pub async fn custom_send( pub struct SendResponseFuture { context: Arc, id: String, + future_id: ProcessUniqueId, } #[holochain_tracing_macros::newrelic_autotrace(HOLOCHAIN_CORE)] @@ -56,11 +60,8 @@ impl Future for SendResponseFuture { return Poll::Ready(Err(err)); } - // - // TODO: connect the waker to state updates for performance reasons - // See: https://github.com/holochain/holochain-rust/issues/314 - // - cx.waker().clone().wake(); + self.context + .register_waker(self.future_id.clone(), cx.waker().clone()); if let Some(state) = self.context.try_state() { let state = state.network(); @@ -73,6 +74,7 @@ impl Future for SendResponseFuture { self.context.action_channel(), ActionWrapper::new(Action::ClearCustomSendResponse(self.id.clone())), ); + self.context.unregister_waker(self.future_id.clone()); Poll::Ready(result.clone()) } _ => Poll::Pending, diff --git a/crates/core/src/network/actions/initialize_network.rs b/crates/core/src/network/actions/initialize_network.rs index 48a012ec59..28d21873c4 100644 --- a/crates/core/src/network/actions/initialize_network.rs +++ b/crates/core/src/network/actions/initialize_network.rs @@ -8,6 +8,7 @@ use futures::{task::Poll, Future}; use holochain_core_types::error::HcResult; #[cfg(test)] use holochain_persistence_api::cas::content::Address; +use snowflake::ProcessUniqueId; use std::{pin::Pin, sync::Arc}; /// Creates a network proxy object and stores DNA and agent hash in the network state. @@ -26,8 +27,10 @@ pub async fn initialize_network(context: &Arc) -> HcResult<()> { dispatch_action(context.action_channel(), action_wrapper.clone()); log_debug!(context, "waiting for network"); + let id = ProcessUniqueId::new(); InitNetworkFuture { context: context.clone(), + id, } .await?; @@ -58,6 +61,7 @@ pub async fn initialize_network_with_spoofed_dna( pub struct InitNetworkFuture { context: Arc, + id: ProcessUniqueId, } impl Future for InitNetworkFuture { @@ -69,15 +73,15 @@ impl Future for InitNetworkFuture { } // - // TODO: connect the waker to state updates for performance reasons - // See: https://github.com/holochain/holochain-rust/issues/314 - // - cx.waker().clone().wake(); + self.context + .register_waker(self.id.clone(), cx.waker().clone()); + if let Some(state) = self.context.try_state() { if state.network().network.is_some() && state.network().dna_address.is_some() && state.network().agent_id.is_some() { + self.context.unregister_waker(self.id.clone()); Poll::Ready(Ok(())) } else { Poll::Pending diff --git a/crates/core/src/network/actions/publish.rs b/crates/core/src/network/actions/publish.rs index 4f7236c1fa..152111445c 100644 --- a/crates/core/src/network/actions/publish.rs +++ b/crates/core/src/network/actions/publish.rs @@ -7,6 +7,7 @@ use crate::{ use futures::{future::Future, task::Poll}; use holochain_core_types::error::HcResult; use holochain_persistence_api::cas::content::Address; +use snowflake::ProcessUniqueId; use std::{pin::Pin, sync::Arc}; /// Publish Action Creator @@ -18,9 +19,11 @@ use std::{pin::Pin, sync::Arc}; pub async fn publish(address: Address, context: &Arc) -> HcResult
{ let action_wrapper = ActionWrapper::new(Action::Publish(address)); dispatch_action(context.action_channel(), action_wrapper.clone()); + let id = ProcessUniqueId::new(); PublishFuture { context: context.clone(), action: action_wrapper, + id, } .await } @@ -30,6 +33,7 @@ pub async fn publish(address: Address, context: &Arc) -> HcResult, action: ActionWrapper, + id: ProcessUniqueId, } #[holochain_tracing_macros::newrelic_autotrace(HOLOCHAIN_CORE)] @@ -41,11 +45,8 @@ impl Future for PublishFuture { return Poll::Ready(Err(err)); } - // - // TODO: connect the waker to state updates for performance reasons - // See: https://github.com/holochain/holochain-rust/issues/314 - // - cx.waker().clone().wake(); + self.context + .register_waker(self.id.clone(), cx.waker().clone()); if let Some(state) = self.context.try_state() { let state = state.network(); @@ -62,6 +63,7 @@ impl Future for PublishFuture { self.action.id().to_string(), )), ); + self.context.unregister_waker(self.id.clone()); Poll::Ready(result.clone()) } _ => unreachable!(), diff --git a/crates/core/src/network/actions/publish_header_entry.rs b/crates/core/src/network/actions/publish_header_entry.rs index 2493d760fb..073fcfbf9e 100644 --- a/crates/core/src/network/actions/publish_header_entry.rs +++ b/crates/core/src/network/actions/publish_header_entry.rs @@ -7,6 +7,7 @@ use crate::{ use futures::{future::Future, task::Poll}; use holochain_core_types::error::HcResult; use holochain_persistence_api::cas::content::Address; +use snowflake::ProcessUniqueId; use std::{pin::Pin, sync::Arc}; /// Publish Header Entry Action Creator @@ -15,9 +16,11 @@ use std::{pin::Pin, sync::Arc}; pub async fn publish_header_entry(address: Address, context: &Arc) -> HcResult
{ let action_wrapper = ActionWrapper::new(Action::PublishHeaderEntry(address)); dispatch_action(context.action_channel(), action_wrapper.clone()); + let id = ProcessUniqueId::new(); PublishHeaderEntryFuture { context: context.clone(), action: action_wrapper, + id, } .await } @@ -27,6 +30,7 @@ pub async fn publish_header_entry(address: Address, context: &Arc) -> H pub struct PublishHeaderEntryFuture { context: Arc, action: ActionWrapper, + id: ProcessUniqueId, } #[holochain_tracing_macros::newrelic_autotrace(HOLOCHAIN_CORE)] @@ -41,11 +45,8 @@ impl Future for PublishHeaderEntryFuture { return Poll::Ready(Err(err)); } - // - // TODO: connect the waker to state updates for performance reasons - // See: https://github.com/holochain/holochain-rust/issues/314 - // - cx.waker().clone().wake(); + self.context + .register_waker(self.id.clone(), cx.waker().clone()); if let Some(state) = self.context.try_state() { let state = state.network(); @@ -61,6 +62,7 @@ impl Future for PublishHeaderEntryFuture { self.action.id().to_string(), )), ); + self.context.unregister_waker(self.id.clone()); Poll::Ready(result.clone()) } _ => unreachable!(), diff --git a/crates/core/src/network/actions/query.rs b/crates/core/src/network/actions/query.rs index e7ecdce264..86834f992b 100644 --- a/crates/core/src/network/actions/query.rs +++ b/crates/core/src/network/actions/query.rs @@ -13,6 +13,7 @@ use holochain_core_types::{crud_status::CrudStatus, error::HcResult, time::Timeo use std::{pin::Pin, sync::Arc}; use holochain_wasm_utils::api_serialization::get_links::{GetLinksArgs, LinksStatusRequestKind}; +use snowflake::ProcessUniqueId; use std::time::SystemTime; /// FetchEntry Action Creator @@ -67,9 +68,11 @@ pub async fn query( )); let action_wrapper = ActionWrapper::new(entry); dispatch_action(context.action_channel(), action_wrapper.clone()); + let id = ProcessUniqueId::new(); QueryFuture { context: context.clone(), key: key.clone(), + id, } .await } @@ -79,6 +82,7 @@ pub async fn query( pub struct QueryFuture { context: Arc, key: QueryKey, + id: ProcessUniqueId, } #[holochain_tracing_macros::newrelic_autotrace(HOLOCHAIN_CORE)] @@ -90,11 +94,8 @@ impl Future for QueryFuture { return Poll::Ready(Err(err)); } - // - // TODO: connect the waker to state updates for performance reasons - // See: https://github.com/holochain/holochain-rust/issues/314 - // - cx.waker().clone().wake(); + self.context + .register_waker(self.id.clone(), cx.waker().clone()); if let Some(state) = self.context.try_state() { if let Err(error) = state.network().initialized() { @@ -106,6 +107,7 @@ impl Future for QueryFuture { self.context.action_channel(), ActionWrapper::new(Action::ClearQueryResult(self.key.clone())), ); + self.context.unregister_waker(self.id.clone()); Poll::Ready(result.clone()) } _ => Poll::Pending, diff --git a/crates/core/src/nucleus/actions/call_zome_function.rs b/crates/core/src/nucleus/actions/call_zome_function.rs index 187b4e5a72..c18a661b00 100644 --- a/crates/core/src/nucleus/actions/call_zome_function.rs +++ b/crates/core/src/nucleus/actions/call_zome_function.rs @@ -24,6 +24,7 @@ use crate::instance::dispatch_action; use base64; use futures::{future::Future, task::Poll}; use holochain_wasm_utils::api_serialization::crypto::CryptoMethod; +use snowflake::ProcessUniqueId; use std::{pin::Pin, sync::Arc}; #[derive(Clone, Debug, PartialEq, Hash, Serialize)] @@ -98,10 +99,12 @@ pub async fn call_zome_function( zome_call ); + let id = ProcessUniqueId::new(); CallResultFuture { context: context.clone(), zome_call, call_spawned: false, + id, } .await } @@ -324,6 +327,7 @@ pub struct CallResultFuture { context: Arc, zome_call: ZomeFnCall, call_spawned: bool, + id: ProcessUniqueId, } impl Unpin for CallResultFuture {} @@ -335,11 +339,8 @@ impl Future for CallResultFuture { if let Some(err) = self.context.action_channel_error("CallResultFuture") { return Poll::Ready(Err(err)); } - // With our own executor implementation in Context::block_on we actually - // wouldn't need the waker since this executor is attached to the redux loop - // and re-polls after every State mutation. - // Leaving this in to be safe against running this future in another executor. - cx.waker().clone().wake(); + self.context + .register_waker(self.id.clone(), cx.waker().clone()); if let Some(state) = self.context.clone().try_state() { if self.call_spawned { @@ -351,6 +352,7 @@ impl Future for CallResultFuture { self.zome_call.clone(), )), ); + self.context.unregister_waker(self.id.clone()); Poll::Ready(result) } None => Poll::Pending, diff --git a/crates/core/src/nucleus/actions/initialize.rs b/crates/core/src/nucleus/actions/initialize.rs index 3b9a0c50af..763bdebf83 100644 --- a/crates/core/src/nucleus/actions/initialize.rs +++ b/crates/core/src/nucleus/actions/initialize.rs @@ -16,6 +16,7 @@ use holochain_core_types::{ use holochain_persistence_api::cas::content::Address; use crate::instance::dispatch_action; +use snowflake::ProcessUniqueId; use std::{pin::Pin, sync::Arc, time::*}; /// Initialization is the value returned by successful initialization of a DNA instance @@ -64,8 +65,10 @@ pub async fn initialize_chain( let action_wrapper = ActionWrapper::new(Action::InitializeChain(dna.clone())); dispatch_action(context.action_channel(), action_wrapper.clone()); + let id = ProcessUniqueId::new(); let _ = InitializingFuture { context: context.clone(), + id, } .await; @@ -179,9 +182,11 @@ pub async fn initialize_chain( ))) .expect("Action channel not usable in initialize_chain()"); + let id = ProcessUniqueId::new(); InitializationFuture { context: context.clone(), created_at: Instant::now(), + id, } .await } @@ -189,6 +194,7 @@ pub async fn initialize_chain( /// Tracks if the initialization has started and the DNA is set in the nucleus. pub struct InitializingFuture { context: Arc, + id: ProcessUniqueId, } #[holochain_tracing_macros::newrelic_autotrace(HOLOCHAIN_CORE)] @@ -199,20 +205,22 @@ impl Future for InitializingFuture { if let Some(err) = self.context.action_channel_error("InitializingFuture") { return Poll::Ready(Err(err)); } - // - // TODO: connect the waker to state updates for performance reasons - // See: https://github.com/holochain/holochain-rust/issues/314 - // - cx.waker().clone().wake(); + self.context + .register_waker(self.id.clone(), cx.waker().clone()); if let Some(state) = self.context.try_state() { match state.nucleus().status { NucleusStatus::New => Poll::Pending, - NucleusStatus::Initializing => Poll::Ready(Ok(NucleusStatus::Initializing)), + NucleusStatus::Initializing => { + self.context.unregister_waker(self.id.clone()); + Poll::Ready(Ok(NucleusStatus::Initializing)) + } NucleusStatus::Initialized(ref init) => { + self.context.unregister_waker(self.id.clone()); Poll::Ready(Ok(NucleusStatus::Initialized(init.clone()))) } NucleusStatus::InitializationFailed(ref error) => { + self.context.unregister_waker(self.id.clone()); Poll::Ready(Err(HolochainError::ErrorGeneric(error.clone()))) } } @@ -227,6 +235,7 @@ impl Future for InitializingFuture { pub struct InitializationFuture { context: Arc, created_at: Instant, + id: ProcessUniqueId, } #[holochain_tracing_macros::newrelic_autotrace(HOLOCHAIN_CORE)] @@ -237,11 +246,8 @@ impl Future for InitializationFuture { if let Some(err) = self.context.action_channel_error("InitializationFuture") { return Poll::Ready(Err(err)); } - // - // TODO: connect the waker to state updates for performance reasons - // See: https://github.com/holochain/holochain-rust/issues/314 - // - cx.waker().clone().wake(); + self.context + .register_waker(self.id.clone(), cx.waker().clone()); if Instant::now().duration_since(self.created_at) > Duration::from_secs(INITIALIZATION_TIMEOUT) @@ -255,9 +261,11 @@ impl Future for InitializationFuture { NucleusStatus::New => Poll::Pending, NucleusStatus::Initializing => Poll::Pending, NucleusStatus::Initialized(ref init) => { + self.context.unregister_waker(self.id.clone()); Poll::Ready(Ok(NucleusStatus::Initialized(init.clone()))) } NucleusStatus::InitializationFailed(ref error) => { + self.context.unregister_waker(self.id.clone()); Poll::Ready(Err(HolochainError::ErrorGeneric(error.clone()))) } } From 2fbfb94e0bfcc1345a4c3a788fa7612322045f11 Mon Sep 17 00:00:00 2001 From: Nicolas Luck Date: Wed, 8 Apr 2020 20:42:07 +0200 Subject: [PATCH 8/9] Remove TODO comment from network::shutdown future --- crates/core/src/network/actions/shutdown.rs | 4 ---- 1 file changed, 4 deletions(-) diff --git a/crates/core/src/network/actions/shutdown.rs b/crates/core/src/network/actions/shutdown.rs index b09459955d..7bb54c26b8 100644 --- a/crates/core/src/network/actions/shutdown.rs +++ b/crates/core/src/network/actions/shutdown.rs @@ -38,10 +38,6 @@ impl Future for ShutdownFuture { type Output = HcResult<()>; fn poll(self: Pin<&mut Self>, cx: &mut std::task::Context) -> Poll { - // - // TODO: connect the waker to state updates for performance reasons - // See: https://github.com/holochain/holochain-rust/issues/314 - // cx.waker().clone().wake(); self.state .try_read() From 100f051135520ed33c79398874731224c298253c Mon Sep 17 00:00:00 2001 From: Nicolas Luck Date: Wed, 8 Apr 2020 20:43:55 +0200 Subject: [PATCH 9/9] Fix missing id --- crates/core/src/network/actions/initialize_network.rs | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/crates/core/src/network/actions/initialize_network.rs b/crates/core/src/network/actions/initialize_network.rs index 28d21873c4..24b72b8116 100644 --- a/crates/core/src/network/actions/initialize_network.rs +++ b/crates/core/src/network/actions/initialize_network.rs @@ -53,8 +53,10 @@ pub async fn initialize_network_with_spoofed_dna( let action_wrapper = ActionWrapper::new(Action::InitNetwork(network_settings)); dispatch_action(context.action_channel(), action_wrapper.clone()); + let id = ProcessUniqueId::new(); InitNetworkFuture { context: context.clone(), + id, } .await } @@ -71,7 +73,6 @@ impl Future for InitNetworkFuture { if let Some(err) = self.context.action_channel_error("InitializeNetworkFuture") { return Poll::Ready(Err(err)); } - // self.context .register_waker(self.id.clone(), cx.waker().clone());