Skip to content

Commit

Permalink
Use a zygote process
Browse files Browse the repository at this point in the history
Signed-off-by: Jorge Prendes <[email protected]>
  • Loading branch information
jprendes committed Dec 20, 2024
1 parent 7d16085 commit db471b3
Show file tree
Hide file tree
Showing 7 changed files with 97 additions and 41 deletions.
46 changes: 46 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

6 changes: 6 additions & 0 deletions crates/containerd-shim-wasm/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ futures = { version = "0.3.30" }
wasmparser = { version = "0.220.0" }
tokio-stream = { version = "0.1" }
sha256 = { workspace = true }
serde_bytes = "0.11"

# tracing
# note: it's important to keep the version of tracing in sync with tracing-subscriber
Expand All @@ -59,6 +60,7 @@ tracing-opentelemetry = { version = "0.27", default-features = false, optional =


[target.'cfg(unix)'.dependencies]
zygote = { version = "0.1.1" }
caps = "0.5"
# this must match the version pulled by libcontainer
dbus = { version = "0", features = ["vendored"] }
Expand Down Expand Up @@ -101,3 +103,7 @@ opentelemetry = [
"dep:tracing-opentelemetry",
]
tracing = ["dep:tracing", "dep:tracing-subscriber"]

[package.metadata.cargo-machete]
# used as part of a derive macro
ignored = ["serde_bytes"]
3 changes: 3 additions & 0 deletions crates/containerd-shim-wasm/src/sandbox/cli.rs
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,9 @@ pub fn shim_main<'a, I>(
I: 'static + Instance + Sync + Send,
I::Engine: Default,
{
#[cfg(unix)]
zygote::Zygote::init();

#[cfg(feature = "opentelemetry")]
if otel_traces_enabled() {
// opentelemetry uses tokio, so we need to initialize a runtime
Expand Down
3 changes: 2 additions & 1 deletion crates/containerd-shim-wasm/src/sandbox/instance.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,13 @@ use std::path::{Path, PathBuf};
use std::time::Duration;

use chrono::{DateTime, Utc};
use serde::{Deserialize, Serialize};

use super::error::Error;

/// Generic options builder for creating a wasm instance.
/// This is passed to the `Instance::new` method.
#[derive(Clone)]
#[derive(Clone, Serialize, Deserialize)]
pub struct InstanceConfig {
/// Optional stdin named pipe path.
stdin: PathBuf,
Expand Down
4 changes: 3 additions & 1 deletion crates/containerd-shim-wasm/src/sandbox/oci.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,12 +8,14 @@ use std::process;

use anyhow::Context;
use oci_spec::image::Descriptor;
use serde::{Deserialize, Serialize};

use super::error::Result;

#[derive(Clone, Debug)]
#[derive(Clone, Debug, Serialize, Deserialize)]
pub struct WasmLayer {
pub config: Descriptor,
#[serde(with = "serde_bytes")]
pub layer: Vec<u8>,
}

Expand Down
42 changes: 28 additions & 14 deletions crates/containerd-shim-wasm/src/sys/unix/container/instance.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ use nix::errno::Errno;
use nix::sys::wait::{waitid, Id as WaitID, WaitPidFlag, WaitStatus};
use nix::unistd::Pid;
use oci_spec::image::Platform;
use zygote::{WireError, Zygote};

use crate::container::Engine;
use crate::sandbox::async_utils::AmbientRuntime as _;
Expand All @@ -24,7 +25,7 @@ use crate::sandbox::{
};
use crate::sys::container::executor::Executor;

static DEFAULT_CONTAINER_ROOT_DIR: &str = "/run/containerd";
const DEFAULT_CONTAINER_ROOT_DIR: &str = "/run/containerd";

pub struct Instance<E: Engine> {
exit_code: WaitableCell<(u32, DateTime<Utc>)>,
Expand All @@ -38,27 +39,40 @@ impl<E: Engine + Default> SandboxInstance for Instance<E> {

#[cfg_attr(feature = "tracing", tracing::instrument(parent = tracing::Span::current(), skip_all, level = "Info"))]
fn new(id: String, cfg: &InstanceConfig) -> Result<Self, SandboxError> {
let bundle = cfg.get_bundle().to_path_buf();
let namespace = cfg.get_namespace();
let rootdir = Path::new(DEFAULT_CONTAINER_ROOT_DIR).join(E::name());
let rootdir = determine_rootdir(&bundle, &namespace, rootdir)?;
let stdio = Stdio::init_from_cfg(cfg)?;

// check if container is OCI image with wasm layers and attempt to read the module
let (modules, platform) = containerd::Client::connect(cfg.get_containerd_address().as_str(), &namespace).block_on()?
let (modules, platform) = containerd::Client::connect(cfg.get_containerd_address(), &cfg.get_namespace()).block_on()?
.load_modules::<Self::Engine>(&id)
.block_on()
.unwrap_or_else(|e| {
log::warn!("Error obtaining wasm layers for container {id}. Will attempt to use files inside container image. Error: {e}");
(vec![], Platform::default())
});

let container = ContainerBuilder::new(id.clone(), SyscallType::Linux)
.with_executor(Executor::<E>::new(stdio, modules, platform))
.with_root_path(rootdir.clone())?
.as_init(&bundle)
.with_systemd(false)
.build()?;
let (root, state) = Zygote::global()
.run(
|(id, cfg, modules, platform)| -> Result<_, WireError> {
let namespace = cfg.get_namespace();

let bundle = cfg.get_bundle().to_path_buf();
let rootdir = Path::new(DEFAULT_CONTAINER_ROOT_DIR).join(E::name());
let rootdir = determine_rootdir(&bundle, &namespace, rootdir)?;
let stdio = Stdio::init_from_cfg(&cfg)?;

let Container { root, state } = ContainerBuilder::new(id, SyscallType::Linux)
.with_executor(Executor::<E>::new(stdio, modules, platform))
.with_root_path(rootdir.clone())?
.as_init(&bundle)
.as_sibling(true)
.with_systemd(false)
.build()?;

// Container is not serializable, but its parts are
Ok((root, state))
},
(id.clone(), cfg.clone(), modules, platform),
)
.map_err(|e| SandboxError::Others(e.to_string()))?;
let container = Container { root, state };

Ok(Self {
id,
Expand Down
34 changes: 9 additions & 25 deletions crates/containerd-shim-wasm/src/test/signals.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,14 +19,14 @@
//! remove the ignore attribute from the test.
use std::future::pending;
use std::io::{stderr, Write as _};
use std::sync::mpsc::channel;
use std::sync::{Arc, LazyLock};
use std::sync::Arc;
use std::thread::sleep;
use std::time::Duration;

use anyhow::{bail, Result};
use containerd_shim_wasm_test_modules::HELLO_WORLD;
use tokio::sync::Notify;

use crate::container::{Engine, Instance, RuntimeContext};
use crate::sandbox::Stdio;
Expand All @@ -35,21 +35,6 @@ use crate::testing::WasiTest;
#[derive(Clone, Default)]
pub struct SomeEngine;

async fn ctrl_c(use_libc: bool) {
static CANCELLATION: LazyLock<Notify> = LazyLock::new(|| Notify::new());

fn on_ctr_c(_: libc::c_int) {
CANCELLATION.notify_waiters();
}

if use_libc {
unsafe { libc::signal(libc::SIGINT, on_ctr_c as _) };
CANCELLATION.notified().await;
} else {
let _ = tokio::signal::ctrl_c().await;
}
}

impl Engine for SomeEngine {
fn name() -> &'static str {
"some-engine"
Expand All @@ -63,16 +48,16 @@ impl Engine for SomeEngine {
.build()?
.block_on(async move {
use tokio::time::sleep;
let use_libc = std::env::var("USE_LIBC").unwrap_or_default();
let use_libc = !use_libc.is_empty() && use_libc != "0";
let signal = async {
println!("{name}> waiting for signal!");
ctrl_c(use_libc).await;
let _ = tokio::signal::ctrl_c().await;
println!("{name}> received signal, bye!");
};
let task = async {
sleep(Duration::from_millis(10)).await;
eprintln!("{name}> ready");
// use writeln to avoid output capturing from the
// testing framework
let _ = writeln!(stderr(), "{name}> ready");
pending().await
};
tokio::select! {
Expand All @@ -94,8 +79,9 @@ impl Drop for KillGuard {
}

#[test]
#[ignore = "this currently fails due to tokio's global state"]
fn test_handling_signals() -> Result<()> {
zygote::Zygote::global();

// use a thread scope to ensure we join all threads at the end
std::thread::scope(|s| -> Result<()> {
let mut containers = vec![];
Expand All @@ -110,7 +96,7 @@ fn test_handling_signals() -> Result<()> {
containers.push(Arc::new(container));
}

let guard: Vec<_> = containers.iter().cloned().map(KillGuard).collect();
let _guard: Vec<_> = containers.iter().cloned().map(KillGuard).collect();

for container in containers.iter() {
container.start()?;
Expand Down Expand Up @@ -150,8 +136,6 @@ fn test_handling_signals() -> Result<()> {
assert_eq!(id, i);
}

drop(guard);

Ok(())
})
}

0 comments on commit db471b3

Please sign in to comment.