diff --git a/Cargo.lock b/Cargo.lock index d6df5a5689a0..f6718a03a9f1 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1338,6 +1338,20 @@ dependencies = [ "tokio-tungstenite", ] +[[package]] +name = "hydro_cli_maelstrom" +version = "0.1.0" +dependencies = [ + "bincode", + "bytes", + "futures", + "serde", + "serde_json", + "tokio", + "tokio-stream", + "tokio-util", +] + [[package]] name = "hydroflow" version = "0.5.0" diff --git a/Cargo.toml b/Cargo.toml index 96a771c0c18c..f6fedee72a30 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -4,6 +4,7 @@ members = [ "benches", "hydro_cli", "hydro_cli_examples", + "hydro_cli_maelstrom", "hydroflow", "hydroflow_cli_integration", "hydroflow_datalog", diff --git a/hydro_cli/src/core/custom_service.rs b/hydro_cli/src/core/custom_service.rs index 98cf030ca25e..c03d43404143 100644 --- a/hydro_cli/src/core/custom_service.rs +++ b/hydro_cli/src/core/custom_service.rs @@ -1,4 +1,5 @@ use std::any::Any; +use std::collections::HashMap; use std::ops::Deref; use std::sync::{Arc, Weak}; @@ -65,11 +66,19 @@ impl Service for CustomService { Ok(()) } - async fn start(&mut self) {} + async fn start(&mut self, names: &HashMap) {} async fn stop(&mut self) -> Result<()> { Ok(()) } + + fn name(&self) -> String { + self._id.to_string() + } + + fn id(&self) -> usize { + self._id + } } pub struct CustomClientPort { diff --git a/hydro_cli/src/core/deployment.rs b/hydro_cli/src/core/deployment.rs index 331470c70317..f4b0044d32b4 100644 --- a/hydro_cli/src/core/deployment.rs +++ b/hydro_cli/src/core/deployment.rs @@ -1,6 +1,8 @@ +use std::collections::HashMap; use std::sync::{Arc, Weak}; use anyhow::Result; +use futures::future::join_all; use tokio::sync::RwLock; use super::{progress, Host, ResourcePool, ResourceResult, Service}; @@ -106,14 +108,30 @@ impl Deployment { .collect::>(); self.services = active_services; + let node_names: HashMap = + join_all(self.services.iter().map(|service| async { + let service = service.upgrade().unwrap(); + let service = service.read().await; + (service.id(), service.name()) + })) + .await + .into_iter() + .collect(); + let all_services_start = self.services .iter() .map(|service: &Weak>| async { - service.upgrade().unwrap().write().await.start().await; + service + .upgrade() + .unwrap() + .write() + .await + .start(&node_names) + .await; }); - futures::future::join_all(all_services_start).await; + join_all(all_services_start).await; } pub fn add_host T>( diff --git a/hydro_cli/src/core/hydroflow_crate/mod.rs b/hydro_cli/src/core/hydroflow_crate/mod.rs index 7b379c4ffaf4..f3b3bdee73b9 100644 --- a/hydro_cli/src/core/hydroflow_crate/mod.rs +++ b/hydro_cli/src/core/hydroflow_crate/mod.rs @@ -286,7 +286,7 @@ impl Service for HydroflowCrate { .await } - async fn start(&mut self) { + async fn start(&mut self, names: &HashMap) { if self.started { return; } @@ -296,7 +296,9 @@ impl Service for HydroflowCrate { sink_ports.insert(port_name.clone(), outgoing.load_instantiated(&|p| p).await); } - let formatted_defns = serde_json::to_string(&sink_ports).unwrap(); + let payload = (&sink_ports, self.id, names); + + let formatted_start = serde_json::to_string(&payload).unwrap(); self.launched_binary .as_mut() @@ -305,7 +307,7 @@ impl Service for HydroflowCrate { .await .stdin() .await - .send(format!("start: {formatted_defns}\n")) + .send(format!("start: {formatted_start}\n")) .await .unwrap(); @@ -333,4 +335,14 @@ impl Service for HydroflowCrate { Ok(()) } + + fn name(&self) -> String { + self.display_id + .clone() + .unwrap_or_else(|| format!("service/{}", self.id)) + } + + fn id(&self) -> usize { + self.id + } } diff --git a/hydro_cli/src/core/mod.rs b/hydro_cli/src/core/mod.rs index c28d1f0fe437..6e8eee337c02 100644 --- a/hydro_cli/src/core/mod.rs +++ b/hydro_cli/src/core/mod.rs @@ -186,8 +186,14 @@ pub trait Service: Send + Sync { async fn ready(&mut self) -> Result<()>; /// Starts the service by having it connect to other services and start computations. - async fn start(&mut self); + /// Takes in a map from service id to service name for all services. + async fn start(&mut self, names: &HashMap); /// Stops the service by having it disconnect from other services and stop computations. async fn stop(&mut self) -> Result<()>; + + /// Returns the id of the service + fn id(&self) -> usize; + /// Returns the display name of the service + fn name(&self) -> String; } diff --git a/hydro_cli_examples/Cargo.toml b/hydro_cli_examples/Cargo.toml index 296db8e3c63f..d795091ee7d0 100644 --- a/hydro_cli_examples/Cargo.toml +++ b/hydro_cli_examples/Cargo.toml @@ -46,6 +46,12 @@ name = "pn_counter_delta" [[example]] name = "ws_chat_server" +[[example]] +name = "maelstrom_unique_id" + +[[example]] +name = "maelstrom_broadcast" + [dev-dependencies] hydroflow = { path = "../hydroflow", features = [ "cli_integration" ] } hydroflow_datalog = { path = "../hydroflow_datalog" } diff --git a/hydro_cli_examples/examples/echo/main.rs b/hydro_cli_examples/examples/echo/main.rs new file mode 100644 index 000000000000..5c1a18adf77b --- /dev/null +++ b/hydro_cli_examples/examples/echo/main.rs @@ -0,0 +1,71 @@ +use hydroflow::hydroflow_syntax; +use hydroflow::util::cli::{ConnectedDirect, ConnectedSink, ConnectedSource}; +use hydroflow::util::serialize_to_bytes; +use serde::{Deserialize, Serialize}; +use serde_json::Value; + +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct EchoMsg { + pub msg_id: Value, + pub echo: String, +} + +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct EchoOkMsg { + pub echo: String, + pub in_reply_to: Value, +} + +impl EchoMsg { + /// Generate EchoOkMsg response to this EchoMsg + fn response( + EchoMsg { + echo, + msg_id: source_msg_id, + }: Self, + ) -> EchoOkMsg { + EchoOkMsg { + echo, + in_reply_to: source_msg_id, + } + } +} + +#[hydroflow::main] +async fn main() { + let mut ports = hydroflow::util::cli::init().await; + + // TODO: use ConnectedDemux? + let echo_in = ports + .port("echo_in") + .connect::() + .await + .into_source(); + let echo_out = ports + .port("echo_out") + .connect::() + .await + .into_sink(); + + let df = hydroflow_syntax! { + input = source_stream(echo_in) + -> map(Result::unwrap) + -> map(|x| x.to_vec()) + -> map(String::from_utf8) + -> map(Result::unwrap); + + output = map(|x| serde_json::to_string(&x)) + -> map(Result::unwrap) + -> map(serialize_to_bytes) + -> dest_sink(echo_out); + + + input + -> map(|x| serde_json::from_str::(&x).unwrap()) + //-> map(|x| EchoMsg {msg_id: x.msg_id, echo: x.echo + "hi"}) + -> map(EchoMsg::response) + -> output; + }; + + hydroflow::util::cli::launch_flow(df).await; +} diff --git a/hydro_cli_examples/examples/maelstrom_broadcast/main.rs b/hydro_cli_examples/examples/maelstrom_broadcast/main.rs new file mode 100644 index 000000000000..93db738ee136 --- /dev/null +++ b/hydro_cli_examples/examples/maelstrom_broadcast/main.rs @@ -0,0 +1,211 @@ +use std::collections::HashMap; +use std::io::Result; + +use hydroflow::bytes::{Bytes, BytesMut}; +use hydroflow::hydroflow_syntax; +use hydroflow::util::cli::{ + ConnectedDemux, ConnectedDirect, ConnectedSink, ConnectedSource, ConnectedTagged, +}; +use hydroflow::util::serialize_to_bytes; +use serde::de::DeserializeOwned; +use serde::{Deserialize, Serialize}; +use serde_json::Value; + +#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)] +pub struct Broadcast { + pub msg_id: Value, + pub message: usize, +} + +#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)] +pub struct BroadcastOk { + pub in_reply_to: Value, +} + +impl Broadcast { + pub fn respond(self) -> BroadcastOk { + BroadcastOk { + in_reply_to: self.msg_id, + } + } +} + +#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)] +pub struct Topology { + pub msg_id: Value, + pub topology: HashMap>, +} + +#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)] +pub struct TopologyOk { + pub in_reply_to: Value, +} + +impl Topology { + pub fn respond(self) -> TopologyOk { + TopologyOk { + in_reply_to: self.msg_id, + } + } +} + +#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)] +pub struct Read { + pub msg_id: Value, +} + +#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)] +pub struct ReadOk { + pub in_reply_to: Value, + pub messages: Vec, +} + +impl Read { + pub fn respond(self, messages: Vec) -> ReadOk { + ReadOk { + in_reply_to: self.msg_id, + messages, + } + } +} + +#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)] +pub struct Gossip { + pub messages: Vec, +} + +fn parse_input(bytes: Result) -> T { + let bytes = bytes.unwrap().to_vec(); + + let string = String::from_utf8(bytes).unwrap(); + + serde_json::from_str::(&string).unwrap() +} + +fn prep_output(output: T) -> Bytes { + let string = serde_json::to_string(&output).unwrap(); + serialize_to_bytes(string) +} + +fn parse_tagged_input(bytes: Result<(u32, BytesMut)>) -> (u32, T) { + let bytes = bytes.unwrap(); + let string = String::from_utf8(bytes.1.to_vec()).unwrap(); + (bytes.0, serde_json::from_str::(&string).unwrap()) +} + +fn prep_demux_output((peer, state): (usize, T)) -> (u32, Bytes) { + let output = serde_json::to_string(&state).unwrap(); + (peer as u32, serialize_to_bytes(output)) +} + +#[hydroflow::main] +async fn main() { + let mut ports = hydroflow::util::cli::init().await; + let node_id = ports.node_id; + let node_name = ports.node_names.get(&node_id).unwrap().clone(); + // map from node name to id + let node_ids: HashMap = ports + .node_names + .iter() + .map(|(key, value)| (value.clone(), *key)) + .collect(); + + let broadcast_in = ports + .port("broadcast_in") + .connect::() + .await + .into_source(); + let broadcastok_out = ports + .port("broadcastok_out") + .connect::() + .await + .into_sink(); + + let topology_in = ports + .port("topology_in") + .connect::() + .await + .into_source(); + let topologyok_out = ports + .port("topologyok_out") + .connect::() + .await + .into_sink(); + + let read_in = ports + .port("read_in") + .connect::() + .await + .into_source(); + let readok_out = ports + .port("readok_out") + .connect::() + .await + .into_sink(); + + // This port is does not transfer maelstrom payloads, but rather passes custom payloads between nodes + let gossip_out = ports + .port("gossip_out") + .connect::>() + .await + .into_sink(); + let gossip_in = ports + .port("gossip_in") + .connect::>() + .await + .into_source(); + + let df = hydroflow_syntax! { + broadcast = source_stream(broadcast_in) -> map(parse_input) -> tee(); + topology = source_stream(topology_in)-> map(parse_input) -> tee(); + read = source_stream(read_in) -> map(parse_input::); + + broadcastok = map(prep_output) -> dest_sink(broadcastok_out); + topologyok = map(prep_output) -> dest_sink(topologyok_out); + readok = map(prep_output) -> dest_sink(readok_out); + + gossip = source_stream(gossip_in) -> map(parse_tagged_input::); + gossipout = map(prep_demux_output) -> dest_sink(gossip_out); + + // Ack topology message from maelstrom + topology -> map(|top: Topology| top.respond()) -> topologyok; + // Ack all broadcasts + broadcast -> map(|b: Broadcast| b.respond()) -> broadcastok; + + // Identifies all neighbors of the current node + topology -> flat_map(|top| top.topology.get(&node_name).unwrap().clone()) -> map(|node_name| *node_ids.get(&node_name).unwrap()) -> [0]forwards; + + + // Set of all gossip messages + gossip_message = gossip -> map(|(_, message)| message) -> tee(); + + // Gossip messages should be gossiped further + gossip_message -> forward_message; + + // Gossip messages should be added to the message list + gossip_message -> messages; + + + // Join the stream of adjacent nodes with the new messages to gossip and output + forwards = cross_join::<'static, 'tick>() -> gossipout; + + // Set of messages to gossip to neighbors + forward_message = union() -> unique::<'static>() -> [1]forwards; + + // All new messages from maelstrom should be gossipped + message -> forward_message; + + // Broadcast messages + message = broadcast -> map(|b: Broadcast| b.message) -> tee(); + + // Messages is singleton list of all unique messages recieved + messages = union() -> unique::<'static>() -> fold::<'static>(Vec::new, |accum: &mut Vec, elem| {accum.push(elem)}) -> [1]output; + message -> messages; + + // When we get a read, trigger a readok containing the messages singleton + read -> [0]output; + output = cross_join() -> map(|(read, messages)| read.respond(messages)) -> readok; + }; + + hydroflow::util::cli::launch_flow(df).await; +} diff --git a/hydro_cli_examples/examples/maelstrom_unique_id/main.rs b/hydro_cli_examples/examples/maelstrom_unique_id/main.rs new file mode 100644 index 000000000000..5d34dff55eca --- /dev/null +++ b/hydro_cli_examples/examples/maelstrom_unique_id/main.rs @@ -0,0 +1,68 @@ +use hydroflow::hydroflow_syntax; +use hydroflow::util::cli::{ConnectedDirect, ConnectedSink, ConnectedSource}; +use hydroflow::util::serialize_to_bytes; +use serde::{Deserialize, Serialize}; +use serde_json::{json, Value}; + +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct Generate { + pub msg_id: Value, +} + +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct GenerateOk { + pub id: Value, + pub in_reply_to: Value, +} + +impl Generate { + /// Generate GenerateOk response to this Generate message + pub fn respond(self, i: usize, node_id: usize) -> GenerateOk { + let id = json!([i, node_id]); + + GenerateOk { + id, + in_reply_to: self.msg_id, + } + } +} + +#[hydroflow::main] +async fn main() { + let mut ports = hydroflow::util::cli::init().await; + let node_id = ports.node_id.clone(); + + // TODO: use ConnectedDemux? + let gen_in = ports + .port("gen_in") + .connect::() + .await + .into_source(); + let ok_out = ports + .port("ok_out") + .connect::() + .await + .into_sink(); + + let df = hydroflow_syntax! { + input = source_stream(gen_in) + -> map(Result::unwrap) + -> map(|x| x.to_vec()) + -> map(String::from_utf8) + -> map(Result::unwrap); + + output = map(|x| serde_json::to_string(&x)) + -> map(Result::unwrap) + -> map(serialize_to_bytes) + -> dest_sink(ok_out); + + + input + -> map(|x| serde_json::from_str::(&x).unwrap()) + -> enumerate::<'static>() //-> enumerate() will fail! + -> map(|(i, x)| x.respond(i, node_id)) + -> output; + }; + + hydroflow::util::cli::launch_flow(df).await; +} diff --git a/hydro_cli_maelstrom/Cargo.toml b/hydro_cli_maelstrom/Cargo.toml new file mode 100644 index 000000000000..09353d0857a7 --- /dev/null +++ b/hydro_cli_maelstrom/Cargo.toml @@ -0,0 +1,22 @@ +[package] +name = "hydro_cli_maelstrom" +version = "0.1.0" +edition = "2021" + +# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html + +[dependencies] +serde_json = "1" +serde = { version = "1", features = [ "derive" ] } +futures = { version = "0.3" } +bytes = "1.1.0" +bincode = "1.3" +tokio-stream = { version = "0.1.10", features = [ "io-util", "sync" ] } + +[target.'cfg(not(target_arch = "wasm32"))'.dependencies] +tokio = { version = "1.16", features = [ "full" ] } +tokio-util = { version = "0.7.4", features = [ "net", "codec" ] } + +[target.'cfg(target_arch = "wasm32")'.dependencies] +tokio = { version = "1.16", features = [ "rt" , "sync", "macros", "io-util", "time" ] } +tokio-util = { version = "0.7.4", features = [ "codec" ] } \ No newline at end of file diff --git a/hydro_cli_maelstrom/src/cli_refs.rs b/hydro_cli_maelstrom/src/cli_refs.rs new file mode 100644 index 000000000000..d320fdb5aa87 --- /dev/null +++ b/hydro_cli_maelstrom/src/cli_refs.rs @@ -0,0 +1,19 @@ +use std::collections::HashMap; +use std::net::SocketAddr; + +use serde::{Deserialize, Serialize}; + +#[derive(Serialize, Deserialize, Clone, Debug)] +pub enum ServerPort { + TcpPort(SocketAddr), + Demux(HashMap), + Merge(Vec), + Tagged(Box, u32), +} + +#[derive(Serialize, Deserialize, Clone, Debug)] +pub enum ServerBindConfig { + TcpPort(String), + Merge(Vec), + Tagged(Box, u32), +} diff --git a/hydro_cli_maelstrom/src/config.rs b/hydro_cli_maelstrom/src/config.rs new file mode 100644 index 000000000000..5ba821982625 --- /dev/null +++ b/hydro_cli_maelstrom/src/config.rs @@ -0,0 +1,74 @@ +use std::path::PathBuf; + +/// A configuration for hydro deploy maelstrom wrapper which identifies how to map ports and maelstrom payloads. +pub struct Config { + pub binary: PathBuf, + pub ports: Vec, +} + +/// A logical port +pub enum Port { + SourcePort(MaelstromPort), + SinkPort(MaelstromPort), + CustomPort(CustomPort), +} + +/// A port which communicates through wrapped maelstrom payloads. +pub struct MaelstromPort { + port_name: String, + maelstrom_type: String, +} + +/// A custom two-sided port with a demux out and tagged in. +/// Used for passing around non-maelstrom payloads between nodes. +pub struct CustomPort { + tagged_port_name: String, + demux_port_name: String, + maelstrom_type: String, +} + +impl MaelstromPort { + pub fn new(port_name: String, maelstrom_type: String) -> MaelstromPort { + MaelstromPort { + port_name, + maelstrom_type, + } + } + + pub fn port_name(&self) -> &str { + &self.port_name + } + + pub fn maelstrom_type(&self) -> &str { + &self.maelstrom_type + } +} + +const CUSTOM_TAG: &str = "_~*hydromael*~_"; + +impl CustomPort { + pub fn new(tagged_port_name: String, demux_port_name: String) -> CustomPort { + // Initialize maelstrom type for custom port to a unique name which doesn't conflict with maelstrom provided types + let maelstrom_type = format!( + "{}{}{}{}", + CUSTOM_TAG, demux_port_name, CUSTOM_TAG, tagged_port_name + ); + CustomPort { + tagged_port_name, + demux_port_name, + maelstrom_type, + } + } + + pub fn tagged_port_name(&self) -> &str { + &self.tagged_port_name + } + + pub fn demux_port_name(&self) -> &str { + &self.demux_port_name + } + + pub fn maelstrom_type(&self) -> &str { + &self.maelstrom_type + } +} diff --git a/hydro_cli_maelstrom/src/hydro_interact.rs b/hydro_cli_maelstrom/src/hydro_interact.rs new file mode 100644 index 000000000000..d6011f23b44b --- /dev/null +++ b/hydro_cli_maelstrom/src/hydro_interact.rs @@ -0,0 +1,384 @@ +use std::collections::HashMap; +use std::error::Error; +use std::net::SocketAddr; + +use bytes::Bytes; +use futures::SinkExt; +use serde_json::Value; +use tokio::io::{stdin, AsyncBufReadExt, BufReader}; +use tokio::net::{TcpListener, TcpStream}; +use tokio::task::JoinHandle; +use tokio_stream::StreamExt; +use tokio_util::codec::{FramedRead, FramedWrite, LengthDelimitedCodec}; + +use crate::cli_refs::ServerPort; +use crate::config::{CustomPort, MaelstromPort, Port}; +use crate::maelstrom::{CustomBody, HydroBody, Message, UnknownBody}; +use crate::ports; + +pub enum SourceConnection { + Direct(FramedWrite), + Tagged(HashMap>), +} + +impl SourceConnection { + fn is_tagged(&self) -> bool { + match self { + Self::Tagged(_) => true, + _ => false, + } + } + + fn get_mut( + &mut self, + node_id: &usize, + ) -> Option<&mut FramedWrite> { + match self { + Self::Direct(writer) => Some(writer), + Self::Tagged(map) => map.get_mut(node_id), + } + } + + async fn try_from(port: ServerPort) -> Result> { + match port { + ServerPort::TcpPort(socket) => connect_to_socket(&socket).await.map(Self::Direct), + ServerPort::Merge(merge) => connect_to_merge(&merge).await.map(Self::Tagged), + _ => Err("Attempted to connect to invalid port type".into()), + } + } +} + +async fn connect_to_socket( + socket: &SocketAddr, +) -> Result, Box> { + let stream = TcpStream::connect(socket).await?; + let codec = LengthDelimitedCodec::new(); + Ok(FramedWrite::new(stream, codec)) +} + +async fn connect_to_merge( + merge: &Vec, +) -> Result>, Box> { + let mut map = HashMap::new(); + for tagged in merge { + let (tcpport, node_id) = match tagged { + ServerPort::Tagged(tcpport, node_id) => (tcpport, node_id), + _ => return Err("ServerPort in merge was not tagged".into()), + }; + let socket = match **tcpport { + ServerPort::TcpPort(socket) => socket, + _ => return Err("ServerPort in tagged was not tcp port".into()), + }; + + let writer = connect_to_socket(&socket).await?; + + map.insert(*node_id as usize, writer); + } + + Ok(map) +} + +// Connects to all source ports, returning a mapping from maelstrom type to the corresponding connection +pub async fn connect_to_sources<'a, I: IntoIterator>( + ready_message: &str, + ports: I, +) -> Result, Box> { + let mut source_name_to_type = ports::source_name_to_type(ports); + + // Parse the ready message into the map from port name to ServerPort for all source ports + let ready_message = ready_message.trim_start_matches("ready: "); + let source_ports = serde_json::from_str::>(ready_message)?; + #[cfg(debug_assertions)] + println!("Parsed ready: {}", serde_json::to_string(&source_ports)?); + + // Connect to all source ports + let mut connections = HashMap::new(); + for (port_name, port) in source_ports { + let connection = SourceConnection::try_from(port).await?; + let maelstrom_type = source_name_to_type + .remove(&port_name) + .ok_or("Port name mapping missing from ready message")?; + connections.insert(maelstrom_type, connection); + } + + Ok(connections) +} + +/// Demuxes standard in to the corresponding connections +pub fn spawn_input_handler( + connections: HashMap, + node_names: &Vec, +) -> JoinHandle<()> { + let name_to_id = node_names + .iter() + .enumerate() + .map(|(id, name)| (name.to_string(), id)) + .collect::>(); + + tokio::task::spawn(input_handler(connections, name_to_id)) +} + +/// Demux stdin to the relevant source ports in the hydroflow program +async fn input_handler( + mut connections: HashMap, + name_to_id: HashMap, +) { + let mut lines = BufReader::new(stdin()).lines(); + while let Ok(Some(line)) = lines.next_line().await { + // Parse the initial message structure + let message = serde_json::from_str::>(&line).unwrap(); + let maelstrom_type = serde_json::from_value::(message.body.clone()) + .unwrap() + .maelstrom_type; + + // Find the connection corresponding to the maelstrom_type and source + let connection = connections.get_mut(&maelstrom_type).unwrap(); + let is_custom = connection.is_tagged(); + let node_id = name_to_id.get(&message.src).unwrap_or(&0); + let target_port = connection.get_mut(node_id).unwrap(); + + let body_string = if is_custom { + // If custom port, simply forward the inner text field + serde_json::from_value::(message.body) + .unwrap() + .text + } else { + // If maelstrom port, remove the type field and update the msg_id so responses can be directed correctly + let mut body_value = message.body; + let body = body_value.as_object_mut().unwrap(); + body.remove("type").unwrap(); + + // Update the msg_id to be the pair [src, msg_id] + body.entry("msg_id").and_modify(|msg_id| { + *msg_id = Value::Array(vec![message.src.into(), msg_id.clone()]) + }); + + serde_json::to_string(body).unwrap() + }; + + #[cfg(debug_assertions)] + println!( + "Sending line {} to {}.{}", + body_string, &maelstrom_type, node_id + ); + target_port.send(Bytes::from(body_string)).await.unwrap(); + } +} + +pub enum SinkConnection { + Direct(DirectSink), + Demux(DemuxSink), +} + +pub struct DirectSink { + port_name: String, + binding: ServerPort, + listener: TcpListener, + maelstrom_type: String, +} + +pub struct DemuxSink { + port_name: String, + binding: ServerPort, + listeners: Vec, + maelstrom_type: String, +} + +impl SinkConnection { + fn spawn_handlers( + self, + node_id: usize, + node_names: &Vec, + output_id: usize, + output_count: usize, + ) { + let node_name = node_names[node_id].clone(); + match self { + Self::Direct(direct) => direct.spawn_handler(node_name, output_id, output_count), + Self::Demux(demux) => demux.spawn_handlers(node_name, node_names), + }; + } + + pub fn defn(&self) -> (&str, &ServerPort) { + match self { + Self::Direct(direct) => direct.defn(), + Self::Demux(demux) => demux.defn(), + } + } +} + +impl DirectSink { + pub async fn bind(port: &MaelstromPort) -> Result> { + let port_name = port.port_name().to_string(); + let maelstrom_type = port.maelstrom_type().to_string(); + + let listener = TcpListener::bind("127.0.0.1:0").await?; + let addr = listener.local_addr()?; + let binding = ServerPort::TcpPort(addr); + + Ok(Self { + port_name, + binding, + listener, + maelstrom_type, + }) + } + + fn spawn_handler(self, node_name: String, output_id: usize, output_count: usize) { + tokio::task::spawn(output_handler( + self.listener, + self.maelstrom_type, + node_name, + output_id, + output_count, + )); + } + + fn defn(&self) -> (&str, &ServerPort) { + (&self.port_name, &self.binding) + } +} + +impl DemuxSink { + pub async fn bind(port: &CustomPort, node_count: usize) -> Result> { + let port_name = port.demux_port_name().to_string(); + let maelstrom_type = port.maelstrom_type().to_string(); + + let mut connections = HashMap::new(); + let mut listeners = Vec::new(); + for dest_id in 0..node_count { + let listener = TcpListener::bind("127.0.0.1:0").await?; + let addr = listener.local_addr()?; + let tcpport = ServerPort::TcpPort(addr); + connections.insert(dest_id as u32, tcpport); + listeners.push(listener); + } + let binding = ServerPort::Demux(connections); + + Ok(Self { + port_name, + binding, + listeners, + maelstrom_type, + }) + } + + fn spawn_handlers(self, node_name: String, node_names: &Vec) { + self.listeners + .into_iter() + .enumerate() + .for_each(|(dest_id, listener)| { + let maelstrom_type = self.maelstrom_type.clone(); + let dest = node_names[dest_id].clone(); + let node_name = node_name.clone(); + tokio::task::spawn(custom_output_handler( + listener, + maelstrom_type, + node_name, + dest, + )); + }); + } + + fn defn(&self) -> (&str, &ServerPort) { + (&self.port_name, &self.binding) + } +} + +pub fn spawn_output_handlers( + sink_connections: Vec, + node_id: usize, + node_names: &Vec, +) -> Result<(), Box> { + let output_count = sink_connections.len(); + for (output_id, connection) in sink_connections.into_iter().enumerate() { + connection.spawn_handlers(node_id, node_names, output_id, output_count); + } + + Ok(()) +} + +/// Accept a connection on each sink port which wraps outputs in maelstrom payload of specified type +/// Generated "msg_id"s will be `= output_id (mod output_count)` to ensure no overlap +async fn output_handler( + listener: TcpListener, + maelstrom_type: String, + node_name: String, + output_id: usize, + output_count: usize, +) { + let in_stream = listener.accept().await.unwrap().0; + + let mut lines = FramedRead::new(in_stream, LengthDelimitedCodec::new()); + #[cfg(debug_assertions)] + println!("accepted connection for {}", maelstrom_type); + + // Initialize counter which tracks the next available msg_id + let mut msg_id_counter = output_id; + + while let Some(Ok(line)) = lines.next().await { + // Transforms output into maelstrom payload + // For example: + // {"echo":"hello world!","in_reply_to":["n1", 1]} + // -> + // {"src":"n1","dest":"c1","body":{"echo":"hello world!","msg_id":0,"in_reply_to":1,"type":"echo_ok"}} + + // Parse line to string + let raw_line: String = bincode::deserialize(&line).unwrap(); + + // Read maelstrom specific hydro content + let hydro_body = serde_json::from_str::(&raw_line).unwrap(); + let (dest, in_reply_to) = hydro_body.in_reply_to; + + // Parse body as a raw json object + let mut raw_body = serde_json::from_str::>(&raw_line).unwrap(); + + // Insert in the maelstrom specific fields + raw_body.insert("in_reply_to".into(), in_reply_to); + raw_body.insert("msg_id".into(), msg_id_counter.into()); + raw_body.insert("type".into(), maelstrom_type.clone().into()); + msg_id_counter += output_count; + + // Wrap in maelstrom payload + let message = Message { + src: node_name.clone(), + dest, + body: raw_body, + }; + + // Send the message to maelstrom + println!("{}", serde_json::to_string(&message).unwrap()); + } +} + +/// Handles forwarding messages from custom ports to maelstrom +async fn custom_output_handler( + listener: TcpListener, + maelstrom_type: String, + node_name: String, + dest: String, +) { + let in_stream = listener.accept().await.unwrap().0; + + let mut lines = FramedRead::new(in_stream, LengthDelimitedCodec::new()); + #[cfg(debug_assertions)] + println!("accepted custom connection for {}.{}", maelstrom_type, dest); + + while let Some(Ok(line)) = lines.next().await { + // Parse line to string + let line = bincode::deserialize(&line).unwrap(); + + // Wrap in maelstrom payload for custom packets + let message = Message { + src: node_name.clone(), + dest: dest.clone(), + body: CustomBody { + maelstrom_type: maelstrom_type.clone(), + text: line, + }, + }; + + // Send the message to maelstrom + println!("{}", serde_json::to_string(&message).unwrap()); + } +} diff --git a/hydro_cli_maelstrom/src/maelstrom.rs b/hydro_cli_maelstrom/src/maelstrom.rs new file mode 100644 index 000000000000..8f17a98e2799 --- /dev/null +++ b/hydro_cli_maelstrom/src/maelstrom.rs @@ -0,0 +1,117 @@ +use std::error::Error; + +use serde::{Deserialize, Serialize}; +use serde_json::Value; + +use crate::utils; + +/// A message from maelstrom +#[derive(Serialize, Deserialize)] +pub struct Message { + pub src: String, + pub dest: String, + pub body: T, +} + +#[derive(Serialize, Deserialize)] +#[serde(tag = "type")] +#[serde(rename = "init")] +pub struct InitMsg { + pub msg_id: i32, + pub node_id: String, + pub node_ids: Vec, +} + +#[derive(Serialize, Deserialize)] +#[serde(tag = "type")] +#[serde(rename = "init_ok")] +pub struct InitOkMsg { + pub in_reply_to: i32, +} + +impl Message { + pub fn response(&self) -> Result, Box> { + let body = self.body.response(); + Ok(Message { + src: self.dest.clone(), + dest: self.src.clone(), + body, + }) + } +} + +impl InitMsg { + fn response(&self) -> InitOkMsg { + InitOkMsg { + in_reply_to: self.msg_id, + } + } +} + +/// Configuration information returned by maelstrom's init package. +pub struct MaelstromConfig { + /// The node id of this node. + pub node_id: usize, + /// Node ids are determined by the index of the corresponding name in the node_names vector. + pub node_names: Vec, + pub node_count: usize, +} + +impl TryFrom> for MaelstromConfig { + type Error = &'static str; + + fn try_from(msg: Message) -> Result { + let node_names = msg.body.node_ids; + // Find the node_id (index in node_names) which corresponds to the current node's name + let node_id = node_names + .iter() + .position(|x| *x == msg.body.node_id) + .ok_or("Could not find current node in node list")?; + + let node_count = node_names.len(); + + Ok(Self { + node_id, + node_names, + node_count, + }) + } +} + +/// Recieve & Ack the init payload from Maelstrom. +pub async fn maelstrom_init() -> Result> { + // Read the init message from Maelstrom + let line = utils::read_line().await?; + let init_msg: Message = serde_json::from_str(&line)?; + + // Send an init_ok back to Maelstrom + let response = init_msg.response()?; + let response = serde_json::to_string(&response)?; + println!("{}", response); + + // Extract the contents from the init message + let cfg = init_msg.try_into()?; + Ok(cfg) +} + +/// An unknown message body +#[derive(Serialize, Deserialize)] +pub struct UnknownBody { + #[serde(rename = "type")] + pub maelstrom_type: String, +} + +/// The body of a custom message +#[derive(Serialize, Deserialize)] +pub struct CustomBody { + #[serde(rename = "type")] + pub maelstrom_type: String, + pub text: String, +} + +/// The body of a maelstrom message sent from hydro +#[derive(Serialize, Deserialize)] +pub struct HydroBody { + /// in_reply_to should be a pair of (origin node name, original message id) because that is how msg_ids from maelstrom are wrapped + pub in_reply_to: (String, Value), +} diff --git a/hydro_cli_maelstrom/src/main.rs b/hydro_cli_maelstrom/src/main.rs new file mode 100644 index 000000000000..df44c4894699 --- /dev/null +++ b/hydro_cli_maelstrom/src/main.rs @@ -0,0 +1,120 @@ +use std::error::Error; +use std::path::PathBuf; + +use tokio::io::stderr; + +mod cli_refs; +mod config; +mod hydro_interact; +mod maelstrom; +mod ports; +mod utils; + +#[tokio::main] +async fn main() -> Result<(), Box> { + // this should be serialized input to the program + let config = config::Config { + binary: PathBuf::from( + r"/mnt/c/Users/rhala/Code/hydroflow/target/debug/examples/maelstrom_broadcast", + ), + // binary: PathBuf::from(r"C://Users/rhala/Code/hydroflow/target/debug/examples/maelstrom_broadcast"), + ports: vec![ + config::Port::SourcePort(config::MaelstromPort::new( + "read_in".to_string(), + "read".to_string(), + )), + config::Port::SinkPort(config::MaelstromPort::new( + "readok_out".to_string(), + "read_ok".to_string(), + )), + config::Port::SourcePort(config::MaelstromPort::new( + "topology_in".to_string(), + "topology".to_string(), + )), + config::Port::SinkPort(config::MaelstromPort::new( + "topologyok_out".to_string(), + "topology_ok".to_string(), + )), + config::Port::SourcePort(config::MaelstromPort::new( + "broadcast_in".to_string(), + "broadcast".to_string(), + )), + config::Port::SinkPort(config::MaelstromPort::new( + "broadcastok_out".to_string(), + "broadcast_ok".to_string(), + )), + config::Port::CustomPort(config::CustomPort::new( + "gossip_in".to_string(), + "gossip_out".to_string(), + )), + ], + }; + + // Spawn the child process + let child = utils::spawn_child(&config.binary)?; + let (mut child, mut child_stdin, mut child_stdout, child_stderr) = child; + + // Perform initialization handshake with maelstrom + let mael_cfg = maelstrom::maelstrom_init().await?; + + // Send setup string which initializes source ports (source from the perspective of the wrapped crate) + let setup_string = ports::make_setup_string(&config.ports, mael_cfg.node_count)?; + utils::write_to_child(&mut child_stdin, &setup_string).await?; + + // Recieves the ready message from hydro cli with bound source ports + let ready_message = utils::read_from_child(&mut child_stdout).await?; + + // Connect to all source ports + let source_connections = + hydro_interact::connect_to_sources(&ready_message, &config.ports).await?; + + // Bind to all sink ports + let sink_connections = ports::bind_to_sinks(&config.ports, mael_cfg.node_count).await?; + + // Send start string with sink bindings + let start_string = + ports::make_start_string(&sink_connections, mael_cfg.node_id, &mael_cfg.node_names)?; + + // Spawn thread to demux stdin to the relevant ports + hydro_interact::spawn_input_handler(source_connections, &mael_cfg.node_names); + hydro_interact::spawn_output_handlers( + sink_connections, + mael_cfg.node_id, + &mael_cfg.node_names, + )?; + + // Send start string to finish hydro cli initialization + utils::write_to_child(&mut child_stdin, &start_string).await?; + + // Forward child's standard out and error stderr for maelstrom logging + tokio::task::spawn(utils::debug_link( + child_stdout, + stderr(), + "child-stdout".into(), + )); + tokio::task::spawn(utils::debug_link( + child_stderr, + stderr(), + "child-stderr".into(), + )); + + child.wait().await?; + + Ok(()) +} + +// Example inputs: +// INIT PAYLOAD SHOULD ALWAYS BE FIRST: +// {"src": "c1", "dest": "n1","body": {"msg_id": 0,"type": "init", "node_id": "n1", "node_ids": ["n1", "n2", "n3"]}} +// +// echo examples: +// {"src": "c1", "dest": "n1","body": {"msg_id": 1,"type": "echo", "echo": "hello world!"}} +// +// unique id examples: +// {"src": "c1", "dest": "n1","body": {"msg_id": 1, "type": "generate"}} +// +// broadcast examples: +// {"src": "n3", "dest": "n1","body": {"msg_id": 1,"type": "topology", "topology": {"n1": ["n2"], "n2": ["n1", "n2"], "n3": ["n2"]}}} +// {"src": "n2", "dest": "n1","body": {"msg_id": 1,"type": "_~*hydromael*~_gossip_out_~*hydromael*~_gossip_in", "text": "10"}} +// {"src": "n2", "dest": "n1","body": {"msg_id": 1,"type": "broadcast", "message": 0}} +// {"src": "c1", "dest": "n1","body": {"msg_id": 1,"type": "read"}} diff --git a/hydro_cli_maelstrom/src/ports.rs b/hydro_cli_maelstrom/src/ports.rs new file mode 100644 index 000000000000..af2c05964701 --- /dev/null +++ b/hydro_cli_maelstrom/src/ports.rs @@ -0,0 +1,164 @@ +use std::collections::HashMap; +use std::error::Error; +use std::iter::repeat; + +use crate::cli_refs::ServerBindConfig; +use crate::config::{CustomPort, MaelstromPort, Port}; +use crate::hydro_interact::{DemuxSink, DirectSink, SinkConnection}; + +impl Port { + /// Returns the (port_name, serverbind config) for sink ports + pub fn bind_config(&self, node_count: usize) -> Option<(&str, ServerBindConfig)> { + match self { + Self::SinkPort(_) => None, + Self::SourcePort(port) => Some(port.bind_config()), + Self::CustomPort(port) => Some(port.bind_config(node_count)), + } + } + + /// Binds to sink ports + pub async fn bind(&self, node_count: usize) -> Result, Box> { + match self { + Self::SourcePort(_) => Ok(None), + Self::SinkPort(port) => port.bind().await.map(Some), + Self::CustomPort(port) => port.bind(node_count).await.map(Some), + } + } + + pub fn maelstrom_type(&self) -> &str { + match self { + Self::SinkPort(port) => port.maelstrom_type(), + Self::SourcePort(port) => port.maelstrom_type(), + Self::CustomPort(port) => port.maelstrom_type(), + } + } + + pub fn source_port_name(&self) -> Option<&str> { + match self { + Self::SinkPort(_) => None, + Self::SourcePort(port) => Some(port.port_name()), + Self::CustomPort(port) => Some(port.tagged_port_name()), + } + } + + pub fn sink_port_name(&self) -> Option<&str> { + match self { + Self::SourcePort(_) => None, + Self::SinkPort(port) => Some(port.port_name()), + Self::CustomPort(port) => Some(port.demux_port_name()), + } + } +} + +impl MaelstromPort { + /// Returns (port name, tcp port) + fn bind_config(&self) -> (&str, ServerBindConfig) { + let port_name = self.port_name(); + let localhost = ServerBindConfig::TcpPort("127.0.0.1".to_string()); + (port_name, localhost) + } + + async fn bind(&self) -> Result> { + let connection = DirectSink::bind(self).await?; + Ok(SinkConnection::Direct(connection)) + } +} + +impl CustomPort { + /// Returns (sink port name, merge of all possible tagged inputs) + fn bind_config(&self, node_count: usize) -> (&str, ServerBindConfig) { + let port_name = self.tagged_port_name(); + + let localhost = ServerBindConfig::TcpPort("127.0.0.1".to_string()); + + let tagged_configs = repeat(localhost) + .enumerate() + .map(|(i, tcpport)| (i as u32, Box::new(tcpport))) + .map(|(i, tcpport)| ServerBindConfig::Tagged(tcpport, i)) + .take(node_count) + .collect(); + + (port_name, ServerBindConfig::Merge(tagged_configs)) + } + + async fn bind(&self, node_count: usize) -> Result> { + let connection = DemuxSink::bind(self, node_count).await?; + Ok(SinkConnection::Demux(connection)) + } +} + +/// Returns the setup string which sets up all source ports. +pub fn make_setup_string<'a, I: IntoIterator>( + ports: I, + node_count: usize, +) -> Result> { + // Pairs from port name to bind configuration for all source ports + let source_setup_pairs = ports + .into_iter() + .flat_map(|p| p.bind_config(node_count)) + .collect::>(); + + let source_setup_string = serde_json::to_string(&source_setup_pairs)?; + + Ok(source_setup_string) +} + +pub fn source_name_to_type<'a, I: IntoIterator>( + ports: I, +) -> HashMap { + ports + .into_iter() + .filter_map(|p| { + p.source_port_name() + .map(|port_name| (port_name, p.maelstrom_type())) + }) + .map(|(a, b)| (a.to_string(), b.to_string())) + .collect() +} + +pub fn sink_name_to_type<'a, I: IntoIterator>( + ports: I, +) -> HashMap<&'a str, &'a str> { + ports + .into_iter() + .filter_map(|p| { + p.sink_port_name() + .map(|port_name| (port_name, p.maelstrom_type())) + }) + .collect() +} + +pub async fn bind_to_sinks<'a, I: IntoIterator>( + ports: I, + node_count: usize, +) -> Result, Box> { + let mut connections = Vec::new(); + for port in ports { + let binding = port.bind(node_count).await?; + if let Some(binding) = binding { + connections.push(binding); + } + } + + Ok(connections) +} + +pub fn make_start_string( + sink_connections: &Vec, + node_id: usize, + node_names: &Vec, +) -> Result> { + let connection_defns = sink_connections + .iter() + .map(|connection| connection.defn()) + .collect::>(); + + let node_names: HashMap = + node_names.iter().map(|x| x.as_str()).enumerate().collect(); + let data = (connection_defns, node_id, node_names); + + let raw_string = serde_json::to_string(&data)?; + let formatted_string = format!("start: {raw_string}"); + + Ok(formatted_string) +} diff --git a/hydro_cli_maelstrom/src/utils.rs b/hydro_cli_maelstrom/src/utils.rs new file mode 100644 index 000000000000..47ae453be420 --- /dev/null +++ b/hydro_cli_maelstrom/src/utils.rs @@ -0,0 +1,73 @@ +use std::error::Error; +use std::path::PathBuf; +use std::process::Stdio; + +use tokio::io::{stdin, AsyncBufReadExt, AsyncRead, AsyncWrite, AsyncWriteExt, BufReader}; +use tokio::process::{Child, ChildStderr, ChildStdin, ChildStdout, Command}; + +/// Spawns the specified child pricess with initialized pipes +pub fn spawn_child( + path: &PathBuf, +) -> Result<(Child, ChildStdin, ChildStdout, ChildStderr), Box> { + let mut child = Command::new(path) + .stdin(Stdio::piped()) + .stdout(Stdio::piped()) + .stderr(Stdio::piped()) + .spawn()?; + + let child_stdin = child.stdin.take().ok_or("Could not access child's stdin")?; + let child_stdout = child + .stdout + .take() + .ok_or("Could not access child's stdin")?; + let child_stderr = child + .stderr + .take() + .ok_or("Could not access child's stdin")?; + + Ok((child, child_stdin, child_stdout, child_stderr)) +} + +/// Writes the specified message to the child's stdin +pub async fn write_to_child( + child_stdin: &mut ChildStdin, + message: &str, +) -> Result<(), Box> { + let message = format!("{message}\n"); + let message_bytes = message.as_bytes(); + child_stdin.write_all(message_bytes).await?; + #[cfg(debug_assertions)] + println!("sent {}", message); + + Ok(()) +} + +/// Reads a line from the child's stdout +pub async fn read_from_child(child_stdout: &mut ChildStdout) -> Result> { + let mut buffer = String::new(); + let mut reader = BufReader::new(child_stdout); + reader.read_line(&mut buffer).await?; + + Ok(buffer) +} + +/// Links two streams line-wise +pub async fn debug_link( + in_stream: R, + mut out_stream: W, + debug_message: String, +) { + let mut lines = BufReader::new(in_stream).lines(); + while let Ok(Some(line)) = lines.next_line().await { + let text = format!("{debug_message}: {line}\n"); + out_stream.write_all(text.as_bytes()).await.unwrap(); + } +} + +/// Reads a line from stdin +pub async fn read_line() -> Result> { + let mut buffer = String::new(); + BufReader::new(stdin()).read_line(&mut buffer).await?; + + Ok(buffer) +} diff --git a/hydroflow/src/util/cli.rs b/hydroflow/src/util/cli.rs index 90928acd8a00..7ce9883a96e7 100644 --- a/hydroflow/src/util/cli.rs +++ b/hydroflow/src/util/cli.rs @@ -23,6 +23,8 @@ pub async fn launch_flow(mut flow: Hydroflow) { pub struct HydroCLI { ports: HashMap, + pub node_names: HashMap, + pub node_id: usize, } impl HydroCLI { @@ -52,11 +54,12 @@ pub async fn init() -> HydroCLI { let mut start_buf = String::new(); std::io::stdin().read_line(&mut start_buf).unwrap(); - let connection_defns = if start_buf.starts_with("start: ") { - serde_json::from_str::>( - start_buf.trim_start_matches("start: ").trim(), - ) - .unwrap() + let (connection_defns, node_id, node_names): ( + HashMap, + usize, + HashMap, + ) = if start_buf.starts_with("start: ") { + serde_json::from_str(start_buf.trim_start_matches("start: ").trim()).unwrap() } else { panic!("expected start"); }; @@ -72,5 +75,7 @@ pub async fn init() -> HydroCLI { HydroCLI { ports: all_connected, + node_names, + node_id, } }