Skip to content

Commit

Permalink
stamp: use beforeunload instead of willterminate
Browse files Browse the repository at this point in the history
  • Loading branch information
nyannyacha committed May 31, 2024
1 parent 60ad851 commit cddcac1
Show file tree
Hide file tree
Showing 9 changed files with 72 additions and 125 deletions.
94 changes: 24 additions & 70 deletions crates/base/src/deno_runtime.rs
Original file line number Diff line number Diff line change
Expand Up @@ -194,7 +194,6 @@ impl GlobalMainContext {

struct DispatchEventFunctions {
dispatch_load_event_fn_global: v8::Global<v8::Function>,
dispatch_willterminate_event_fn_global: v8::Global<v8::Function>,
dispatch_beforeunload_event_fn_global: v8::Global<v8::Function>,
dispatch_unload_event_fn_global: v8::Global<v8::Function>,
}
Expand Down Expand Up @@ -223,8 +222,8 @@ pub struct DenoRuntime {
mem_check_state: Arc<MemCheckState>,
waker: Arc<AtomicWaker>,

willterminate_mem_threshold: Arc<ArcSwapOption<u64>>,
willterminate_cpu_threshold: Arc<ArcSwapOption<u64>>,
beforeunload_mem_threshold: Arc<ArcSwapOption<u64>>,
beforeunload_cpu_threshold: Arc<ArcSwapOption<u64>>,
}

impl Drop for DenoRuntime {
Expand Down Expand Up @@ -476,24 +475,24 @@ impl DenoRuntime {
let mut create_params = None;
let mut mem_check_state = MemCheckState::default();

let willterminate_cpu_threshold = ArcSwapOption::<u64>::from_pointee(None);
let willterminate_mem_threshold = ArcSwapOption::<u64>::from_pointee(None);
let beforeunload_cpu_threshold = ArcSwapOption::<u64>::from_pointee(None);
let beforeunload_mem_threshold = ArcSwapOption::<u64>::from_pointee(None);

if conf.is_user_worker() {
let conf = conf.as_user_worker().unwrap();
let memory_limit_bytes = mib_to_bytes(conf.memory_limit_mb) as usize;

willterminate_mem_threshold.store(
beforeunload_mem_threshold.store(
flags
.willterminate_memory_pct
.beforeunload_memory_pct
.and_then(|it| percentage_value(memory_limit_bytes as u64, it))
.map(Arc::new),
);

if conf.cpu_time_hard_limit_ms > 0 {
willterminate_cpu_threshold.store(
beforeunload_cpu_threshold.store(
flags
.willterminate_cpu_pct
.beforeunload_cpu_pct
.and_then(|it| percentage_value(conf.cpu_time_hard_limit_ms, it))
.map(Arc::new),
);
Expand Down Expand Up @@ -547,14 +546,6 @@ impl DenoRuntime {
.unwrap();
let dispatch_load_event_fn =
v8::Local::<v8::Function>::try_from(dispatch_load_event_fn).unwrap();
let dispatch_willterminate_event_fn_str =
v8::String::new_external_onebyte_static(scope, b"dispatchWillTerminateEvent")
.unwrap();
let dispatch_willterminate_event_fn = bootstrap_ns
.get(scope, dispatch_willterminate_event_fn_str.into())
.unwrap();
let dispatch_willterminate_event_fn =
v8::Local::<v8::Function>::try_from(dispatch_willterminate_event_fn).unwrap();
let dispatch_beforeunload_event_fn_str =
v8::String::new_external_onebyte_static(scope, b"dispatchBeforeUnloadEvent")
.unwrap();
Expand All @@ -572,15 +563,12 @@ impl DenoRuntime {
v8::Local::<v8::Function>::try_from(dispatch_unload_event_fn).unwrap();

let dispatch_load_event_fn_global = v8::Global::new(scope, dispatch_load_event_fn);
let dispatch_willterminate_event_fn_global =
v8::Global::new(scope, dispatch_willterminate_event_fn);
let dispatch_beforeunload_event_fn_global =
v8::Global::new(scope, dispatch_beforeunload_event_fn);
let dispatch_unload_event_fn_global = v8::Global::new(scope, dispatch_unload_event_fn);

DispatchEventFunctions {
dispatch_load_event_fn_global,
dispatch_willterminate_event_fn_global,
dispatch_beforeunload_event_fn_global,
dispatch_unload_event_fn_global,
}
Expand Down Expand Up @@ -729,8 +717,8 @@ impl DenoRuntime {
mem_check_state,
waker: Arc::default(),

willterminate_cpu_threshold: Arc::new(willterminate_cpu_threshold),
willterminate_mem_threshold: Arc::new(willterminate_mem_threshold),
beforeunload_cpu_threshold: Arc::new(beforeunload_cpu_threshold),
beforeunload_mem_threshold: Arc::new(beforeunload_mem_threshold),
})
}

Expand Down Expand Up @@ -862,16 +850,6 @@ impl DenoRuntime {
.await
{
let mut this = self.get_v8_tls_guard();
let _ = with_cpu_metrics_guard(
current_thread_id,
&maybe_cpu_usage_metrics_tx,
&mut accumulated_cpu_time_ns,
|| MaybeDenoRuntime::DenoRuntime(&mut this).dispatch_beforeunload_event(),
);

// TODO(Nyannyacha): Here we also need to trigger the event for node platform (i.e;
// beforeExit)

let _ = with_cpu_metrics_guard(
current_thread_id,
&maybe_cpu_usage_metrics_tx,
Expand All @@ -889,18 +867,6 @@ impl DenoRuntime {

let mut this = self.get_v8_tls_guard();

if let Err(err) = with_cpu_metrics_guard(
current_thread_id,
&maybe_cpu_usage_metrics_tx,
&mut accumulated_cpu_time_ns,
|| MaybeDenoRuntime::DenoRuntime(&mut this).dispatch_beforeunload_event(),
) {
return (Err(err), get_accumulated_cpu_time_ms!());
}

// TODO(Nyannyacha): Here we also need to trigger the event for node platform (i.e;
// beforeExit)

if let Err(err) = with_cpu_metrics_guard(
current_thread_id,
&maybe_cpu_usage_metrics_tx,
Expand All @@ -927,8 +893,8 @@ impl DenoRuntime {
let is_termination_requested = self.termination_request_token.clone();
let global_waker = self.waker.clone();

let willterminate_cpu_threshold = self.willterminate_cpu_threshold.clone();
let willterminate_mem_threshold = self.willterminate_mem_threshold.clone();
let beforeunload_cpu_threshold = self.beforeunload_cpu_threshold.clone();
let beforeunload_mem_threshold = self.beforeunload_mem_threshold.clone();

let mem_check_state = is_user_worker.then(|| self.mem_check_state.clone());

Expand Down Expand Up @@ -995,32 +961,31 @@ impl DenoRuntime {
let mem_state = mem_check_state.as_ref().unwrap();
let total_malloced_bytes = mem_state.check(js_runtime.v8_isolate().as_mut());

if let Some(threshold_ms) = willterminate_cpu_threshold.load().as_deref().copied() {
if let Some(threshold_ms) = beforeunload_cpu_threshold.load().as_deref().copied() {
let threshold_ns = (threshold_ms as i128) * 1_000_000;
let accumulated_cpu_time_ns = *accumulated_cpu_time_ns as i128;

if accumulated_cpu_time_ns >= threshold_ns {
willterminate_cpu_threshold.store(None);
beforeunload_cpu_threshold.store(None);

if let Err(err) = MaybeDenoRuntime::DenoRuntime(&mut this)
.dispatch_willterminate_event(WillTerminateReason::CPU)
.dispatch_beforeunload_event(WillTerminateReason::CPU)
{
return Poll::Ready(Err(err));
}
}
}

if let Some(threshold_bytes) =
willterminate_mem_threshold.load().as_deref().copied()
if let Some(threshold_bytes) = beforeunload_mem_threshold.load().as_deref().copied()
{
let total_malloced_bytes = total_malloced_bytes as u64;

if total_malloced_bytes >= threshold_bytes {
willterminate_mem_threshold.store(None);
beforeunload_mem_threshold.store(None);

if !this.mem_check_state.is_exceeded() {
if let Err(err) = MaybeDenoRuntime::DenoRuntime(&mut this)
.dispatch_willterminate_event(WillTerminateReason::Memory)
.dispatch_beforeunload_event(WillTerminateReason::Memory)
{
return Poll::Ready(Err(err));
}
Expand Down Expand Up @@ -1271,15 +1236,15 @@ impl<'l> MaybeDenoRuntime<'l> {
)
}

/// Dispatches "willterminate" event to the JavaScript runtime.
///
/// Does not poll event loop, and thus not await any of the "willterminate" event handlers.
pub fn dispatch_willterminate_event(
/// Dispatches "beforeunload" event to the JavaScript runtime. Returns a boolean
/// indicating if the event was prevented and thus event loop should continue
/// running.
pub fn dispatch_beforeunload_event(
&mut self,
reason: WillTerminateReason,
) -> Result<(), AnyError> {
) -> Result<bool, AnyError> {
self.dispatch_event_with_callback(
|fns| &fns.dispatch_willterminate_event_fn_global,
|fns| &fns.dispatch_beforeunload_event_fn_global,
move |scope| {
vec![v8::String::new_external_onebyte_static(
scope,
Expand All @@ -1288,17 +1253,6 @@ impl<'l> MaybeDenoRuntime<'l> {
.unwrap()
.into()]
},
|_| Ok(()),
)
}

/// Dispatches "beforeunload" event to the JavaScript runtime. Returns a boolean
/// indicating if the event was prevented and thus event loop should continue
/// running.
pub fn dispatch_beforeunload_event(&mut self) -> Result<bool, AnyError> {
self.dispatch_event_with_callback(
|fns| &fns.dispatch_beforeunload_event_fn_global,
|_| vec![],
|it| Ok(it.unwrap().is_false()),
)
}
Expand Down
8 changes: 4 additions & 4 deletions crates/base/src/rt_worker/supervisor/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -156,7 +156,7 @@ async fn wait_cpu_alarm(maybe_alarm: Option<&mut UnboundedReceiver<()>>) -> Opti
}
}

async fn create_wall_clock_willterminate_alert(wall_clock_limit_ms: u64, pct: Option<u8>) {
async fn create_wall_clock_beforeunload_alert(wall_clock_limit_ms: u64, pct: Option<u8>) {
let dur = pct
.and_then(|it| percentage_value(wall_clock_limit_ms, it))
.map(Duration::from_millis);
Expand All @@ -169,15 +169,15 @@ async fn create_wall_clock_willterminate_alert(wall_clock_limit_ms: u64, pct: Op
}
}

extern "C" fn v8_handle_wall_clock_willterminate(
extern "C" fn v8_handle_wall_clock_beforeunload(
isolate: &mut v8::Isolate,
_data: *mut std::ffi::c_void,
) {
if let Err(err) = MaybeDenoRuntime::Isolate(isolate)
.dispatch_willterminate_event(WillTerminateReason::WallClock)
.dispatch_beforeunload_event(WillTerminateReason::WallClock)
{
warn!(
"found an error while dispatching the willterminate event: {}",
"found an error while dispatching the beforeunload event: {}",
err
);
}
Expand Down
21 changes: 10 additions & 11 deletions crates/base/src/rt_worker/supervisor/strategy_per_request.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,9 +9,8 @@ use sb_workers::context::{Timing, TimingStatus, UserWorkerMsgs};
use tokio::time::Instant;

use crate::rt_worker::supervisor::{
create_wall_clock_willterminate_alert, v8_handle_termination,
v8_handle_wall_clock_willterminate, wait_cpu_alarm, CPUUsage, CPUUsageMetrics, Tokens,
V8HandleTerminationData,
create_wall_clock_beforeunload_alert, v8_handle_termination, v8_handle_wall_clock_beforeunload,
wait_cpu_alarm, CPUUsage, CPUUsageMetrics, Tokens, V8HandleTerminationData,
};

use super::Arguments;
Expand Down Expand Up @@ -57,7 +56,7 @@ pub async fn supervise(args: Arguments, oneshot: bool) -> (ShutdownReason, i64)

let is_wall_clock_limit_disabled = wall_clock_limit_ms == 0;
let mut is_worker_entered = false;
let mut is_wall_clock_willterminate_armed = false;
let mut is_wall_clock_beforeunload_armed = false;

let mut cpu_usage_metrics_rx = cpu_usage_metrics_rx.unwrap();
let mut cpu_usage_ms = 0i64;
Expand All @@ -75,13 +74,13 @@ pub async fn supervise(args: Arguments, oneshot: bool) -> (ShutdownReason, i64)

let wall_clock_duration = Duration::from_millis(wall_clock_limit_ms);
let wall_clock_duration_alert = tokio::time::sleep(wall_clock_duration);
let wall_clock_willterminate_alert = create_wall_clock_willterminate_alert(
let wall_clock_beforeunload_alert = create_wall_clock_beforeunload_alert(
wall_clock_limit_ms,
flags.willterminate_wall_clock_pct,
flags.beforeunload_wall_clock_pct,
);

tokio::pin!(wall_clock_duration_alert);
tokio::pin!(wall_clock_willterminate_alert);
tokio::pin!(wall_clock_beforeunload_alert);

loop {
tokio::select! {
Expand Down Expand Up @@ -188,17 +187,17 @@ pub async fn supervise(args: Arguments, oneshot: bool) -> (ShutdownReason, i64)
}
}

_ = &mut wall_clock_willterminate_alert,
if !is_wall_clock_limit_disabled && !is_wall_clock_willterminate_armed
_ = &mut wall_clock_beforeunload_alert,
if !is_wall_clock_limit_disabled && !is_wall_clock_beforeunload_armed
=> {
if thread_safe_handle.request_interrupt(
v8_handle_wall_clock_willterminate,
v8_handle_wall_clock_beforeunload,
std::ptr::null_mut()
) {
waker.wake();
}

is_wall_clock_willterminate_armed = true;
is_wall_clock_beforeunload_armed = true;
}

Some(_) = memory_limit_rx.recv() => {
Expand Down
18 changes: 9 additions & 9 deletions crates/base/src/rt_worker/supervisor/strategy_per_worker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ use log::error;
use sb_workers::context::{Timing, TimingStatus, UserWorkerMsgs};

use crate::rt_worker::supervisor::{
create_wall_clock_willterminate_alert, v8_handle_wall_clock_willterminate, wait_cpu_alarm,
create_wall_clock_beforeunload_alert, v8_handle_wall_clock_beforeunload, wait_cpu_alarm,
CPUUsage, Tokens,
};

Expand Down Expand Up @@ -54,7 +54,7 @@ pub async fn supervise(args: Arguments) -> (ShutdownReason, i64) {

let is_wall_clock_limit_disabled = wall_clock_limit_ms == 0;
let mut is_worker_entered = false;
let mut is_wall_clock_willterminate_armed = false;
let mut is_wall_clock_beforeunload_armed = false;

let mut cpu_usage_metrics_rx = cpu_usage_metrics_rx.unwrap();
let mut cpu_usage_ms = 0i64;
Expand All @@ -79,9 +79,9 @@ pub async fn supervise(args: Arguments) -> (ShutdownReason, i64) {
.unwrap_or(Duration::from_millis(1)),
);

let wall_clock_willterminate_alert = create_wall_clock_willterminate_alert(
let wall_clock_beforeunload_alert = create_wall_clock_beforeunload_alert(
wall_clock_limit_ms,
flags.willterminate_wall_clock_pct,
flags.beforeunload_wall_clock_pct,
);

let terminate_fn = {
Expand All @@ -101,7 +101,7 @@ pub async fn supervise(args: Arguments) -> (ShutdownReason, i64) {
};

tokio::pin!(wall_clock_duration_alert);
tokio::pin!(wall_clock_willterminate_alert);
tokio::pin!(wall_clock_beforeunload_alert);

loop {
tokio::select! {
Expand Down Expand Up @@ -223,17 +223,17 @@ pub async fn supervise(args: Arguments) -> (ShutdownReason, i64) {
}
}

_ = &mut wall_clock_willterminate_alert,
if !is_wall_clock_limit_disabled && !is_wall_clock_willterminate_armed
_ = &mut wall_clock_beforeunload_alert,
if !is_wall_clock_limit_disabled && !is_wall_clock_beforeunload_armed
=> {
if thread_safe_handle.request_interrupt(
v8_handle_wall_clock_willterminate,
v8_handle_wall_clock_beforeunload,
std::ptr::null_mut()
) {
waker.wake();
}

is_wall_clock_willterminate_armed = true;
is_wall_clock_beforeunload_armed = true;
}

Some(_) = memory_limit_rx.recv() => {
Expand Down
6 changes: 3 additions & 3 deletions crates/base/src/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -252,9 +252,9 @@ pub struct ServerFlags {
pub request_idle_timeout_ms: Option<u64>,
pub request_read_timeout_ms: Option<u64>,

pub willterminate_wall_clock_pct: Option<u8>,
pub willterminate_cpu_pct: Option<u8>,
pub willterminate_memory_pct: Option<u8>,
pub beforeunload_wall_clock_pct: Option<u8>,
pub beforeunload_cpu_pct: Option<u8>,
pub beforeunload_memory_pct: Option<u8>,
}

#[derive(Debug)]
Expand Down
Loading

0 comments on commit cddcac1

Please sign in to comment.