Skip to content

Commit

Permalink
Merge pull request #471 from Michael-J-Ward/configurable-bind-mounts
Browse files Browse the repository at this point in the history
Configurable bind addrs
  • Loading branch information
phil-opp authored Apr 18, 2024
2 parents 1c2dc46 + 431d72c commit bbabdc3
Show file tree
Hide file tree
Showing 8 changed files with 85 additions and 56 deletions.
26 changes: 20 additions & 6 deletions binaries/cli/src/main.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,7 @@
use std::{net::Ipv4Addr, path::PathBuf};
use std::{
net::{IpAddr, Ipv4Addr},
path::PathBuf,
};

use attach::attach_dataflow;
use clap::Parser;
Expand Down Expand Up @@ -103,6 +106,11 @@ enum Command {
Daemon {
#[clap(long)]
machine_id: Option<String>,
/// The IP address and port this daemon will bind to.
#[clap(long, default_value_t = SocketAddr::new(
IpAddr::V4(Ipv4Addr::new(0, 0, 0, 0)), 0)
)]
addr: SocketAddr,
#[clap(long)]
coordinator_addr: Option<SocketAddr>,

Expand All @@ -112,7 +120,12 @@ enum Command {
/// Run runtime
Runtime,
/// Run coordinator
Coordinator { port: Option<u16> },
Coordinator {
#[clap(long, default_value_t = SocketAddr::new(
IpAddr::V4(Ipv4Addr::new(0, 0, 0, 0)), DORA_COORDINATOR_PORT_DEFAULT)
)]
addr: SocketAddr,
},
}

#[derive(Debug, clap::Args)]
Expand Down Expand Up @@ -266,20 +279,21 @@ fn run() -> eyre::Result<()> {
}
}
Command::Destroy { config } => up::destroy(config.as_deref())?,
Command::Coordinator { port } => {
Command::Coordinator { addr } => {
let rt = Builder::new_multi_thread()
.enable_all()
.build()
.context("tokio runtime failed")?;
rt.block_on(async {
let (_port, task) =
dora_coordinator::start(port, futures::stream::empty::<Event>()).await?;
dora_coordinator::start(addr, futures::stream::empty::<Event>()).await?;
task.await
})
.context("failed to run dora-coordinator")?
}
Command::Daemon {
coordinator_addr,
addr,
machine_id,
run_dataflow,
} => {
Expand All @@ -301,12 +315,12 @@ fn run() -> eyre::Result<()> {
Daemon::run_dataflow(&dataflow_path).await
}
None => {
let addr = coordinator_addr.unwrap_or_else(|| {
let coordination_addr = coordinator_addr.unwrap_or_else(|| {
tracing::info!("Starting in local mode");
let localhost = Ipv4Addr::new(127, 0, 0, 1);
(localhost, DORA_COORDINATOR_PORT_DEFAULT).into()
});
Daemon::run(addr, machine_id.unwrap_or_default()).await
Daemon::run(coordination_addr, machine_id.unwrap_or_default(), addr).await
}
}
})
Expand Down
55 changes: 33 additions & 22 deletions binaries/coordinator/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,10 +9,7 @@ use dora_core::{
daemon_messages::{DaemonCoordinatorEvent, DaemonCoordinatorReply, Timestamped},
descriptor::{Descriptor, ResolvedNode},
message::uhlc::{self, HLC},
topics::{
control_socket_addr, ControlRequest, ControlRequestReply, DataflowId,
DORA_COORDINATOR_PORT_DEFAULT,
},
topics::{control_socket_addr, ControlRequest, ControlRequestReply, DataflowId},
};
use eyre::{bail, eyre, ContextCompat, WrapErr};
use futures::{stream::FuturesUnordered, Future, Stream, StreamExt};
Expand All @@ -39,11 +36,10 @@ mod run;
mod tcp_utils;

pub async fn start(
port: Option<u16>,
bind: SocketAddr,
external_events: impl Stream<Item = Event> + Unpin,
) -> Result<(u16, impl Future<Output = eyre::Result<()>>), eyre::ErrReport> {
let port = port.unwrap_or(DORA_COORDINATOR_PORT_DEFAULT);
let listener = listener::create_listener(port).await?;
let listener = listener::create_listener(bind).await?;
let port = listener
.local_addr()
.wrap_err("failed to get local addr of listener")?
Expand Down Expand Up @@ -175,29 +171,38 @@ async fn start_inner(
machine_id,
mut connection,
dora_version: daemon_version,
listen_socket,
listen_port,
} => {
let coordinator_version = &env!("CARGO_PKG_VERSION");
let reply = if &daemon_version == coordinator_version {
RegisterResult::Ok
let coordinator_version: &&str = &env!("CARGO_PKG_VERSION");
let version_check = if &daemon_version == coordinator_version {
Ok(())
} else {
RegisterResult::Err(format!(
Err(format!(
"version mismatch: daemon v{daemon_version} is \
not compatible with coordinator v{coordinator_version}"
not compatible with coordinator v{coordinator_version}"
))
};
let reply = Timestamped {
inner: reply,
let peer_ip = connection
.peer_addr()
.map(|addr| addr.ip())
.map_err(|err| format!("failed to get peer addr of connection: {err}"));
let register_result = version_check.and(peer_ip);

let reply: Timestamped<RegisterResult> = Timestamped {
inner: match &register_result {
Ok(_) => RegisterResult::Ok,
Err(err) => RegisterResult::Err(err.clone()),
},
timestamp: clock.new_timestamp(),
};
let send_result = tcp_send(&mut connection, &serde_json::to_vec(&reply)?).await;
match (reply.inner, send_result) {
(RegisterResult::Ok, Ok(())) => {
match (register_result, send_result) {
(Ok(ip), Ok(())) => {
let previous = daemon_connections.insert(
machine_id.clone(),
DaemonConnection {
stream: connection,
listen_socket,
listen_socket: (ip, listen_port).into(),
last_heartbeat: Instant::now(),
},
);
Expand All @@ -207,10 +212,10 @@ async fn start_inner(
);
}
}
(RegisterResult::Err(err), _) => {
(Err(err), _) => {
tracing::warn!("failed to register daemon connection for machine `{machine_id}`: {err}");
}
(RegisterResult::Ok, Err(err)) => {
(Ok(_), Err(err)) => {
tracing::warn!("failed to confirm daemon connection for machine `{machine_id}`: {err}");
}
}
Expand Down Expand Up @@ -481,6 +486,12 @@ async fn start_inner(
let mut disconnected = BTreeSet::new();
for (machine_id, connection) in &mut daemon_connections {
if connection.last_heartbeat.elapsed() > Duration::from_secs(15) {
tracing::warn!(
"no heartbeat message from machine `{machine_id}` since {:?}",
connection.last_heartbeat.elapsed()
)
}
if connection.last_heartbeat.elapsed() > Duration::from_secs(30) {
disconnected.insert(machine_id.clone());
continue;
}
Expand All @@ -500,7 +511,7 @@ async fn start_inner(
}
}
if !disconnected.is_empty() {
tracing::info!("Disconnecting daemons that failed watchdog: {disconnected:?}");
tracing::error!("Disconnecting daemons that failed watchdog: {disconnected:?}");
for machine_id in disconnected {
daemon_connections.remove(&machine_id);
}
Expand Down Expand Up @@ -905,7 +916,7 @@ pub enum DaemonEvent {
dora_version: String,
machine_id: String,
connection: TcpStream,
listen_socket: SocketAddr,
listen_port: u16,
},
}

Expand Down
11 changes: 5 additions & 6 deletions binaries/coordinator/src/listener.rs
Original file line number Diff line number Diff line change
@@ -1,15 +1,14 @@
use crate::{tcp_utils::tcp_receive, DaemonEvent, DataflowEvent, Event};
use dora_core::{coordinator_messages, daemon_messages::Timestamped, message::uhlc::HLC};
use eyre::{eyre, Context};
use std::{io::ErrorKind, net::Ipv4Addr, sync::Arc};
use std::{io::ErrorKind, net::SocketAddr, sync::Arc};
use tokio::{
net::{TcpListener, TcpStream},
sync::mpsc,
};

pub async fn create_listener(port: u16) -> eyre::Result<TcpListener> {
let localhost = Ipv4Addr::new(127, 0, 0, 1);
let socket = match TcpListener::bind((localhost, port)).await {
pub async fn create_listener(bind: SocketAddr) -> eyre::Result<TcpListener> {
let socket = match TcpListener::bind(bind).await {
Ok(socket) => socket,
Err(err) => {
return Err(eyre::Report::new(err).wrap_err("failed to create local TCP listener"))
Expand Down Expand Up @@ -53,13 +52,13 @@ pub async fn handle_connection(
coordinator_messages::CoordinatorRequest::Register {
machine_id,
dora_version,
listen_socket,
listen_port,
} => {
let event = DaemonEvent::Register {
dora_version,
machine_id,
connection,
listen_socket,
listen_port,
};
let _ = events_tx.send(Event::Daemon(event)).await;
break;
Expand Down
4 changes: 2 additions & 2 deletions binaries/daemon/src/coordinator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ pub struct CoordinatorEvent {
pub async fn register(
addr: SocketAddr,
machine_id: String,
listen_socket: SocketAddr,
listen_port: u16,
clock: &HLC,
) -> eyre::Result<impl Stream<Item = Timestamped<CoordinatorEvent>>> {
let mut stream = TcpStream::connect(addr)
Expand All @@ -37,7 +37,7 @@ pub async fn register(
inner: CoordinatorRequest::Register {
dora_version: env!("CARGO_PKG_VERSION").to_owned(),
machine_id,
listen_socket,
listen_port,
},
timestamp: clock.new_timestamp(),
})?;
Expand Down
19 changes: 8 additions & 11 deletions binaries/daemon/src/inter_daemon.rs
Original file line number Diff line number Diff line change
@@ -1,11 +1,7 @@
use crate::tcp_utils::{tcp_receive, tcp_send};
use dora_core::daemon_messages::{InterDaemonEvent, Timestamped};
use eyre::{Context, ContextCompat};
use std::{
collections::BTreeMap,
io::ErrorKind,
net::{Ipv4Addr, SocketAddr},
};
use std::{collections::BTreeMap, io::ErrorKind, net::SocketAddr};
use tokio::net::{TcpListener, TcpStream};

pub struct InterDaemonConnection {
Expand Down Expand Up @@ -65,26 +61,27 @@ pub async fn send_inter_daemon_event(
}

pub async fn spawn_listener_loop(
bind: SocketAddr,
machine_id: String,
events_tx: flume::Sender<Timestamped<InterDaemonEvent>>,
) -> eyre::Result<SocketAddr> {
let localhost = Ipv4Addr::new(127, 0, 0, 1);
let socket = match TcpListener::bind((localhost, 0)).await {
) -> eyre::Result<u16> {
let socket = match TcpListener::bind(bind).await {
Ok(socket) => socket,
Err(err) => {
return Err(eyre::Report::new(err).wrap_err("failed to create local TCP listener"))
}
};
let socket_addr = socket
let listen_port = socket
.local_addr()
.wrap_err("failed to get local addr of socket")?;
.wrap_err("failed to get local addr of socket")?
.port();

tokio::spawn(async move {
listener_loop(socket, events_tx).await;
tracing::debug!("inter-daemon listener loop finished for machine `{machine_id}`");
});

Ok(socket_addr)
Ok(listen_port)
}

async fn listener_loop(
Expand Down
12 changes: 8 additions & 4 deletions binaries/daemon/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -77,23 +77,27 @@ pub struct Daemon {
}

impl Daemon {
pub async fn run(coordinator_addr: SocketAddr, machine_id: String) -> eyre::Result<()> {
pub async fn run(
coordinator_addr: SocketAddr,
machine_id: String,
bind_addr: SocketAddr,
) -> eyre::Result<()> {
let clock = Arc::new(HLC::default());

let ctrlc_events = set_up_ctrlc_handler(clock.clone())?;

// spawn listen loop
let (events_tx, events_rx) = flume::bounded(10);
let listen_socket =
inter_daemon::spawn_listener_loop(machine_id.clone(), events_tx).await?;
let listen_port =
inter_daemon::spawn_listener_loop(bind_addr, machine_id.clone(), events_tx).await?;
let daemon_events = events_rx.into_stream().map(|e| Timestamped {
inner: Event::Daemon(e.inner),
timestamp: e.timestamp,
});

// connect to the coordinator
let coordinator_events =
coordinator::register(coordinator_addr, machine_id.clone(), listen_socket, &clock)
coordinator::register(coordinator_addr, machine_id.clone(), listen_port, &clock)
.await
.wrap_err("failed to connect to dora-coordinator")?
.map(
Expand Down
11 changes: 8 additions & 3 deletions examples/multiple-daemons/run.rs
Original file line number Diff line number Diff line change
@@ -1,14 +1,14 @@
use dora_coordinator::{ControlEvent, Event};
use dora_core::{
descriptor::Descriptor,
topics::{ControlRequest, ControlRequestReply, DataflowId},
topics::{ControlRequest, ControlRequestReply, DataflowId, DORA_COORDINATOR_PORT_DEFAULT},
};
use dora_tracing::set_up_tracing;
use eyre::{bail, Context};

use std::{
collections::BTreeSet,
net::{Ipv4Addr, SocketAddr},
net::{IpAddr, Ipv4Addr, SocketAddr},
path::Path,
time::Duration,
};
Expand All @@ -34,8 +34,13 @@ async fn main() -> eyre::Result<()> {
build_dataflow(dataflow).await?;

let (coordinator_events_tx, coordinator_events_rx) = mpsc::channel(1);
let coordinator_bind = SocketAddr::new(
IpAddr::V4(Ipv4Addr::new(0, 0, 0, 0)),
DORA_COORDINATOR_PORT_DEFAULT,
);
let (coordinator_port, coordinator) =
dora_coordinator::start(None, ReceiverStream::new(coordinator_events_rx)).await?;
dora_coordinator::start(coordinator_bind, ReceiverStream::new(coordinator_events_rx))
.await?;
let coordinator_addr = SocketAddr::new(Ipv4Addr::LOCALHOST.into(), coordinator_port);
let daemon_a = run_daemon(coordinator_addr.to_string(), "A".into());
let daemon_b = run_daemon(coordinator_addr.to_string(), "B".into());
Expand Down
3 changes: 1 addition & 2 deletions libraries/core/src/coordinator_messages.rs
Original file line number Diff line number Diff line change
@@ -1,13 +1,12 @@
use crate::daemon_messages::DataflowId;
use eyre::eyre;
use std::net::SocketAddr;

#[derive(Debug, serde::Serialize, serde::Deserialize)]
pub enum CoordinatorRequest {
Register {
dora_version: String,
machine_id: String,
listen_socket: SocketAddr,
listen_port: u16,
},
Event {
machine_id: String,
Expand Down

0 comments on commit bbabdc3

Please sign in to comment.