Skip to content

Commit

Permalink
draft: anna v2
Browse files Browse the repository at this point in the history
  • Loading branch information
zzlk committed Jan 23, 2024
1 parent 5a03ed4 commit 26154b4
Show file tree
Hide file tree
Showing 8 changed files with 796 additions and 1 deletion.
35 changes: 35 additions & 0 deletions hydroflow/examples/kvs_replicated_v2/client.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
use hydroflow::hydroflow_syntax;
use hydroflow::util::{UdpSink, UdpStream};

use crate::helpers::parse_command;
use crate::protocol::ServerResp;
use crate::Opts;

pub(crate) async fn run_client(outbound: UdpSink, inbound: UdpStream, opts: Opts) {
println!("Client live!");

let server_addr = opts.server_addr.unwrap();
let mut hf = hydroflow_syntax! {
// set up channels
outbound_chan = dest_sink_serde(outbound);
inbound_chan = source_stream_serde(inbound) -> map(Result::unwrap);

// read in commands from stdin and forward to server
source_stdin()
-> filter_map(|line| parse_command(line.unwrap()))
-> map(|msg| { (msg, server_addr) })
-> outbound_chan;

// print inbound msgs
inbound_chan -> for_each(|(response, addr): (ServerResp, _)| println!("Got a Response: {:?} from: {:?}", response, addr));
};

if let Some(graph) = opts.graph {
let serde_graph = hf
.meta_graph()
.expect("No graph found, maybe failed to parse.");
serde_graph.open_graph(graph, opts.write_config).unwrap();
}

hf.run_async().await.unwrap();
}
25 changes: 25 additions & 0 deletions hydroflow/examples/kvs_replicated_v2/helpers.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
use regex::Regex;

use crate::protocol::ServerReq;

pub fn parse_command(line: String) -> Option<ServerReq> {
let re = Regex::new(r"([A-z]+)\s+(.+)").unwrap();
let caps = re.captures(line.as_str())?;

let binding = caps.get(1).unwrap().as_str().to_uppercase();
let cmdstr = binding.as_str();
let args = caps.get(2).unwrap().as_str();
match cmdstr {
"PUT" => {
let kv = args.split_once(',')?;
Some(ServerReq::ClientPut {
key: kv.0.trim().to_string(),
value: kv.1.trim().to_string(),
})
}
"GET" => Some(ServerReq::ClientGet {
key: args.trim().to_string(),
}),
_ => None,
}
}
16 changes: 16 additions & 0 deletions hydroflow/examples/kvs_replicated_v2/left_outer_join.hf
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
lhs = mod[0] -> tee();
rhs = mod[1] -> tee();

lhs -> [0]joined;
rhs -> [1]joined;

joined = join_multiset() -> map(|(k, (lhs, rhs))| (k, (lhs, Some(rhs)))) -> combined;

lhs -> [pos]missed;
rhs -> map(|(k, _v)| k) -> [neg]missed;

missed = anti_join()
-> map(|(k, v)| (k, (v, None)))
-> combined;

combined = union() -> mod;
223 changes: 223 additions & 0 deletions hydroflow/examples/kvs_replicated_v2/main.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,223 @@
use std::net::SocketAddr;
use std::pin::Pin;

Check failure on line 2 in hydroflow/examples/kvs_replicated_v2/main.rs

View workflow job for this annotation

GitHub Actions / Lints (pinned-nightly)

unused import: `std::pin::Pin`

Check warning on line 2 in hydroflow/examples/kvs_replicated_v2/main.rs

View workflow job for this annotation

GitHub Actions / Check (ubuntu-latest, pinned-nightly)

unused import: `std::pin::Pin`

Check warning on line 2 in hydroflow/examples/kvs_replicated_v2/main.rs

View workflow job for this annotation

GitHub Actions / Check (ubuntu-latest, pinned-nightly)

unused import: `std::pin::Pin`

use bytes::{Bytes, BytesMut};

Check failure on line 4 in hydroflow/examples/kvs_replicated_v2/main.rs

View workflow job for this annotation

GitHub Actions / Lints (pinned-nightly)

unused import: `BytesMut`

Check warning on line 4 in hydroflow/examples/kvs_replicated_v2/main.rs

View workflow job for this annotation

GitHub Actions / Check (ubuntu-latest, pinned-nightly)

unused import: `BytesMut`

Check warning on line 4 in hydroflow/examples/kvs_replicated_v2/main.rs

View workflow job for this annotation

GitHub Actions / Check (ubuntu-latest, pinned-nightly)

unused import: `BytesMut`
use clap::{Parser, ValueEnum};
use client::run_client;
use futures::stream::{SplitSink, SplitStream};
use futures::task::noop_waker;

Check failure on line 8 in hydroflow/examples/kvs_replicated_v2/main.rs

View workflow job for this annotation

GitHub Actions / Lints (pinned-nightly)

unused import: `futures::task::noop_waker`

Check warning on line 8 in hydroflow/examples/kvs_replicated_v2/main.rs

View workflow job for this annotation

GitHub Actions / Check (ubuntu-latest, pinned-nightly)

unused import: `futures::task::noop_waker`

Check warning on line 8 in hydroflow/examples/kvs_replicated_v2/main.rs

View workflow job for this annotation

GitHub Actions / Check (ubuntu-latest, pinned-nightly)

unused import: `futures::task::noop_waker`
use futures::SinkExt;
use hydroflow::lang::graph::{WriteConfig, WriteGraphType};
use hydroflow::tokio;
use hydroflow::util::{bind_udp_bytes, ipv4_resolve, TcpFramedStream};

Check failure on line 12 in hydroflow/examples/kvs_replicated_v2/main.rs

View workflow job for this annotation

GitHub Actions / Lints (pinned-nightly)

unused import: `TcpFramedStream`

Check warning on line 12 in hydroflow/examples/kvs_replicated_v2/main.rs

View workflow job for this annotation

GitHub Actions / Check (ubuntu-latest, pinned-nightly)

unused import: `TcpFramedStream`
use multiplatform_test::multiplatform_test;

Check failure on line 13 in hydroflow/examples/kvs_replicated_v2/main.rs

View workflow job for this annotation

GitHub Actions / Lints (pinned-nightly)

unused import: `multiplatform_test::multiplatform_test`

Check warning on line 13 in hydroflow/examples/kvs_replicated_v2/main.rs

View workflow job for this annotation

GitHub Actions / Check (ubuntu-latest, pinned-nightly)

unused import: `multiplatform_test::multiplatform_test`
use server::run_server;
use tokio_util::codec::LengthDelimitedCodec;
use tokio_util::udp::UdpFramed;

use crate::protocol::{ServerReq, ServerResp};

mod client;
mod helpers;
mod protocol;
mod server;

#[derive(Clone, ValueEnum, Debug)]
enum Role {
Client,
Server,
}

#[derive(Parser, Debug)]
struct Opts {
#[clap(value_enum, long)]
role: Role,
#[clap(long, value_parser = ipv4_resolve)]
addr: SocketAddr,
#[clap(long, value_parser = ipv4_resolve)]
server_addr: Option<SocketAddr>,
#[clap(long)]
graph: Option<WriteGraphType>,
#[clap(flatten)]
write_config: Option<WriteConfig>,
}

#[hydroflow::main]
async fn main() {
let opts = Opts::parse();
let addr = opts.addr;

match opts.role {
Role::Client => {
let (outbound, inbound, _) = bind_udp_bytes(addr).await;
println!("Client is bound to {:?}", addr);
println!("Attempting to connect to server at {:?}", opts.server_addr);
run_client(outbound, inbound, opts).await;
}
Role::Server => {
run_server(opts.addr).await;
}
}
}

async fn send(
outbound: &mut SplitSink<UdpFramed<LengthDelimitedCodec>, (Bytes, SocketAddr)>,
x: ServerReq,
addr: SocketAddr,
) {
outbound
.send((hydroflow::util::serialize_to_bytes(x), addr))
.await
.unwrap();
}

async fn read(inbound: &mut SplitStream<UdpFramed<LengthDelimitedCodec>>) -> ServerResp {
use futures::StreamExt;

let Some(Ok((bytes, src))) = inbound.next().await else {
panic!()
};

hydroflow::util::deserialize_from_bytes(bytes).unwrap()
}

// #[multiplatform_test(hydroflow, env_tracing)]
#[hydroflow::test]
async fn test_server() {
let server_addr_1 = "127.0.0.1:2098".parse().unwrap();
let server_addr_2: SocketAddr = "127.0.0.1:2096".parse().unwrap();

tokio::task::spawn_local(run_server(server_addr_1));
// tokio::task::spawn_local(run_server(server_addr_2));
tokio::time::sleep(std::time::Duration::from_millis(100)).await;

let (mut outbound, mut inbound, _) = bind_udp_bytes("127.0.0.1:0".parse().unwrap()).await;

send(
&mut outbound,
ServerReq::AddNode {
node_id: server_addr_1,
},
server_addr_1,
)
.await;
tokio::time::sleep(std::time::Duration::from_millis(100)).await;

send(
&mut outbound,
ServerReq::AddNode {
node_id: server_addr_2,
},
server_addr_1,
)
.await;
tokio::time::sleep(std::time::Duration::from_millis(100)).await;

// send(
// &mut outbound,
// ServerReq::RemoveNode {
// node_id: server_addr_2,
// },
// server_addr_1,
// )
// .await;
// tokio::time::sleep(std::time::Duration::from_millis(100)).await;

send(
&mut outbound,
ServerReq::ClientPut {
key: "mykey".to_owned(),
value: "myval".to_owned(),
},
server_addr_1,
)
.await;
tokio::time::sleep(std::time::Duration::from_millis(100)).await;

// send(
// &mut outbound,
// ServerReq::ClientPut {
// key: "mykey".to_owned(),
// value: "myval2".to_owned(),
// },
// server_addr_1,
// )
// .await;
// tokio::time::sleep(std::time::Duration::from_millis(100)).await;

send(
&mut outbound,
ServerReq::ClientGet {
key: "mykey".to_owned(),
},
server_addr_1,
)
.await;
tokio::time::sleep(std::time::Duration::from_millis(100)).await;

println!("received from srv: {:?}", read(&mut inbound).await);

// send(
// &mut outbound,
// ServerReq::ClientGet {
// key: "mykey2".to_owned(),
// },
// server_addr_1,
// )
// .await;
// tokio::time::sleep(std::time::Duration::from_millis(100)).await;

// Need this sleep otherwise the last sent messages won't get processed before the whole process terminates.
tokio::time::sleep(std::time::Duration::from_millis(1000)).await;
}

#[test]
fn test() {
use std::io::Write;

use hydroflow::util::{run_cargo_example, wait_for_process_output};

let (_server_1, _, mut server_1_stdout) =
run_cargo_example("kvs_replicated_v2", "--role server --addr 127.0.0.1:2051");

let mut server_1_output = String::new();
wait_for_process_output(&mut server_1_output, &mut server_1_stdout, "Server live!");

let (_client_1, mut client_1_stdin, mut client_1_stdout) = run_cargo_example(
"kvs_replicated_v2",
"--role client --addr 127.0.0.1:2052 --server-addr 127.0.0.1:2051",
);

let mut client_1_output = String::new();
wait_for_process_output(&mut client_1_output, &mut client_1_stdout, "Client live!");

client_1_stdin.write_all(b"PUT a,7\n").unwrap();

// let (_server_2, _, mut server_2_stdout) = run_cargo_example(
// "kvs_replicated_v2",
// "--role server --addr 127.0.0.1:2053 --server-addr 127.0.0.1:2051",
// );

// let (_client_2, mut client_2_stdin, mut client_2_stdout) = run_cargo_example(
// "kvs_replicated_v2",
// "--role client --addr 127.0.0.1:2054 --server-addr 127.0.0.1:2053",
// );

// let mut server_2_output = String::new();
// wait_for_process_output(&mut server_2_output, &mut server_2_stdout, "Server live!");
// wait_for_process_output(
// &mut server_2_output,
// &mut server_2_stdout,
// r#"Message received PeerGossip \{ key: "a", value: "7" \} from 127\.0\.0\.1:2051"#,
// );

// let mut client_2_output = String::new();
// wait_for_process_output(&mut client_2_output, &mut client_2_stdout, "Client live!");

// client_2_stdin.write_all(b"GET a\n").unwrap();
// wait_for_process_output(
// &mut client_2_output,
// &mut client_2_stdout,
// r#"Got a Response: ServerResponse \{ key: "a", value: "7" \}"#,
// );
}
Loading

0 comments on commit 26154b4

Please sign in to comment.