Skip to content

Commit

Permalink
Fix #166
Browse files Browse the repository at this point in the history
  • Loading branch information
wlandau committed Apr 19, 2024
1 parent 32ff5ac commit fd83c57
Show file tree
Hide file tree
Showing 7 changed files with 203 additions and 24 deletions.
1 change: 1 addition & 0 deletions NEWS.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
* Add new controller methods `autoscale()`, `descale()`, and `started()` to facilitate different kinds of Shiny apps.
* Deprecate the `scale` and `throttle` methods of `controller$promise()`. `promise()` now always calls `autoscale()` to make sure one and only one auto-scaling loop is running asynchronously. Auto-scaling thus continues even after the promise resolves.
* Add a second example vignette that simulates coin flips.
* Add a new `error` argument to `collect()` (#166).

# crew 0.9.1

Expand Down
43 changes: 32 additions & 11 deletions R/crew_controller.R
Original file line number Diff line number Diff line change
Expand Up @@ -1018,19 +1018,19 @@ crew_class_controller <- R6::R6Class(
#' within the last `seconds_interval` seconds. `FALSE` to auto-scale
#' every time `scale()` is called. Throttling avoids
#' overburdening the `mirai` dispatcher and other resources.
#' @param error Character of length 1, choice of action if
#' @param error `NULL` or character of length 1, choice of action if
#' the popped task threw an error. Possible values:
#' * `"stop"`: throw an error in the main R session instead of returning
#' a value.
#' * `"warn"`: throw a warning.
#' * `"silent"`: do nothing special.
#' * `NULL` or `"silent"`: do not react to errors.
#' @param controllers Not used. Included to ensure the signature is
#' compatible with the analogous method of controller groups.
pop = function(
scale = TRUE,
collect = NULL,
throttle = TRUE,
error = "silent",
error = NULL,
controllers = NULL
) {
crew_deprecate(
Expand All @@ -1043,6 +1043,16 @@ crew_class_controller <- R6::R6Class(
skip_cran = TRUE,
frequency = "once"
)
if (!is.null(error)) {
crew_assert(
error,
is.character(.),
!anyNA(.),
nzchar(.),
length(.) == 1L,
error %in% c("stop", "warn", "silent")
)
}
if (scale) {
.subset2(self, "scale")(throttle = throttle)
}
Expand Down Expand Up @@ -1091,43 +1101,54 @@ crew_class_controller <- R6::R6Class(
return(out)
}
# nocov end
error_message <- .subset2(out, "error")
on.exit({
index <- .subset2(out, "worker")
private$.log$tasks[index] <- .subset2(log, "tasks")[index] + 1L
private$.log$seconds[index] <- .subset2(log, "seconds")[index] +
.subset2(out, "seconds")
private$.log$errors[index] <- .subset2(log, "errors")[index] +
!anyNA(error_message)
!anyNA(.subset2(out, "error"))
private$.log$warnings[index] <- .subset2(log, "warnings")[index] +
!anyNA(.subset2(out, "warnings"))
}, add = TRUE)
if (!anyNA(error_message)) {
if (!is.null(error) && !anyNA(.subset2(out, "error"))) {
if (identical(error, "stop")) {
crew_error(message = error_message)
crew_error(message = .subset2(out, "error"))
} else if (identical(error, "warn")) {
crew_warning(message = error_message)
crew_warning(message = .subset2(out, "error"))
}
}
out
},
#' @description Pop all available task results and return them in a tidy
#' `tibble`.
#' @return A `tibble` of results and metadata of all resolved tasks,
#' with one row per task.
#' with one row per task. Returns `NULL` if there are no tasks
#' to collect.
#' @param scale Logical of length 1,
#' whether to automatically call `scale()`
#' to auto-scale workers to meet the demand of the task load.
#' @param throttle `TRUE` to skip auto-scaling if it already happened
#' within the last `seconds_interval` seconds. `FALSE` to auto-scale
#' every time `scale()` is called. Throttling avoids
#' overburdening the `mirai` dispatcher and other resources.
#' @param error `NULL` or character of length 1, choice of action if
#' the popped task threw an error. Possible values:
#' * `"stop"`: throw an error in the main R session instead of returning
#' a value.
#' * `"warn"`: throw a warning.
#' * `NULL` or `"silent"`: do not react to errors.
#' @param controllers Not used. Included to ensure the signature is
#' compatible with the analogous method of controller groups.
collect = function(scale = TRUE, throttle = TRUE, controllers = NULL) {
collect = function(
scale = TRUE,
throttle = TRUE,
error = NULL,
controllers = NULL
) {
pop <- .subset2(self, "pop")
results <- list()
while (!is.null(result <- pop(scale = FALSE))) {
while (!is.null(result <- pop(scale = FALSE, error = error))) {
results[[length(results) + 1L]] <- result
}
out <- lapply(results, monad_tibble)
Expand Down
28 changes: 23 additions & 5 deletions R/crew_controller_group.R
Original file line number Diff line number Diff line change
Expand Up @@ -711,19 +711,19 @@ crew_class_controller_group <- R6::R6Class(
#' within the last `seconds_interval` seconds. `FALSE` to auto-scale
#' every time `scale()` is called. Throttling avoids
#' overburdening the `mirai` dispatcher and other resources.
#' @param error Character of length 1, choice of action if
#' @param error `NULL` or character of length 1, choice of action if
#' the popped task threw an error. Possible values:
#' * `"stop"`: throw an error in the main R session instead of returning
#' a value.
#' * `"warn"`: throw a warning.
#' * `"silent"`: do nothing special.
#' * `NULL` or `"silent"`: do not react to errors.
#' @param controllers Character vector of controller names.
#' Set to `NULL` to select all controllers.
pop = function(
scale = TRUE,
collect = NULL,
throttle = TRUE,
error = "silent",
error = NULL,
controllers = NULL
) {
control <- private$.select_controllers(controllers)
Expand Down Expand Up @@ -751,11 +751,29 @@ crew_class_controller_group <- R6::R6Class(
#' within the last `seconds_interval` seconds. `FALSE` to auto-scale
#' every time `scale()` is called. Throttling avoids
#' overburdening the `mirai` dispatcher and other resources.
#' @param error `NULL` or character of length 1, choice of action if
#' the popped task threw an error. Possible values:
#' * `"stop"`: throw an error in the main R session instead of returning
#' a value.
#' * `"warn"`: throw a warning.
#' * `NULL` or `"silent"`: do not react to errors.
#' @param controllers Character vector of controller names.
#' Set to `NULL` to select all controllers.
collect = function(scale = TRUE, throttle = TRUE, controllers = NULL) {
collect = function(
scale = TRUE,
throttle = TRUE,
error = NULL,
controllers = NULL
) {
control <- private$.select_controllers(controllers)
out <- map(control, ~.x$collect(scale = scale, throttle = throttle))
out <- map(
control,
~.x$collect(
scale = scale,
throttle = throttle,
error = error
)
)
out <- tibble::new_tibble(data.table::rbindlist(out, use.names = FALSE))
if_any(nrow(out), out, NULL)
},
Expand Down
19 changes: 15 additions & 4 deletions man/crew_class_controller.Rd

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

16 changes: 13 additions & 3 deletions man/crew_class_controller_group.Rd

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

60 changes: 59 additions & 1 deletion tests/testthat/test-crew_controller.R
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ crew_test("controller walk()", {
)
})

crew_test("controller collect()", {
crew_test("controller collect() success", {
skip_on_cran()
skip_on_os("windows")
on.exit({
Expand All @@ -94,6 +94,64 @@ crew_test("controller collect()", {
expect_equal(nrow(out), 2L)
expect_equal(as.character(out$result), rep("done", 2))
expect_null(x$collect())
expect_crew_error(x$collect(error = "bad"))
})

crew_test("controller collect() silent error", {
skip_on_cran()
skip_on_os("windows")
on.exit({
x$terminate()
rm(x)
gc()
crew_test_sleep()
})
x <- crew_controller_local(workers = 1L, seconds_idle = 30L)
x$start()
x$push("success")
x$push(stop("failure 1"))
x$push(stop("failure 2"))
x$wait(mode = "all")
expect_silent(out <- x$collect(error = "silent"))
expect_true("failure 1" %in% out$error)
})

crew_test("controller collect() error as warning", {
skip_on_cran()
skip_on_os("windows")
on.exit({
x$terminate()
rm(x)
gc()
crew_test_sleep()
})
x <- crew_controller_local(workers = 1L, seconds_idle = 30L)
x$start()
x$push("success")
x$push(stop("failure 1"))
x$push(stop("failure 2"))
x$wait(mode = "all")
suppressWarnings(
expect_warning(x$collect(error = "warn"), class = "crew_warning")
)
})

crew_test("controller collect() stop on error", {
skip_on_cran()
skip_on_os("windows")
on.exit({
x$terminate()
rm(x)
gc()
crew_test_sleep()
})
x <- crew_controller_local(workers = 1L, seconds_idle = 30L)
x$start()
x$push("success")
x$push(stop("failure 1"))
x$push(stop("failure 2"))
x$wait(mode = "all")
expect_crew_error(x$collect(error = "stop"))
})

crew_test("controller map() works", {
Expand Down
Loading

0 comments on commit fd83c57

Please sign in to comment.