From 96d32e24d703fa336b7a71f4a4902756c4e1fba6 Mon Sep 17 00:00:00 2001 From: wlandau-lilly Date: Wed, 20 Sep 2023 11:25:49 -0400 Subject: [PATCH] Sketch #124 --- R/crew_launcher.R | 58 +++++++++++++++++++++-------------------------- 1 file changed, 26 insertions(+), 32 deletions(-) diff --git a/R/crew_launcher.R b/R/crew_launcher.R index 61f47d3a..5e5b82e9 100644 --- a/R/crew_launcher.R +++ b/R/crew_launcher.R @@ -291,6 +291,8 @@ crew_class_launcher <- R6::R6Class( "futile", "launched", "history", + "online", + "discovered", "assigned", "complete" ) @@ -378,6 +380,8 @@ crew_class_launcher <- R6::R6Class( futile = rep(0L, n), launched = rep(FALSE, n), history = rep(0L, n), + online = rep(FALSE, n), + discovered = rep(FALSE, n), assigned = rep(0L, n), complete = rep(0L, n) ) @@ -390,6 +394,13 @@ crew_class_launcher <- R6::R6Class( #' * `launches`: number of times the worker was launched. Each launch #' occurs at a different websocket because the token at the end of the #' URL is rotated before each new launch. + #' * `online`: logical vector, whether the current instance of each + #' worker was actively connected to its NNG socket during the time of + #' the last call to `tally()`. + #' * `discovered`: logical vector, whether the current instance of each + #' worker had connected to its NNG socket at some point + #' (and then possibly disconnected) during the time of + #' the last call to `tally()`. #' * `assigned`: cumulative number of tasks assigned, reported by #' `mirai::daemons()` and summed over all #' completed instances of the worker. Does not reflect the activity @@ -407,6 +418,8 @@ crew_class_launcher <- R6::R6Class( tibble::tibble( worker = seq_len(nrow(workers)), launches = .subset2(workers, "launches"), + online = .subset2(workers, "online"), + discovered = .subset2(workers, "discovered"), assigned = .subset2(workers, "assigned"), complete = .subset2(workers, "complete") ) @@ -421,16 +434,13 @@ crew_class_launcher <- R6::R6Class( #' websocket at some point in the past, #' or (2) `seconds_launch` seconds elapsed since launch. #' @return Integer index of inactive workers. - #' @param daemons `mirai` daemons matrix. For testing only. Users - #' should not set this. - done = function(daemons = NULL) { + done = function() { bound <- self$seconds_launch start <- self$workers$start now <- nanonext::mclock() / 1000 launching <- !is.na(start) & ((now - start) < bound) - daemons <- daemons %|||% daemons_info(name = self$name) - online <- as.logical(daemons[, "online"]) - discovered <- as.logical(daemons[, "instance"] > 0L) + online <- self$workers$online + discovered <- self$workers$discovered inactive <- (!online) & (discovered | (!launching)) launched <- self$workers$launched which(inactive & launched) @@ -449,16 +459,16 @@ crew_class_launcher <- R6::R6Class( self$workers$launched[index] <- FALSE } }, - #' @description Update the cumulative assigned and complete statistics. - #' @description Used to detect backlogged workers with more assigned - #' than complete tasks. If terminated, these workers need to be - #' relaunched until the backlog of assigned tasks is complete. + #' @description Update the `daemons`-related columns of the internal + #' `workers` data frame. #' @return `NULL` (invisibly). #' @param daemons `mirai` daemons matrix. For testing only. Users #' should not set this. tally = function(daemons = NULL) { daemons <- daemons %|||% daemons_info(name = self$name) index <- !(self$workers$launched) + self$workers$online <- as.logical(daemons[, "online"]) + self$workers$discovered <- as.logical(daemons[, "instance"] > 0L) self$workers$assigned[index] <- as.integer(daemons[index, "assigned"]) self$workers$complete[index] <- as.integer(daemons[index, "complete"]) invisible() @@ -466,24 +476,9 @@ crew_class_launcher <- R6::R6Class( #' @description Get workers available for launch. #' @return Integer index of workers available for launch. #' @param n Maximum number of worker indexes to return. - unlaunched = function(n = Inf) { + unlaunched = function(n = Inf) { head(x = which(!self$workers$launched), n = n) }, - #' @description List non-launched backlogged workers. - #' @return Integer vector of worker indexes. - backlogged = function() { - workers <- self$workers - index <- !(workers$launched) & (workers$assigned > workers$complete) - which(index) - }, - #' @description List non-launched non-backlogged workers. - #' @return Integer vector of worker indexes. - #' @param n Maximum number of worker indexes to return. - resolved = function() { - workers <- self$workers - index <- !(workers$launched) & !(workers$assigned > workers$complete) - which(index) - }, #' @description Launch a worker. #' @return `NULL` (invisibly). #' @param index Positive integer of length 1, index of the worker @@ -561,13 +556,12 @@ crew_class_launcher <- R6::R6Class( if (throttle && self$throttle()) { return(invisible()) } - walk(x = self$done(), f = self$rotate) self$tally() - walk(x = self$backlogged(), f = self$launch) - resolved <- self$resolved() - active <- nrow(self$workers) - length(resolved) - deficit <- min(length(resolved), max(0L, demand - active)) - walk(x = head(x = resolved, n = deficit), f = self$launch) + walk(x = self$done(), f = self$rotate) + unlaunched <- self$unlaunched(n = Inf) + active <- nrow(self$workers) - length(unlaunched) + deficit <- min(length(unlaunched), max(0L, demand - active)) + walk(x = head(x = unlaunched, n = deficit), f = self$launch) invisible() }, #' @description Terminate one or more workers.