Skip to content

Commit

Permalink
Sketch #124
Browse files Browse the repository at this point in the history
  • Loading branch information
wlandau-lilly committed Sep 20, 2023
1 parent 0e0c169 commit 96d32e2
Showing 1 changed file with 26 additions and 32 deletions.
58 changes: 26 additions & 32 deletions R/crew_launcher.R
Original file line number Diff line number Diff line change
Expand Up @@ -291,6 +291,8 @@ crew_class_launcher <- R6::R6Class(
"futile",
"launched",
"history",
"online",
"discovered",
"assigned",
"complete"
)
Expand Down Expand Up @@ -378,6 +380,8 @@ crew_class_launcher <- R6::R6Class(
futile = rep(0L, n),
launched = rep(FALSE, n),
history = rep(0L, n),
online = rep(FALSE, n),
discovered = rep(FALSE, n),
assigned = rep(0L, n),
complete = rep(0L, n)
)
Expand All @@ -390,6 +394,13 @@ crew_class_launcher <- R6::R6Class(
#' * `launches`: number of times the worker was launched. Each launch
#' occurs at a different websocket because the token at the end of the
#' URL is rotated before each new launch.
#' * `online`: logical vector, whether the current instance of each
#' worker was actively connected to its NNG socket during the time of
#' the last call to `tally()`.
#' * `discovered`: logical vector, whether the current instance of each
#' worker had connected to its NNG socket at some point
#' (and then possibly disconnected) during the time of
#' the last call to `tally()`.
#' * `assigned`: cumulative number of tasks assigned, reported by
#' `mirai::daemons()` and summed over all
#' completed instances of the worker. Does not reflect the activity
Expand All @@ -407,6 +418,8 @@ crew_class_launcher <- R6::R6Class(
tibble::tibble(
worker = seq_len(nrow(workers)),
launches = .subset2(workers, "launches"),
online = .subset2(workers, "online"),
discovered = .subset2(workers, "discovered"),
assigned = .subset2(workers, "assigned"),
complete = .subset2(workers, "complete")
)
Expand All @@ -421,16 +434,13 @@ crew_class_launcher <- R6::R6Class(
#' websocket at some point in the past,
#' or (2) `seconds_launch` seconds elapsed since launch.
#' @return Integer index of inactive workers.
#' @param daemons `mirai` daemons matrix. For testing only. Users
#' should not set this.
done = function(daemons = NULL) {
done = function() {
bound <- self$seconds_launch
start <- self$workers$start
now <- nanonext::mclock() / 1000
launching <- !is.na(start) & ((now - start) < bound)
daemons <- daemons %|||% daemons_info(name = self$name)
online <- as.logical(daemons[, "online"])
discovered <- as.logical(daemons[, "instance"] > 0L)
online <- self$workers$online
discovered <- self$workers$discovered
inactive <- (!online) & (discovered | (!launching))
launched <- self$workers$launched
which(inactive & launched)
Expand All @@ -449,41 +459,26 @@ crew_class_launcher <- R6::R6Class(
self$workers$launched[index] <- FALSE
}
},
#' @description Update the cumulative assigned and complete statistics.
#' @description Used to detect backlogged workers with more assigned
#' than complete tasks. If terminated, these workers need to be
#' relaunched until the backlog of assigned tasks is complete.
#' @description Update the `daemons`-related columns of the internal
#' `workers` data frame.
#' @return `NULL` (invisibly).
#' @param daemons `mirai` daemons matrix. For testing only. Users
#' should not set this.
tally = function(daemons = NULL) {
daemons <- daemons %|||% daemons_info(name = self$name)
index <- !(self$workers$launched)
self$workers$online <- as.logical(daemons[, "online"])
self$workers$discovered <- as.logical(daemons[, "instance"] > 0L)
self$workers$assigned[index] <- as.integer(daemons[index, "assigned"])
self$workers$complete[index] <- as.integer(daemons[index, "complete"])
invisible()
},
#' @description Get workers available for launch.
#' @return Integer index of workers available for launch.
#' @param n Maximum number of worker indexes to return.
unlaunched = function(n = Inf) {
unlaunched = function(n = Inf) {
head(x = which(!self$workers$launched), n = n)
},
#' @description List non-launched backlogged workers.
#' @return Integer vector of worker indexes.
backlogged = function() {
workers <- self$workers
index <- !(workers$launched) & (workers$assigned > workers$complete)
which(index)
},
#' @description List non-launched non-backlogged workers.
#' @return Integer vector of worker indexes.
#' @param n Maximum number of worker indexes to return.
resolved = function() {
workers <- self$workers
index <- !(workers$launched) & !(workers$assigned > workers$complete)
which(index)
},
#' @description Launch a worker.
#' @return `NULL` (invisibly).
#' @param index Positive integer of length 1, index of the worker
Expand Down Expand Up @@ -561,13 +556,12 @@ crew_class_launcher <- R6::R6Class(
if (throttle && self$throttle()) {
return(invisible())
}
walk(x = self$done(), f = self$rotate)
self$tally()
walk(x = self$backlogged(), f = self$launch)
resolved <- self$resolved()
active <- nrow(self$workers) - length(resolved)
deficit <- min(length(resolved), max(0L, demand - active))
walk(x = head(x = resolved, n = deficit), f = self$launch)
walk(x = self$done(), f = self$rotate)
unlaunched <- self$unlaunched(n = Inf)
active <- nrow(self$workers) - length(unlaunched)
deficit <- min(length(unlaunched), max(0L, demand - active))
walk(x = head(x = unlaunched, n = deficit), f = self$launch)
invisible()
},
#' @description Terminate one or more workers.
Expand Down

0 comments on commit 96d32e2

Please sign in to comment.