Skip to content

Commit

Permalink
new crashes() launcher method
Browse files Browse the repository at this point in the history
  • Loading branch information
wlandau committed Nov 11, 2024
1 parent 06e7fc9 commit e9eb6b5
Show file tree
Hide file tree
Showing 6 changed files with 69 additions and 5 deletions.
1 change: 1 addition & 0 deletions NEWS.md
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
* Eliminate spurious `launch_max` error from underutilized workers (#189).
* Deprecate `launch_max` in favor of `crashes_error` (#189).
* Look for crashes of all workers in `rotate()` instead of looking for crashes of a specific worker in `launch()` (#189).
* Add a `crashes()` launcher method to allow plugins to detect and respond to crashes more easily.

# crew 0.10.1

Expand Down
15 changes: 14 additions & 1 deletion R/crew_launcher.R
Original file line number Diff line number Diff line change
Expand Up @@ -321,6 +321,7 @@ crew_class_launcher <- R6::R6Class(
#' @param reset_options See [crew_launcher()].
#' @param garbage_collection See [crew_launcher()].
#' @param crashes_error See [crew_launcher()].
#' @param launch_max Deprecated.
#' @param tls See [crew_launcher()].
#' @param processes See [crew_launcher()].
#' @param r_arguments See [crew_launcher()].
Expand Down Expand Up @@ -646,7 +647,8 @@ crew_class_launcher <- R6::R6Class(
#' `mirai::daemons()` and summed over all
#' completed instances of the worker. Does not reflect the activity
#' of the currently running instance of the worker.
#' * `socket`: current websocket URL of the worker.
#' * `crashes`: number of consecutive times a worker
#' launched without completing all its assigned tasks.
summary = function() {
workers <- .subset2(self, "workers")
if (is.null(workers)) {
Expand Down Expand Up @@ -909,6 +911,17 @@ crew_class_launcher <- R6::R6Class(
)
list(abstract = TRUE)
},
#' @description Return the number of consecutive times a worker
#' launched without completing all its assigned tasks.
#' @return Non-negative integer, number of consecutive times a worker
#' launched without completing all its assigned tasks.
#' @param index Non-negative integer, index of the worker pointing
#' to a row of the data frame output of the `summary()` method
#' of the launcher.
crashes = function(index) {
workers <- .subset2(private, ".workers")
.subset(.subset2(workers, "crashes"), index)
},
#' @description Abstract worker termination method.
#' @details Launcher plugins will overwrite this method.
#' @return A handle to mock worker termination.
Expand Down
30 changes: 29 additions & 1 deletion man/crew_class_launcher.Rd

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions man/crew_class_launcher_local.Rd

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

23 changes: 20 additions & 3 deletions tests/local/test-crashes_error.R
Original file line number Diff line number Diff line change
Expand Up @@ -6,14 +6,24 @@ test_that("crashes_error allows idle workers to exit", {
x$launch()
Sys.sleep(5)
}
expect_equal(x$launcher$crashes(index = 1L), 0L)
expect_equal(index, 10L)
})

test_that("crashes_error detects when there are too many crashes", {
x <- crew_controller_local(name = "name", crashes_error = 2L)
x <- crew_controller_local(
name = "name",
workers = 2L,
crashes_error = 2L,
tasks_max = 1L,
seconds_idle = 1
)
on.exit(x$terminate())
x$start()
x$push(Sys.sleep(1e3), scale = FALSE)
x$push(Sys.sleep(4), scale = TRUE)
Sys.sleep(5)
x$push(Sys.sleep(1e3), scale = TRUE)
x$wait(mode = "one")
for (index in seq_len(10L)) {
Sys.sleep(5)
if (index == 3L) {
Expand All @@ -22,8 +32,15 @@ test_that("crashes_error detects when there are too many crashes", {
} else {
expect_silent(x$scale())
}
expect_equal(x$launcher$crashes(index = 1L), 0L)
expect_equal(x$launcher$crashes(index = 2L), index - 1L)
Sys.sleep(5)
x$launcher$workers$handle[[1L]]$kill()
client_summary <- x$client$summary()
expect_equal(client_summary$online, c(FALSE, TRUE))
expect_equal(client_summary$instances, c(1L, index))
expect_equal(client_summary$assigned, c(1L, 1L))
expect_equal(client_summary$complete, c(1L, 0L))
x$launcher$workers$handle[[2L]]$kill()
}
expect_equal(index, 3L)
})
Expand Down
4 changes: 4 additions & 0 deletions tests/testthat/test-crew_launcher.R
Original file line number Diff line number Diff line change
Expand Up @@ -209,6 +209,10 @@ crew_test("launcher start()", {
expect_equal(workers$launched, rep(FALSE, 2L))
expect_equal(workers$assigned, rep(0L, 2L))
expect_equal(workers$complete, rep(0L, 2L))
expect_equal(workers$crashes, rep(0L, 2L))
for (index in seq_len(2L)) {
expect_equal(launcher$crashes(index), 0L)
}
})

crew_test("launcher done()", {
Expand Down

0 comments on commit e9eb6b5

Please sign in to comment.