Skip to content

Commit

Permalink
sketch solution to #189
Browse files Browse the repository at this point in the history
  • Loading branch information
wlandau committed Nov 4, 2024
1 parent 74ee238 commit 7352501
Show file tree
Hide file tree
Showing 14 changed files with 132 additions and 91 deletions.
2 changes: 1 addition & 1 deletion DESCRIPTION
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ Description: In computationally demanding analysis projects,
'clustermq' by Schubert (2019) <doi:10.1093/bioinformatics/btz284>),
and 'batchtools' by Lang, Bischel, and Surmann (2017)
<doi:10.21105/joss.00135>.
Version: 0.10.1
Version: 0.10.2
License: MIT + file LICENSE
URL: https://wlandau.github.io/crew/, https://github.com/wlandau/crew
BugReports: https://github.com/wlandau/crew/issues
Expand Down
6 changes: 5 additions & 1 deletion NEWS.md
Original file line number Diff line number Diff line change
@@ -1,4 +1,8 @@
# crew 0.10.1 (development)
# crew 0.10.2

* Eliminate spurious `launch_max` error from underutilized workers and deprecate `launch_max` in favor of `crashes_error` (#189).

# crew 0.10.1

* Instrument `crew_eval()` with `autometric::log_phase_set()` and `autometric::log_phase_reset()`.

Expand Down
13 changes: 11 additions & 2 deletions R/crew_controller_local.R
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,8 @@ crew_controller_local <- function(
reset_packages = FALSE,
reset_options = FALSE,
garbage_collection = FALSE,
launch_max = 5L,
crashes_error = 5L,
launch_max = NULL,
r_arguments = c("--no-save", "--no-restore"),
options_metrics = crew::crew_options_metrics(),
options_local = crew::crew_options_local(),
Expand Down Expand Up @@ -67,6 +68,14 @@ crew_controller_local <- function(
condition = "warning",
value = local_log_join
)
crew_deprecate(
name = "launch_max",
date = "2024-11-04",
version = "0.10.1.9000",
alternative = "crashes_error",
condition = "warning",
value = launch_max
)
options_local$log_directory <- local_log_directory %|||%
options_local$log_directory
options_local$log_join <- local_log_join %|||%
Expand Down Expand Up @@ -96,7 +105,7 @@ crew_controller_local <- function(
reset_packages = reset_packages,
reset_options = reset_options,
garbage_collection = garbage_collection,
launch_max = launch_max,
crashes_error = crashes_error,
tls = tls,
r_arguments = r_arguments,
options_metrics = options_metrics,
Expand Down
77 changes: 44 additions & 33 deletions R/crew_launcher.R
Original file line number Diff line number Diff line change
Expand Up @@ -53,15 +53,16 @@
#' because packages sometimes rely on options they set at loading time.
#' @param garbage_collection `TRUE` to run garbage collection between
#' tasks, `FALSE` to skip.
#' @param launch_max Positive integer of length 1, maximum allowed
#' consecutive launch attempts which do not complete any tasks.
#' Enforced on a worker-by-worker basis.
#' The futile launch count resets to back 0
#' for each worker that completes a task.
#' It is recommended to set `launch_max` above 0
#' because sometimes workers are unproductive under perfectly ordinary
#' circumstances. But `launch_max` should still be small enough
#' to detect errors in the underlying platform.
#' @param crashes_error Positive integer scalar. If a worker exits
#' `crashes_error` times in a row without completing all its assigned
#' tasks, then the launcher throws an informative error next time
#' the worker tries to relaunch. The reason for `crashes_error`
#' is to avoid an infinite loop where a task crashes a worker
#' (through a segfault, maxing out memory, etc) but the worker always
#' relaunches. To monitor the resources of `crew` workers,
#' please see <https://wlandau.github.io/crew/articles/logging.html>.
#' @param launch_max Deprecated on 2024-11-04 (version 0.10.1.9000).
#' Use `crashes_error` instead.
#' @param processes `NULL` or positive integer of length 1,
#' number of local processes to
#' launch to allow worker launches to happen asynchronously. If `NULL`,
Expand Down Expand Up @@ -100,7 +101,8 @@ crew_launcher <- function(
reset_packages = FALSE,
reset_options = FALSE,
garbage_collection = FALSE,
launch_max = 5L,
crashes_error = 5L,
launch_max = NULL,
tls = crew::crew_tls(),
processes = NULL,
r_arguments = c("--no-save", "--no-restore"),
Expand All @@ -119,6 +121,14 @@ crew_launcher <- function(
inherits(tls, "crew_class_tls"),
message = "argument tls must be an object created by crew_tls()"
)
crew_deprecate(
name = "launch_max",
date = "2024-11-04",
version = "0.10.1.9000",
alternative = "crashes_error",
condition = "warning",
value = launch_max
)
crew_class_launcher$new(
name = name,
seconds_interval = seconds_interval,
Expand All @@ -132,7 +142,7 @@ crew_launcher <- function(
reset_packages = reset_packages,
reset_options = reset_options,
garbage_collection = garbage_collection,
launch_max = launch_max,
crashes_error = crashes_error,
tls = tls,
processes = processes,
r_arguments = r_arguments,
Expand Down Expand Up @@ -175,7 +185,7 @@ crew_class_launcher <- R6::R6Class(
.reset_packages = NULL,
.reset_options = NULL,
.garbage_collection = NULL,
.launch_max = NULL,
.crashes_error = NULL,
.tls = NULL,
.processes = NULL,
.r_arguments = NULL,
Expand Down Expand Up @@ -236,9 +246,9 @@ crew_class_launcher <- R6::R6Class(
garbage_collection = function() {
.subset2(private, ".garbage_collection")
},
#' @field launch_max See [crew_launcher()].
launch_max = function() {
.subset2(private, ".launch_max")
#' @field crashes_error See [crew_launcher()].
crashes_error = function() {
.subset2(private, ".crashes_error")
},
#' @field tls See [crew_launcher()].
tls = function() {
Expand Down Expand Up @@ -283,7 +293,7 @@ crew_class_launcher <- R6::R6Class(
#' @param reset_packages See [crew_launcher()].
#' @param reset_options See [crew_launcher()].
#' @param garbage_collection See [crew_launcher()].
#' @param launch_max See [crew_launcher()].
#' @param crashes_error See [crew_launcher()].
#' @param tls See [crew_launcher()].
#' @param processes See [crew_launcher()].
#' @param r_arguments See [crew_launcher()].
Expand Down Expand Up @@ -314,7 +324,7 @@ crew_class_launcher <- R6::R6Class(
reset_packages = NULL,
reset_options = NULL,
garbage_collection = NULL,
launch_max = NULL,
crashes_error = NULL,
tls = NULL,
processes = NULL,
r_arguments = NULL,
Expand All @@ -332,7 +342,7 @@ crew_class_launcher <- R6::R6Class(
private$.reset_packages <- reset_packages
private$.reset_options <- reset_options
private$.garbage_collection <- garbage_collection
private$.launch_max <- launch_max
private$.crashes_error <- crashes_error
private$.tls <- tls
private$.processes <- processes
private$.r_arguments <- r_arguments
Expand Down Expand Up @@ -378,7 +388,7 @@ crew_class_launcher <- R6::R6Class(
"seconds_wall",
"tasks_max",
"tasks_timers",
"launch_max"
"crashes_error"
)
for (field in fields) {
crew_assert(
Expand Down Expand Up @@ -418,10 +428,9 @@ crew_class_launcher <- R6::R6Class(
"socket",
"start",
"launches",
"futile",
"launched",
"terminated",
"history",
"crashes",
"online",
"discovered",
"assigned",
Expand Down Expand Up @@ -556,10 +565,9 @@ crew_class_launcher <- R6::R6Class(
socket = sockets,
start = rep(NA_real_, n),
launches = rep(0L, n),
futile = rep(0L, n),
launched = rep(FALSE, n),
terminated = rep(TRUE, n),
history = rep(-1L, n),
crashes = rep(0L, n),
online = rep(FALSE, n),
discovered = rep(FALSE, n),
assigned = rep(0L, n),
Expand Down Expand Up @@ -731,24 +739,27 @@ crew_class_launcher <- R6::R6Class(
worker = index,
instance = instance
)
assigned <- private$.workers$assigned[index]
complete <- private$.workers$complete[index]
history <- private$.workers$history[index]
futile <- private$.workers$futile[index]
futile <- if_any(complete > history, 0L, futile + 1L)
crashes <- private$.workers$crashes[index]
crashes <- if_any(assigned > complete, crashes + 1L, 0L)
crew_assert(
futile <= private$.launch_max,
crashes < private$.crashes_error,
message = paste0(
"{crew} worker with name ",
name,
" and index ",
index,
" launched ",
private$.launch_max,
" times in a row without completing any tasks. ",
"Either troubleshoot or raise launch_max above ",
private$.launch_max,
" inappropriately terminated ",
private$.crashes_error,
" times in a row (it exited ",
"without completing all its assigned tasks). ",
"Either troubleshoot or raise crashes_error above ",
private$.crashes_error,
". Details: ",
"https://wlandau.github.io/crew/articles/risks.html#crashes"
"https://wlandau.github.io/crew/articles/risks.html#crashes. ",
"See https://wlandau.github.io/crew/articles/logging.html ",
"to learn how to log memory and CPU usage for a worker."
)
)
mirai::call_mirai_(private$.workers$handle[[index]])
Expand Down
19 changes: 14 additions & 5 deletions R/crew_launcher_local.R
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,8 @@ crew_launcher_local <- function(
reset_packages = FALSE,
reset_options = FALSE,
garbage_collection = FALSE,
launch_max = 5L,
crashes_error = 5L,
launch_max = NULL,
tls = crew::crew_tls(),
r_arguments = c("--no-save", "--no-restore"),
options_metrics = crew::crew_options_metrics(),
Expand Down Expand Up @@ -68,6 +69,14 @@ crew_launcher_local <- function(
condition = "warning",
value = local_log_join
)
crew_deprecate(
name = "launch_max",
date = "2024-11-04",
version = "0.10.1.9000",
alternative = "crashes_error",
condition = "warning",
value = launch_max
)
options_local$log_directory <- local_log_directory %|||%
options_local$log_directory
options_local$log_join <- local_log_join %|||%
Expand All @@ -86,7 +95,7 @@ crew_launcher_local <- function(
reset_packages = reset_packages,
reset_options = reset_options,
garbage_collection = garbage_collection,
launch_max = launch_max,
crashes_error = crashes_error,
tls = tls,
r_arguments = r_arguments,
options_metrics = options_metrics,
Expand Down Expand Up @@ -168,7 +177,7 @@ crew_class_launcher_local <- R6::R6Class(
#' @param reset_packages See [crew_launcher()].
#' @param reset_options See [crew_launcher()].
#' @param garbage_collection See [crew_launcher()].
#' @param launch_max See [crew_launcher()].
#' @param crashes_error See [crew_launcher()].
#' @param tls See [crew_launcher()].
#' @param processes See [crew_launcher()].
#' @param r_arguments See [crew_launcher()].
Expand Down Expand Up @@ -200,7 +209,7 @@ crew_class_launcher_local <- R6::R6Class(
reset_packages = NULL,
reset_options = NULL,
garbage_collection = NULL,
launch_max = NULL,
crashes_error = NULL,
tls = NULL,
processes = NULL,
r_arguments = NULL,
Expand All @@ -221,7 +230,7 @@ crew_class_launcher_local <- R6::R6Class(
reset_packages = reset_packages,
reset_options = reset_options,
garbage_collection = garbage_collection,
launch_max = launch_max,
crashes_error = crashes_error,
tls = tls,
processes = processes,
r_arguments = r_arguments,
Expand Down
6 changes: 3 additions & 3 deletions man/crew_class_launcher.Rd

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 2 additions & 2 deletions man/crew_class_launcher_local.Rd

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

23 changes: 13 additions & 10 deletions man/crew_controller_local.Rd

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading

0 comments on commit 7352501

Please sign in to comment.