Skip to content

Commit e00bb8d

Browse files
kaiyuan-lifacebook-github-bot
authored andcommitted
hyperactor_state examples
Summary: Created three examples to showcase how the hyperactor_state works. It includes a state actor server, a event stream ingestor and a client. Differential Revision: D76821366
1 parent 5e7d123 commit e00bb8d

File tree

6 files changed

+296
-9
lines changed

6 files changed

+296
-9
lines changed

hyperactor_state/Cargo.toml

Lines changed: 16 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
# @generated by autocargo from //monarch/hyperactor_state:hyperactor_state
1+
# @generated by autocargo from //monarch/hyperactor_state:[hyperactor_state,state_actor_example,state_client_example,state_ingestor_example]
22

33
[package]
44
name = "hyperactor_state"
@@ -7,15 +7,30 @@ authors = ["Meta"]
77
edition = "2021"
88
license = "BSD-3-Clause"
99

10+
[[bin]]
11+
name = "state_actor_example"
12+
path = "examples/state_actor_main.rs"
13+
14+
[[bin]]
15+
name = "state_client_example"
16+
path = "examples/state_client_main.rs"
17+
18+
[[bin]]
19+
name = "state_ingestor_example"
20+
path = "examples/state_ingestor_main.rs"
21+
1022
[dependencies]
1123
anyhow = "1.0.95"
1224
async-trait = "0.1.86"
25+
chrono = { version = "=0.4.39", features = ["clock", "serde", "std"], default-features = false }
26+
clap = { version = "4.5.38", features = ["derive", "env", "string", "unicode", "wrap_help"] }
1327
hyperactor = { version = "0.0.0", path = "../hyperactor" }
1428
hyperactor_macros = { version = "0.0.0", path = "../hyperactor_macros" }
1529
serde = { version = "1.0.185", features = ["derive", "rc"] }
1630
serde_json = { version = "1.0.140", features = ["float_roundtrip", "unbounded_depth"] }
1731
tokio = { version = "1.45.0", features = ["full", "test-util", "tracing"] }
1832
tracing = { version = "0.1.41", features = ["attributes", "valuable"] }
33+
tracing-subscriber = { version = "0.3.19", features = ["chrono", "env-filter", "json", "local-time", "parking_lot", "registry"] }
1934
unicode-ident = "1.0.12"
2035

2136
[dev-dependencies]
Lines changed: 58 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,58 @@
1+
/*
2+
* Copyright (c) Meta Platforms, Inc. and affiliates.
3+
* All rights reserved.
4+
*
5+
* This source code is licensed under the BSD-style license found in the
6+
* LICENSE file in the root directory of this source tree.
7+
*/
8+
9+
use anyhow::Result;
10+
use clap::Parser;
11+
use hyperactor::channel::ChannelAddr;
12+
use hyperactor::id;
13+
use hyperactor_state::spawn_actor;
14+
use hyperactor_state::state_actor::StateActor;
15+
use tracing::Level;
16+
use tracing_subscriber::FmtSubscriber;
17+
18+
/// A simple state actor binary
19+
/// ```
20+
/// buck run //monarch/hyperactor_state:state_actor_example -- -a 'tcp![::]:3000'
21+
/// ```
22+
#[derive(Parser, Debug)]
23+
#[command()]
24+
struct Args {
25+
/// The system address
26+
#[arg(short, long)]
27+
address: ChannelAddr,
28+
}
29+
30+
#[tokio::main]
31+
async fn main() -> Result<()> {
32+
// Initialize the tracing subscriber
33+
let subscriber = FmtSubscriber::builder()
34+
.with_max_level(Level::INFO)
35+
.finish();
36+
tracing::subscriber::set_global_default(subscriber).expect("Failed to set tracing subscriber");
37+
38+
let args = Args::parse();
39+
40+
println!("\x1b[33m======= STATE ACTOR STARTING ========\x1b[0m");
41+
42+
// Create a state actor
43+
let actor_id = id!(state[0].state_actor[0]);
44+
let addr = args.address.clone();
45+
46+
// Spawn the state actor
47+
let (local_addr, _state_actor_ref) =
48+
spawn_actor::<StateActor>(addr, actor_id.clone(), ()).await?;
49+
50+
println!("State actor spawned at address: {:?}", local_addr);
51+
52+
// Keep the application running until terminated
53+
println!("State actor system running. Press Ctrl+C to exit.");
54+
tokio::signal::ctrl_c().await?;
55+
println!("Shutting down state actor system");
56+
57+
Ok(())
58+
}
Lines changed: 119 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,119 @@
1+
/*
2+
* Copyright (c) Meta Platforms, Inc. and affiliates.
3+
* All rights reserved.
4+
*
5+
* This source code is licensed under the BSD-style license found in the
6+
* LICENSE file in the root directory of this source tree.
7+
*/
8+
9+
use std::time::Duration;
10+
11+
use anyhow::Result;
12+
use clap::Parser;
13+
use hyperactor::ActorRef;
14+
use hyperactor::channel::ChannelAddr;
15+
use hyperactor::channel::ChannelTransport;
16+
use hyperactor::id;
17+
use hyperactor_state::client::ClientActor;
18+
use hyperactor_state::client::ClientActorParams;
19+
use hyperactor_state::create_remote_client;
20+
use hyperactor_state::object::GenericStateObject;
21+
use hyperactor_state::spawn_actor;
22+
use hyperactor_state::state_actor::StateActor;
23+
use hyperactor_state::state_actor::StateMessageClient;
24+
use tokio::sync::mpsc;
25+
26+
/// A state client binary that subscribes to logs from a state actor
27+
/// ```
28+
/// buck run //monarch/hyperactor_state:state_client_example -- -a 'tcp![::]:3000'
29+
/// ```
30+
#[derive(Parser, Debug)]
31+
#[command()]
32+
struct Args {
33+
/// The state actor address to connect to
34+
#[arg(short, long)]
35+
address: ChannelAddr,
36+
}
37+
38+
#[tokio::main]
39+
async fn main() -> Result<()> {
40+
// Initialize the tracing subscriber for better logging
41+
let subscriber = tracing_subscriber::FmtSubscriber::builder()
42+
.with_max_level(tracing::Level::INFO)
43+
.finish();
44+
tracing::subscriber::set_global_default(subscriber).expect("Failed to set tracing subscriber");
45+
46+
let args = Args::parse();
47+
48+
println!("\x1b[36m======= STATE CLIENT STARTING ========\x1b[0m");
49+
println!("Connecting to state actor at: {:?}", args.address);
50+
51+
// Connect to the state actor
52+
let (_proc, remote_client) = create_remote_client(args.address.clone()).await?;
53+
54+
// Get a reference to the state actor
55+
let state_actor_ref = ActorRef::<StateActor>::attest(id!(state[0].state_actor[0]));
56+
57+
// Create a channel to receive logs
58+
let (sender, mut receiver) = mpsc::channel::<GenericStateObject>(100);
59+
60+
// Create a client actor to receive logs
61+
let client_actor_addr = ChannelAddr::any(ChannelTransport::Unix);
62+
let params = ClientActorParams { sender };
63+
let (client_actor_addr, client_actor_ref) = spawn_actor::<ClientActor>(
64+
client_actor_addr.clone(),
65+
id![state_client[0].log_client],
66+
params,
67+
)
68+
.await?;
69+
70+
// Subscribe to logs from the state actor
71+
println!("Subscribing to logs from state actor...");
72+
state_actor_ref
73+
.subscribe_logs(&remote_client, client_actor_addr, client_actor_ref)
74+
.await?;
75+
76+
println!("\x1b[32mSubscribed successfully! Waiting for logs...\x1b[0m");
77+
78+
// Process received logs
79+
let mut log_count = 0;
80+
loop {
81+
match tokio::time::timeout(Duration::from_secs(60), receiver.recv()).await {
82+
Ok(Some(log)) => {
83+
log_count += 1;
84+
println!("\x1b[34m--- Log #{} Received ---\x1b[0m", log_count);
85+
println!("Metadata: {:?}", log.metadata());
86+
87+
// Try to parse the data as JSON for better display
88+
match serde_json::from_str::<serde_json::Value>(log.data()) {
89+
Ok(json_data) => {
90+
if let Some(status) = json_data.get("status") {
91+
if let Some(message) = status.get("message") {
92+
println!("Message: {}", message);
93+
}
94+
if let Some(seq) = status.get("seq") {
95+
println!("Sequence: {}", seq);
96+
}
97+
} else {
98+
println!("Data: {}", serde_json::to_string_pretty(&json_data)?);
99+
}
100+
}
101+
Err(_) => {
102+
// If not valid JSON, just print the raw data
103+
println!("Raw data: {}", log.data());
104+
}
105+
}
106+
println!();
107+
}
108+
Ok(None) => {
109+
println!("\x1b[31mChannel closed, exiting\x1b[0m");
110+
break;
111+
}
112+
Err(_) => {
113+
println!("\x1b[33mNo logs received in the last 60 seconds\x1b[0m");
114+
}
115+
}
116+
}
117+
118+
Ok(())
119+
}
Lines changed: 86 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,86 @@
1+
/*
2+
* Copyright (c) Meta Platforms, Inc. and affiliates.
3+
* All rights reserved.
4+
*
5+
* This source code is licensed under the BSD-style license found in the
6+
* LICENSE file in the root directory of this source tree.
7+
*/
8+
9+
use std::time::Duration;
10+
11+
use anyhow::Result;
12+
use clap::Parser;
13+
use hyperactor::ActorRef;
14+
use hyperactor::channel::ChannelAddr;
15+
use hyperactor::clock::Clock;
16+
use hyperactor::clock::RealClock;
17+
use hyperactor::id;
18+
use hyperactor_state::create_remote_client;
19+
use hyperactor_state::state_actor::StateActor;
20+
use hyperactor_state::state_actor::StateMessage;
21+
use hyperactor_state::test_utils::log_items;
22+
23+
/// A state ingestor binary that sends logs to a state actor at a customizable rate
24+
/// ```
25+
/// buck run //monarch/hyperactor_state:state_ingestor_example -- -a 'tcp![::]:3000' -r 2
26+
/// ```
27+
#[derive(Parser, Debug)]
28+
#[command()]
29+
struct Args {
30+
/// The state actor address to connect to
31+
#[arg(short, long)]
32+
address: ChannelAddr,
33+
34+
/// Rate of log ingestion in logs per second
35+
#[arg(short, long, default_value = "1")]
36+
rate: u64,
37+
}
38+
39+
#[tokio::main]
40+
async fn main() -> Result<()> {
41+
let args = Args::parse();
42+
43+
// Print a yellow banner
44+
println!("\x1b[33m======= STATE INGESTOR STARTING ========\x1b[0m");
45+
println!("Connecting to state actor at: {:?}", args.address);
46+
println!("Ingestion rate: {} logs per second", args.rate);
47+
48+
// Calculate the delay between logs based on the rate
49+
let delay = Duration::from_millis(1000 / args.rate);
50+
51+
// Connect to the state actor
52+
let (_proc, remote_client) = create_remote_client(args.address.clone()).await?;
53+
54+
// Get a reference to the state actor
55+
let state_actor_ref = ActorRef::<StateActor>::attest(id!(state[0].state_actor[0]));
56+
57+
println!("\x1b[32mStarting log ingestion...\x1b[0m");
58+
59+
// Send logs at the specified rate
60+
let mut seq = 0;
61+
loop {
62+
seq += 1;
63+
64+
// Create a simple text log message
65+
let log_message = format!(
66+
"Log message #{:04} at {}",
67+
seq,
68+
chrono::Utc::now().to_rfc3339()
69+
);
70+
println!("Sending log #{:04}: {}", seq, log_message);
71+
72+
// Send the log message to the state actor
73+
// Note: In a real implementation, you would create proper GenericStateObject instances
74+
// but for this example, we're just sending an empty message
75+
let logs = log_items(seq, seq + 1);
76+
state_actor_ref.send(
77+
&remote_client,
78+
StateMessage::SetLogs {
79+
logs, // Empty logs for now, as we can't create GenericStateObject directly
80+
},
81+
)?;
82+
83+
// Wait for the specified delay
84+
RealClock.sleep(delay).await;
85+
}
86+
}

hyperactor_state/src/lib.rs

Lines changed: 7 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -25,14 +25,14 @@ use hyperactor::mailbox::MessageEnvelope;
2525
use hyperactor::mailbox::Undeliverable;
2626
use hyperactor::proc::Proc;
2727

28-
mod client;
29-
mod object;
30-
mod state_actor;
28+
pub mod client;
29+
pub mod object;
30+
pub mod state_actor;
3131

3232
/// Creates a state actor server at given address. Returns the server address and a handle to the
3333
/// state actor.
3434
#[allow(dead_code)]
35-
pub(crate) async fn spawn_actor<T: Actor + RemoteActor + Binds<T>>(
35+
pub async fn spawn_actor<T: Actor + RemoteActor + Binds<T>>(
3636
addr: ChannelAddr,
3737
actor_id: ActorId,
3838
params: T::Params,
@@ -56,7 +56,7 @@ pub(crate) async fn spawn_actor<T: Actor + RemoteActor + Binds<T>>(
5656

5757
/// Creates a remote client that can send message to actors in the remote addr.
5858
/// It is important to keep the client proc alive for the remote_client's lifetime.
59-
pub(crate) async fn create_remote_client(addr: ChannelAddr) -> Result<(Proc, Mailbox)> {
59+
pub async fn create_remote_client(addr: ChannelAddr) -> Result<(Proc, Mailbox)> {
6060
let remote_sender = MailboxClient::new(channel::dial(addr).unwrap());
6161
let client_proc_id = id!(client).random_user_proc();
6262
let client_proc = Proc::new(
@@ -67,15 +67,14 @@ pub(crate) async fn create_remote_client(addr: ChannelAddr) -> Result<(Proc, Mai
6767
Ok((client_proc, remote_client))
6868
}
6969

70-
#[cfg(test)]
71-
pub(crate) mod test_utils {
70+
pub mod test_utils {
7271
use crate::object::GenericStateObject;
7372
use crate::object::LogSpec;
7473
use crate::object::LogState;
7574
use crate::object::StateMetadata;
7675
use crate::object::StateObject;
7776

78-
pub(crate) fn log_items(seq_low: usize, seq_high: usize) -> Vec<GenericStateObject> {
77+
pub fn log_items(seq_low: usize, seq_high: usize) -> Vec<GenericStateObject> {
7978
let mut log_items = vec![];
8079
let metadata = StateMetadata {
8180
name: "test".to_string(),

hyperactor_state/src/object.rs

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -80,6 +80,16 @@ where
8080
}
8181
}
8282

83+
impl GenericStateObject {
84+
pub fn metadata(&self) -> &StateMetadata {
85+
&self.metadata
86+
}
87+
88+
pub fn data(&self) -> &str {
89+
&self.data
90+
}
91+
}
92+
8393
/// Spec is the define the desired state of an object, defined by the user.
8494
pub trait Spec: Serialize + for<'de> Deserialize<'de> {}
8595
/// State is the current state of an object.

0 commit comments

Comments
 (0)