From 107e551763ff62e201119887f2a2beade5984173 Mon Sep 17 00:00:00 2001 From: Stephen Akinyemi Date: Wed, 8 Jan 2025 01:17:14 +0100 Subject: [PATCH] feat: enhance service configuration with OCI defaults and improve runtime features (#92) * Support fallback to OCI image config.json for service defaults - Auto-populate workdir from OCI config - Use entrypoint/cmd for command and args - Inherit environment variables from image * Add support for multiple port mappings per service - Replace single port with ports array - Update validation for port conflicts - Maintain backward compatibility * Improve logging system - Implement log rotation - Add supervisor logs viewing - Enhance log formatting and organization * Refactor path handling - Remove static global paths - Use dynamic path resolution - Better organization of monocore directories BREAKING CHANGE: Service port configuration now uses 'ports' array instead of single 'port' field --- .todo.md | 9 + Cargo.lock | 70 ++++- README.md | 56 ++-- monocore.example.json | 4 +- monocore.example.toml | 4 +- monocore/Cargo.toml | 2 + monocore/README.md | 8 +- monocore/bin/monocore.rs | 118 +++++--- monocore/bin/monokrun.rs | 57 +++- monocore/lib/cli/args.rs | 7 +- monocore/lib/config/merge.rs | 102 ++++++- monocore/lib/config/monocore.rs | 61 +++- monocore/lib/config/service_builder.rs | 32 +- monocore/lib/config/validate.rs | 104 ++++++- monocore/lib/error.rs | 8 + monocore/lib/orchestration/log.rs | 50 +++- monocore/lib/orchestration/orchestrator.rs | 20 +- monocore/lib/orchestration/status.rs | 9 +- monocore/lib/orchestration/up.rs | 69 +---- monocore/lib/runtime/log.rs | 329 +++++++++++++++++++++ monocore/lib/runtime/mod.rs | 2 + monocore/lib/runtime/supervisor.rs | 236 ++++++++------- monocore/lib/utils/path.rs | 20 +- 23 files changed, 1026 insertions(+), 351 deletions(-) create mode 100644 monocore/lib/runtime/log.rs diff --git a/.todo.md b/.todo.md index 7571e8a..ba83ac7 100644 --- a/.todo.md +++ b/.todo.md @@ -13,3 +13,12 @@ - [ ] Use sqlitedb for maintaining running services state. - [ ] Fix issue with services running even after the config is deleted. - [ ] We should be able to guarantee that service is dead when the config is deleted. + +- [ ] Treating microvm management like a package manager. + - [ ] Store service rootfs, state, logs locally in a .mc directory kind of like ./node_modules. + - [ ] Store reference rootfses (oci & monofs) in home_dir with a special store that links to them from forked rootfses. + +- [ ] Support multiple registries. + - [ ] Use `Reference` type for image_ref where it makes sense: https://docs.rs/oci-spec/0.7.1/oci_spec/distribution/struct.Reference.html + - [ ] Qualify image names fully where needed. /: + - [ ] Instead of sanitizing image refs, we should just hash them instead. diff --git a/Cargo.lock b/Cargo.lock index 87b453f..9df2e39 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -702,6 +702,15 @@ dependencies = [ "syn 1.0.109", ] +[[package]] +name = "deranged" +version = "0.3.11" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b42b6fa04a440b495c8b04d0e71b707c585f83cb9cb28cf8cd0d976c315e31b4" +dependencies = [ + "powerfmt", +] + [[package]] name = "derive_builder" version = "0.20.2" @@ -1588,6 +1597,7 @@ dependencies = [ "nix", "oci-spec", "pin-project", + "pin-project-lite", "pretty-error-debug", "procspawn", "rand", @@ -1610,6 +1620,7 @@ dependencies = [ "tokio-stream", "toml 0.8.19", "tracing", + "tracing-appender", "tracing-subscriber", "typed-builder", "typed-path", @@ -1776,6 +1787,12 @@ dependencies = [ "winapi", ] +[[package]] +name = "num-conv" +version = "0.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "51d515d32fb182ee37cda2ccdcb92950d6a3c2893aa280e540671c2cd0f3b1d9" + [[package]] name = "num-traits" version = "0.2.19" @@ -1954,9 +1971,9 @@ dependencies = [ [[package]] name = "pin-project-lite" -version = "0.2.14" +version = "0.2.15" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "bda66fc9667c18cb2758a2ac84d1167245054bcf85d5d1aaa6923f45801bdd02" +checksum = "915a1e146535de9163f3987b8944ed8cf49a18bb0056bcebcdcece385cece4ff" [[package]] name = "pin-utils" @@ -1998,6 +2015,12 @@ dependencies = [ "plotters-backend", ] +[[package]] +name = "powerfmt" +version = "0.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "439ee305def115ba05938db6eb1644ff94165c5ab5e9420d1c1bcedbba909391" + [[package]] name = "ppv-lite86" version = "0.2.20" @@ -2883,6 +2906,37 @@ dependencies = [ "once_cell", ] +[[package]] +name = "time" +version = "0.3.37" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "35e7868883861bd0e56d9ac6efcaaca0d6d5d82a2a7ec8209ff492c07cf37b21" +dependencies = [ + "deranged", + "itoa", + "num-conv", + "powerfmt", + "serde", + "time-core", + "time-macros", +] + +[[package]] +name = "time-core" +version = "0.1.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ef927ca75afb808a4d64dd374f00a2adf8d0fcff8e7b184af886c3c87ec4a3f3" + +[[package]] +name = "time-macros" +version = "0.2.19" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2834e6017e3e5e4b9834939793b282bc03b37a3336245fa820e35e233e2a85de" +dependencies = [ + "num-conv", + "time-core", +] + [[package]] name = "tinytemplate" version = "1.2.1" @@ -3065,6 +3119,18 @@ dependencies = [ "tracing-core", ] +[[package]] +name = "tracing-appender" +version = "0.2.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3566e8ce28cc0a3fe42519fc80e6b4c943cc4c8cef275620eb8dac2d3d4e06cf" +dependencies = [ + "crossbeam-channel", + "thiserror 1.0.65", + "time", + "tracing-subscriber", +] + [[package]] name = "tracing-attributes" version = "0.1.28" diff --git a/README.md b/README.md index f38506b..a30de38 100644 --- a/README.md +++ b/README.md @@ -7,19 +7,19 @@

- Discord + Discord Build Status - Monocore Crate + Monocore Crate - Monocore Docs + Monocore Docs - License + License

@@ -31,7 +31,7 @@ Building AI agents that write and execute code? You'll need a secure sandbox. - Generate visualizations and charts - Run data analysis scripts - Execute system commands safely -- Create and test web applications +- Create and host web applications - Run automated browser tasks - Perform complex calculations @@ -100,29 +100,29 @@ This will install both the `monocore` command and its alias `mc`. ```toml # monocore.toml [[service]] - name = "sh-counter" - base = "alpine:latest" - ram = 256 - cpus = 1 - group = "demo" - command = "/bin/sh" - args = ["-c", "for i in $(seq 1 20); do echo $i; sleep 2; done"] - - [[service]] - name = "python-counter" - base = "python:3.11-slim" - ram = 256 - cpus = 1 - group = "demo" - command = "/usr/local/bin/python3" - args = [ - "-c", - "import time; count=0; [print(f'Count: {count+1}') or time.sleep(2) or (count:=count+1) for _ in range(20)]", - ] - - [[group]] - name = "demo" - local_only = true + name = "sh-counter" + base = "alpine:latest" + ram = 256 + cpus = 1 + group = "demo" + command = "/bin/sh" + args = ["-c", "for i in $(seq 1 20); do echo $i; sleep 2; done"] + + [[service]] + name = "python-counter" + base = "python:3.11-slim" + ram = 256 + cpus = 1 + group = "demo" + command = "/usr/local/bin/python3" + args = [ + "-c", + "import time; count=0; [print(f'Count: {count+1}') or time.sleep(2) or (count:=count+1) for _ in range(20)]", + ] + + [[group]] + name = "demo" + local_only = true ``` 2. **Manage your sandboxes** diff --git a/monocore.example.json b/monocore.example.json index 7740547..bd94f20 100644 --- a/monocore.example.json +++ b/monocore.example.json @@ -6,7 +6,7 @@ "ram": 256, "cpus": 1, "group": "demo", - "command": "/bin/sh", + "command": "sh", "args": ["-c", "for i in $(seq 1 20); do echo $i; sleep 2; done"] }, { @@ -15,7 +15,7 @@ "ram": 256, "cpus": 1, "group": "demo", - "command": "/usr/local/bin/python3", + "command": "python", "args": [ "-c", "import time; count=0; [print(f'Count: {count+1}') or time.sleep(2) or (count:=count+1) for _ in range(20)]" diff --git a/monocore.example.toml b/monocore.example.toml index 12fd8d7..a6840d6 100644 --- a/monocore.example.toml +++ b/monocore.example.toml @@ -4,7 +4,7 @@ base = "alpine:latest" ram = 256 cpus = 1 group = "demo" -command = "/bin/sh" +command = "sh" args = ["-c", "for i in $(seq 1 20); do echo $i; sleep 2; done"] [[service]] @@ -13,7 +13,7 @@ base = "python:3.11-slim" ram = 256 cpus = 1 group = "demo" -command = "/usr/local/bin/python3" +command = "python" args = [ "-c", "import time; count=0; [print(f'Count: {count+1}') or time.sleep(2) or (count:=count+1) for _ in range(20)]", diff --git a/monocore/Cargo.toml b/monocore/Cargo.toml index 3767543..75ef783 100644 --- a/monocore/Cargo.toml +++ b/monocore/Cargo.toml @@ -69,6 +69,8 @@ pretty-error-debug.workspace = true serde_yaml = "0.9.34" async-stream.workspace = true pin-project = "1.1.7" +tracing-appender = "0.2.3" +pin-project-lite = "0.2.15" [dev-dependencies] test-log.workspace = true diff --git a/monocore/README.md b/monocore/README.md index 627d286..4034409 100644 --- a/monocore/README.md +++ b/monocore/README.md @@ -7,19 +7,19 @@

- Discord + Discord Build Status - Monocore Crate + Monocore Crate - Monocore Docs + Monocore Docs - License + License

diff --git a/monocore/bin/monocore.rs b/monocore/bin/monocore.rs index 9bb3573..8d3b3d4 100644 --- a/monocore/bin/monocore.rs +++ b/monocore/bin/monocore.rs @@ -7,7 +7,7 @@ use monocore::{ config::Monocore, orchestration::Orchestrator, server::{self, ServerState}, - utils::{self, OCI_SUBDIR, ROOTFS_SUBDIR}, + utils::{self, OCI_SUBDIR}, MonocoreError, MonocoreResult, }; use serde::de::DeserializeOwned; @@ -118,8 +118,8 @@ async fn main() -> MonocoreResult<()> { Some(MonocoreSubcommand::Status {}) => { let current_exe = env::current_exe()?; let supervisor_path = current_exe.parent().unwrap().join(SUPERVISOR_EXE); - let rootfs_dir = monocore::utils::monocore_home_path().join(ROOTFS_SUBDIR); - let orchestrator = Orchestrator::load(&rootfs_dir, &supervisor_path).await?; + let home_dir = monocore::utils::monocore_home_path(); + let orchestrator = Orchestrator::load(&home_dir, &supervisor_path).await?; let statuses = orchestrator.status().await?; println!(); @@ -238,48 +238,10 @@ async fn main() -> MonocoreResult<()> { let current_exe = env::current_exe()?; let supervisor_path = current_exe.parent().unwrap().join(SUPERVISOR_EXE); let orchestrator = Orchestrator::load(&home_dir, &supervisor_path).await?; - - let mut log_stream = orchestrator.view_logs(&service, lines, follow).await?; - - if follow || no_pager { - // Set up Ctrl+C handler - let mut sigint = tokio::signal::unix::signal(SignalKind::interrupt())?; - - // Print directly to stdout for follow mode or when no pager is requested - loop { - tokio::select! { - maybe_line = log_stream.next() => { - match maybe_line { - Some(line) => { - print!("{}", line?); - std::io::stdout().flush()?; - } - None => break, - } - } - _ = sigint.recv() => { - break; - } - } - } - } else { - // Collect all content for pager mode - let mut content = String::new(); - while let Some(line) = log_stream.next().await { - content.push_str(&line?); - } - - let mut less = Command::new("less") - .arg("-R") // Handle ANSI color codes - .stdin(std::process::Stdio::piped()) - .spawn()?; - - if let Some(mut stdin) = less.stdin.take() { - stdin.write_all(content.as_bytes()).await?; - } - - less.wait().await?; - } + let mut log_stream = orchestrator + .view_logs(service.as_deref(), lines, follow) + .await?; + display_logs(&mut log_stream, no_pager, follow).await?; } None => { @@ -307,3 +269,69 @@ async fn parse_config_file( _ => toml::from_str(&content).map_err(MonocoreError::Toml), } } + +/// Helper function to display logs with optional pager and follow mode +async fn display_logs( + log_stream: &mut (impl futures::Stream> + Unpin), + no_pager: bool, + follow: bool, +) -> MonocoreResult<()> { + if follow || no_pager { + // Set up Ctrl+C handler + let mut sigint = tokio::signal::unix::signal(SignalKind::interrupt())?; + + // Print directly to stdout for follow mode or when no pager is requested + loop { + tokio::select! { + maybe_line = log_stream.next() => { + match maybe_line { + Some(line) => { + print!("{}", line?); + std::io::stdout().flush()?; + } + None => break, + } + } + _ = sigint.recv() => { + break; + } + } + } + } else { + // Try to spawn less, fall back to direct output if not available + let less_result = Command::new("less") + .arg("-R") // Handle ANSI color codes + .stdin(std::process::Stdio::piped()) + .spawn(); + + match less_result { + Ok(mut less) => { + let mut stdin = less.stdin.take().ok_or_else(|| { + MonocoreError::PagerError("Failed to open stdin to pager".to_string()) + })?; + + // Stream directly to less + while let Some(line) = log_stream.next().await { + let line = line?; + if stdin.write_all(line.as_bytes()).await.is_err() { + // Pager was probably closed by user + break; + } + } + + // Close pager's stdin and wait for it + drop(stdin); + less.wait().await?; + } + Err(_) => { + // less not available, fall back to direct output + while let Some(line) = log_stream.next().await { + print!("{}", line?); + std::io::stdout().flush()?; + } + } + } + } + + Ok(()) +} diff --git a/monocore/bin/monokrun.rs b/monocore/bin/monokrun.rs index 0a15b93..00d0e98 100644 --- a/monocore/bin/monokrun.rs +++ b/monocore/bin/monokrun.rs @@ -2,12 +2,14 @@ use std::{env, net::Ipv4Addr, path::PathBuf}; use monocore::{ config::{Group, Service}, - runtime::Supervisor, + runtime::{RotatingLog, Supervisor}, + utils::{LOG_SUBDIR, SUPERVISORS_LOG_FILENAME}, vm::MicroVm, MonocoreError, MonocoreResult, }; use tokio::signal::unix::{signal, SignalKind}; use tracing::{error, info}; +use tracing_subscriber::{layer::SubscriberExt, util::SubscriberInitExt, EnvFilter}; //-------------------------------------------------------------------------------------------------- // Function: main @@ -26,7 +28,7 @@ use tracing::{error, info}; /// /// Expected arguments for subprocess mode: /// ```text -/// monokrun --run-microvm +/// monokrun --run-microvm /// ``` #[tokio::main] pub async fn main() -> MonocoreResult<()> { @@ -37,9 +39,9 @@ pub async fn main() -> MonocoreResult<()> { // Handle microvm mode let service: Service = serde_json::from_str(&args[2])?; let group: Group = serde_json::from_str(&args[3])?; - let local_only: bool = serde_json::from_str(&args[4])?; - let group_ip: Option = serde_json::from_str(&args[5])?; - let rootfs_path = PathBuf::from(&args[6]); + let group_ip: Option = serde_json::from_str(&args[4])?; + let rootfs_path = PathBuf::from(&args[5]); + let local_only: bool = serde_json::from_str(&args[6])?; // Resolve environment variables let env_pairs = service.resolve_environment_variables(&group)?; @@ -50,7 +52,7 @@ pub async fn main() -> MonocoreResult<()> { .root_path(rootfs_path) .num_vcpus(service.get_cpus()) .ram_mib(service.get_ram()) - .port_map(service.get_port().cloned().into_iter()) + .port_map(service.get_ports().iter().cloned()) .workdir_path(service.get_workdir().unwrap_or("/")) .exec_path(service.get_command().unwrap_or("/bin/sh")) .args(service.get_args().iter().map(|s| s.as_str())) @@ -70,16 +72,49 @@ pub async fn main() -> MonocoreResult<()> { } // Check for supervisor mode - if args.len() == 6 && args[1] == "--run-supervisor" { - tracing_subscriber::fmt().init(); - + if args.len() == 7 && args[1] == "--run-supervisor" { let service: Service = serde_json::from_str(&args[2])?; let group: Group = serde_json::from_str(&args[3])?; let group_ip: Option = serde_json::from_str(&args[4])?; let rootfs_path = PathBuf::from(&args[5]); + let home_dir = &args[6]; + + // Create a rotating log file that automatically rotates when reaching max size + let rotating_log = RotatingLog::new( + PathBuf::from(home_dir) + .join(LOG_SUBDIR) + .join(SUPERVISORS_LOG_FILENAME), + None, + ) + .await?; + + // Bridge between our async rotating log and tracing's sync writer requirement + let sync_writer = rotating_log.get_sync_writer(); + + // Create a non-blocking writer to prevent logging from blocking execution + let (non_blocking, _guard) = tracing_appender::non_blocking(sync_writer); + + // Configure log level filtering from environment variables + let env_filter = EnvFilter::try_from_default_env() + .or_else(|_| EnvFilter::try_new("debug")) + .unwrap(); + + // Configure file output format without ANSI colors or target field + let file_layer = tracing_subscriber::fmt::layer() + .with_writer(non_blocking) + .with_ansi(false) + .with_target(false) + .with_file(true); + + // Set up the global tracing subscriber + tracing_subscriber::registry() + .with(env_filter) + .with(file_layer) + .init(); // Create and start the supervisor - let mut supervisor = Supervisor::new(service, group, group_ip, rootfs_path).await?; + let mut supervisor = + Supervisor::new(home_dir, service, group, group_ip, rootfs_path).await?; // Set up signal handler for graceful shutdown let mut term_signal = signal(SignalKind::terminate())?; @@ -111,6 +146,6 @@ pub async fn main() -> MonocoreResult<()> { // If we get here, no valid subcommand was provided Err(MonocoreError::InvalidSupervisorArgs( - "Usage: monokrun --run-supervisor \n monokrun --run-microvm ".into(), + "Usage: monokrun --run-supervisor \n monokrun --run-microvm >".into(), )) } diff --git a/monocore/lib/cli/args.rs b/monocore/lib/cli/args.rs index be9af3b..2c3ad03 100644 --- a/monocore/lib/cli/args.rs +++ b/monocore/lib/cli/args.rs @@ -74,11 +74,10 @@ pub enum MonocoreSubcommand { Status {}, /// Display service logs - #[command(arg_required_else_help = true)] + #[command(arg_required_else_help = false)] Log { - /// Name of the service to show logs for - #[arg(required = true)] - service: String, + /// Name of the service to show logs for. If not specified, shows supervisor logs + service: Option, /// Number of lines to show (from the end) #[arg(short = 'n')] diff --git a/monocore/lib/config/merge.rs b/monocore/lib/config/merge.rs index 65f6f0a..787a789 100644 --- a/monocore/lib/config/merge.rs +++ b/monocore/lib/config/merge.rs @@ -180,15 +180,23 @@ mod tests { #[test] fn test_monocore_merge_basic() { - // Create two services with different names + // Create two services with different names and multiple ports let service1 = Service::builder() .name("service1") .command("./test1") + .ports(vec![ + PortPair::with_same(8080), + PortPair::with_distinct(8081, 81), + ]) .build(); let service2 = Service::builder() .name("service2") .command("./test2") + .ports(vec![ + PortPair::with_same(9090), + PortPair::with_distinct(9091, 91), + ]) .build(); // Create two valid configurations, each with one service @@ -209,20 +217,49 @@ mod tests { assert_eq!(merged.services.len(), 2); assert!(merged.services.iter().any(|s| s.get_name() == "service1")); assert!(merged.services.iter().any(|s| s.get_name() == "service2")); + + // Verify ports are preserved + let merged_service1 = merged + .services + .iter() + .find(|s| s.get_name() == "service1") + .unwrap(); + assert_eq!(merged_service1.ports.len(), 2); + assert!(merged_service1.ports.contains(&PortPair::with_same(8080))); + assert!(merged_service1 + .ports + .contains(&PortPair::with_distinct(8081, 81))); + + let merged_service2 = merged + .services + .iter() + .find(|s| s.get_name() == "service2") + .unwrap(); + assert_eq!(merged_service2.ports.len(), 2); + assert!(merged_service2.ports.contains(&PortPair::with_same(9090))); + assert!(merged_service2 + .ports + .contains(&PortPair::with_distinct(9091, 91))); } #[test] fn test_monocore_merge_service_update() { - // Create original service + // Create original service with single port let service1 = Service::builder() .name("service1") .command("./test1") + .ports(vec![PortPair::with_same(8080)]) .build(); - // Create updated version of the same service + // Create updated version of the same service with multiple ports let service1_updated = Service::builder() .name("service1") .command("./test1_updated") + .ports(vec![ + PortPair::with_same(8080), + PortPair::with_distinct(8081, 81), + PortPair::with_distinct(8082, 82), + ]) .build(); // Create configurations with original and updated services @@ -242,6 +279,18 @@ mod tests { let merged = config1.merge(&config2).unwrap(); assert_eq!(merged.services.len(), 1); assert_eq!(merged.services[0].get_command().unwrap(), "./test1_updated"); + + // Verify updated ports + assert_eq!(merged.services[0].ports.len(), 3); + assert!(merged.services[0] + .ports + .contains(&PortPair::with_same(8080))); + assert!(merged.services[0] + .ports + .contains(&PortPair::with_distinct(8081, 81))); + assert!(merged.services[0] + .ports + .contains(&PortPair::with_distinct(8082, 82))); } #[test] @@ -249,18 +298,24 @@ mod tests { // Create a group let group = Group::builder().name("test-group").build(); - // Create two services in the same group that use the same port + // Create two services in the same group that use conflicting ports let service1 = Service::builder() .name("service1") .group("test-group") - .port("8080:8080".parse::().unwrap()) + .ports(vec![ + PortPair::with_same(8080), + PortPair::with_distinct(8081, 81), + ]) .command("./test1") .build(); let service2 = Service::builder() .name("service2") .group("test-group") - .port("8080:8080".parse::().unwrap()) + .ports(vec![ + PortPair::with_same(8080), // Conflicts with service1 + PortPair::with_distinct(8082, 82), // This one is fine + ]) .command("./test2") .build(); @@ -292,18 +347,24 @@ mod tests { let group1 = Group::builder().name("group1").build(); let group2 = Group::builder().name("group2").build(); - // Create services in different groups using the same port + // Create services in different groups using overlapping ports let service1 = Service::builder() .name("service1") .group("group1") - .port("8080:8080".parse::().unwrap()) + .ports(vec![ + PortPair::with_same(8080), + PortPair::with_distinct(8081, 81), + ]) .command("./test1") .build(); let service2 = Service::builder() .name("service2") .group("group2") - .port("8080:8080".parse::().unwrap()) + .ports(vec![ + PortPair::with_same(8080), // Same port is fine in different group + PortPair::with_distinct(8082, 82), + ]) .command("./test2") .build(); @@ -326,6 +387,29 @@ mod tests { let merged = result.unwrap(); assert_eq!(merged.services.len(), 2); assert_eq!(merged.groups.len(), 2); + + // Verify all ports are preserved + let merged_service1 = merged + .services + .iter() + .find(|s| s.get_name() == "service1") + .unwrap(); + assert_eq!(merged_service1.ports.len(), 2); + assert!(merged_service1.ports.contains(&PortPair::with_same(8080))); + assert!(merged_service1 + .ports + .contains(&PortPair::with_distinct(8081, 81))); + + let merged_service2 = merged + .services + .iter() + .find(|s| s.get_name() == "service2") + .unwrap(); + assert_eq!(merged_service2.ports.len(), 2); + assert!(merged_service2.ports.contains(&PortPair::with_same(8080))); + assert!(merged_service2 + .ports + .contains(&PortPair::with_distinct(8082, 82))); } #[test] diff --git a/monocore/lib/config/monocore.rs b/monocore/lib/config/monocore.rs index 3ab077d..b3404f0 100644 --- a/monocore/lib/config/monocore.rs +++ b/monocore/lib/config/monocore.rs @@ -134,8 +134,8 @@ pub struct Service { pub(super) depends_on: Vec, /// The port to expose. - #[serde(skip_serializing_if = "Option::is_none", default)] - pub(super) port: Option, + #[serde(skip_serializing_if = "Vec::is_empty", default)] + pub(super) ports: Vec, /// The working directory to use. #[serde(skip_serializing_if = "Option::is_none", default)] @@ -330,12 +330,20 @@ impl Service { &self.depends_on } - /// Gets the port of the service. + /// Gets the mapped ports of the service. /// /// ## Returns - /// The port of the service, or None if the service is not exposed. - pub fn get_port(&self) -> Option<&PortPair> { - self.port.as_ref() + /// The ports of the service. + pub fn get_ports(&self) -> &[PortPair] { + &self.ports + } + + /// Sets the mapped ports of the service. + /// + /// ## Arguments + /// * `ports` - A vector of port mappings to set + pub fn set_ports(&mut self, ports: Vec) { + self.ports = ports; } /// Gets the working directory of the service. @@ -516,6 +524,31 @@ impl Service { Ok(volume_mounts) } + + /// Sets the working directory of the service. + pub fn set_workdir(&mut self, workdir: String) { + self.workdir = Some(workdir); + } + + /// Sets the command of the service. + pub fn set_command(&mut self, command: String) { + self.command = Some(command); + } + + /// Sets the arguments of the service. + pub fn set_args(&mut self, args: Vec) { + self.args = args; + } + + /// Sets the environment variables of the service. + pub fn set_envs(&mut self, envs: Vec) { + self.envs = envs.into_iter().filter_map(|e| e.parse().ok()).collect(); + } + + /// Gets all environment variables of the service. + pub fn get_envs(&self) -> &[EnvPair] { + &self.envs + } } impl Group { @@ -552,7 +585,7 @@ mod tests { volumes = [ "/var/lib/postgresql/data:/" ] - port = "5432:5432" + ports = ["5432:5432"] [[service]] name = "server" @@ -566,7 +599,7 @@ mod tests { group = "app_grp" group_envs = ["app_grp_env"] depends_on = ["database"] - port = "3000:3000" + ports = ["3000:3000"] command = "/app/bin/mcp-server" [[service.group_volumes]] @@ -596,7 +629,7 @@ mod tests { .name("database") .base("postgres:16.1") .volumes(vec!["/var/lib/postgresql/data:/".parse::()?]) - .port("5432:5432".parse::()?) + .ports(vec!["5432:5432".parse::()?]) .build(), Service::builder() .name("server") @@ -606,7 +639,7 @@ mod tests { .group("app_grp") .group_envs(vec!["app_grp_env".to_string()]) .depends_on(vec!["database".to_string()]) - .port("3000:3000".parse::()?) + .ports(vec!["3000:3000".parse::()?]) .command("/app/bin/mcp-server") .group_volumes(vec![VolumeMount::builder() .name("app_grp_vol") @@ -641,7 +674,7 @@ mod tests { "name": "database", "base": "postgres:16.1", "volumes": ["/var/lib/postgresql/data:/"], - "port": "5432:5432" + "ports": ["5432:5432"] }, { "name": "server", @@ -651,7 +684,7 @@ mod tests { "group": "app_grp", "group_envs": ["app_grp_env"], "depends_on": ["database"], - "port": "3000:3000", + "ports": ["3000:3000"], "command": "/app/bin/mcp-server", "group_volumes": [ { @@ -689,7 +722,7 @@ mod tests { .name("database") .base("postgres:16.1") .volumes(vec!["/var/lib/postgresql/data:/".parse::()?]) - .port("5432:5432".parse::()?) + .ports(vec!["5432:5432".parse::()?]) .build(), Service::builder() .name("server") @@ -699,7 +732,7 @@ mod tests { .group("app_grp") .group_envs(vec!["app_grp_env".to_string()]) .depends_on(vec!["database".to_string()]) - .port("3000:3000".parse::()?) + .ports(vec!["3000:3000".parse::()?]) .command("/app/bin/mcp-server") .group_volumes(vec![VolumeMount::builder() .name("app_grp_vol") diff --git a/monocore/lib/config/service_builder.rs b/monocore/lib/config/service_builder.rs index 5b331dc..287d50d 100644 --- a/monocore/lib/config/service_builder.rs +++ b/monocore/lib/config/service_builder.rs @@ -19,7 +19,7 @@ pub struct ServiceBuilder { group_volumes: Vec, group_envs: Vec, depends_on: Vec, - port: Option, + ports: Vec, workdir: Option, command: Option, args: Vec, @@ -74,9 +74,9 @@ impl ServiceBuilder { self } - /// Sets the port mapping for the service - pub fn port(mut self, port: PortPair) -> Self { - self.port = Some(port); + /// Sets the port mappings for the service + pub fn ports(mut self, ports: impl IntoIterator) -> Self { + self.ports = ports.into_iter().collect(); self } @@ -121,7 +121,7 @@ impl ServiceBuilder { group_volumes: self.group_volumes, group_envs: self.group_envs, depends_on: self.depends_on, - port: self.port, + ports: self.ports, workdir: self.workdir, command: self.command, args: self.args, @@ -143,7 +143,7 @@ impl ServiceBuilder { group_volumes: self.group_volumes, group_envs: self.group_envs, depends_on: self.depends_on, - port: self.port, + ports: self.ports, workdir: self.workdir, command: self.command, args: self.args, @@ -168,7 +168,7 @@ impl Default for ServiceBuilder<()> { group_volumes: vec![], group_envs: vec![], depends_on: vec![], - port: None, + ports: vec![], workdir: None, command: None, args: vec![], @@ -194,18 +194,18 @@ mod tests { .name("test-service") .base("ubuntu:24.04") .group("app") - .volumes(vec!["/app;".parse()?]) - .envs(vec!["ENV=main".parse()?]) - .group_volumes(vec![VolumeMount::builder() + .volumes(["/app;".parse()?]) + .envs(["ENV=main".parse()?]) + .group_volumes([VolumeMount::builder() .name("main".to_string()) .mount(PathPair::Same("/app".parse()?)) .build()]) - .group_envs(vec!["main".to_string()]) - .depends_on(vec!["db".to_string()]) - .port("8080:80".parse()?) + .group_envs(["main".to_string()]) + .depends_on(["db".to_string()]) + .ports(["8080:80".parse()?]) .workdir("/app") .command("./app") - .args(vec!["--port", "80"]) + .args(["--port", "80"]) .cpus(2) .ram(1024) .build(); @@ -218,7 +218,7 @@ mod tests { assert_eq!(service.group_volumes.len(), 1); assert_eq!(service.group_envs, vec!["main".to_string()]); assert_eq!(service.depends_on, vec!["db".to_string()]); - assert_eq!(service.port, Some("8080:80".parse()?)); + assert_eq!(service.ports, vec!["8080:80".parse()?]); assert_eq!(service.workdir, Some("/app".to_string())); assert_eq!(service.command, Some("./app".to_string())); assert_eq!(service.args, vec!["--port", "80"]); @@ -240,7 +240,7 @@ mod tests { assert!(service.group_volumes.is_empty()); assert!(service.group_envs.is_empty()); assert!(service.depends_on.is_empty()); - assert_eq!(service.port, None); + assert!(service.ports.is_empty()); assert_eq!(service.workdir, None); assert_eq!(service.command, None); assert!(service.args.is_empty()); diff --git a/monocore/lib/config/validate.rs b/monocore/lib/config/validate.rs index c9c2c3f..ac7c832 100644 --- a/monocore/lib/config/validate.rs +++ b/monocore/lib/config/validate.rs @@ -142,12 +142,15 @@ impl Monocore { /// Validates that services within the same group don't have port conflicts fn validate_service_ports(&self, services: &[Service], errors: &mut Vec) { + // Map of group name (or None for default group) to a map of host ports to service names let mut used_ports: HashMap, HashMap> = HashMap::new(); for service in services { - if let Some(port) = &service.port { + let group_ports = used_ports.entry(service.group.clone()).or_default(); + + // Check each port in the service's ports vector + for port in &service.ports { let host_port = port.get_host(); - let group_ports = used_ports.entry(service.group.clone()).or_default(); if let Some(existing_service) = group_ports.get(&host_port) { errors.push(format!( @@ -1098,14 +1101,20 @@ mod tests { let service1 = Service::builder() .name("service1") .group("test-group") - .port("8080:8080".parse::().unwrap()) + .ports(vec![ + PortPair::with_same(8080), + PortPair::with_distinct(8081, 81), + ]) .command("./test1") .build(); let service2 = Service::builder() .name("service2") .group("test-group") - .port("8080:8080".parse::().unwrap()) + .ports(vec![ + PortPair::with_same(8080), // Conflicts with service1 + PortPair::with_distinct(8082, 82), // This one is fine + ]) .command("./test2") .build(); @@ -1128,14 +1137,20 @@ mod tests { let service1 = Service::builder() .name("service1") .group("group1") - .port("8080:8080".parse::().unwrap()) + .ports(vec![ + PortPair::with_same(8080), + PortPair::with_distinct(8081, 81), + ]) .command("./test1") .build(); let service2 = Service::builder() .name("service2") .group("group2") - .port("8080:8080".parse::().unwrap()) + .ports(vec![ + PortPair::with_same(8080), // Same port is fine in different group + PortPair::with_distinct(8082, 82), + ]) .command("./test2") .build(); @@ -1157,13 +1172,19 @@ mod tests { // Create services with no group (default group) with conflicting ports let service1 = Service::builder() .name("service1") - .port("8080:8080".parse::().unwrap()) + .ports(vec![ + PortPair::with_same(8080), + PortPair::with_distinct(8081, 81), + ]) .command("./test1") .build(); let service2 = Service::builder() .name("service2") - .port("8080:8080".parse::().unwrap()) + .ports(vec![ + PortPair::with_same(8080), // Conflicts with service1 + PortPair::with_distinct(8082, 82), // This one is fine + ]) .command("./test2") .build(); @@ -1178,6 +1199,73 @@ mod tests { assert!(errors[0].contains("Port 8080 is already in use")); } + #[test] + fn test_validate_service_ports_multiple_conflicts() { + // Create services with multiple port conflicts + let service1 = Service::builder() + .name("service1") + .group("test-group") + .ports(vec![ + PortPair::with_same(8080), + PortPair::with_same(8081), + PortPair::with_distinct(8082, 82), + ]) + .command("./test1") + .build(); + + let service2 = Service::builder() + .name("service2") + .group("test-group") + .ports(vec![ + PortPair::with_same(8080), // Conflicts with service1 + PortPair::with_same(8081), // Also conflicts with service1 + PortPair::with_distinct(8083, 83), // This one is fine + ]) + .command("./test2") + .build(); + + let group = Group::builder().name("test-group").build(); + + let config = Monocore { + services: vec![service1, service2], + groups: vec![group], + }; + + let mut errors = Vec::new(); + config.validate_service_ports(&config.services, &mut errors); + assert_eq!(errors.len(), 2); // Should have two conflict errors + assert!(errors + .iter() + .any(|e| e.contains("Port 8080 is already in use"))); + assert!(errors + .iter() + .any(|e| e.contains("Port 8081 is already in use"))); + } + + #[test] + fn test_validate_service_ports_same_service() { + // Create a service with duplicate ports + let service = Service::builder() + .name("service1") + .ports(vec![ + PortPair::with_same(8080), + PortPair::with_same(8080), // Duplicate port in same service + PortPair::with_distinct(8081, 81), + ]) + .command("./test") + .build(); + + let config = Monocore { + services: vec![service], + groups: vec![], + }; + + let mut errors = Vec::new(); + config.validate_service_ports(&config.services, &mut errors); + assert_eq!(errors.len(), 1); + assert!(errors[0].contains("Port 8080 is already in use")); + } + #[test] fn test_monocore_validate_check_circular_dependencies() { // Create services with circular dependency diff --git a/monocore/lib/error.rs b/monocore/lib/error.rs index 83bce0f..1c2271f 100644 --- a/monocore/lib/error.rs +++ b/monocore/lib/error.rs @@ -238,6 +238,14 @@ pub enum MonocoreError { /// An error that occurred when failed to parse configuration file #[error("Failed to parse configuration file: {0}")] ConfigParseError(String), + + /// An error that occurred when a log file was not found + #[error("log not found: {0}")] + LogNotFound(String), + + /// An error that occurred when a pager error occurred + #[error("pager error: {0}")] + PagerError(String), } /// An error that occurred when an invalid MicroVm configuration was used. diff --git a/monocore/lib/orchestration/log.rs b/monocore/lib/orchestration/log.rs index 0cc2156..f88d7be 100644 --- a/monocore/lib/orchestration/log.rs +++ b/monocore/lib/orchestration/log.rs @@ -6,7 +6,10 @@ use tokio::{ time, }; -use crate::{utils::LOG_SUBDIR, MonocoreResult}; +use crate::{ + utils::{LOG_SUBDIR, SUPERVISORS_LOG_FILENAME}, + MonocoreResult, +}; use super::Orchestrator; @@ -25,7 +28,7 @@ impl Orchestrator { /// * `follow` - Whether to continuously follow the log output pub async fn view_logs( &self, - service_name: &str, + service_name: Option<&str>, lines: Option, follow: bool, ) -> MonocoreResult { @@ -34,25 +37,41 @@ impl Orchestrator { // Ensure log directory exists if !fs::try_exists(&log_dir).await? { - let msg = format!("No logs found for service '{}'", service_name); + let msg = "No logs found".to_string(); return Ok(Box::pin(futures::stream::once(futures::future::ready(Ok( msg, ))))); } - let log_path = log_dir.join(format!("{}.stdout.log", service_name)); + let log_path = if let Some(service_name) = service_name { + let log_path = log_dir.join(format!("{}.stdout.log", service_name)); - // Check if log file exists - if !fs::try_exists(&log_path).await? { - let msg = format!("No logs found for service '{}'", service_name); - return Ok(Box::pin(futures::stream::once(futures::future::ready(Ok( - msg, - ))))); - } + // Check if log file exists + if !fs::try_exists(&log_path).await? { + let msg = format!("No logs found for service '{}'", service_name); + return Ok(Box::pin(futures::stream::once(futures::future::ready(Ok( + msg, + ))))); + } + + log_path + } else { + let log_path = log_dir.join(SUPERVISORS_LOG_FILENAME); + + // Ensure supervisor log file exists + if !fs::try_exists(&log_path).await? { + let msg = "Supervisor logs not found".to_string(); + return Ok(Box::pin(futures::stream::once(futures::future::ready(Ok( + msg, + ))))); + } + + log_path + }; // Read initial content let content = fs::read_to_string(&log_path).await?; - let initial_content = if let Some(n) = lines { + let content = if let Some(n) = lines { let lines: Vec<&str> = content.lines().collect(); let start = if lines.len() > n { lines.len() - n } else { 0 }; lines[start..].join("\n") @@ -60,6 +79,13 @@ impl Orchestrator { content }; + // Ensure content ends with newline + let initial_content = if content.ends_with('\n') { + content + } else { + content + "\n" + }; + if !follow { return Ok(Box::pin(futures::stream::once(futures::future::ready(Ok( initial_content, diff --git a/monocore/lib/orchestration/orchestrator.rs b/monocore/lib/orchestration/orchestrator.rs index f507467..2828591 100644 --- a/monocore/lib/orchestration/orchestrator.rs +++ b/monocore/lib/orchestration/orchestrator.rs @@ -14,7 +14,7 @@ use tokio::{ use crate::{ config::{Monocore, Service}, runtime::MicroVmState, - utils::{MONOCORE_LOG_DIR, MONOCORE_STATE_DIR}, + utils::{LOG_SUBDIR, STATE_SUBDIR}, MonocoreError, MonocoreResult, }; @@ -80,8 +80,10 @@ impl Orchestrator { supervisor_exe_path: impl AsRef, log_retention_policy: LogRetentionPolicy, ) -> MonocoreResult { + let state_dir = home_dir.as_ref().join(STATE_SUBDIR); + // Ensure the state directory exists - fs::create_dir_all(&*MONOCORE_STATE_DIR).await?; + fs::create_dir_all(&state_dir).await?; // Verify supervisor binary exists let supervisor_exe_path = supervisor_exe_path.as_ref().to_path_buf(); @@ -142,8 +144,10 @@ impl Orchestrator { supervisor_exe_path: impl AsRef, log_retention_policy: LogRetentionPolicy, ) -> MonocoreResult { + let state_dir = home_dir.as_ref().join(STATE_SUBDIR); + // Ensure the state directory exists - fs::create_dir_all(&*MONOCORE_STATE_DIR).await?; + fs::create_dir_all(&state_dir).await?; // Verify supervisor binary exists let supervisor_exe_path = supervisor_exe_path.as_ref().to_path_buf(); @@ -154,7 +158,7 @@ impl Orchestrator { } // Load state from files - let state = utils::load_state_from_files(&MONOCORE_STATE_DIR).await?; + let state = utils::load_state_from_files(&state_dir).await?; // Create config from state let ( @@ -221,16 +225,18 @@ impl Orchestrator { /// Performs cleanup of old log files based on the configured maximum age. Removes /// files that exceed the age threshold and logs the cleanup activity. pub async fn cleanup_old_logs(&self) -> MonocoreResult<()> { + let log_dir = self.home_dir.join(LOG_SUBDIR); + // Ensure log directory exists before attempting cleanup - if !fs::try_exists(&*MONOCORE_LOG_DIR).await? { - fs::create_dir_all(&*MONOCORE_LOG_DIR).await?; + if !fs::try_exists(&log_dir).await? { + fs::create_dir_all(&log_dir).await?; return Ok(()); } let now = SystemTime::now(); let mut cleaned_files = 0; - let mut entries = fs::read_dir(&*MONOCORE_LOG_DIR).await?; + let mut entries = fs::read_dir(&log_dir).await?; while let Some(entry) = entries.next_entry().await? { if self .should_delete_log(&entry, now, self.log_retention_policy.max_age) diff --git a/monocore/lib/orchestration/status.rs b/monocore/lib/orchestration/status.rs index abe37f3..4def605 100644 --- a/monocore/lib/orchestration/status.rs +++ b/monocore/lib/orchestration/status.rs @@ -1,6 +1,6 @@ use tokio::fs; -use crate::{runtime::MicroVmState, utils::MONOCORE_STATE_DIR, MonocoreResult}; +use crate::{runtime::MicroVmState, utils::STATE_SUBDIR, MonocoreResult}; use super::{utils, Orchestrator, ServiceStatus}; @@ -15,15 +15,16 @@ impl Orchestrator { pub async fn status(&self) -> MonocoreResult> { let mut statuses = Vec::new(); let mut stale_files = Vec::new(); + let state_dir = self.home_dir.join(STATE_SUBDIR); // Ensure directory exists before reading - if !fs::try_exists(&*MONOCORE_STATE_DIR).await? { - fs::create_dir_all(&*MONOCORE_STATE_DIR).await?; + if !fs::try_exists(&state_dir).await? { + fs::create_dir_all(&state_dir).await?; return Ok(statuses); } // Read all state files from the state directory - let mut dir = fs::read_dir(&*MONOCORE_STATE_DIR).await?; + let mut dir = fs::read_dir(&state_dir).await?; while let Some(entry) = dir.next_entry().await? { let path = entry.path(); if path.is_file() && path.extension().is_some_and(|ext| ext == "json") { diff --git a/monocore/lib/orchestration/up.rs b/monocore/lib/orchestration/up.rs index 576651d..0644c79 100644 --- a/monocore/lib/orchestration/up.rs +++ b/monocore/lib/orchestration/up.rs @@ -8,11 +8,7 @@ use crate::{ MonocoreError, MonocoreResult, }; use std::{collections::HashSet, net::Ipv4Addr, process::Stdio}; -use tokio::{ - fs, - io::{AsyncBufReadExt, BufReader}, - process::{Child, Command}, -}; +use tokio::{fs, process::Command}; //-------------------------------------------------------------------------------------------------- // Methods @@ -203,6 +199,7 @@ impl Orchestrator { &group_json, &group_ip_json, service_rootfs.to_str().unwrap(), + self.home_dir.to_str().unwrap(), ]) .stdout(Stdio::piped()) .stderr(Stdio::piped()) @@ -221,71 +218,9 @@ impl Orchestrator { pid ); - // Spawn tasks to handle stdout and stderr - let service_name = service.get_name().to_string(); - self.spawn_output_handler(child, service_name); - Ok(()) } - /// Sets up asynchronous handlers for process output streams, capturing stdout and stderr - /// from the supervised process and logging them appropriately. - fn spawn_output_handler(&self, mut child: Child, service_name: String) { - // Handle stdout - match child.stdout.take() { - Some(stdout) => { - let stdout_service_name = service_name.clone(); - tokio::spawn(async move { - let mut reader = BufReader::new(stdout).lines(); - while let Ok(Some(line)) = reader.next_line().await { - tracing::info!("[{}/stdout] {}", stdout_service_name, line); - } - }); - } - None => { - tracing::warn!( - "Failed to capture stdout for supervisor of service {}", - service_name - ); - } - } - - // Handle stderr - match child.stderr.take() { - Some(stderr) => { - let stderr_service_name = service_name.clone(); - tokio::spawn(async move { - let mut reader = BufReader::new(stderr).lines(); - while let Ok(Some(line)) = reader.next_line().await { - tracing::error!("[{}/stderr] {}", stderr_service_name, line); - } - }); - } - None => { - tracing::warn!( - "Failed to capture stderr for supervisor of service {}", - service_name - ); - } - } - - // Wait for the child process - tokio::spawn(async move { - match child.wait().await { - Ok(status) => { - tracing::info!( - "Service supervisor for {} exited with status: {}", - service_name, - status - ); - } - Err(e) => { - tracing::error!("Failed to wait for service {}: {}", service_name, e); - } - } - }); - } - /// Assigns an IP address to a group from the 127.0.0.x range. /// Returns the existing IP if the group already has one assigned. /// diff --git a/monocore/lib/runtime/log.rs b/monocore/lib/runtime/log.rs new file mode 100644 index 0000000..ad6dd61 --- /dev/null +++ b/monocore/lib/runtime/log.rs @@ -0,0 +1,329 @@ +//! Log rotation implementation for the Monocore runtime. +//! +//! This module provides a rotating log implementation that automatically rotates log files +//! when they reach a specified size. The rotation process involves: +//! 1. Renaming the current log file to .old extension +//! 2. Creating a new empty log file +//! 3. Continuing writing to the new file +//! +//! The implementation is fully asynchronous and implements AsyncWrite. + +use futures::future::BoxFuture; +use std::{ + io::{self, Write}, + path::{Path, PathBuf}, + pin::Pin, + sync::{ + atomic::{AtomicU64, Ordering}, + Arc, + }, + task::{Context, Poll}, +}; +use tokio::{ + fs::{remove_file, rename, File, OpenOptions}, + io::{AsyncWrite, AsyncWriteExt}, + sync::mpsc::{self, UnboundedReceiver, UnboundedSender}, + task::JoinHandle, +}; + +//-------------------------------------------------------------------------------------------------- +// Types +//-------------------------------------------------------------------------------------------------- + +/// A rotating log file that automatically rotates when reaching a maximum size. +/// +/// The log rotation process preserves the last full log file with a ".old" extension +/// while continuing to write to a new log file with the original name. +/// +/// # Example +/// +/// ```no_run +/// use monocore::runtime::RotatingLog; +/// +/// #[tokio::main] +/// async fn main() -> std::io::Result<()> { +/// let log = RotatingLog::new("app.log", Some(1024 * 1024)).await?; // 1MB max size +/// Ok(()) +/// } +/// ``` +pub struct RotatingLog { + /// The current log file being written to + file: File, + + /// Path to the current log file + path: PathBuf, + + /// Maximum size in bytes before rotation + max_size: u64, + + /// Current size of the log file (shared between sync and async paths) + current_size: Arc, + + /// Current state of the log rotation + state: State, + + /// Channel for sending data to sync writer + tx: UnboundedSender>, + + /// Background task handle + _background_task: JoinHandle<()>, +} + +/// Internal state machine for managing log rotation +enum State { + /// Normal operation, ready to accept writes + Idle, + + /// Currently performing log rotation + Rotating(RotationFuture), + + /// Currently writing data + Writing, +} + +/// A sync writer that sends all written data to a channel. +pub struct SyncChannelWriter { + tx: UnboundedSender>, +} + +type RotationFuture = BoxFuture<'static, io::Result<(File, PathBuf)>>; + +//-------------------------------------------------------------------------------------------------- +// Methods +//-------------------------------------------------------------------------------------------------- + +impl RotatingLog { + /// Default maximum log file size (10MB) + pub const DEFAULT_MAX_SIZE: u64 = 10 * 1024 * 1024; + + /// Creates a new rotating log file. + /// + /// # Arguments + /// + /// * `path` - Path to the log file + /// * `max_size` - Optional maximum size in bytes before rotation. If None, uses DEFAULT_MAX_SIZE + /// + /// # Returns + /// + /// Returns a Result containing the new RotatingLog instance or an IO error + /// + /// # Errors + /// + /// Will return an error if: + /// * The file cannot be created or opened + /// * File metadata cannot be read + pub async fn new(path: impl AsRef, max_size: Option) -> io::Result { + let path = path.as_ref().to_path_buf(); + let file = OpenOptions::new() + .create(true) + .append(true) + .open(&path) + .await?; + let metadata = file.metadata().await?; + let (tx, rx) = mpsc::unbounded_channel(); + + // Create shared atomic counter for current size + let current_size = Arc::new(AtomicU64::new(metadata.len())); + + // Create a clone of the file and size counter for the background task + let bg_file = file.try_clone().await?; + let bg_path = path.clone(); + let bg_max_size = max_size.unwrap_or(Self::DEFAULT_MAX_SIZE); + let bg_size = Arc::clone(¤t_size); + + // Spawn background task to handle channel data + let background_task = tokio::spawn(async move { + handle_channel_data(rx, bg_file, bg_path, bg_max_size, bg_size).await + }); + + Ok(Self { + file, + path, + max_size: max_size.unwrap_or(Self::DEFAULT_MAX_SIZE), + current_size, + state: State::Idle, + tx, + _background_task: background_task, + }) + } + + /// Get a sync writer that implements std::io::Write + pub fn get_sync_writer(&self) -> SyncChannelWriter { + SyncChannelWriter::new(self.tx.clone()) + } +} + +impl SyncChannelWriter { + /// Creates a new `SyncChannelWriter` with the given channel sender. + pub fn new(tx: UnboundedSender>) -> Self { + Self { tx } + } +} + +//-------------------------------------------------------------------------------------------------- +// Functions +//-------------------------------------------------------------------------------------------------- + +/// Performs the actual log rotation operation. +/// +/// # Arguments +/// +/// * `file` - The current log file to be rotated +/// * `path` - Path to the current log file +/// +/// # Returns +/// +/// Returns a tuple containing: +/// * The newly created log file +/// * The path to the new log file +/// +/// # Errors +/// +/// Will return an error if: +/// * File synchronization fails +/// * Old backup file cannot be removed +/// * File rename operation fails +/// * New log file cannot be created +async fn do_rotation(file: File, path: PathBuf) -> io::Result<(File, PathBuf)> { + file.sync_all().await?; + let backup_path = path.with_extension("old"); + if backup_path.exists() { + remove_file(&backup_path).await?; + } + rename(&path, &backup_path).await?; + + let new_file = OpenOptions::new() + .create(true) + .append(true) + .open(&path) + .await?; + Ok((new_file, path)) +} + +/// Background task that handles data from the sync channel +async fn handle_channel_data( + mut rx: UnboundedReceiver>, + mut file: File, + path: PathBuf, + max_size: u64, + current_size: Arc, +) { + while let Some(data) = rx.recv().await { + let data_len = data.len() as u64; + let size = current_size.fetch_add(data_len, Ordering::Relaxed); + + if size + data_len > max_size { + // Clone the file handle before rotation + if let Ok(file_clone) = file.try_clone().await { + match do_rotation(file_clone, path.clone()).await { + Ok((new_file, _)) => { + file = new_file; + current_size.store(0, Ordering::Relaxed); + } + Err(e) => { + eprintln!("Failed to rotate log file: {}", e); + continue; + } + } + } else { + eprintln!("Failed to clone file handle for rotation"); + continue; + } + } + + if let Err(e) = file.write_all(&data).await { + eprintln!("Failed to write to log file: {}", e); + // On write error, subtract the size we added + current_size.fetch_sub(data_len, Ordering::Relaxed); + } + } +} + +//-------------------------------------------------------------------------------------------------- +// Trait Implementations +//-------------------------------------------------------------------------------------------------- + +impl AsyncWrite for RotatingLog { + fn poll_write( + mut self: Pin<&mut Self>, + cx: &mut Context<'_>, + buf: &[u8], + ) -> Poll> { + let this = &mut *self; + let buf_len = buf.len() as u64; + + loop { + match &mut this.state { + State::Idle => { + let size = this.current_size.fetch_add(buf_len, Ordering::Relaxed); + if size + buf_len > this.max_size { + let old_file = std::mem::replace( + &mut this.file, + File::from_std(std::fs::File::open("/dev/null").unwrap()), + ); + let old_path = this.path.clone(); + let fut = Box::pin(do_rotation(old_file, old_path)); + this.state = State::Rotating(fut); + } else { + this.state = State::Writing; + } + } + State::Rotating(fut) => { + match fut.as_mut().poll(cx) { + Poll::Pending => return Poll::Pending, + Poll::Ready(Err(e)) => { + this.state = State::Idle; + // On rotation error, subtract the size we added + this.current_size.fetch_sub(buf_len, Ordering::Relaxed); + return Poll::Ready(Err(e)); + } + Poll::Ready(Ok((new_file, new_path))) => { + this.file = new_file; + this.path = new_path; + this.current_size.store(0, Ordering::Relaxed); + this.state = State::Writing; + } + } + } + State::Writing => { + let pinned_file = Pin::new(&mut this.file); + match pinned_file.poll_write(cx, buf) { + Poll::Ready(Ok(written)) => { + this.state = State::Idle; + return Poll::Ready(Ok(written)); + } + Poll::Ready(Err(e)) => { + this.state = State::Idle; + // On write error, subtract the size we added + this.current_size.fetch_sub(buf_len, Ordering::Relaxed); + return Poll::Ready(Err(e)); + } + Poll::Pending => return Poll::Pending, + } + } + } + } + } + + fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + Pin::new(&mut self.file).poll_flush(cx) + } + + fn poll_shutdown(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + Pin::new(&mut self.file).poll_shutdown(cx) + } +} + +impl Write for SyncChannelWriter { + fn write(&mut self, buf: &[u8]) -> io::Result { + let data = buf.to_vec(); + self.tx.send(data).map_err(|_| { + io::Error::new(io::ErrorKind::Other, "failed to send log data to channel") + })?; + Ok(buf.len()) + } + + fn flush(&mut self) -> io::Result<()> { + Ok(()) + } +} diff --git a/monocore/lib/runtime/mod.rs b/monocore/lib/runtime/mod.rs index 3227adf..78335cf 100644 --- a/monocore/lib/runtime/mod.rs +++ b/monocore/lib/runtime/mod.rs @@ -1,5 +1,6 @@ //! Supervisor for managing vm lifecycles. +mod log; mod state; mod supervisor; @@ -7,5 +8,6 @@ mod supervisor; // Exports //-------------------------------------------------------------------------------------------------- +pub use log::*; pub use state::*; pub use supervisor::*; diff --git a/monocore/lib/runtime/supervisor.rs b/monocore/lib/runtime/supervisor.rs index 53b63a7..0eaf1d7 100644 --- a/monocore/lib/runtime/supervisor.rs +++ b/monocore/lib/runtime/supervisor.rs @@ -8,9 +8,11 @@ use std::{ time::Duration, }; +use oci_spec::image::ImageConfiguration; +use serde_json::from_str; use sysinfo::System; use tokio::{ - fs::{self, File, OpenOptions}, + fs::{self, OpenOptions}, io::{AsyncBufReadExt, AsyncWriteExt, BufReader}, process::Command, sync::{broadcast, Mutex}, @@ -23,11 +25,11 @@ use tracing::{error, info, warn}; use crate::{ config::{Group, Service}, runtime::MicroVmStatus, - utils::{MONOCORE_LOG_DIR, MONOCORE_STATE_DIR}, + utils::{self, LOG_SUBDIR, OCI_CONFIG_FILENAME, OCI_REPO_SUBDIR, OCI_SUBDIR, STATE_SUBDIR}, MonocoreError, MonocoreResult, }; -use super::MicroVmState; +use super::{log::RotatingLog, MicroVmState}; //-------------------------------------------------------------------------------------------------- // Types @@ -39,6 +41,9 @@ pub struct Supervisor { /// The state of the micro VM process. state: Arc>, + /// The home directory of monocore. + home_dir: PathBuf, + /// The path to the state file of the micro VM process. runtime_state_path: PathBuf, @@ -57,26 +62,29 @@ pub struct Supervisor { //-------------------------------------------------------------------------------------------------- impl Supervisor { - const MAX_LOG_SIZE: u64 = 10 * 1024 * 1024; // 10MB max log size - /// Creates a new Supervisor instance. pub async fn new( + home_dir: impl AsRef, service: Service, group: Group, group_ip: Option, rootfs_path: impl AsRef, ) -> MonocoreResult { + let home_dir = home_dir.as_ref().to_path_buf(); + let state_dir = home_dir.join(STATE_SUBDIR); + let log_dir = home_dir.join(LOG_SUBDIR); + // Generate unique IDs for the files let service_name = service.get_name(); // Create paths with service name for better identification let runtime_state_path = - MONOCORE_STATE_DIR.join(format!("{}__{}.json", service_name, process::id())); - let stdout_log_path = MONOCORE_LOG_DIR.join(format!("{}.stdout.log", service_name)); - let stderr_log_path = MONOCORE_LOG_DIR.join(format!("{}.stderr.log", service_name)); + state_dir.join(format!("{}__{}.json", service_name, process::id())); + let stdout_log_path = log_dir.join(format!("{}.stdout.log", service_name)); + let stderr_log_path = log_dir.join(format!("{}.stderr.log", service_name)); // Create directories with proper permissions - for dir in [&*MONOCORE_STATE_DIR, &*MONOCORE_LOG_DIR] { + for dir in [&state_dir, &log_dir] { fs::create_dir_all(dir).await?; #[cfg(unix)] { @@ -96,6 +104,7 @@ impl Supervisor { group_ip, rootfs_path, ))), + home_dir, runtime_state_path, stdout_log_path, stderr_log_path, @@ -103,51 +112,6 @@ impl Supervisor { }) } - /// Creates a log file with proper permissions and rotation - async fn create_log_file(path: &Path) -> MonocoreResult { - // Create new log file with proper permissions - let file = OpenOptions::new() - .create(true) - .write(true) - .append(true) - .open(path) - .await?; - - let mut perms = file.metadata().await?.permissions(); - perms.set_mode(0o644); // rw-r--r-- - file.set_permissions(perms).await?; - - Ok(file) - } - - /// Rotates the log file if it reaches a certain size - async fn rotate_log_if_needed(file: &File, path: &Path) -> MonocoreResult<()> { - let metadata = file.metadata().await?; - if metadata.len() > Self::MAX_LOG_SIZE { - // Ensure all data is written before rotation - file.sync_all().await?; - - // Rotate old log file if it exists - let backup_path = path.with_extension(format!( - "{}.old", - path.extension().unwrap_or_default().to_str().unwrap_or("") - )); - - // Remove old backup if it exists - if backup_path.exists() { - if let Err(e) = fs::remove_file(&backup_path).await { - warn!("Failed to remove old backup log file: {}", e); - } - } - - // Rename current log to backup - if let Err(e) = fs::rename(path, &backup_path).await { - warn!("Failed to rotate log file: {}", e); - } - } - Ok(()) - } - /// Starts the supervised micro VM process. /// /// This method: @@ -165,8 +129,12 @@ impl Supervisor { // Get all the needed data under a single lock let (service_json, group_json, local_only_json, group_ip_json, rootfs_path) = { - let state = self.state.lock().await; - let service = state.get_service(); + let mut state = self.state.lock().await; + let service = state.get_service_mut(); + + // Update service with OCI config defaults + self.update_service_with_oci_config(service).await?; + let service_json = serde_json::to_string(service)?; let group_json = serde_json::to_string(state.get_group())?; @@ -189,9 +157,9 @@ impl Supervisor { "--run-microvm", &service_json, &group_json, - &local_only_json, &group_ip_json, &rootfs_path, + &local_only_json, ]) .stdout(Stdio::piped()) .stderr(Stdio::piped()) @@ -210,32 +178,17 @@ impl Supervisor { let stdout_path = self.stdout_log_path.clone(); let service_name = self.state.lock().await.get_service().get_name().to_string(); let stdout_handle = tokio::spawn(async move { - let mut file = match Self::create_log_file(&stdout_path).await { - Ok(f) => f, + println!("Starting stdout log rotation for {}", service_name); // TODO: Remove + let mut rotating_log = match RotatingLog::new(&stdout_path, None).await { + Ok(r) => r, Err(e) => { - error!("Failed to create stdout log file: {}", e); + error!("Failed to create stdout rotating log: {}", e); return; } }; let mut reader = BufReader::new(stdout).lines(); while let Ok(Some(line)) = reader.next_line().await { - // Check and rotate if needed - if let Err(e) = Self::rotate_log_if_needed(&file, &stdout_path).await { - error!("Failed to rotate stdout log: {}", e); - } - - // Reopen file if it was rotated - if !stdout_path.exists() { - file = match Self::create_log_file(&stdout_path).await { - Ok(f) => f, - Err(e) => { - error!("Failed to create new stdout log file after rotation: {}", e); - return; - } - }; - } - // Format the log entry with timestamp in standard log format let now = chrono::Utc::now(); let formatted_line = format!( @@ -245,11 +198,11 @@ impl Supervisor { line ); - if let Err(e) = file.write_all(formatted_line.as_bytes()).await { + if let Err(e) = rotating_log.write_all(formatted_line.as_bytes()).await { error!("Failed to write to stdout log: {}", e); } - if let Err(e) = file.flush().await { + if let Err(e) = rotating_log.flush().await { error!("Failed to flush stdout log: {}", e); } } @@ -260,32 +213,17 @@ impl Supervisor { let stderr_path = self.stderr_log_path.clone(); let service_name = self.state.lock().await.get_service().get_name().to_string(); let stderr_handle = tokio::spawn(async move { - let mut file = match Self::create_log_file(&stderr_path).await { - Ok(f) => f, + println!("Starting stderr log rotation for {}", service_name); // TODO: Remove + let mut rotating_log = match RotatingLog::new(&stderr_path, None).await { + Ok(r) => r, Err(e) => { - error!("Failed to create stderr log file: {}", e); + error!("Failed to create stderr rotating log: {}", e); return; } }; let mut reader = BufReader::new(stderr).lines(); while let Ok(Some(line)) = reader.next_line().await { - // Check and rotate if needed - if let Err(e) = Self::rotate_log_if_needed(&file, &stderr_path).await { - error!("Failed to rotate stderr log: {}", e); - } - - // Reopen file if it was rotated - if !stderr_path.exists() { - file = match Self::create_log_file(&stderr_path).await { - Ok(f) => f, - Err(e) => { - error!("Failed to create new stderr log file after rotation: {}", e); - return; - } - }; - } - // Format the log entry with timestamp in standard log format let now = chrono::Utc::now(); let formatted_line = format!( @@ -295,10 +233,11 @@ impl Supervisor { line ); - if let Err(e) = file.write_all(formatted_line.as_bytes()).await { + if let Err(e) = rotating_log.write_all(formatted_line.as_bytes()).await { error!("Failed to write to stderr log: {}", e); } - if let Err(e) = file.flush().await { + + if let Err(e) = rotating_log.flush().await { error!("Failed to flush stderr log: {}", e); } } @@ -492,4 +431,103 @@ impl Supervisor { let state = serde_json::from_str(&contents)?; Ok(state) } + + /// Updates service properties with defaults from OCI config.json if they are not specified. + /// The config.json file is expected to be in /oci/repo//. + async fn update_service_with_oci_config(&self, service: &mut Service) -> MonocoreResult<()> { + // Get base image name from service config + let base_image = match service.get_base() { + Some(base) => base, + None => return Ok(()), // No base image, nothing to do + }; + + // Parse image reference to get the repository tag directory name + let (_, _, repo_tag) = utils::parse_image_ref(base_image)?; + + // Construct path to config.json + let config_path = self + .home_dir + .join(OCI_SUBDIR) + .join(OCI_REPO_SUBDIR) + .join(repo_tag) + .join(OCI_CONFIG_FILENAME); + + // Read and parse config.json + let config_str = match fs::read_to_string(&config_path).await { + Ok(content) => content, + Err(e) => { + error!( + "Failed to read OCI config.json at {}: {}", + config_path.display(), + e + ); + return Ok(()); + } + }; + + let config: ImageConfiguration = match from_str(&config_str) { + Ok(config) => config, + Err(e) => { + error!("Failed to parse OCI config.json: {}", e); + return Ok(()); + } + }; + + // Get the config section which contains the defaults + let config = match config.config() { + Some(config) => config, + None => return Ok(()), + }; + + // Update workdir if not set + if service.get_workdir().is_none() { + if let Some(working_dir) = config.working_dir() { + service.set_workdir(working_dir.to_string()); + } + } + + // Update command and args if not set + if service.get_command().is_none() { + // First try entrypoint + cmd + if let Some(entrypoint) = config.entrypoint() { + if !entrypoint.is_empty() { + // Use first item as command and rest as args + let mut entrypoint = entrypoint.clone(); + if let Some(command) = entrypoint.first() { + service.set_command(command.clone()); + // Add remaining entrypoint items as args + let mut args = entrypoint.split_off(1); + // Add cmd as additional args if present + if let Some(cmd) = config.cmd() { + args.extend(cmd.iter().cloned()); + } + service.set_args(args); + } + } + } else if let Some(cmd) = config.cmd() { + if !cmd.is_empty() { + // Use first item as command and rest as args + let mut cmd = cmd.clone(); + if let Some(command) = cmd.first() { + service.set_command(command.clone()); + service.set_args(cmd.split_off(1)); + } + } + } + } + + // Prepend config env to service envs + if let Some(env) = config.env() { + // Get existing service envs + let mut new_envs = env.clone(); + new_envs.extend(service.get_envs().iter().map(|e| e.to_string())); + service.set_envs(new_envs); + } + + // NOTE: We intentionally do not use exposed_ports from OCI config. + // In OCI/Docker, EXPOSE (which becomes exposed_ports in config.json) only documents which ports + // a container uses internally. It does not define port mappings between host and container. + + Ok(()) + } } diff --git a/monocore/lib/utils/path.rs b/monocore/lib/utils/path.rs index c5ebfc7..73f4947 100644 --- a/monocore/lib/utils/path.rs +++ b/monocore/lib/utils/path.rs @@ -29,6 +29,9 @@ pub const OCI_MANIFEST_FILENAME: &str = "manifest.json"; /// The filename for the OCI image config JSON file pub const OCI_CONFIG_FILENAME: &str = "config.json"; +/// The filename for the supervisors log file +pub const SUPERVISORS_LOG_FILENAME: &str = "supervisors.log"; + /// The rootfs sub directory where the rootfs and other related files associated with the microvm are stored. pub const ROOTFS_SUBDIR: &str = "rootfs"; @@ -47,23 +50,6 @@ pub const STATE_SUBDIR: &str = "run"; /// The sub directory where runtime logs are stored. pub const LOG_SUBDIR: &str = "log"; -lazy_static::lazy_static! { - /// The path to the monocore OCI directory - pub static ref MONOCORE_OCI_DIR: PathBuf = monocore_home_path().join(OCI_SUBDIR); - - /// The path to the monocore rootfs directory - pub static ref MONOCORE_ROOTFS_DIR: PathBuf = monocore_home_path().join(ROOTFS_SUBDIR); - - /// The path to the monocore service directory - pub static ref MONOCORE_SERVICE_DIR: PathBuf = monocore_home_path().join(SERVICE_SUBDIR); - - /// The path to the monocore state directory (e.g. for storing service state files) - pub static ref MONOCORE_STATE_DIR: PathBuf = monocore_home_path().join(STATE_SUBDIR); - - /// The path to the monocore log directory (e.g. for storing service stdout/stderr logs) - pub static ref MONOCORE_LOG_DIR: PathBuf = monocore_home_path().join(LOG_SUBDIR); -} - //-------------------------------------------------------------------------------------------------- // Functions //--------------------------------------------------------------------------------------------------