diff --git a/src/executor/local_spawner.rs b/src/executor/local_spawner.rs index 7d12b72..d32b8c4 100644 --- a/src/executor/local_spawner.rs +++ b/src/executor/local_spawner.rs @@ -1,15 +1,18 @@ use bollard::{ container::{Config, CreateContainerOptions, RemoveContainerOptions, StartContainerOptions}, + models::{HostConfig, PortBinding}, service::ContainerInspectResponse, Docker, }; use std::{ + collections::HashMap, default::Default, env, error::Error, future::Future, + net::{SocketAddr, TcpListener, IpAddr}, pin::Pin, - sync::atomic::{AtomicUsize, Ordering}, + sync::atomic::{AtomicUsize, Ordering}, str::FromStr, }; use tokio; use tokio::sync::oneshot; @@ -46,17 +49,49 @@ impl LocalSpawner { } } + fn find_unused_port() -> Result { + // Bind to address with port 0. + // The OS will assign an available ephemeral port. + let listener = TcpListener::bind("127.0.0.1:0")?; + + // Retrieve the assigned port. + match listener.local_addr() { + Ok(SocketAddr::V4(addr)) => Ok(addr.port()), + Ok(SocketAddr::V6(addr)) => Ok(addr.port()), + Err(e) => Err(e), + } + } + // Create a Docker instance connected to the local Docker daemon. pub async fn create_container( docker: Docker, image_name: String, container_name: String, id: usize, + desirable_port: u16, ) -> Result> { let container_name = format!("{}_{}", container_name, id); + // Define port mapping (container_port -> host_port) + let port_bindings = { + let mut port_bindings = HashMap::new(); + port_bindings.insert( + "4000/tcp".to_string(), // Container port + Some(vec![PortBinding { + host_ip: Some(IpAddr::from_str("127.0.0.1").unwrap().to_string()), // Host IP + host_port: Some(desirable_port.to_string()), // Host port + }]), + ); + port_bindings + }; + let config = Config { image: Some(image_name), + exposed_ports: Some(HashMap::from([("4000/tcp".to_string(), HashMap::<(), ()>::new())])), // Expose the container port + host_config: Some(HostConfig { + port_bindings: Some(port_bindings), + ..Default::default() + }), ..Default::default() }; @@ -89,25 +124,39 @@ impl ExecutorSpawner for LocalSpawner { // Using channel that onetime use, `oneshot`, to send container information let (tx, rx) = oneshot::channel(); - // These variables has to be cloned because it is moved into the async block + // These variables has to be cloned because these are moved into the async block let docker_clone = self.docker.clone(); let image_name = self.image_name.clone(); let container_name = self.container_name.clone(); let id = self.worker_counter.fetch_add(1, Ordering::SeqCst); tokio::spawn(async move { - if let Ok(container_info) = - LocalSpawner::create_container(docker_clone, image_name, container_name, id).await - { - let _ = tx.send(container_info); + let desirable_port = LocalSpawner::find_unused_port().unwrap_or_default(); + let res = LocalSpawner::create_container( + docker_clone, + image_name, + container_name, + id, + desirable_port, + ) + .await; + match res { + Ok(container_info) => { + // the desirable_port is the port that is exposed to the host + let _ = tx.send((desirable_port, container_info)); + } + Err(e) => { + eprintln!("Error creating container: {}", e); + } } }); // Return a Future that resolves to Executor Box::pin(async move { - let container_info = rx.await.expect("Failed to receive worker URL"); + // the container_info also has exposed port as 'host_port` field but it looks ugly to use it + let (exposed_port, container_info) = rx.await.expect("Failed to receive worker URL"); let worker_url = format!( - "http://{}:4000", // this port is not exposed to the host - container_info.network_settings.unwrap().ip_address.unwrap() + "http://127.0.0.1:{}", // This port is exposed to the host + exposed_port ); Executor::new(worker_url, container_info.name) }) @@ -126,9 +175,9 @@ impl ExecutorSpawner for LocalSpawner { }; for i in 0..worker_counter { - let container_name = format!("{}_{}", container_name, i); + let container_name_with_id = format!("{}_{}", container_name, i); if let Err(e) = docker_clone - .remove_container(&container_name, Some(remove_options)) + .remove_container(&container_name_with_id, Some(remove_options)) .await { eprintln!("Error removing container: {}", e); @@ -138,7 +187,7 @@ impl ExecutorSpawner for LocalSpawner { } } -#[cfg(feature = "docker")] +// #[cfg(feature = "docker")] #[cfg(test)] mod tests { use super::*;