Skip to content

Commit

Permalink
Merge pull request #190 from wlandau/189
Browse files Browse the repository at this point in the history
Eliminate spurious warnings in crash detection
  • Loading branch information
wlandau authored Nov 4, 2024
2 parents 74ee238 + a44f0bc commit 7ed05fd
Show file tree
Hide file tree
Showing 17 changed files with 234 additions and 166 deletions.
2 changes: 1 addition & 1 deletion DESCRIPTION
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ Description: In computationally demanding analysis projects,
'clustermq' by Schubert (2019) <doi:10.1093/bioinformatics/btz284>),
and 'batchtools' by Lang, Bischel, and Surmann (2017)
<doi:10.21105/joss.00135>.
Version: 0.10.1
Version: 0.10.2
License: MIT + file LICENSE
URL: https://wlandau.github.io/crew/, https://github.com/wlandau/crew
BugReports: https://github.com/wlandau/crew/issues
Expand Down
8 changes: 7 additions & 1 deletion NEWS.md
Original file line number Diff line number Diff line change
@@ -1,4 +1,10 @@
# crew 0.10.1 (development)
# crew 0.10.2

* Eliminate spurious `launch_max` error from underutilized workers (#189).
* Deprecate `launch_max` in favor of `crashes_error` (#189).
* Look for crashes of all workers in `rotate()` instead of looking for crashes of a specific worker in `launch()` (#189).

# crew 0.10.1

* Instrument `crew_eval()` with `autometric::log_phase_set()` and `autometric::log_phase_reset()`.

Expand Down
13 changes: 11 additions & 2 deletions R/crew_controller_local.R
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,8 @@ crew_controller_local <- function(
reset_packages = FALSE,
reset_options = FALSE,
garbage_collection = FALSE,
launch_max = 5L,
crashes_error = 5L,
launch_max = NULL,
r_arguments = c("--no-save", "--no-restore"),
options_metrics = crew::crew_options_metrics(),
options_local = crew::crew_options_local(),
Expand Down Expand Up @@ -67,6 +68,14 @@ crew_controller_local <- function(
condition = "warning",
value = local_log_join
)
crew_deprecate(
name = "launch_max",
date = "2024-11-04",
version = "0.10.1.9000",
alternative = "crashes_error",
condition = "warning",
value = launch_max
)
options_local$log_directory <- local_log_directory %|||%
options_local$log_directory
options_local$log_join <- local_log_join %|||%
Expand Down Expand Up @@ -96,7 +105,7 @@ crew_controller_local <- function(
reset_packages = reset_packages,
reset_options = reset_options,
garbage_collection = garbage_collection,
launch_max = launch_max,
crashes_error = crashes_error,
tls = tls,
r_arguments = r_arguments,
options_metrics = options_metrics,
Expand Down
113 changes: 65 additions & 48 deletions R/crew_launcher.R
Original file line number Diff line number Diff line change
Expand Up @@ -53,15 +53,16 @@
#' because packages sometimes rely on options they set at loading time.
#' @param garbage_collection `TRUE` to run garbage collection between
#' tasks, `FALSE` to skip.
#' @param launch_max Positive integer of length 1, maximum allowed
#' consecutive launch attempts which do not complete any tasks.
#' Enforced on a worker-by-worker basis.
#' The futile launch count resets to back 0
#' for each worker that completes a task.
#' It is recommended to set `launch_max` above 0
#' because sometimes workers are unproductive under perfectly ordinary
#' circumstances. But `launch_max` should still be small enough
#' to detect errors in the underlying platform.
#' @param crashes_error Positive integer scalar. If a worker exits
#' `crashes_error` times in a row without completing all its assigned
#' tasks, then the launcher throws an informative error.
#' The reason for `crashes_error`
#' is to avoid an infinite loop where a task crashes a worker
#' (through a segfault, maxing out memory, etc) but the worker always
#' relaunches. To monitor the resources of `crew` workers,
#' please see <https://wlandau.github.io/crew/articles/logging.html>.
#' @param launch_max Deprecated on 2024-11-04 (version 0.10.1.9000).
#' Use `crashes_error` instead.
#' @param processes `NULL` or positive integer of length 1,
#' number of local processes to
#' launch to allow worker launches to happen asynchronously. If `NULL`,
Expand Down Expand Up @@ -100,7 +101,8 @@ crew_launcher <- function(
reset_packages = FALSE,
reset_options = FALSE,
garbage_collection = FALSE,
launch_max = 5L,
crashes_error = 5L,
launch_max = NULL,
tls = crew::crew_tls(),
processes = NULL,
r_arguments = c("--no-save", "--no-restore"),
Expand All @@ -119,6 +121,14 @@ crew_launcher <- function(
inherits(tls, "crew_class_tls"),
message = "argument tls must be an object created by crew_tls()"
)
crew_deprecate(
name = "launch_max",
date = "2024-11-04",
version = "0.10.1.9000",
alternative = "crashes_error",
condition = "warning",
value = launch_max
)
crew_class_launcher$new(
name = name,
seconds_interval = seconds_interval,
Expand All @@ -132,7 +142,7 @@ crew_launcher <- function(
reset_packages = reset_packages,
reset_options = reset_options,
garbage_collection = garbage_collection,
launch_max = launch_max,
crashes_error = crashes_error,
tls = tls,
processes = processes,
r_arguments = r_arguments,
Expand Down Expand Up @@ -175,13 +185,40 @@ crew_class_launcher <- R6::R6Class(
.reset_packages = NULL,
.reset_options = NULL,
.garbage_collection = NULL,
.launch_max = NULL,
.crashes_error = NULL,
.tls = NULL,
.processes = NULL,
.r_arguments = NULL,
.options_metrics = NULL,
.async = NULL,
.throttle = NULL
.throttle = NULL,
.check_crashes = function(index) {
workers <- .subset2(private, ".workers")
assigned <- .subset2(workers, "assigned")[index]
complete <- .subset2(workers, "complete")[index]
crashes <- .subset2(workers, "crashes")[index]
crashes <- ifelse(assigned > complete, crashes + 1L, 0L)
private$.workers$crashes[index] <- crashes
crew_assert(
crashes < private$.crashes_error,
message = paste0(
"{crew} launcher ",
private$.name,
" worker index ",
index,
" inappropriately terminated ",
private$.crashes_error,
" times in a row (it exited ",
"without completing all its assigned tasks). ",
"Either troubleshoot or raise crashes_error above ",
private$.crashes_error,
". Details: ",
"https://wlandau.github.io/crew/articles/risks.html#crashes. ",
"See https://wlandau.github.io/crew/articles/logging.html ",
"to learn how to log memory and CPU usage for a worker."
)
)
}
),
active = list(
#' @field workers Data frame of worker information.
Expand Down Expand Up @@ -236,9 +273,9 @@ crew_class_launcher <- R6::R6Class(
garbage_collection = function() {
.subset2(private, ".garbage_collection")
},
#' @field launch_max See [crew_launcher()].
launch_max = function() {
.subset2(private, ".launch_max")
#' @field crashes_error See [crew_launcher()].
crashes_error = function() {
.subset2(private, ".crashes_error")
},
#' @field tls See [crew_launcher()].
tls = function() {
Expand Down Expand Up @@ -283,7 +320,7 @@ crew_class_launcher <- R6::R6Class(
#' @param reset_packages See [crew_launcher()].
#' @param reset_options See [crew_launcher()].
#' @param garbage_collection See [crew_launcher()].
#' @param launch_max See [crew_launcher()].
#' @param crashes_error See [crew_launcher()].
#' @param tls See [crew_launcher()].
#' @param processes See [crew_launcher()].
#' @param r_arguments See [crew_launcher()].
Expand Down Expand Up @@ -314,7 +351,8 @@ crew_class_launcher <- R6::R6Class(
reset_packages = NULL,
reset_options = NULL,
garbage_collection = NULL,
launch_max = NULL,
crashes_error = NULL,
launch_max = NULL, # TODO: remove after deprecation period
tls = NULL,
processes = NULL,
r_arguments = NULL,
Expand All @@ -332,7 +370,7 @@ crew_class_launcher <- R6::R6Class(
private$.reset_packages <- reset_packages
private$.reset_options <- reset_options
private$.garbage_collection <- garbage_collection
private$.launch_max <- launch_max
private$.crashes_error <- crashes_error
private$.tls <- tls
private$.processes <- processes
private$.r_arguments <- r_arguments
Expand Down Expand Up @@ -370,15 +408,15 @@ crew_class_launcher <- R6::R6Class(
!anyNA(.),
nzchar(.)
)
# TODO: add "crashes_error" when revdeps use this argument.
fields <- c(
"seconds_interval",
"seconds_timeout",
"seconds_launch",
"seconds_idle",
"seconds_wall",
"tasks_max",
"tasks_timers",
"launch_max"
"tasks_timers"
)
for (field in fields) {
crew_assert(
Expand Down Expand Up @@ -418,10 +456,9 @@ crew_class_launcher <- R6::R6Class(
"socket",
"start",
"launches",
"futile",
"launched",
"terminated",
"history",
"crashes",
"online",
"discovered",
"assigned",
Expand Down Expand Up @@ -556,10 +593,9 @@ crew_class_launcher <- R6::R6Class(
socket = sockets,
start = rep(NA_real_, n),
launches = rep(0L, n),
futile = rep(0L, n),
launched = rep(FALSE, n),
terminated = rep(TRUE, n),
history = rep(-1L, n),
crashes = rep(0L, n),
online = rep(FALSE, n),
discovered = rep(FALSE, n),
assigned = rep(0L, n),
Expand Down Expand Up @@ -695,7 +731,9 @@ crew_class_launcher <- R6::R6Class(
done = function() {
!self$active() & private$.workers$launched
},
#' @details Rotate websockets at all unlaunched workers.
#' @details Rotate websockets at all unlaunched workers and
#' throw an error if a worker launched at least `crashes_error`
#' times in a row without completing all its assigned tasks.
#' @return `NULL` (invisibly).
rotate = function() {
which_done <- which(self$done())
Expand All @@ -710,6 +748,7 @@ crew_class_launcher <- R6::R6Class(
private$.workers$socket[index] <- socket
private$.workers$launched[index] <- FALSE
}
private$.check_crashes(index)
}
},
#' @description Launch a worker.
Expand All @@ -731,26 +770,6 @@ crew_class_launcher <- R6::R6Class(
worker = index,
instance = instance
)
complete <- private$.workers$complete[index]
history <- private$.workers$history[index]
futile <- private$.workers$futile[index]
futile <- if_any(complete > history, 0L, futile + 1L)
crew_assert(
futile <= private$.launch_max,
message = paste0(
"{crew} worker with name ",
name,
" and index ",
index,
" launched ",
private$.launch_max,
" times in a row without completing any tasks. ",
"Either troubleshoot or raise launch_max above ",
private$.launch_max,
". Details: ",
"https://wlandau.github.io/crew/articles/risks.html#crashes"
)
)
mirai::call_mirai_(private$.workers$handle[[index]])
handle <- self$launch_worker(
call = as.character(call),
Expand All @@ -763,10 +782,8 @@ crew_class_launcher <- R6::R6Class(
private$.workers$socket[index] <- socket
private$.workers$start[index] <- nanonext::mclock() / 1000
private$.workers$launches[index] <- private$.workers$launches[index] + 1L
private$.workers$futile[index] <- futile
private$.workers$launched[index] <- TRUE
private$.workers$terminated[index] <- FALSE
private$.workers$history[index] <- complete
invisible()
},
#' @description Forward an asynchronous launch/termination error condition
Expand Down
19 changes: 14 additions & 5 deletions R/crew_launcher_local.R
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,8 @@ crew_launcher_local <- function(
reset_packages = FALSE,
reset_options = FALSE,
garbage_collection = FALSE,
launch_max = 5L,
crashes_error = 5L,
launch_max = NULL,
tls = crew::crew_tls(),
r_arguments = c("--no-save", "--no-restore"),
options_metrics = crew::crew_options_metrics(),
Expand Down Expand Up @@ -68,6 +69,14 @@ crew_launcher_local <- function(
condition = "warning",
value = local_log_join
)
crew_deprecate(
name = "launch_max",
date = "2024-11-04",
version = "0.10.1.9000",
alternative = "crashes_error",
condition = "warning",
value = launch_max
)
options_local$log_directory <- local_log_directory %|||%
options_local$log_directory
options_local$log_join <- local_log_join %|||%
Expand All @@ -86,7 +95,7 @@ crew_launcher_local <- function(
reset_packages = reset_packages,
reset_options = reset_options,
garbage_collection = garbage_collection,
launch_max = launch_max,
crashes_error = crashes_error,
tls = tls,
r_arguments = r_arguments,
options_metrics = options_metrics,
Expand Down Expand Up @@ -168,7 +177,7 @@ crew_class_launcher_local <- R6::R6Class(
#' @param reset_packages See [crew_launcher()].
#' @param reset_options See [crew_launcher()].
#' @param garbage_collection See [crew_launcher()].
#' @param launch_max See [crew_launcher()].
#' @param crashes_error See [crew_launcher()].
#' @param tls See [crew_launcher()].
#' @param processes See [crew_launcher()].
#' @param r_arguments See [crew_launcher()].
Expand Down Expand Up @@ -200,7 +209,7 @@ crew_class_launcher_local <- R6::R6Class(
reset_packages = NULL,
reset_options = NULL,
garbage_collection = NULL,
launch_max = NULL,
crashes_error = NULL,
tls = NULL,
processes = NULL,
r_arguments = NULL,
Expand All @@ -221,7 +230,7 @@ crew_class_launcher_local <- R6::R6Class(
reset_packages = reset_packages,
reset_options = reset_options,
garbage_collection = garbage_collection,
launch_max = launch_max,
crashes_error = crashes_error,
tls = tls,
processes = processes,
r_arguments = r_arguments,
Expand Down
10 changes: 6 additions & 4 deletions man/crew_class_launcher.Rd

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading

0 comments on commit 7ed05fd

Please sign in to comment.