Skip to content

Commit

Permalink
fix: the container now using unused port on host
Browse files Browse the repository at this point in the history
  • Loading branch information
sifnoc committed Dec 8, 2023
1 parent 679ed56 commit b108d1b
Showing 1 changed file with 61 additions and 12 deletions.
73 changes: 61 additions & 12 deletions src/executor/local_spawner.rs
Original file line number Diff line number Diff line change
@@ -1,15 +1,18 @@
use bollard::{
container::{Config, CreateContainerOptions, RemoveContainerOptions, StartContainerOptions},
models::{HostConfig, PortBinding},
service::ContainerInspectResponse,
Docker,
};
use std::{
collections::HashMap,
default::Default,
env,
error::Error,
future::Future,
net::{SocketAddr, TcpListener, IpAddr},
pin::Pin,
sync::atomic::{AtomicUsize, Ordering},
sync::atomic::{AtomicUsize, Ordering}, str::FromStr,
};
use tokio;
use tokio::sync::oneshot;
Expand Down Expand Up @@ -46,17 +49,49 @@ impl LocalSpawner {
}
}

fn find_unused_port() -> Result<u16, std::io::Error> {
// Bind to address with port 0.
// The OS will assign an available ephemeral port.
let listener = TcpListener::bind("127.0.0.1:0")?;

// Retrieve the assigned port.
match listener.local_addr() {
Ok(SocketAddr::V4(addr)) => Ok(addr.port()),
Ok(SocketAddr::V6(addr)) => Ok(addr.port()),
Err(e) => Err(e),
}
}

// Create a Docker instance connected to the local Docker daemon.
pub async fn create_container(
docker: Docker,
image_name: String,
container_name: String,
id: usize,
desirable_port: u16,
) -> Result<ContainerInspectResponse, Box<dyn Error>> {
let container_name = format!("{}_{}", container_name, id);

// Define port mapping (container_port -> host_port)
let port_bindings = {
let mut port_bindings = HashMap::new();
port_bindings.insert(
"4000/tcp".to_string(), // Container port
Some(vec![PortBinding {
host_ip: Some(IpAddr::from_str("127.0.0.1").unwrap().to_string()), // Host IP
host_port: Some(desirable_port.to_string()), // Host port
}]),
);
port_bindings
};

let config = Config {
image: Some(image_name),
exposed_ports: Some(HashMap::from([("4000/tcp".to_string(), HashMap::<(), ()>::new())])), // Expose the container port
host_config: Some(HostConfig {
port_bindings: Some(port_bindings),
..Default::default()
}),
..Default::default()
};

Expand Down Expand Up @@ -89,25 +124,39 @@ impl ExecutorSpawner for LocalSpawner {
// Using channel that onetime use, `oneshot`, to send container information
let (tx, rx) = oneshot::channel();

// These variables has to be cloned because it is moved into the async block
// These variables has to be cloned because these are moved into the async block
let docker_clone = self.docker.clone();
let image_name = self.image_name.clone();
let container_name = self.container_name.clone();
let id = self.worker_counter.fetch_add(1, Ordering::SeqCst);
tokio::spawn(async move {
if let Ok(container_info) =
LocalSpawner::create_container(docker_clone, image_name, container_name, id).await
{
let _ = tx.send(container_info);
let desirable_port = LocalSpawner::find_unused_port().unwrap_or_default();
let res = LocalSpawner::create_container(
docker_clone,
image_name,
container_name,
id,
desirable_port,
)
.await;
match res {
Ok(container_info) => {
// the desirable_port is the port that is exposed to the host
let _ = tx.send((desirable_port, container_info));
}
Err(e) => {
eprintln!("Error creating container: {}", e);
}
}
});

// Return a Future that resolves to Executor
Box::pin(async move {
let container_info = rx.await.expect("Failed to receive worker URL");
// the container_info also has exposed port as 'host_port` field but it looks ugly to use it
let (exposed_port, container_info) = rx.await.expect("Failed to receive worker URL");
let worker_url = format!(
"http://{}:4000", // this port is not exposed to the host
container_info.network_settings.unwrap().ip_address.unwrap()
"http://127.0.0.1:{}", // This port is exposed to the host
exposed_port
);
Executor::new(worker_url, container_info.name)
})
Expand All @@ -126,9 +175,9 @@ impl ExecutorSpawner for LocalSpawner {
};

for i in 0..worker_counter {
let container_name = format!("{}_{}", container_name, i);
let container_name_with_id = format!("{}_{}", container_name, i);
if let Err(e) = docker_clone
.remove_container(&container_name, Some(remove_options))
.remove_container(&container_name_with_id, Some(remove_options))
.await
{
eprintln!("Error removing container: {}", e);
Expand All @@ -138,7 +187,7 @@ impl ExecutorSpawner for LocalSpawner {
}
}

#[cfg(feature = "docker")]
// #[cfg(feature = "docker")]
#[cfg(test)]
mod tests {
use super::*;
Expand Down

0 comments on commit b108d1b

Please sign in to comment.