Skip to content
This repository has been archived by the owner on Feb 3, 2023. It is now read-only.

Commit

Permalink
Merge pull request #2175 from holochain/future-wake-on-state-change
Browse files Browse the repository at this point in the history
Redux synced Future Wakers
  • Loading branch information
zippy authored Apr 8, 2020
2 parents 64b3a2e + 100f051 commit 420fe08
Show file tree
Hide file tree
Showing 15 changed files with 165 additions and 79 deletions.
12 changes: 7 additions & 5 deletions crates/core/src/agent/actions/commit.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ use crate::{
use futures::{future::Future, task::Poll};
use holochain_core_types::{entry::Entry, error::HolochainError};
use holochain_persistence_api::cas::content::Address;
use snowflake::ProcessUniqueId;
use std::{pin::Pin, sync::Arc};

/// Commit Action Creator
Expand All @@ -26,9 +27,11 @@ pub async fn commit_entry(
vec![],
)));
dispatch_action(context.action_channel(), action_wrapper.clone());
let id = ProcessUniqueId::new();
CommitFuture {
context: context.clone(),
action: action_wrapper,
id,
}
.await
}
Expand All @@ -38,6 +41,7 @@ pub async fn commit_entry(
pub struct CommitFuture {
context: Arc<Context>,
action: ActionWrapper,
id: ProcessUniqueId,
}

#[holochain_tracing_macros::newrelic_autotrace(HOLOCHAIN_CORE)]
Expand All @@ -48,11 +52,8 @@ impl Future for CommitFuture {
if let Some(err) = self.context.action_channel_error("CommitFuture") {
return Poll::Ready(Err(err));
}
//
// TODO: connect the waker to state updates for performance reasons
// See: https://github.com/holochain/holochain-rust/issues/314
//
cx.waker().clone().wake();
self.context
.register_waker(self.id.clone(), cx.waker().clone());
if let Some(state) = self.context.try_state() {
match state.agent().actions().get(&self.action) {
Some(r) => match r.response() {
Expand All @@ -63,6 +64,7 @@ impl Future for CommitFuture {
self.action.id().to_string(),
)),
);
self.context.unregister_waker(self.id.clone());
Poll::Ready(result.clone())
}
_ => unreachable!(),
Expand Down
18 changes: 18 additions & 0 deletions crates/core/src/context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,9 @@ use std::{
time::Duration,
};

use crate::instance::WakerRequest;
use futures::task::Waker;
use snowflake::ProcessUniqueId;
#[cfg(test)]
use test_utils::mock_signing::mock_conductor_api;

Expand Down Expand Up @@ -94,6 +97,7 @@ pub struct Context {
state: Option<Arc<RwLock<StateWrapper>>>,
pub action_channel: Option<ActionSender>,
pub observer_channel: Option<Sender<Observer>>,
pub waker_channel: Option<Sender<WakerRequest>>,
pub chain_storage: Arc<RwLock<dyn ContentAddressableStorage>>,
pub dht_storage: Arc<RwLock<dyn ContentAddressableStorage>>,
pub eav_storage: Arc<RwLock<dyn EntityAttributeValueStorage<Attribute>>>,
Expand Down Expand Up @@ -159,6 +163,7 @@ impl Context {
action_channel: None,
signal_tx,
observer_channel: None,
waker_channel: None,
chain_storage,
dht_storage,
eav_storage: eav,
Expand Down Expand Up @@ -199,6 +204,7 @@ impl Context {
action_channel,
signal_tx,
observer_channel,
waker_channel: None,
chain_storage: cas.clone(),
dht_storage: cas,
eav_storage: eav,
Expand Down Expand Up @@ -350,6 +356,18 @@ impl Context {
tick_rx
}

pub fn register_waker(&self, future_id: ProcessUniqueId, waker: Waker) {
self.waker_channel
.as_ref()
.map(|c| c.send(WakerRequest::Add(future_id, waker)));
}

pub fn unregister_waker(&self, future_id: ProcessUniqueId) {
self.waker_channel
.as_ref()
.map(|c| c.send(WakerRequest::Remove(future_id)));
}

/// Custom future executor that enables nested futures and nested calls of `block_on`.
/// This makes use of the redux action loop and the observers.
/// The given future gets polled everytime the instance's state got changed.
Expand Down
18 changes: 12 additions & 6 deletions crates/core/src/dht/actions/hold_aspect.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,17 +5,25 @@ use crate::{
};
use futures::{future::Future, task::Poll};
use holochain_core_types::{error::HolochainError, network::entry_aspect::EntryAspect};
use snowflake::ProcessUniqueId;
use std::{pin::Pin, sync::Arc};

pub async fn hold_aspect(aspect: EntryAspect, context: Arc<Context>) -> Result<(), HolochainError> {
let action_wrapper = ActionWrapper::new(Action::HoldAspect(aspect.clone()));
dispatch_action(context.action_channel(), action_wrapper.clone());
HoldAspectFuture { context, aspect }.await
let id = ProcessUniqueId::new();
HoldAspectFuture {
context,
aspect,
id,
}
.await
}

pub struct HoldAspectFuture {
context: Arc<Context>,
aspect: EntryAspect,
id: ProcessUniqueId,
}

#[holochain_tracing_macros::newrelic_autotrace(HOLOCHAIN_CORE)]
Expand All @@ -26,15 +34,13 @@ impl Future for HoldAspectFuture {
if let Some(err) = self.context.action_channel_error("HoldAspectFuture") {
return Poll::Ready(Err(err));
}
//
// TODO: connect the waker to state updates for performance reasons
// See: https://github.com/holochain/holochain-rust/issues/314
//
cx.waker().clone().wake();
self.context
.register_waker(self.id.clone(), cx.waker().clone());
if let Some(state) = self.context.try_state() {
// TODO: wait for it to show up in the holding list
// i.e. once we write the reducer we'll know
if state.dht().get_holding_map().contains(&self.aspect) {
self.context.unregister_waker(self.id.clone());
Poll::Ready(Ok(()))
} else {
Poll::Pending
Expand Down
18 changes: 12 additions & 6 deletions crates/core/src/dht/actions/queue_holding_workflow.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ use crate::{
instance::dispatch_action,
};
use futures::{future::Future, task::Poll};
use snowflake::ProcessUniqueId;
use std::{
pin::Pin,
sync::Arc,
Expand Down Expand Up @@ -37,7 +38,13 @@ pub async fn queue_holding_workflow(
{
log_trace!(context, "Queueing holding workflow: {:?}", pending);
dispatch_queue_holding_workflow(pending.clone(), delay, context.clone());
QueueHoldingWorkflowFuture { context, pending }.await
let id = ProcessUniqueId::new();
QueueHoldingWorkflowFuture {
context,
pending,
id,
}
.await
} else {
log_trace!(
context,
Expand All @@ -50,21 +57,20 @@ pub async fn queue_holding_workflow(
pub struct QueueHoldingWorkflowFuture {
context: Arc<Context>,
pending: PendingValidation,
id: ProcessUniqueId,
}

#[holochain_tracing_macros::newrelic_autotrace(HOLOCHAIN_CORE)]
impl Future for QueueHoldingWorkflowFuture {
type Output = ();

fn poll(self: Pin<&mut Self>, cx: &mut std::task::Context) -> Poll<Self::Output> {
//
// TODO: connect the waker to state updates for performance reasons
// See: https://github.com/holochain/holochain-rust/issues/314
//
cx.waker().clone().wake();
self.context
.register_waker(self.id.clone(), cx.waker().clone());

if let Some(state) = self.context.try_state() {
if state.dht().has_exact_queued_holding_workflow(&self.pending) {
self.context.unregister_waker(self.id.clone());
Poll::Ready(())
} else {
Poll::Pending
Expand Down
18 changes: 12 additions & 6 deletions crates/core/src/dht/actions/remove_queued_holding_workflow.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,35 +5,41 @@ use crate::{
instance::dispatch_action,
};
use futures::{future::Future, task::Poll};
use snowflake::ProcessUniqueId;
use std::{pin::Pin, sync::Arc};

#[holochain_tracing_macros::newrelic_autotrace(HOLOCHAIN_CORE)]
pub async fn remove_queued_holding_workflow(pending: PendingValidation, context: Arc<Context>) {
let action_wrapper = ActionWrapper::new(Action::RemoveQueuedHoldingWorkflow(pending.clone()));
dispatch_action(context.action_channel(), action_wrapper.clone());
RemoveQueuedHoldingWorkflowFuture { context, pending }.await
let id = ProcessUniqueId::new();
RemoveQueuedHoldingWorkflowFuture {
context,
pending,
id,
}
.await
}

pub struct RemoveQueuedHoldingWorkflowFuture {
context: Arc<Context>,
pending: PendingValidation,
id: ProcessUniqueId,
}

#[holochain_tracing_macros::newrelic_autotrace(HOLOCHAIN_CORE)]
impl Future for RemoveQueuedHoldingWorkflowFuture {
type Output = ();

fn poll(self: Pin<&mut Self>, cx: &mut std::task::Context) -> Poll<Self::Output> {
//
// TODO: connect the waker to state updates for performance reasons
// See: https://github.com/holochain/holochain-rust/issues/314
//
cx.waker().clone().wake();
self.context
.register_waker(self.id.clone(), cx.waker().clone());

if let Some(state) = self.context.try_state() {
if state.dht().has_exact_queued_holding_workflow(&self.pending) {
Poll::Pending
} else {
self.context.unregister_waker(self.id.clone());
Poll::Ready(())
}
} else {
Expand Down
38 changes: 33 additions & 5 deletions crates/core/src/instance.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ use crate::{
};
use clokwerk::{ScheduleHandle, Scheduler, TimeUnits};
use crossbeam_channel::{unbounded, Receiver, Sender};
use futures::task::Waker;
use holochain_core_types::{
dna::Dna,
error::{HcResult, HolochainError},
Expand All @@ -30,6 +31,7 @@ use holochain_persistence_api::cas::content::Address;
use holochain_tracing::{self as ht, channel::lax_send_wrapped};
use snowflake::ProcessUniqueId;
use std::{
collections::HashMap,
sync::{
atomic::Ordering::{self, Relaxed},
Arc,
Expand All @@ -43,6 +45,11 @@ pub const RECV_DEFAULT_TIMEOUT_MS: Duration = Duration::from_millis(10000);
pub const RETRY_VALIDATION_DURATION_MIN: Duration = Duration::from_millis(500);
pub const RETRY_VALIDATION_DURATION_MAX: Duration = Duration::from_secs(60 * 60);

pub enum WakerRequest {
Add(ProcessUniqueId, Waker),
Remove(ProcessUniqueId),
}

/// Object representing a Holochain instance, i.e. a running holochain (DNA + DHT + source-chain)
/// Holds the Event loop and processes it with the redux pattern.
#[derive(Clone)]
Expand All @@ -51,6 +58,7 @@ pub struct Instance {
state: Arc<RwLock<StateWrapper>>,
action_channel: Option<ActionSender>,
observer_channel: Option<Sender<Observer>>,
waker_channel: Option<Sender<WakerRequest>>,
scheduler_handle: Option<Arc<ScheduleHandle>>,
persister: Option<Arc<RwLock<dyn Persister>>>,
consistency_model: ConsistencyModel,
Expand All @@ -71,7 +79,7 @@ impl Instance {
/// This is initializing and starting the redux action loop and adding channels to dispatch
/// actions and observers to the context
pub(in crate::instance) fn inner_setup(&mut self, context: Arc<Context>) -> Arc<Context> {
let (rx_action, rx_observer) = self.initialize_channels();
let (rx_action, rx_observer, rx_waker) = self.initialize_channels();
let context = self.initialize_context(context);
let mut scheduler = Scheduler::new();
scheduler
Expand All @@ -89,7 +97,7 @@ impl Instance {

self.persister = Some(context.persister.clone());

self.start_action_loop(context.clone(), rx_action, rx_observer);
self.start_action_loop(context.clone(), rx_action, rx_observer, rx_waker);
self.start_holding_loop(context.clone());

context
Expand Down Expand Up @@ -160,20 +168,25 @@ impl Instance {
}

/// Returns recievers for actions and observers that get added to this instance
fn initialize_channels(&mut self) -> (ActionReceiver, Receiver<Observer>) {
fn initialize_channels(
&mut self,
) -> (ActionReceiver, Receiver<Observer>, Receiver<WakerRequest>) {
let (tx_action, rx_action) = unbounded::<ht::SpanWrap<ActionWrapper>>();
let (tx_observer, rx_observer) = unbounded::<Observer>();
let (tx_waker, rx_waker) = unbounded::<WakerRequest>();
self.action_channel = Some(tx_action.into());
self.observer_channel = Some(tx_observer);
self.waker_channel = Some(tx_waker);

(rx_action, rx_observer)
(rx_action, rx_observer, rx_waker)
}

pub fn initialize_context(&self, context: Arc<Context>) -> Arc<Context> {
let mut sub_context = (*context).clone();
sub_context.set_state(self.state.clone());
sub_context.action_channel = self.action_channel.clone();
sub_context.observer_channel = self.observer_channel.clone();
sub_context.waker_channel = self.waker_channel.clone();
Arc::new(sub_context)
}

Expand All @@ -183,6 +196,7 @@ impl Instance {
context: Arc<Context>,
rx_action: ActionReceiver,
rx_observer: Receiver<Observer>,
rx_waker: Receiver<WakerRequest>,
) {
self.stop_action_loop();

Expand All @@ -202,10 +216,18 @@ impl Instance {
.spawn(move || {
let mut state_observers: Vec<Observer> = Vec::new();
let mut unprocessed_action: Option<ht::SpanWrap<ActionWrapper>> = None;
let mut wakers: HashMap::<ProcessUniqueId, Waker> = HashMap::new();
while kill_receiver.try_recv().is_err() {
if let Some(action_wrapper) = unprocessed_action.take().or_else(|| rx_action.recv_timeout(Duration::from_secs(1)).ok()) {
// Add new observers
state_observers.extend(rx_observer.try_iter());
// Process waker requests
for waker_request in rx_waker.try_iter() {
match waker_request {
WakerRequest::Add(id, waker) => wakers.insert(id, waker),
WakerRequest::Remove(id) => wakers.remove(&id),
};
}
let action = action_wrapper.action();
// Ping can happen often, and should be as lightweight as possible
let should_process = *action != Action::Ping;
Expand All @@ -223,6 +245,10 @@ impl Instance {
.into_iter()
.filter(|observer| observer.ticker.send(()).is_ok())
.collect();
// Tick all wakers
for waker in wakers.values() {
waker.clone().wake();
}
},
Err(HolochainError::Timeout(s)) => {
warn!("Instance::process_action() couldn't get lock on state. Trying again next loop. Timeout string: {}", s);
Expand Down Expand Up @@ -424,6 +450,7 @@ impl Instance {
state: Arc::new(RwLock::new(StateWrapper::new(context.clone()))),
action_channel: None,
observer_channel: None,
waker_channel: None,
scheduler_handle: None,
persister: None,
consistency_model: ConsistencyModel::new(context),
Expand All @@ -437,6 +464,7 @@ impl Instance {
state: Arc::new(RwLock::new(StateWrapper::from(state))),
action_channel: None,
observer_channel: None,
waker_channel: None,
scheduler_handle: None,
persister: None,
consistency_model: ConsistencyModel::new(context),
Expand Down Expand Up @@ -782,7 +810,7 @@ pub mod tests {
let netname = Some("can_process_action");
let mut instance = Instance::new(test_context("jason", netname));
let context = instance.initialize_context(test_context("jane", netname));
let (rx_action, rx_observer) = instance.initialize_channels();
let (rx_action, rx_observer, _) = instance.initialize_channels();

let action_wrapper = test_action_wrapper_commit();
instance
Expand Down
Loading

0 comments on commit 420fe08

Please sign in to comment.