diff --git a/crates/base/src/deno_runtime.rs b/crates/base/src/deno_runtime.rs index 5e9b277a1..33fd85ea9 100644 --- a/crates/base/src/deno_runtime.rs +++ b/crates/base/src/deno_runtime.rs @@ -269,6 +269,7 @@ pub enum WillTerminateReason { CPU, Memory, WallClock, + EarlyDrop, } pub struct DenoRuntime { diff --git a/crates/base/src/rt_worker/supervisor/mod.rs b/crates/base/src/rt_worker/supervisor/mod.rs index 277586454..15e520d92 100644 --- a/crates/base/src/rt_worker/supervisor/mod.rs +++ b/crates/base/src/rt_worker/supervisor/mod.rs @@ -26,40 +26,6 @@ use crate::{ use super::{worker_ctx::TerminationToken, worker_pool::SupervisorPolicy}; -#[repr(C)] -pub struct V8HandleTerminationData { - pub should_terminate: bool, - pub isolate_memory_usage_tx: Option>, -} - -pub extern "C" fn v8_handle_termination(isolate: &mut v8::Isolate, data: *mut std::ffi::c_void) { - let mut boxed_data: Box; - - unsafe { - boxed_data = Box::from_raw(data as *mut V8HandleTerminationData); - } - - // log memory usage - let mut heap_stats = v8::HeapStatistics::default(); - - isolate.get_heap_statistics(&mut heap_stats); - - let usage = IsolateMemoryStats { - used_heap_size: heap_stats.used_heap_size(), - external_memory: heap_stats.external_memory(), - }; - - if let Some(usage_tx) = boxed_data.isolate_memory_usage_tx.take() { - if usage_tx.send(usage).is_err() { - error!("failed to send isolate memory usage - receiver may have been dropped"); - } - } - - if boxed_data.should_terminate { - isolate.terminate_execution(); - } -} - #[repr(C)] pub struct IsolateMemoryStats { pub used_heap_size: usize, @@ -172,6 +138,36 @@ async fn create_wall_clock_beforeunload_alert(wall_clock_limit_ms: u64, pct: Opt } } +#[repr(C)] +pub struct V8HandleTerminationData { + pub should_terminate: bool, + pub isolate_memory_usage_tx: Option>, +} + +pub extern "C" fn v8_handle_termination(isolate: &mut v8::Isolate, data: *mut std::ffi::c_void) { + let mut data = unsafe { Box::from_raw(data as *mut V8HandleTerminationData) }; + + // log memory usage + let mut heap_stats = v8::HeapStatistics::default(); + + isolate.get_heap_statistics(&mut heap_stats); + + let usage = IsolateMemoryStats { + used_heap_size: heap_stats.used_heap_size(), + external_memory: heap_stats.external_memory(), + }; + + if let Some(usage_tx) = data.isolate_memory_usage_tx.take() { + if usage_tx.send(usage).is_err() { + error!("failed to send isolate memory usage - receiver may have been dropped"); + } + } + + if data.should_terminate { + isolate.terminate_execution(); + } +} + extern "C" fn v8_handle_wall_clock_beforeunload( isolate: &mut v8::Isolate, _data: *mut std::ffi::c_void, @@ -179,13 +175,36 @@ extern "C" fn v8_handle_wall_clock_beforeunload( if let Err(err) = MaybeDenoRuntime::<()>::Isolate(isolate) .dispatch_beforeunload_event(WillTerminateReason::WallClock) { - warn!( + error!( "found an error while dispatching the beforeunload event: {}", err ); } } +#[repr(C)] +pub struct V8HandleEarlyRetireData { + token: CancellationToken, +} + +extern "C" fn v8_handle_early_drop_beforeunload( + isolate: &mut v8::Isolate, + data: *mut std::ffi::c_void, +) { + let data = unsafe { Box::from_raw(data as *mut V8HandleEarlyRetireData) }; + + if let Err(err) = MaybeDenoRuntime::<()>::Isolate(isolate) + .dispatch_beforeunload_event(WillTerminateReason::EarlyDrop) + { + error!( + "found an error while dispatching the beforeunload event: {}", + err + ); + } else { + data.token.cancel(); + } +} + #[instrument(level = "debug", skip_all)] extern "C" fn v8_handle_early_retire(isolate: &mut v8::Isolate, _data: *mut std::ffi::c_void) { isolate.low_memory_notification(); diff --git a/crates/base/src/rt_worker/supervisor/strategy_per_worker.rs b/crates/base/src/rt_worker/supervisor/strategy_per_worker.rs index a46add2bb..e349ef6ad 100644 --- a/crates/base/src/rt_worker/supervisor/strategy_per_worker.rs +++ b/crates/base/src/rt_worker/supervisor/strategy_per_worker.rs @@ -4,12 +4,14 @@ use std::{future::pending, sync::atomic::Ordering, time::Duration}; use std::thread::ThreadId; use event_worker::events::ShutdownReason; -use log::error; +use log::{error, info}; use sb_workers::context::{Timing, TimingStatus, UserWorkerMsgs}; +use tokio_util::sync::CancellationToken; use crate::rt_worker::supervisor::{ - create_wall_clock_beforeunload_alert, v8_handle_early_retire, - v8_handle_wall_clock_beforeunload, wait_cpu_alarm, CPUUsage, Tokens, + create_wall_clock_beforeunload_alert, v8_handle_early_drop_beforeunload, + v8_handle_early_retire, v8_handle_wall_clock_beforeunload, wait_cpu_alarm, CPUUsage, Tokens, + V8HandleEarlyRetireData, }; use super::{v8_handle_termination, Arguments, CPUUsageMetrics, V8HandleTerminationData}; @@ -57,12 +59,13 @@ pub async fn supervise(args: Arguments) -> (ShutdownReason, i64) { let mut is_worker_entered = false; let mut is_wall_clock_beforeunload_armed = false; let mut is_cpu_time_soft_limit_reached = false; - let mut is_termination_requested = false; + let mut is_waiting_for_termination = false; let mut have_all_reqs_been_acknowledged = false; let mut cpu_usage_metrics_rx = cpu_usage_metrics_rx.unwrap(); let mut cpu_usage_ms = 0i64; + let mut complete_reason = None::; let mut wall_clock_alerts = 0; let mut req_ack_count = 0usize; @@ -97,6 +100,25 @@ pub async fn supervise(args: Arguments) -> (ShutdownReason, i64) { guard.raise(); }; + let early_drop_token = CancellationToken::new(); + let early_drop_fut = early_drop_token.cancelled(); + + let mut dispatch_early_drop_beforeunload_fn = Some({ + let token = early_drop_token.clone(); + || { + let data_ptr_mut = Box::into_raw(Box::new(V8HandleEarlyRetireData { token })); + + if !thread_safe_handle.request_interrupt( + v8_handle_early_drop_beforeunload, + data_ptr_mut as *mut std::ffi::c_void, + ) { + drop(unsafe { Box::from_raw(data_ptr_mut) }); + } else { + waker.wake(); + } + } + }); + let terminate_fn = { let thread_safe_handle = thread_safe_handle.clone(); move || { @@ -115,11 +137,12 @@ pub async fn supervise(args: Arguments) -> (ShutdownReason, i64) { tokio::pin!(wall_clock_duration_alert); tokio::pin!(wall_clock_beforeunload_alert); + tokio::pin!(early_drop_fut); - let result = 'scope: loop { + loop { tokio::select! { _ = supervise.cancelled() => { - break 'scope (ShutdownReason::TerminationRequested, cpu_usage_ms); + complete_reason = Some(ShutdownReason::TerminationRequested); } _ = async { @@ -127,11 +150,12 @@ pub async fn supervise(args: Arguments) -> (ShutdownReason, i64) { Some(token) => token.inbound.cancelled().await, None => pending().await, } - }, if !is_termination_requested => { - is_termination_requested = true; + }, if !is_waiting_for_termination => { + is_waiting_for_termination = true; if promise_metrics.have_all_promises_been_resolved() { - terminate_fn(); - break 'scope (ShutdownReason::TerminationRequested, cpu_usage_ms); + if let Some(func) = dispatch_early_drop_beforeunload_fn.take() { + func(); + } } } @@ -164,9 +188,8 @@ pub async fn supervise(args: Arguments) -> (ShutdownReason, i64) { if !cpu_timer_param.is_disabled() { if cpu_usage_ms >= hard_limit_ms as i64 { - terminate_fn(); error!("CPU time hard limit reached: isolate: {:?}", key); - break 'scope (ShutdownReason::CPUTime, cpu_usage_ms); + complete_reason = Some(ShutdownReason::CPUTime); } else if cpu_usage_ms >= soft_limit_ms as i64 && !is_cpu_time_soft_limit_reached { early_retire_fn(); error!("CPU time soft limit reached: isolate: {:?}", key); @@ -177,17 +200,18 @@ pub async fn supervise(args: Arguments) -> (ShutdownReason, i64) { if have_all_reqs_been_acknowledged && promise_metrics.have_all_promises_been_resolved() { - terminate_fn(); - error!("early termination due to the last request being completed: isolate: {:?}", key); - break 'scope (ShutdownReason::EarlyDrop, cpu_usage_ms); + if let Some(func) = dispatch_early_drop_beforeunload_fn.take() { + func(); + } } } else if is_cpu_time_soft_limit_reached && have_all_reqs_been_acknowledged && promise_metrics.have_all_promises_been_resolved() { - terminate_fn(); - break 'scope (ShutdownReason::EarlyDrop, cpu_usage_ms); + if let Some(func) = dispatch_early_drop_beforeunload_fn.take() { + func(); + } } } } @@ -206,14 +230,13 @@ pub async fn supervise(args: Arguments) -> (ShutdownReason, i64) { if have_all_reqs_been_acknowledged && promise_metrics.have_all_promises_been_resolved() { - terminate_fn(); - error!("early termination due to the last request being completed: isolate: {:?}", key); - break 'scope (ShutdownReason::EarlyDrop, cpu_usage_ms); + if let Some(func) = dispatch_early_drop_beforeunload_fn.take() { + func(); + } } } else { - terminate_fn(); error!("CPU time hard limit reached: isolate: {:?}", key); - break 'scope (ShutdownReason::CPUTime, cpu_usage_ms); + complete_reason = Some(ShutdownReason::CPUTime); } } } @@ -237,9 +260,9 @@ pub async fn supervise(args: Arguments) -> (ShutdownReason, i64) { continue; } - terminate_fn(); - error!("early termination due to the last request being completed: isolate: {:?}", key); - break 'scope (ShutdownReason::EarlyDrop, cpu_usage_ms); + if let Some(func) = dispatch_early_drop_beforeunload_fn.take() { + func(); + } } _ = wall_clock_duration_alert.tick(), if !is_wall_clock_limit_disabled => { @@ -253,9 +276,8 @@ pub async fn supervise(args: Arguments) -> (ShutdownReason, i64) { } else { let is_in_flight_req_exists = req_ack_count != demand.load(Ordering::Acquire); - terminate_fn(); error!("wall clock duration reached: isolate: {:?} (in_flight_req_exists = {})", key, is_in_flight_req_exists); - break 'scope (ShutdownReason::WallClockTime, cpu_usage_ms); + complete_reason = Some(ShutdownReason::WallClockTime); } } @@ -273,18 +295,34 @@ pub async fn supervise(args: Arguments) -> (ShutdownReason, i64) { } Some(_) = memory_limit_rx.recv() => { - terminate_fn(); error!("memory limit reached for the worker: isolate: {:?}", key); - break 'scope (ShutdownReason::Memory, cpu_usage_ms); + complete_reason = Some(ShutdownReason::Memory); } - } - }; - match result { - (ShutdownReason::EarlyDrop, cpu_usage_ms) if is_termination_requested => { - (ShutdownReason::TerminationRequested, cpu_usage_ms) + _ = &mut early_drop_fut => { + info!("early termination has been triggered: isolate: {:?}", key); + complete_reason = Some(ShutdownReason::EarlyDrop); + } } - result => result, + match complete_reason.take() { + Some(ShutdownReason::EarlyDrop) => { + terminate_fn(); + return ( + if is_waiting_for_termination { + ShutdownReason::TerminationRequested + } else { + ShutdownReason::EarlyDrop + }, + cpu_usage_ms, + ); + } + + Some(result) => { + terminate_fn(); + return (result, cpu_usage_ms); + } + None => continue, + } } } diff --git a/crates/base/test_cases/main/index.ts b/crates/base/test_cases/main/index.ts index 1c844f1e2..0f3eb2b62 100644 --- a/crates/base/test_cases/main/index.ts +++ b/crates/base/test_cases/main/index.ts @@ -37,7 +37,6 @@ Deno.serve((req: Request) => { const workerTimeoutMs = parseIntFromHeadersOrDefault(req, "x-worker-timeout-ms", 10 * 60 * 1000); const cpuTimeSoftLimitMs = parseIntFromHeadersOrDefault(req, "x-cpu-time-soft-limit-ms", 10 * 60 * 1000); const cpuTimeHardLimitMs = parseIntFromHeadersOrDefault(req, "x-cpu-time-hard-limit-ms", 10 * 60 * 1000); - console.log(cpuTimeSoftLimitMs); const noModuleCache = false; const importMapPath = null; const envVarsObj = Deno.env.toObject();