Skip to content
This repository has been archived by the owner on Feb 3, 2023. It is now read-only.

Bounded #2164

Open
wants to merge 8 commits into
base: develop
Choose a base branch
from
Open

Bounded #2164

Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 9 additions & 9 deletions crates/conductor_lib/src/conductor/base.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ use crate::{
port_utils::{try_with_port, INTERFACE_CONNECT_ATTEMPTS_MAX},
Holochain,
};
use crossbeam_channel::{unbounded, Receiver, Sender};
use crossbeam_channel::{bounded, Receiver, Sender};
use holochain_common::paths::DNA_EXTENSION;
use holochain_core::{logger::Logger, signal::Signal};
use holochain_core_types::{
Expand Down Expand Up @@ -61,6 +61,8 @@ use holochain_net::p2p_config::{BackendConfig, P2pBackendKind, P2pConfig};

pub const MAX_DYNAMIC_PORT: u16 = std::u16::MAX;

use crate::CHANNEL_SIZE;

/// Special string to be printed on stdout, which clients must parse
/// in order to discover which port the interface bound to.
/// DO NOT CHANGE!
Expand Down Expand Up @@ -168,8 +170,6 @@ pub fn notify(msg: String) {
println!("{}", msg);
}

#[autotrace]
#[holochain_tracing_macros::newrelic_autotrace(HOLOCHAIN_CONDUCTOR_LIB)]
impl Conductor {
pub fn from_config(config: Configuration) -> Self {
lib3h_sodium::check_init();
Expand Down Expand Up @@ -243,8 +243,8 @@ impl Conductor {
pub fn spawn_stats_thread(&mut self) {
self.stop_stats_thread();
let instances = self.instances.clone();
let (kill_switch_tx, kill_switch_rx) = unbounded();
let (stats_tx, stats_rx) = unbounded();
let (kill_switch_tx, kill_switch_rx) = bounded(CHANNEL_SIZE);
let (stats_tx, stats_rx) = bounded(CHANNEL_SIZE);
self.stats_thread_kill_switch = Some(kill_switch_tx);
self.stats_signal_receiver = Some(stats_rx);
thread::Builder::new()
Expand Down Expand Up @@ -314,7 +314,7 @@ impl Conductor {
let instance_signal_receivers = self.instance_signal_receivers.clone();
let signal_tx = self.signal_tx.clone();
let config = self.config.clone();
let (kill_switch_tx, kill_switch_rx) = unbounded();
let (kill_switch_tx, kill_switch_rx) = bounded(CHANNEL_SIZE);
self.signal_multiplexer_kill_switch = Some(kill_switch_tx);
self.spawn_stats_thread();
let stats_signal_receiver = self.stats_signal_receiver.clone().expect(
Expand Down Expand Up @@ -784,7 +784,7 @@ impl Conductor {
)> {
match self.config.tracing.clone().unwrap_or_default() {
TracingConfiguration::Jaeger(jaeger_config) => {
let (span_tx, span_rx) = crossbeam_channel::unbounded();
let (span_tx, span_rx) = crossbeam_channel::bounded(CHANNEL_SIZE);
let service_name = format!("{}-{}", jaeger_config.service_name, id);
let mut reporter = ht::reporter::JaegerCompactReporter::new(&service_name).unwrap();
if let Some(s) = jaeger_config.socket_address {
Expand Down Expand Up @@ -834,7 +834,7 @@ impl Conductor {
context_builder = context_builder.with_p2p_config(self.get_p2p_config());

// Signal config:
let (sender, receiver) = unbounded();
let (sender, receiver) = bounded(CHANNEL_SIZE);
self.instance_signal_receivers
.write()
.unwrap()
Expand Down Expand Up @@ -1341,7 +1341,7 @@ impl Conductor {
fn spawn_interface_thread(&self, interface_config: InterfaceConfiguration) -> Sender<()> {
let dispatcher = self.make_interface_handler(&interface_config);
// The "kill switch" is the channel which allows the interface to be stopped from outside its thread
let (kill_switch_tx, kill_switch_rx) = unbounded();
let (kill_switch_tx, kill_switch_rx) = bounded(CHANNEL_SIZE);

let (broadcaster, _handle) = run_interface(&interface_config, dispatcher, kill_switch_rx)
.map_err(|error| {
Expand Down
5 changes: 3 additions & 2 deletions crates/conductor_lib/src/conductor/passphrase_manager.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use crossbeam_channel::{unbounded, Sender};
use crossbeam_channel::{bounded, Sender};
use holochain_core_types::error::HolochainError;
use holochain_locksmith::Mutex;
use lib3h_sodium::secbuf::SecBuf;
Expand All @@ -11,6 +11,7 @@ use std::{
time::{Duration, Instant},
};

use crate::CHANNEL_SIZE;
#[cfg(unix)]
use std::io::{BufRead, BufReader};
#[cfg(unix)]
Expand All @@ -34,7 +35,7 @@ pub struct PassphraseManager {
#[holochain_tracing_macros::newrelic_autotrace(HOLOCHAIN_CONDUCTOR_LIB)]
impl PassphraseManager {
pub fn new(passphrase_service: Arc<Mutex<dyn PassphraseService + Send>>) -> Self {
let (kill_switch_tx, kill_switch_rx) = unbounded::<()>();
let (kill_switch_tx, kill_switch_rx) = bounded::<()>(CHANNEL_SIZE);
let pm = PassphraseManager {
passphrase_cache: Arc::new(Mutex::new(None)),
passphrase_service,
Expand Down
2 changes: 2 additions & 0 deletions crates/conductor_lib/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -131,4 +131,6 @@ pub mod static_server_impls;

pub use crate::holochain::Holochain;

const CHANNEL_SIZE: usize = 1000;

new_relic_setup!("NEW_RELIC_LICENSE_KEY");
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@ use nickel::{
Response, StaticFilesHandler,
};

use crate::CHANNEL_SIZE;

pub struct NickelStaticServer {
shutdown_signal: Option<Sender<()>>,
config: UiInterfaceConfiguration,
Expand All @@ -38,7 +40,7 @@ impl ConductorStaticFileServer for NickelStaticServer {
}

fn start(&mut self) -> HolochainResult<()> {
let (tx, rx) = crossbeam_channel::unbounded();
let (tx, rx) = crossbeam_channel::bounded(CHANNEL_SIZE);

self.shutdown_signal = Some(tx);
self.running = true;
Expand Down
6 changes: 6 additions & 0 deletions crates/core/src/agent/actions/commit.rs
Original file line number Diff line number Diff line change
Expand Up @@ -53,10 +53,16 @@ impl Future for CommitFuture {
// See: https://github.com/holochain/holochain-rust/issues/314
//
cx.waker().clone().wake();
if self.context.action_channel().is_full() {
return Poll::Pending;
}
if let Some(state) = self.context.try_state() {
match state.agent().actions().get(&self.action) {
Some(r) => match r.response() {
AgentActionResponse::Commit(result) => {
if self.context.action_channel().is_full() {
return Poll::Pending;
}
dispatch_action(
self.context.action_channel(),
ActionWrapper::new(Action::ClearActionResponse(
Expand Down
13 changes: 7 additions & 6 deletions crates/core/src/context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,9 @@ use crate::{
persister::Persister,
signal::{Signal, SignalSender},
state::StateWrapper,
CHANNEL_SIZE,
};
use crossbeam_channel::{unbounded, Receiver, Sender};
use crossbeam_channel::{bounded, Receiver, Sender};
use futures::{
executor::ThreadPool,
task::{noop_waker_ref, Poll},
Expand Down Expand Up @@ -50,8 +51,8 @@ use std::{
#[cfg(test)]
use test_utils::mock_signing::mock_conductor_api;

pub type ActionSender = ht::channel::SpanSender<ActionWrapper>;
pub type ActionReceiver = ht::channel::SpanReceiver<ActionWrapper>;
pub type ActionSender = crossbeam_channel::Sender<ActionWrapper>;
pub type ActionReceiver = crossbeam_channel::Receiver<ActionWrapper>;

pub struct P2pNetworkWrapper(Arc<Mutex<Option<P2pNetwork>>>);

Expand Down Expand Up @@ -306,13 +307,13 @@ impl Context {
pub fn is_action_channel_open(&self) -> bool {
self.action_channel
.clone()
.map(|tx| tx.send_wrapped(ActionWrapper::new(Action::Ping)).is_ok())
.map(|tx| tx.send(ActionWrapper::new(Action::Ping)).is_ok())
.unwrap_or(false)
}

pub fn action_channel_error(&self, msg: &str) -> Option<HolochainError> {
match &self.action_channel {
Some(tx) => match tx.send_wrapped(ActionWrapper::new(Action::Ping)) {
Some(tx) => match tx.send(ActionWrapper::new(Action::Ping)) {
Ok(()) => None,
Err(_) => Some(HolochainError::LifecycleError(msg.into())),
},
Expand Down Expand Up @@ -343,7 +344,7 @@ impl Context {
/// got mutated.
/// This enables blocking/parking the calling thread until the application state got changed.
pub fn create_observer(&self) -> Receiver<()> {
let (tick_tx, tick_rx) = unbounded();
let (tick_tx, tick_rx) = bounded(CHANNEL_SIZE);
self.observer_channel()
.send(Observer { ticker: tick_tx })
.expect("Observer channel not initialized");
Expand Down
39 changes: 11 additions & 28 deletions crates/core/src/instance.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,22 +12,22 @@ use crate::{
signal::Signal,
state::{State, StateWrapper},
workflows::{application, run_holding_workflow},
CHANNEL_SIZE,
};
#[cfg(test)]
use crate::{
network::actions::initialize_network::initialize_network_with_spoofed_dna,
nucleus::actions::initialize::initialize_chain,
};
use clokwerk::{ScheduleHandle, Scheduler, TimeUnits};
use crossbeam_channel::{unbounded, Receiver, Sender};
use crossbeam_channel::{bounded, Receiver, Sender};
use holochain_core_types::{
dna::Dna,
error::{HcResult, HolochainError},
};
use holochain_locksmith::RwLock;
#[cfg(test)]
use holochain_persistence_api::cas::content::Address;
use holochain_tracing::{self as ht, channel::lax_send_wrapped};
use snowflake::ProcessUniqueId;
use std::{
sync::{
Expand Down Expand Up @@ -67,7 +67,6 @@ pub struct Observer {
pub static DISPATCH_WITHOUT_CHANNELS: &str = "dispatch called without channels open";

#[autotrace]
#[holochain_tracing_macros::newrelic_autotrace(HOLOCHAIN_CORE)]
impl Instance {
/// This is initializing and starting the redux action loop and adding channels to dispatch
/// actions and observers to the context
Expand Down Expand Up @@ -162,9 +161,9 @@ impl Instance {

/// Returns recievers for actions and observers that get added to this instance
fn initialize_channels(&mut self) -> (ActionReceiver, Receiver<Observer>) {
let (tx_action, rx_action) = unbounded::<ht::SpanWrap<ActionWrapper>>();
let (tx_observer, rx_observer) = unbounded::<Observer>();
self.action_channel = Some(tx_action.into());
let (tx_action, rx_action) = bounded::<ActionWrapper>(CHANNEL_SIZE);
let (tx_observer, rx_observer) = bounded::<Observer>(CHANNEL_SIZE);
self.action_channel = Some(tx_action);
self.observer_channel = Some(tx_observer);

(rx_action, rx_observer)
Expand All @@ -190,7 +189,7 @@ impl Instance {
let mut sync_self = self.clone();
let sub_context = self.initialize_context(context);

let (kill_sender, kill_receiver) = crossbeam_channel::unbounded();
let (kill_sender, kill_receiver) = crossbeam_channel::bounded(CHANNEL_SIZE);
self.kill_switch = Some(kill_sender);
let instance_is_alive = sub_context.instance_is_alive.clone();
instance_is_alive.store(true, Ordering::Relaxed);
Expand All @@ -202,7 +201,7 @@ impl Instance {
))
.spawn(move || {
let mut state_observers: Vec<Observer> = Vec::new();
let mut unprocessed_action: Option<ht::SpanWrap<ActionWrapper>> = None;
let mut unprocessed_action: Option<ActionWrapper> = None;
while kill_receiver.try_recv().is_err() {
if let Some(action_wrapper) = unprocessed_action.take().or_else(|| rx_action.recv_timeout(Duration::from_secs(1)).ok()) {
// Add new observers
Expand All @@ -213,11 +212,6 @@ impl Instance {
if should_process {
match sync_self.process_action(&action_wrapper, &sub_context) {
Ok(()) => {
let tag = ht::Tag::new("action", format!("{:?}", action));
let _guard = action_wrapper.follower_(&sub_context.tracer, "action_loop thread", |s| s.tag(tag).start()).map(|span| {

ht::push_span(span)
});
sync_self.emit_signals(&sub_context, &action_wrapper);
// Tick all observers and remove those that have lost their receiving part
state_observers= state_observers
Expand Down Expand Up @@ -255,20 +249,9 @@ impl Instance {
/// returns the new vector of observers
pub(crate) fn process_action(
&self,
action_wrapper: &ht::SpanWrap<ActionWrapper>,
action_wrapper: &ActionWrapper,
context: &Arc<Context>,
) -> Result<(), HolochainError> {
let span = action_wrapper
.follower(&context.tracer, "begin process_action")
.unwrap_or_else(|| {
context
.tracer
.span("ROOT: process_action")
.tag(ht::debug_tag("action_wrapper", action_wrapper))
.start()
.into()
});
let _trace_guard = ht::push_span(span);
context.redux_wants_write.store(true, Relaxed);
// Mutate state
{
Expand All @@ -282,7 +265,7 @@ impl Instance {
HolochainError::Timeout(format!("timeout src: {}:{}", file!(), line!()))
})?;

new_state = state.reduce(action_wrapper.data.clone());
new_state = state.reduce(action_wrapper.clone());

// Change the state
*state = new_state;
Expand All @@ -308,7 +291,7 @@ impl Instance {
}

fn start_holding_loop(&mut self, context: Arc<Context>) {
let (kill_sender, kill_receiver) = crossbeam_channel::unbounded();
let (kill_sender, kill_receiver) = crossbeam_channel::bounded(CHANNEL_SIZE);
self.kill_switch_holding = Some(kill_sender);
thread::Builder::new()
.name(format!(
Expand Down Expand Up @@ -500,7 +483,7 @@ impl Drop for Instance {
/// Panics if the channels passed are disconnected.
#[autotrace]
pub fn dispatch_action(action_channel: &ActionSender, action_wrapper: ActionWrapper) {
lax_send_wrapped(action_channel.clone(), action_wrapper, "dispatch_action");
action_channel.send(action_wrapper).ok();
}

#[cfg(test)]
Expand Down
2 changes: 2 additions & 0 deletions crates/core/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -67,4 +67,6 @@ pub mod state_dump;
pub mod wasm_engine;
pub mod workflows;

const CHANNEL_SIZE: usize = 1000;

new_relic_setup!("NEW_RELIC_LICENSE_KEY");
3 changes: 2 additions & 1 deletion crates/core/src/logger.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
//! This logger is the logger that's attached to each Holochain application
//! which is separate from standard logging via the log crate warn! info! debug! logging that
//! gets emitted globaly from the conductor.
use crate::CHANNEL_SIZE;
use chrono::Local;
use crossbeam_channel;
use holochain_locksmith::Mutex;
Expand Down Expand Up @@ -56,7 +57,7 @@ impl ChannelLogger {
ChannelLogger { id, sender }
}
pub fn setup() -> (Sender, Receiver) {
crossbeam_channel::unbounded()
crossbeam_channel::bounded(CHANNEL_SIZE)
}
}
pub fn default_handler(msg: String) {
Expand Down
6 changes: 6 additions & 0 deletions crates/core/src/network/actions/custom_send.rs
Original file line number Diff line number Diff line change
Expand Up @@ -62,13 +62,19 @@ impl Future for SendResponseFuture {
//
cx.waker().clone().wake();

if self.context.action_channel().is_full() {
return Poll::Pending;
}
if let Some(state) = self.context.try_state() {
let state = state.network();
if let Err(error) = state.initialized() {
return Poll::Ready(Err(HolochainError::ErrorGeneric(error.to_string())));
}
match state.custom_direct_message_replys.get(&self.id) {
Some(result) => {
if self.context.action_channel().is_full() {
return Poll::Pending;
}
dispatch_action(
self.context.action_channel(),
ActionWrapper::new(Action::ClearCustomSendResponse(self.id.clone())),
Expand Down
6 changes: 6 additions & 0 deletions crates/core/src/network/actions/get_validation_package.rs
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,9 @@ impl Future for GetValidationPackageFuture {
//
cx.waker().clone().wake();

if self.context.action_channel().is_full() {
return Poll::Pending;
}
if let Some(state) = self.context.try_state() {
let state = state.network();
if let Err(error) = state.initialized() {
Expand All @@ -70,6 +73,9 @@ impl Future for GetValidationPackageFuture {

match state.get_validation_package_results.get(&self.key) {
Some(Some(result)) => {
if self.context.action_channel().is_full() {
return Poll::Pending;
}
dispatch_action(
self.context.action_channel(),
ActionWrapper::new(Action::ClearValidationPackageResult(self.key.clone())),
Expand Down
6 changes: 6 additions & 0 deletions crates/core/src/network/actions/publish.rs
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,9 @@ impl Future for PublishFuture {
//
cx.waker().clone().wake();

if self.context.action_channel().is_full() {
return Poll::Pending;
}
if let Some(state) = self.context.try_state() {
let state = state.network();
if let Err(error) = state.initialized() {
Expand All @@ -56,6 +59,9 @@ impl Future for PublishFuture {
match state.actions().get(&self.action) {
Some(r) => match r.response() {
NetworkActionResponse::Publish(result) => {
if self.context.action_channel().is_full() {
return Poll::Pending;
}
dispatch_action(
self.context.action_channel(),
ActionWrapper::new(Action::ClearActionResponse(
Expand Down
Loading