From 735250127b47e42aff6d05765527280e43d939c4 Mon Sep 17 00:00:00 2001 From: wlandau Date: Mon, 4 Nov 2024 09:52:40 -0500 Subject: [PATCH 1/5] sketch solution to #189 --- DESCRIPTION | 2 +- NEWS.md | 6 +- R/crew_controller_local.R | 13 ++++- R/crew_launcher.R | 77 ++++++++++++++----------- R/crew_launcher_local.R | 19 ++++-- man/crew_class_launcher.Rd | 6 +- man/crew_class_launcher_local.Rd | 4 +- man/crew_controller_local.Rd | 23 ++++---- man/crew_launcher.Rd | 23 ++++---- man/crew_launcher_local.Rd | 23 ++++---- tests/local/test-backlog_seconds_idle.R | 3 +- tests/local/test-launcher-system2.R | 4 +- tests/testthat/test-plugins.R | 12 ++-- vignettes/plugins.Rmd | 8 +-- 14 files changed, 132 insertions(+), 91 deletions(-) diff --git a/DESCRIPTION b/DESCRIPTION index b750233a..32b62a6c 100644 --- a/DESCRIPTION +++ b/DESCRIPTION @@ -16,7 +16,7 @@ Description: In computationally demanding analysis projects, 'clustermq' by Schubert (2019) ), and 'batchtools' by Lang, Bischel, and Surmann (2017) . -Version: 0.10.1 +Version: 0.10.2 License: MIT + file LICENSE URL: https://wlandau.github.io/crew/, https://github.com/wlandau/crew BugReports: https://github.com/wlandau/crew/issues diff --git a/NEWS.md b/NEWS.md index a8f81d38..535e011d 100644 --- a/NEWS.md +++ b/NEWS.md @@ -1,4 +1,8 @@ -# crew 0.10.1 (development) +# crew 0.10.2 + +* Eliminate spurious `launch_max` error from underutilized workers and deprecate `launch_max` in favor of `crashes_error` (#189). + +# crew 0.10.1 * Instrument `crew_eval()` with `autometric::log_phase_set()` and `autometric::log_phase_reset()`. diff --git a/R/crew_controller_local.R b/R/crew_controller_local.R index 79432f7d..fb4aaf96 100644 --- a/R/crew_controller_local.R +++ b/R/crew_controller_local.R @@ -36,7 +36,8 @@ crew_controller_local <- function( reset_packages = FALSE, reset_options = FALSE, garbage_collection = FALSE, - launch_max = 5L, + crashes_error = 5L, + launch_max = NULL, r_arguments = c("--no-save", "--no-restore"), options_metrics = crew::crew_options_metrics(), options_local = crew::crew_options_local(), @@ -67,6 +68,14 @@ crew_controller_local <- function( condition = "warning", value = local_log_join ) + crew_deprecate( + name = "launch_max", + date = "2024-11-04", + version = "0.10.1.9000", + alternative = "crashes_error", + condition = "warning", + value = launch_max + ) options_local$log_directory <- local_log_directory %|||% options_local$log_directory options_local$log_join <- local_log_join %|||% @@ -96,7 +105,7 @@ crew_controller_local <- function( reset_packages = reset_packages, reset_options = reset_options, garbage_collection = garbage_collection, - launch_max = launch_max, + crashes_error = crashes_error, tls = tls, r_arguments = r_arguments, options_metrics = options_metrics, diff --git a/R/crew_launcher.R b/R/crew_launcher.R index 85a1b3c5..ab0e2ffe 100644 --- a/R/crew_launcher.R +++ b/R/crew_launcher.R @@ -53,15 +53,16 @@ #' because packages sometimes rely on options they set at loading time. #' @param garbage_collection `TRUE` to run garbage collection between #' tasks, `FALSE` to skip. -#' @param launch_max Positive integer of length 1, maximum allowed -#' consecutive launch attempts which do not complete any tasks. -#' Enforced on a worker-by-worker basis. -#' The futile launch count resets to back 0 -#' for each worker that completes a task. -#' It is recommended to set `launch_max` above 0 -#' because sometimes workers are unproductive under perfectly ordinary -#' circumstances. But `launch_max` should still be small enough -#' to detect errors in the underlying platform. +#' @param crashes_error Positive integer scalar. If a worker exits +#' `crashes_error` times in a row without completing all its assigned +#' tasks, then the launcher throws an informative error next time +#' the worker tries to relaunch. The reason for `crashes_error` +#' is to avoid an infinite loop where a task crashes a worker +#' (through a segfault, maxing out memory, etc) but the worker always +#' relaunches. To monitor the resources of `crew` workers, +#' please see . +#' @param launch_max Deprecated on 2024-11-04 (version 0.10.1.9000). +#' Use `crashes_error` instead. #' @param processes `NULL` or positive integer of length 1, #' number of local processes to #' launch to allow worker launches to happen asynchronously. If `NULL`, @@ -100,7 +101,8 @@ crew_launcher <- function( reset_packages = FALSE, reset_options = FALSE, garbage_collection = FALSE, - launch_max = 5L, + crashes_error = 5L, + launch_max = NULL, tls = crew::crew_tls(), processes = NULL, r_arguments = c("--no-save", "--no-restore"), @@ -119,6 +121,14 @@ crew_launcher <- function( inherits(tls, "crew_class_tls"), message = "argument tls must be an object created by crew_tls()" ) + crew_deprecate( + name = "launch_max", + date = "2024-11-04", + version = "0.10.1.9000", + alternative = "crashes_error", + condition = "warning", + value = launch_max + ) crew_class_launcher$new( name = name, seconds_interval = seconds_interval, @@ -132,7 +142,7 @@ crew_launcher <- function( reset_packages = reset_packages, reset_options = reset_options, garbage_collection = garbage_collection, - launch_max = launch_max, + crashes_error = crashes_error, tls = tls, processes = processes, r_arguments = r_arguments, @@ -175,7 +185,7 @@ crew_class_launcher <- R6::R6Class( .reset_packages = NULL, .reset_options = NULL, .garbage_collection = NULL, - .launch_max = NULL, + .crashes_error = NULL, .tls = NULL, .processes = NULL, .r_arguments = NULL, @@ -236,9 +246,9 @@ crew_class_launcher <- R6::R6Class( garbage_collection = function() { .subset2(private, ".garbage_collection") }, - #' @field launch_max See [crew_launcher()]. - launch_max = function() { - .subset2(private, ".launch_max") + #' @field crashes_error See [crew_launcher()]. + crashes_error = function() { + .subset2(private, ".crashes_error") }, #' @field tls See [crew_launcher()]. tls = function() { @@ -283,7 +293,7 @@ crew_class_launcher <- R6::R6Class( #' @param reset_packages See [crew_launcher()]. #' @param reset_options See [crew_launcher()]. #' @param garbage_collection See [crew_launcher()]. - #' @param launch_max See [crew_launcher()]. + #' @param crashes_error See [crew_launcher()]. #' @param tls See [crew_launcher()]. #' @param processes See [crew_launcher()]. #' @param r_arguments See [crew_launcher()]. @@ -314,7 +324,7 @@ crew_class_launcher <- R6::R6Class( reset_packages = NULL, reset_options = NULL, garbage_collection = NULL, - launch_max = NULL, + crashes_error = NULL, tls = NULL, processes = NULL, r_arguments = NULL, @@ -332,7 +342,7 @@ crew_class_launcher <- R6::R6Class( private$.reset_packages <- reset_packages private$.reset_options <- reset_options private$.garbage_collection <- garbage_collection - private$.launch_max <- launch_max + private$.crashes_error <- crashes_error private$.tls <- tls private$.processes <- processes private$.r_arguments <- r_arguments @@ -378,7 +388,7 @@ crew_class_launcher <- R6::R6Class( "seconds_wall", "tasks_max", "tasks_timers", - "launch_max" + "crashes_error" ) for (field in fields) { crew_assert( @@ -418,10 +428,9 @@ crew_class_launcher <- R6::R6Class( "socket", "start", "launches", - "futile", "launched", "terminated", - "history", + "crashes", "online", "discovered", "assigned", @@ -556,10 +565,9 @@ crew_class_launcher <- R6::R6Class( socket = sockets, start = rep(NA_real_, n), launches = rep(0L, n), - futile = rep(0L, n), launched = rep(FALSE, n), terminated = rep(TRUE, n), - history = rep(-1L, n), + crashes = rep(0L, n), online = rep(FALSE, n), discovered = rep(FALSE, n), assigned = rep(0L, n), @@ -731,24 +739,27 @@ crew_class_launcher <- R6::R6Class( worker = index, instance = instance ) + assigned <- private$.workers$assigned[index] complete <- private$.workers$complete[index] - history <- private$.workers$history[index] - futile <- private$.workers$futile[index] - futile <- if_any(complete > history, 0L, futile + 1L) + crashes <- private$.workers$crashes[index] + crashes <- if_any(assigned > complete, crashes + 1L, 0L) crew_assert( - futile <= private$.launch_max, + crashes < private$.crashes_error, message = paste0( "{crew} worker with name ", name, " and index ", index, - " launched ", - private$.launch_max, - " times in a row without completing any tasks. ", - "Either troubleshoot or raise launch_max above ", - private$.launch_max, + " inappropriately terminated ", + private$.crashes_error, + " times in a row (it exited ", + "without completing all its assigned tasks). ", + "Either troubleshoot or raise crashes_error above ", + private$.crashes_error, ". Details: ", - "https://wlandau.github.io/crew/articles/risks.html#crashes" + "https://wlandau.github.io/crew/articles/risks.html#crashes. ", + "See https://wlandau.github.io/crew/articles/logging.html ", + "to learn how to log memory and CPU usage for a worker." ) ) mirai::call_mirai_(private$.workers$handle[[index]]) diff --git a/R/crew_launcher_local.R b/R/crew_launcher_local.R index 8c367c3a..0fc0312d 100644 --- a/R/crew_launcher_local.R +++ b/R/crew_launcher_local.R @@ -36,7 +36,8 @@ crew_launcher_local <- function( reset_packages = FALSE, reset_options = FALSE, garbage_collection = FALSE, - launch_max = 5L, + crashes_error = 5L, + launch_max = NULL, tls = crew::crew_tls(), r_arguments = c("--no-save", "--no-restore"), options_metrics = crew::crew_options_metrics(), @@ -68,6 +69,14 @@ crew_launcher_local <- function( condition = "warning", value = local_log_join ) + crew_deprecate( + name = "launch_max", + date = "2024-11-04", + version = "0.10.1.9000", + alternative = "crashes_error", + condition = "warning", + value = launch_max + ) options_local$log_directory <- local_log_directory %|||% options_local$log_directory options_local$log_join <- local_log_join %|||% @@ -86,7 +95,7 @@ crew_launcher_local <- function( reset_packages = reset_packages, reset_options = reset_options, garbage_collection = garbage_collection, - launch_max = launch_max, + crashes_error = crashes_error, tls = tls, r_arguments = r_arguments, options_metrics = options_metrics, @@ -168,7 +177,7 @@ crew_class_launcher_local <- R6::R6Class( #' @param reset_packages See [crew_launcher()]. #' @param reset_options See [crew_launcher()]. #' @param garbage_collection See [crew_launcher()]. - #' @param launch_max See [crew_launcher()]. + #' @param crashes_error See [crew_launcher()]. #' @param tls See [crew_launcher()]. #' @param processes See [crew_launcher()]. #' @param r_arguments See [crew_launcher()]. @@ -200,7 +209,7 @@ crew_class_launcher_local <- R6::R6Class( reset_packages = NULL, reset_options = NULL, garbage_collection = NULL, - launch_max = NULL, + crashes_error = NULL, tls = NULL, processes = NULL, r_arguments = NULL, @@ -221,7 +230,7 @@ crew_class_launcher_local <- R6::R6Class( reset_packages = reset_packages, reset_options = reset_options, garbage_collection = garbage_collection, - launch_max = launch_max, + crashes_error = crashes_error, tls = tls, processes = processes, r_arguments = r_arguments, diff --git a/man/crew_class_launcher.Rd b/man/crew_class_launcher.Rd index 40dcfcda..0e8fb35c 100644 --- a/man/crew_class_launcher.Rd +++ b/man/crew_class_launcher.Rd @@ -82,7 +82,7 @@ Other launcher: \item{\code{garbage_collection}}{See \code{\link[=crew_launcher]{crew_launcher()}}.} -\item{\code{launch_max}}{See \code{\link[=crew_launcher]{crew_launcher()}}.} +\item{\code{crashes_error}}{See \code{\link[=crew_launcher]{crew_launcher()}}.} \item{\code{tls}}{See \code{\link[=crew_launcher]{crew_launcher()}}.} @@ -147,7 +147,7 @@ Launcher constructor. reset_packages = NULL, reset_options = NULL, garbage_collection = NULL, - launch_max = NULL, + crashes_error = NULL, tls = NULL, processes = NULL, r_arguments = NULL, @@ -184,7 +184,7 @@ Launcher constructor. \item{\code{garbage_collection}}{See \code{\link[=crew_launcher]{crew_launcher()}}.} -\item{\code{launch_max}}{See \code{\link[=crew_launcher]{crew_launcher()}}.} +\item{\code{crashes_error}}{See \code{\link[=crew_launcher]{crew_launcher()}}.} \item{\code{tls}}{See \code{\link[=crew_launcher]{crew_launcher()}}.} diff --git a/man/crew_class_launcher_local.Rd b/man/crew_class_launcher_local.Rd index 21479a21..4d1acaaa 100644 --- a/man/crew_class_launcher_local.Rd +++ b/man/crew_class_launcher_local.Rd @@ -107,7 +107,7 @@ Local launcher constructor. reset_packages = NULL, reset_options = NULL, garbage_collection = NULL, - launch_max = NULL, + crashes_error = NULL, tls = NULL, processes = NULL, r_arguments = NULL, @@ -145,7 +145,7 @@ Local launcher constructor. \item{\code{garbage_collection}}{See \code{\link[=crew_launcher]{crew_launcher()}}.} -\item{\code{launch_max}}{See \code{\link[=crew_launcher]{crew_launcher()}}.} +\item{\code{crashes_error}}{See \code{\link[=crew_launcher]{crew_launcher()}}.} \item{\code{tls}}{See \code{\link[=crew_launcher]{crew_launcher()}}.} diff --git a/man/crew_controller_local.Rd b/man/crew_controller_local.Rd index 80db3cdf..cf2f6e23 100644 --- a/man/crew_controller_local.Rd +++ b/man/crew_controller_local.Rd @@ -25,7 +25,8 @@ crew_controller_local( reset_packages = FALSE, reset_options = FALSE, garbage_collection = FALSE, - launch_max = 5L, + crashes_error = 5L, + launch_max = NULL, r_arguments = c("--no-save", "--no-restore"), options_metrics = crew::crew_options_metrics(), options_local = crew::crew_options_local(), @@ -117,15 +118,17 @@ because packages sometimes rely on options they set at loading time.} \item{garbage_collection}{\code{TRUE} to run garbage collection between tasks, \code{FALSE} to skip.} -\item{launch_max}{Positive integer of length 1, maximum allowed -consecutive launch attempts which do not complete any tasks. -Enforced on a worker-by-worker basis. -The futile launch count resets to back 0 -for each worker that completes a task. -It is recommended to set \code{launch_max} above 0 -because sometimes workers are unproductive under perfectly ordinary -circumstances. But \code{launch_max} should still be small enough -to detect errors in the underlying platform.} +\item{crashes_error}{Positive integer scalar. If a worker exits +\code{crashes_error} times in a row without completing all its assigned +tasks, then the launcher throws an informative error next time +the worker tries to relaunch. The reason for \code{crashes_error} +is to avoid an infinite loop where a task crashes a worker +(through a segfault, maxing out memory, etc) but the worker always +relaunches. To monitor the resources of \code{crew} workers, +please see \url{https://wlandau.github.io/crew/articles/logging.html}.} + +\item{launch_max}{Deprecated on 2024-11-04 (version 0.10.1.9000). +Use \code{crashes_error} instead.} \item{r_arguments}{Optional character vector of command line arguments to pass to \code{Rscript} (non-Windows) or \code{Rscript.exe} (Windows) diff --git a/man/crew_launcher.Rd b/man/crew_launcher.Rd index 46923b2b..c87c9d39 100644 --- a/man/crew_launcher.Rd +++ b/man/crew_launcher.Rd @@ -18,7 +18,8 @@ crew_launcher( reset_packages = FALSE, reset_options = FALSE, garbage_collection = FALSE, - launch_max = 5L, + crashes_error = 5L, + launch_max = NULL, tls = crew::crew_tls(), processes = NULL, r_arguments = c("--no-save", "--no-restore"), @@ -85,15 +86,17 @@ because packages sometimes rely on options they set at loading time.} \item{garbage_collection}{\code{TRUE} to run garbage collection between tasks, \code{FALSE} to skip.} -\item{launch_max}{Positive integer of length 1, maximum allowed -consecutive launch attempts which do not complete any tasks. -Enforced on a worker-by-worker basis. -The futile launch count resets to back 0 -for each worker that completes a task. -It is recommended to set \code{launch_max} above 0 -because sometimes workers are unproductive under perfectly ordinary -circumstances. But \code{launch_max} should still be small enough -to detect errors in the underlying platform.} +\item{crashes_error}{Positive integer scalar. If a worker exits +\code{crashes_error} times in a row without completing all its assigned +tasks, then the launcher throws an informative error next time +the worker tries to relaunch. The reason for \code{crashes_error} +is to avoid an infinite loop where a task crashes a worker +(through a segfault, maxing out memory, etc) but the worker always +relaunches. To monitor the resources of \code{crew} workers, +please see \url{https://wlandau.github.io/crew/articles/logging.html}.} + +\item{launch_max}{Deprecated on 2024-11-04 (version 0.10.1.9000). +Use \code{crashes_error} instead.} \item{tls}{A TLS configuration object from \code{\link[=crew_tls]{crew_tls()}}.} diff --git a/man/crew_launcher_local.Rd b/man/crew_launcher_local.Rd index 7967c289..8aec2df8 100644 --- a/man/crew_launcher_local.Rd +++ b/man/crew_launcher_local.Rd @@ -18,7 +18,8 @@ crew_launcher_local( reset_packages = FALSE, reset_options = FALSE, garbage_collection = FALSE, - launch_max = 5L, + crashes_error = 5L, + launch_max = NULL, tls = crew::crew_tls(), r_arguments = c("--no-save", "--no-restore"), options_metrics = crew::crew_options_metrics(), @@ -87,15 +88,17 @@ because packages sometimes rely on options they set at loading time.} \item{garbage_collection}{\code{TRUE} to run garbage collection between tasks, \code{FALSE} to skip.} -\item{launch_max}{Positive integer of length 1, maximum allowed -consecutive launch attempts which do not complete any tasks. -Enforced on a worker-by-worker basis. -The futile launch count resets to back 0 -for each worker that completes a task. -It is recommended to set \code{launch_max} above 0 -because sometimes workers are unproductive under perfectly ordinary -circumstances. But \code{launch_max} should still be small enough -to detect errors in the underlying platform.} +\item{crashes_error}{Positive integer scalar. If a worker exits +\code{crashes_error} times in a row without completing all its assigned +tasks, then the launcher throws an informative error next time +the worker tries to relaunch. The reason for \code{crashes_error} +is to avoid an infinite loop where a task crashes a worker +(through a segfault, maxing out memory, etc) but the worker always +relaunches. To monitor the resources of \code{crew} workers, +please see \url{https://wlandau.github.io/crew/articles/logging.html}.} + +\item{launch_max}{Deprecated on 2024-11-04 (version 0.10.1.9000). +Use \code{crashes_error} instead.} \item{tls}{A TLS configuration object from \code{\link[=crew_tls]{crew_tls()}}.} diff --git a/tests/local/test-backlog_seconds_idle.R b/tests/local/test-backlog_seconds_idle.R index 4fe627f4..96d19141 100644 --- a/tests/local/test-backlog_seconds_idle.R +++ b/tests/local/test-backlog_seconds_idle.R @@ -12,8 +12,7 @@ test_that("backlog of tasks completes with finite seconds_idle", { on.exit(options(old_options)) controller <- crew_controller_local( workers = 20L, - seconds_idle = 1, - launch_max = 25 + seconds_idle = 1 ) on.exit(controller$terminate(), add = TRUE) controller$start() diff --git a/tests/local/test-launcher-system2.R b/tests/local/test-launcher-system2.R index 6a1aa5dc..99b34087 100644 --- a/tests/local/test-launcher-system2.R +++ b/tests/local/test-launcher-system2.R @@ -33,7 +33,7 @@ crew_test("custom launcher plugin based on system2()", { reset_packages = FALSE, reset_options = FALSE, garbage_collection = FALSE, - launch_max = 5L + crashes_error = 5L ) { client <- crew::crew_client( name = name, @@ -58,7 +58,7 @@ crew_test("custom launcher plugin based on system2()", { reset_packages = reset_packages, reset_options = reset_options, garbage_collection = garbage_collection, - launch_max = launch_max, + crashes_error = crashes_error, tls = tls ) controller <- crew::crew_controller(client = client, launcher = launcher) diff --git a/tests/testthat/test-plugins.R b/tests/testthat/test-plugins.R index a8f6e4dc..eb0ef9a8 100644 --- a/tests/testthat/test-plugins.R +++ b/tests/testthat/test-plugins.R @@ -40,7 +40,7 @@ crew_test("custom launcher", { reset_packages = FALSE, reset_options = FALSE, garbage_collection = FALSE, - launch_max = 5L + crashes_error = 5L ) { client <- crew::crew_client( name = name, @@ -64,7 +64,7 @@ crew_test("custom launcher", { reset_packages = reset_packages, reset_options = reset_options, garbage_collection = garbage_collection, - launch_max = launch_max, + crashes_error = crashes_error, tls = tls ) controller <- crew::crew_controller( @@ -146,7 +146,7 @@ crew_test("custom launcher with local async errors", { reset_packages = FALSE, reset_options = FALSE, garbage_collection = FALSE, - launch_max = 5L, + crashes_error = 5L, processes = NULL ) { client <- crew::crew_client( @@ -171,7 +171,7 @@ crew_test("custom launcher with local async errors", { reset_packages = reset_packages, reset_options = reset_options, garbage_collection = garbage_collection, - launch_max = launch_max, + crashes_error = crashes_error, tls = tls, processes = processes ) @@ -282,7 +282,7 @@ crew_test("custom launcher with async internal launcher tasks", { reset_packages = FALSE, reset_options = FALSE, garbage_collection = FALSE, - launch_max = 5L, + crashes_error = 5L, processes = NULL ) { client <- crew::crew_client( @@ -307,7 +307,7 @@ crew_test("custom launcher with async internal launcher tasks", { reset_packages = reset_packages, reset_options = reset_options, garbage_collection = garbage_collection, - launch_max = launch_max, + crashes_error = crashes_error, tls = tls, processes = processes ) diff --git a/vignettes/plugins.Rmd b/vignettes/plugins.Rmd index 8b1dd118..59403307 100644 --- a/vignettes/plugins.Rmd +++ b/vignettes/plugins.Rmd @@ -135,7 +135,7 @@ crew_controller_custom <- function( reset_packages = FALSE, reset_options = FALSE, garbage_collection = FALSE, - launch_max = 5L, + crashes_error = 5L, r_arguments = NULL, options_metrics = crew::crew_options_metrics() ) { @@ -162,7 +162,7 @@ crew_controller_custom <- function( reset_packages = reset_packages, reset_options = reset_options, garbage_collection = garbage_collection, - launch_max = launch_max, + crashes_error = crashes_error, tls = tls, r_arguments = r_arguments, options_metrics = options_metrics @@ -343,7 +343,7 @@ crew_controller_async <- function( reset_packages = FALSE, reset_options = FALSE, garbage_collection = FALSE, - launch_max = 5L, + crashes_error = 5L, processes = NULL # Number of local async daemons for worker launches etc. ) { client <- crew::crew_client( @@ -367,7 +367,7 @@ crew_controller_async <- function( reset_packages = reset_packages, reset_options = reset_options, garbage_collection = garbage_collection, - launch_max = launch_max, + crashes_error = crashes_error, tls = tls, processes = processes ) From bdf82164a37cc1919c826da869970a2c6ee9acc3 Mon Sep 17 00:00:00 2001 From: wlandau Date: Mon, 4 Nov 2024 09:55:05 -0500 Subject: [PATCH 2/5] fix --- R/crew_launcher.R | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/R/crew_launcher.R b/R/crew_launcher.R index ab0e2ffe..73a5163d 100644 --- a/R/crew_launcher.R +++ b/R/crew_launcher.R @@ -774,10 +774,9 @@ crew_class_launcher <- R6::R6Class( private$.workers$socket[index] <- socket private$.workers$start[index] <- nanonext::mclock() / 1000 private$.workers$launches[index] <- private$.workers$launches[index] + 1L - private$.workers$futile[index] <- futile + private$.workers$crashes[index] <- crashes private$.workers$launched[index] <- TRUE private$.workers$terminated[index] <- FALSE - private$.workers$history[index] <- complete invisible() }, #' @description Forward an asynchronous launch/termination error condition From 6223c285745e41ca3fa93653df7896fa76dd94b8 Mon Sep 17 00:00:00 2001 From: wlandau Date: Mon, 4 Nov 2024 10:01:05 -0500 Subject: [PATCH 3/5] update test --- tests/testthat/test-crew_launcher.R | 31 +++++++++++++++-------------- 1 file changed, 16 insertions(+), 15 deletions(-) diff --git a/tests/testthat/test-crew_launcher.R b/tests/testthat/test-crew_launcher.R index 4bddca55..1541b3b7 100644 --- a/tests/testthat/test-crew_launcher.R +++ b/tests/testthat/test-crew_launcher.R @@ -183,21 +183,22 @@ crew_test("launcher start()", { workers <- launcher$workers expect_equal(nrow(workers), 2L) expect_equal( - colnames(workers), - cols <- c( - "handle", - "termination", - "socket", - "start", - "launches", - "futile", - "launched", - "terminated", - "history", - "online", - "discovered", - "assigned", - "complete" + sort(colnames(workers)), + sort( + c( + "handle", + "termination", + "socket", + "start", + "launches", + "crashes", + "launched", + "terminated", + "online", + "discovered", + "assigned", + "complete" + ) ) ) expect_equal(workers$handle, list(crew_null, crew_null)) From 64c05d0e05d818363a8fedc408b722b90b2660f1 Mon Sep 17 00:00:00 2001 From: wlandau Date: Mon, 4 Nov 2024 10:30:34 -0500 Subject: [PATCH 4/5] refactor crash detection --- R/crew_launcher.R | 62 ++++++++++++++++++++---------------- man/crew_class_launcher.Rd | 4 ++- man/crew_controller_local.Rd | 4 +-- man/crew_launcher.Rd | 4 +-- man/crew_launcher_local.Rd | 4 +-- 5 files changed, 43 insertions(+), 35 deletions(-) diff --git a/R/crew_launcher.R b/R/crew_launcher.R index 73a5163d..88936026 100644 --- a/R/crew_launcher.R +++ b/R/crew_launcher.R @@ -55,8 +55,8 @@ #' tasks, `FALSE` to skip. #' @param crashes_error Positive integer scalar. If a worker exits #' `crashes_error` times in a row without completing all its assigned -#' tasks, then the launcher throws an informative error next time -#' the worker tries to relaunch. The reason for `crashes_error` +#' tasks, then the launcher throws an informative error. +#' The reason for `crashes_error` #' is to avoid an infinite loop where a task crashes a worker #' (through a segfault, maxing out memory, etc) but the worker always #' relaunches. To monitor the resources of `crew` workers, @@ -191,7 +191,34 @@ crew_class_launcher <- R6::R6Class( .r_arguments = NULL, .options_metrics = NULL, .async = NULL, - .throttle = NULL + .throttle = NULL, + .check_crashes = function(index) { + workers <- .subset2(private, ".workers") + assigned <- .subset2(workers, "assigned")[index] + complete <- .subset2(workers, "complete")[index] + crashes <- .subset2(workers, "crashes")[index] + crashes <- ifelse(assigned > complete, crashes + 1L, 0L) + private$.workers$crashes[index] <- crashes + crew_assert( + crashes < private$.crashes_error, + message = paste0( + "{crew} launcher ", + private$.name, + " worker index ", + index, + " inappropriately terminated ", + private$.crashes_error, + " times in a row (it exited ", + "without completing all its assigned tasks). ", + "Either troubleshoot or raise crashes_error above ", + private$.crashes_error, + ". Details: ", + "https://wlandau.github.io/crew/articles/risks.html#crashes. ", + "See https://wlandau.github.io/crew/articles/logging.html ", + "to learn how to log memory and CPU usage for a worker." + ) + ) + } ), active = list( #' @field workers Data frame of worker information. @@ -703,7 +730,9 @@ crew_class_launcher <- R6::R6Class( done = function() { !self$active() & private$.workers$launched }, - #' @details Rotate websockets at all unlaunched workers. + #' @details Rotate websockets at all unlaunched workers and + #' throw an error if a worker launched at least `crashes_error` + #' times in a row without completing all its assigned tasks. #' @return `NULL` (invisibly). rotate = function() { which_done <- which(self$done()) @@ -718,6 +747,7 @@ crew_class_launcher <- R6::R6Class( private$.workers$socket[index] <- socket private$.workers$launched[index] <- FALSE } + private$.check_crashes(index) } }, #' @description Launch a worker. @@ -739,29 +769,6 @@ crew_class_launcher <- R6::R6Class( worker = index, instance = instance ) - assigned <- private$.workers$assigned[index] - complete <- private$.workers$complete[index] - crashes <- private$.workers$crashes[index] - crashes <- if_any(assigned > complete, crashes + 1L, 0L) - crew_assert( - crashes < private$.crashes_error, - message = paste0( - "{crew} worker with name ", - name, - " and index ", - index, - " inappropriately terminated ", - private$.crashes_error, - " times in a row (it exited ", - "without completing all its assigned tasks). ", - "Either troubleshoot or raise crashes_error above ", - private$.crashes_error, - ". Details: ", - "https://wlandau.github.io/crew/articles/risks.html#crashes. ", - "See https://wlandau.github.io/crew/articles/logging.html ", - "to learn how to log memory and CPU usage for a worker." - ) - ) mirai::call_mirai_(private$.workers$handle[[index]]) handle <- self$launch_worker( call = as.character(call), @@ -774,7 +781,6 @@ crew_class_launcher <- R6::R6Class( private$.workers$socket[index] <- socket private$.workers$start[index] <- nanonext::mclock() / 1000 private$.workers$launches[index] <- private$.workers$launches[index] + 1L - private$.workers$crashes[index] <- crashes private$.workers$launched[index] <- TRUE private$.workers$terminated[index] <- FALSE invisible() diff --git a/man/crew_class_launcher.Rd b/man/crew_class_launcher.Rd index 0e8fb35c..49b8eda6 100644 --- a/man/crew_class_launcher.Rd +++ b/man/crew_class_launcher.Rd @@ -511,7 +511,9 @@ Integer index of inactive workers. } \subsection{Details}{ -Rotate websockets at all unlaunched workers. +Rotate websockets at all unlaunched workers and +throw an error if a worker launched at least \code{crashes_error} +times in a row without completing all its assigned tasks. } \subsection{Returns}{ diff --git a/man/crew_controller_local.Rd b/man/crew_controller_local.Rd index cf2f6e23..f652aced 100644 --- a/man/crew_controller_local.Rd +++ b/man/crew_controller_local.Rd @@ -120,8 +120,8 @@ tasks, \code{FALSE} to skip.} \item{crashes_error}{Positive integer scalar. If a worker exits \code{crashes_error} times in a row without completing all its assigned -tasks, then the launcher throws an informative error next time -the worker tries to relaunch. The reason for \code{crashes_error} +tasks, then the launcher throws an informative error. +The reason for \code{crashes_error} is to avoid an infinite loop where a task crashes a worker (through a segfault, maxing out memory, etc) but the worker always relaunches. To monitor the resources of \code{crew} workers, diff --git a/man/crew_launcher.Rd b/man/crew_launcher.Rd index c87c9d39..63c69fc8 100644 --- a/man/crew_launcher.Rd +++ b/man/crew_launcher.Rd @@ -88,8 +88,8 @@ tasks, \code{FALSE} to skip.} \item{crashes_error}{Positive integer scalar. If a worker exits \code{crashes_error} times in a row without completing all its assigned -tasks, then the launcher throws an informative error next time -the worker tries to relaunch. The reason for \code{crashes_error} +tasks, then the launcher throws an informative error. +The reason for \code{crashes_error} is to avoid an infinite loop where a task crashes a worker (through a segfault, maxing out memory, etc) but the worker always relaunches. To monitor the resources of \code{crew} workers, diff --git a/man/crew_launcher_local.Rd b/man/crew_launcher_local.Rd index 8aec2df8..549626fa 100644 --- a/man/crew_launcher_local.Rd +++ b/man/crew_launcher_local.Rd @@ -90,8 +90,8 @@ tasks, \code{FALSE} to skip.} \item{crashes_error}{Positive integer scalar. If a worker exits \code{crashes_error} times in a row without completing all its assigned -tasks, then the launcher throws an informative error next time -the worker tries to relaunch. The reason for \code{crashes_error} +tasks, then the launcher throws an informative error. +The reason for \code{crashes_error} is to avoid an infinite loop where a task crashes a worker (through a segfault, maxing out memory, etc) but the worker always relaunches. To monitor the resources of \code{crew} workers, From a44f0bcf05808e9be8fb9ff4bdc3bb4994692411 Mon Sep 17 00:00:00 2001 From: wlandau Date: Mon, 4 Nov 2024 11:08:13 -0500 Subject: [PATCH 5/5] Fix #189 --- NEWS.md | 4 ++- R/crew_launcher.R | 5 +-- tests/local/test-crashes_error.R | 60 ++++++++++++++++++++++++++++++++ tests/local/test-launch_max.R | 44 ----------------------- 4 files changed, 66 insertions(+), 47 deletions(-) create mode 100644 tests/local/test-crashes_error.R delete mode 100644 tests/local/test-launch_max.R diff --git a/NEWS.md b/NEWS.md index 535e011d..065c193d 100644 --- a/NEWS.md +++ b/NEWS.md @@ -1,6 +1,8 @@ # crew 0.10.2 -* Eliminate spurious `launch_max` error from underutilized workers and deprecate `launch_max` in favor of `crashes_error` (#189). +* Eliminate spurious `launch_max` error from underutilized workers (#189). +* Deprecate `launch_max` in favor of `crashes_error` (#189). +* Look for crashes of all workers in `rotate()` instead of looking for crashes of a specific worker in `launch()` (#189). # crew 0.10.1 diff --git a/R/crew_launcher.R b/R/crew_launcher.R index 88936026..c8e0844f 100644 --- a/R/crew_launcher.R +++ b/R/crew_launcher.R @@ -352,6 +352,7 @@ crew_class_launcher <- R6::R6Class( reset_options = NULL, garbage_collection = NULL, crashes_error = NULL, + launch_max = NULL, # TODO: remove after deprecation period tls = NULL, processes = NULL, r_arguments = NULL, @@ -407,6 +408,7 @@ crew_class_launcher <- R6::R6Class( !anyNA(.), nzchar(.) ) + # TODO: add "crashes_error" when revdeps use this argument. fields <- c( "seconds_interval", "seconds_timeout", @@ -414,8 +416,7 @@ crew_class_launcher <- R6::R6Class( "seconds_idle", "seconds_wall", "tasks_max", - "tasks_timers", - "crashes_error" + "tasks_timers" ) for (field in fields) { crew_assert( diff --git a/tests/local/test-crashes_error.R b/tests/local/test-crashes_error.R new file mode 100644 index 00000000..27403751 --- /dev/null +++ b/tests/local/test-crashes_error.R @@ -0,0 +1,60 @@ +test_that("crashes_error allows idle workers to exit", { + x <- crew_controller_local(crashes_error = 2L, seconds_idle = 0.25) + on.exit(x$terminate()) + x$start() + for (index in seq_len(10L)) { + x$launch() + Sys.sleep(5) + } + expect_equal(index, 10L) +}) + +test_that("crashes_error detects when there are too many crashes", { + x <- crew_controller_local(name = "name", crashes_error = 2L) + on.exit(x$terminate()) + x$start() + x$push(Sys.sleep(1e3), scale = FALSE) + for (index in seq_len(10L)) { + Sys.sleep(5) + if (index == 3L) { + expect_error(x$scale(), class = "crew_error") + break + } else { + expect_silent(x$scale()) + } + Sys.sleep(5) + x$launcher$workers$handle[[1L]]$kill() + } + expect_equal(index, 3L) +}) + +# nolint start +# TODO: re-enable the test below if/when mirai::stop_mirai() decrements the +# assigned counter: https://github.com/shikokuchuo/mirai/issues/163 +# test_that("crash counter resets when a worker succeeds", { +# x <- crew_controller_local(name = "name", crashes_error = 3L) +# x$start() +# x$push(Sys.sleep(1e3), scale = FALSE) +# for (index in seq_len(2L)) { +# Sys.sleep(5) +# x$scale() +# Sys.sleep(5) +# x$launcher$workers$handle[[1L]]$kill() +# } +# x$cancel(all = TRUE) +# expect_equal(x$launcher$workers$crashes, 1L) +# x$push(TRUE) +# x$wait() +# expect_gt(x$launcher$workers$crashes, 0L) +# x$push(Sys.sleep(1e3), scale = FALSE) +# for (index in seq_len(2L)) { +# Sys.sleep(5) +# x$scale() +# Sys.sleep(5) +# x$launcher$workers$handle[[1L]]$kill() +# } +# expect_gt(x$launcher$workers$crashes, 1L) +# testthat::expect_equal(index, 3L) +# x$terminate() +# }) +# nolint end diff --git a/tests/local/test-launch_max.R b/tests/local/test-launch_max.R deleted file mode 100644 index 201956c4..00000000 --- a/tests/local/test-launch_max.R +++ /dev/null @@ -1,44 +0,0 @@ -crew_test("prevent infinite loop of failed worker launches", { - x <- crew_controller_local( - workers = 1L, - seconds_idle = 1e-9, - launch_max = 3L - ) - x$start() - expect_equal(x$launcher$workers$futile, 0L) - x$launch(n = 1L) - # Pause until worker idles out. - Sys.sleep(5) - expect_equal(x$launcher$workers$futile, 0L) - x$launch(n = 1L) - # Pause until worker idles out. - Sys.sleep(5) - expect_equal(x$launcher$workers$futile, 1L) - private <- crew_private(x$launcher) - private$.seconds_idle <- Inf - x$push(TRUE) - x$wait() - expect_equal(x$launcher$workers$futile, 2L) - x$launcher$terminate_workers(index = 1L) - # Pause until worker exits. - Sys.sleep(5) - private$.seconds_idle <- 1e-9 - x$launch(n = 1L) - # Pause until worker exits. - Sys.sleep(5) - expect_equal(x$launcher$workers$futile, 0L) - x$launch(n = 1L) - # Pause until worker exits. - Sys.sleep(5) - expect_equal(x$launcher$workers$futile, 1L) - x$launch(n = 1L) - # Pause until worker exits. - Sys.sleep(5) - expect_equal(x$launcher$workers$futile, 2L) - x$launch(n = 1L) - # Pause until worker exits. - Sys.sleep(5) - expect_equal(x$launcher$workers$futile, 3L) - expect_error(x$launch(n = 1L), class = "crew_error") - x$terminate() -})