Skip to content

subscription response #293

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 4 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
37 changes: 37 additions & 0 deletions hyperactor_state/Cargo.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
# @generated by autocargo from //monarch/hyperactor_state:[hyperactor_state,state_actor_example,state_client_example,state_ingestor_example]

[package]
name = "hyperactor_state"
version = "0.0.0"
authors = ["Meta"]
edition = "2021"
license = "BSD-3-Clause"

[[bin]]
name = "state_actor_example"
path = "examples/state_actor_main.rs"

[[bin]]
name = "state_client_example"
path = "examples/state_client_main.rs"

[[bin]]
name = "state_ingestor_example"
path = "examples/state_ingestor_main.rs"

[dependencies]
anyhow = "1.0.95"
async-trait = "0.1.86"
chrono = { version = "=0.4.39", features = ["clock", "serde", "std"], default-features = false }
clap = { version = "4.5.38", features = ["derive", "env", "string", "unicode", "wrap_help"] }
hyperactor = { version = "0.0.0", path = "../hyperactor" }
hyperactor_macros = { version = "0.0.0", path = "../hyperactor_macros" }
serde = { version = "1.0.185", features = ["derive", "rc"] }
serde_json = { version = "1.0.140", features = ["float_roundtrip", "unbounded_depth"] }
tokio = { version = "1.45.0", features = ["full", "test-util", "tracing"] }
tracing = { version = "0.1.41", features = ["attributes", "valuable"] }
tracing-subscriber = { version = "0.3.19", features = ["chrono", "env-filter", "json", "local-time", "parking_lot", "registry"] }
unicode-ident = "1.0.12"

[dev-dependencies]
tracing-test = { version = "0.2.3", features = ["no-env-filter"] }
58 changes: 58 additions & 0 deletions hyperactor_state/examples/state_actor_main.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
/*
* Copyright (c) Meta Platforms, Inc. and affiliates.
* All rights reserved.
*
* This source code is licensed under the BSD-style license found in the
* LICENSE file in the root directory of this source tree.
*/

use anyhow::Result;
use clap::Parser;
use hyperactor::channel::ChannelAddr;
use hyperactor::id;
use hyperactor_state::spawn_actor;
use hyperactor_state::state_actor::StateActor;
use tracing::Level;
use tracing_subscriber::FmtSubscriber;

/// A simple state actor binary
/// ```
/// buck run //monarch/hyperactor_state:state_actor_example -- -a 'tcp![::]:3000'
/// ```
#[derive(Parser, Debug)]
#[command()]
struct Args {
/// The system address
#[arg(short, long)]
address: ChannelAddr,
}

#[tokio::main]
async fn main() -> Result<()> {
// Initialize the tracing subscriber
let subscriber = FmtSubscriber::builder()
.with_max_level(Level::INFO)
.finish();
tracing::subscriber::set_global_default(subscriber).expect("Failed to set tracing subscriber");

let args = Args::parse();

println!("\x1b[33m======= STATE ACTOR STARTING ========\x1b[0m");

// Create a state actor
let actor_id = id!(state[0].state_actor[0]);
let addr = args.address.clone();

// Spawn the state actor
let (local_addr, _state_actor_ref) =
spawn_actor::<StateActor>(addr, actor_id.clone(), ()).await?;

println!("State actor spawned at address: {:?}", local_addr);

// Keep the application running until terminated
println!("State actor system running. Press Ctrl+C to exit.");
tokio::signal::ctrl_c().await?;
println!("Shutting down state actor system");

Ok(())
}
119 changes: 119 additions & 0 deletions hyperactor_state/examples/state_client_main.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,119 @@
/*
* Copyright (c) Meta Platforms, Inc. and affiliates.
* All rights reserved.
*
* This source code is licensed under the BSD-style license found in the
* LICENSE file in the root directory of this source tree.
*/

use std::time::Duration;

use anyhow::Result;
use clap::Parser;
use hyperactor::ActorRef;
use hyperactor::channel::ChannelAddr;
use hyperactor::channel::ChannelTransport;
use hyperactor::id;
use hyperactor_state::client::ClientActor;
use hyperactor_state::client::ClientActorParams;
use hyperactor_state::create_remote_client;
use hyperactor_state::object::GenericStateObject;
use hyperactor_state::spawn_actor;
use hyperactor_state::state_actor::StateActor;
use hyperactor_state::state_actor::StateMessageClient;
use tokio::sync::mpsc;

/// A state client binary that subscribes to logs from a state actor
/// ```
/// buck run //monarch/hyperactor_state:state_client_example -- -a 'tcp![::]:3000'
/// ```
#[derive(Parser, Debug)]
#[command()]
struct Args {
/// The state actor address to connect to
#[arg(short, long)]
address: ChannelAddr,
}

#[tokio::main]
async fn main() -> Result<()> {
// Initialize the tracing subscriber for better logging
let subscriber = tracing_subscriber::FmtSubscriber::builder()
.with_max_level(tracing::Level::INFO)
.finish();
tracing::subscriber::set_global_default(subscriber).expect("Failed to set tracing subscriber");

let args = Args::parse();

println!("\x1b[36m======= STATE CLIENT STARTING ========\x1b[0m");
println!("Connecting to state actor at: {:?}", args.address);

// Connect to the state actor
let (_proc, remote_client) = create_remote_client(args.address.clone()).await?;

// Get a reference to the state actor
let state_actor_ref = ActorRef::<StateActor>::attest(id!(state[0].state_actor[0]));

// Create a channel to receive logs
let (sender, mut receiver) = mpsc::channel::<GenericStateObject>(100);

// Create a client actor to receive logs
let client_actor_addr = ChannelAddr::any(ChannelTransport::Unix);
let params = ClientActorParams { sender };
let (client_actor_addr, client_actor_ref) = spawn_actor::<ClientActor>(
client_actor_addr.clone(),
id![state_client[0].log_client],
params,
)
.await?;

// Subscribe to logs from the state actor
println!("Subscribing to logs from state actor...");
state_actor_ref
.subscribe_logs(&remote_client, client_actor_addr, client_actor_ref)
.await?;

println!("\x1b[32mSubscribed successfully! Waiting for logs...\x1b[0m");

// Process received logs
let mut log_count = 0;
loop {
match tokio::time::timeout(Duration::from_secs(60), receiver.recv()).await {
Ok(Some(log)) => {
log_count += 1;
println!("\x1b[34m--- Log #{} Received ---\x1b[0m", log_count);
println!("Metadata: {:?}", log.metadata());

// Try to parse the data as JSON for better display
match serde_json::from_str::<serde_json::Value>(log.data()) {
Ok(json_data) => {
if let Some(status) = json_data.get("status") {
if let Some(message) = status.get("message") {
println!("Message: {}", message);
}
if let Some(seq) = status.get("seq") {
println!("Sequence: {}", seq);
}
} else {
println!("Data: {}", serde_json::to_string_pretty(&json_data)?);
}
}
Err(_) => {
// If not valid JSON, just print the raw data
println!("Raw data: {}", log.data());
}
}
println!();
}
Ok(None) => {
println!("\x1b[31mChannel closed, exiting\x1b[0m");
break;
}
Err(_) => {
println!("\x1b[33mNo logs received in the last 60 seconds\x1b[0m");
}
}
}

Ok(())
}
86 changes: 86 additions & 0 deletions hyperactor_state/examples/state_ingestor_main.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,86 @@
/*
* Copyright (c) Meta Platforms, Inc. and affiliates.
* All rights reserved.
*
* This source code is licensed under the BSD-style license found in the
* LICENSE file in the root directory of this source tree.
*/

use std::time::Duration;

use anyhow::Result;
use clap::Parser;
use hyperactor::ActorRef;
use hyperactor::channel::ChannelAddr;
use hyperactor::clock::Clock;
use hyperactor::clock::RealClock;
use hyperactor::id;
use hyperactor_state::create_remote_client;
use hyperactor_state::state_actor::StateActor;
use hyperactor_state::state_actor::StateMessage;
use hyperactor_state::test_utils::log_items;

/// A state ingestor binary that sends logs to a state actor at a customizable rate
/// ```
/// buck run //monarch/hyperactor_state:state_ingestor_example -- -a 'tcp![::]:3000' -r 2
/// ```
#[derive(Parser, Debug)]
#[command()]
struct Args {
/// The state actor address to connect to
#[arg(short, long)]
address: ChannelAddr,

/// Rate of log ingestion in logs per second
#[arg(short, long, default_value = "1")]
rate: u64,
}

#[tokio::main]
async fn main() -> Result<()> {
let args = Args::parse();

// Print a yellow banner
println!("\x1b[33m======= STATE INGESTOR STARTING ========\x1b[0m");
println!("Connecting to state actor at: {:?}", args.address);
println!("Ingestion rate: {} logs per second", args.rate);

// Calculate the delay between logs based on the rate
let delay = Duration::from_millis(1000 / args.rate);

// Connect to the state actor
let (_proc, remote_client) = create_remote_client(args.address.clone()).await?;

// Get a reference to the state actor
let state_actor_ref = ActorRef::<StateActor>::attest(id!(state[0].state_actor[0]));

println!("\x1b[32mStarting log ingestion...\x1b[0m");

// Send logs at the specified rate
let mut seq = 0;
loop {
seq += 1;

// Create a simple text log message
let log_message = format!(
"Log message #{:04} at {}",
seq,
chrono::Utc::now().to_rfc3339()
);
println!("Sending log #{:04}: {}", seq, log_message);

// Send the log message to the state actor
// Note: In a real implementation, you would create proper GenericStateObject instances
// but for this example, we're just sending an empty message
let logs = log_items(seq, seq + 1);
state_actor_ref.send(
&remote_client,
StateMessage::SetLogs {
logs, // Empty logs for now, as we can't create GenericStateObject directly
},
)?;

// Wait for the specified delay
RealClock.sleep(delay).await;
}
}
Loading