Skip to content

Commit

Permalink
local metrics logging
Browse files Browse the repository at this point in the history
  • Loading branch information
wlandau committed Oct 8, 2024
1 parent 077539d commit a4e36cf
Show file tree
Hide file tree
Showing 15 changed files with 289 additions and 135 deletions.
2 changes: 2 additions & 0 deletions R/crew_controller_local.R
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ crew_controller_local <- function(
garbage_collection = FALSE,
launch_max = 5L,
r_arguments = c("--no-save", "--no-restore"),
options_metrics = crew::crew_options_metrics(),
options_local = crew::crew_options_local(),
local_log_directory = NULL,
local_log_join = NULL
Expand Down Expand Up @@ -98,6 +99,7 @@ crew_controller_local <- function(
launch_max = launch_max,
tls = tls,
r_arguments = r_arguments,
options_metrics = options_metrics,
options_local = options_local,
local_log_directory = local_log_directory,
local_log_join = local_log_join
Expand Down
30 changes: 25 additions & 5 deletions R/crew_launcher.R
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
#' `@inheritParams crew::crew_launcher` in the source code file of
#' [crew_launcher_local()].
#' @inheritParams crew_client
#' @inheritParams crew_worker
#' @param name Name of the launcher.
#' @param seconds_interval Number of seconds between
#' polling intervals waiting for certain internal
Expand Down Expand Up @@ -102,7 +103,8 @@ crew_launcher <- function(
launch_max = 5L,
tls = crew::crew_tls(),
processes = NULL,
r_arguments = c("--no-save", "--no-restore")
r_arguments = c("--no-save", "--no-restore"),
options_metrics = crew::crew_options_metrics()
) {
crew_deprecate(
name = "seconds_exit",
Expand Down Expand Up @@ -133,7 +135,8 @@ crew_launcher <- function(
launch_max = launch_max,
tls = tls,
processes = processes,
r_arguments = r_arguments
r_arguments = r_arguments,
options_metrics = options_metrics
)
}

Expand Down Expand Up @@ -176,6 +179,7 @@ crew_class_launcher <- R6::R6Class(
.tls = NULL,
.processes = NULL,
.r_arguments = NULL,
.options_metrics = NULL,
.async = NULL,
.throttle = NULL
),
Expand Down Expand Up @@ -249,6 +253,10 @@ crew_class_launcher <- R6::R6Class(
r_arguments = function() {
.subset2(private, ".r_arguments")
},
#' @field options_metrics See [crew_launcher()].
options_metrics = function() {
.subset2(private, ".options_metrics")
},
#' @field async A [crew_async()] object to run low-level launcher tasks
#' asynchronously.
async = function() {
Expand Down Expand Up @@ -279,6 +287,7 @@ crew_class_launcher <- R6::R6Class(
#' @param tls See [crew_launcher()].
#' @param processes See [crew_launcher()].
#' @param r_arguments See [crew_launcher()].
#' @param options_metrics See [crew_launcher()].
#' @examples
#' if (identical(Sys.getenv("CREW_EXAMPLES"), "true")) {
#' client <- crew_client()
Expand Down Expand Up @@ -308,7 +317,8 @@ crew_class_launcher <- R6::R6Class(
launch_max = NULL,
tls = NULL,
processes = NULL,
r_arguments = NULL
r_arguments = NULL,
options_metrics = NULL
) {
private$.name <- name
private$.seconds_interval <- seconds_interval
Expand All @@ -326,6 +336,7 @@ crew_class_launcher <- R6::R6Class(
private$.tls <- tls
private$.processes <- processes
private$.r_arguments <- r_arguments
private$.options_metrics <- options_metrics
},
#' @description Validate the launcher.
#' @return `NULL` (invisibly).
Expand Down Expand Up @@ -444,6 +455,9 @@ crew_class_launcher <- R6::R6Class(
)
private$.throttle$validate()
}
if (!is.null(private$.options_metrics)) {
crew_options_metrics_validate(private$.options_metrics)
}
invisible()
},
#' @description Set the name of the launcher.
Expand Down Expand Up @@ -500,13 +514,19 @@ crew_class_launcher <- R6::R6Class(
settings = settings,
launcher = launcher,
worker = worker,
instance = instance
instance = instance,
options_metrics = crew::crew_options_metrics(
path = path,
seconds_interval = seconds_interval
)
),
env = list(
settings = self$settings(socket),
launcher = launcher,
worker = worker,
instance = instance
instance = instance,
path = private$.options_metrics$path,
seconds_interval = private$.options_metrics$seconds_interval
)
)
out <- deparse_safe(expr = call, collapse = " ")
Expand Down
7 changes: 6 additions & 1 deletion R/crew_launcher_local.R
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ crew_launcher_local <- function(
launch_max = 5L,
tls = crew::crew_tls(),
r_arguments = c("--no-save", "--no-restore"),
options_metrics = crew::crew_options_metrics(),
options_local = crew::crew_options_local(),
local_log_directory = NULL,
local_log_join = NULL
Expand Down Expand Up @@ -88,6 +89,7 @@ crew_launcher_local <- function(
launch_max = launch_max,
tls = tls,
r_arguments = r_arguments,
options_metrics = options_metrics,
options_local = options_local
)
launcher$validate()
Expand Down Expand Up @@ -170,6 +172,7 @@ crew_class_launcher_local <- R6::R6Class(
#' @param tls See [crew_launcher()].
#' @param processes See [crew_launcher()].
#' @param r_arguments See [crew_launcher()].
#' @param options_metrics See [crew_launcher_local()].
#' @param options_local See [crew_launcher_local()].
#' @examples
#' if (identical(Sys.getenv("CREW_EXAMPLES"), "true")) {
Expand Down Expand Up @@ -201,6 +204,7 @@ crew_class_launcher_local <- R6::R6Class(
tls = NULL,
processes = NULL,
r_arguments = NULL,
options_metrics = NULL,
options_local = NULL
) {
super$initialize(
Expand All @@ -220,7 +224,8 @@ crew_class_launcher_local <- R6::R6Class(
launch_max = launch_max,
tls = tls,
processes = processes,
r_arguments = r_arguments
r_arguments = r_arguments,
options_metrics = options_metrics
)
private$.options_local <- options_local
},
Expand Down
114 changes: 33 additions & 81 deletions R/crew_options_metrics.R
Original file line number Diff line number Diff line change
Expand Up @@ -5,54 +5,39 @@
#' supplied to the `options_metrics` argument of a `crew`
#' controller function, then the `autometric` R package will
#' record resource usage metrics (such as CPU and memory usage)
#' as the controller and workers are running. Logging happens in
#' for each running worker. Logging happens in
#' the background (through a detached POSIX) so as not to disrupt
#' the R session. On Unix-like systems, [crew_options_metrics()]
#' can specify `/dev/stdout` or `/dev/stderr` as the log files, which will
#' redirect output to existing logs you are already using. Then,
#' even if those logs are mixed with other messages, you can use functions
#' [autometric::log_read()] and [autometric::log_plot()] to read and
#' visualize resource usage data.
#' @return A classed list of options for logging resource usage metrics.
#' @param path_local Where to write resource metric log entries for the
#' local R session and `mirai` dispatcher process.
#' If `NULL`, resource metric logging is turned off for these processes.
#' If `"/dev/stdout"` or `"/dev/stderr"`, resource usage metrics
#' are printed to standard output or standard error,
#' respectively. If a different character string, that string is the
#' directory path for writing logs to files on disk, and each
#' controller instance gets a different log file.
#' [autometric::log_read()] and [autometric::log_plot()] can read and
#' visualize data from logs, even if the logs files are mixed
#' with other kinds of messages.
#' @param path_workers Like `path_local`, but for the `crew` worker processes.
#' On Unix-like systems, it is recommended to set `path_workers`
#' equal to `"/dev/stdout"` or `"/dev/stderr"` to automatically write
#' resource log messages to the existing log files generated on your
#' platform (for example, the logs configured with
#' [crew_options_local()] in the case of [crew_controller_local()]).
#' redirect output to existing logs you are already using.
#' [autometric::log_read()] and [autometric::log_plot()] can read and
#' visualize data from logs, even if the logs files are mixed
#' with other kinds of messages.
#' @param seconds_local Positive number, seconds between resource metric log
#' entries written to `path_local`.
#' @param seconds_workers Positive number, seconds between resource metric log
#' entries written to `seconds_workers`.
#' visualize resource usage data from multiple log files, even
#' if those files are mixed with other messages.
#' @return A classed list of options for logging resource usage metrics.
#' @param path Where to write resource metric log entries for workers.
#' `path = NULL` disables logging. `path` equal to `"/dev/stdout"`
#' (or `"/dev/stderr"`) sends log messages to the standard output
#' (or standard error) streams, which is recommended on Unix-like systems
#' because then output will go to the existing log files already
#' configured for the
#' controller, e.g. through [crew_options_local()] in the case of
#' [crew_controller_local()]. If `path` is not `NULL`, `"/dev/stdout"`, or
#' `"/dev/stderr"`, it should be a directory path,
#' in which case each worker instance will write to a new file in that
#' directory.
#'
#' After running enough tasks in `crew`, you can call
#' `autometric::log_read(path)` to read all the data from all the log
#' files in the files or directories at `path`,
#' even if the logs files are mixed with other kinds of messages.
#' Pass that data into [autometric::log_read()] to visualize it.
#' @param seconds_interval Positive number, seconds between resource metric
#' log entries written to `path`.
#' @examples
#' crew_options_metrics()
crew_options_metrics = function(
path_local = NULL,
path_workers = NULL,
seconds_local = 5,
seconds_workers = 5
) {
#' crew_options_metrics(path = "/dev/stdout")
crew_options_metrics = function(path = NULL, seconds_interval = 5) {

Check warning on line 38 in R/crew_options_metrics.R

View workflow job for this annotation

GitHub Actions / lint

file=R/crew_options_metrics.R,line=38,col=22,[assignment_linter] Use <-, not =, for assignment.
out <- structure(
list(
path_local = path_local,
path_workers = path_workers,
seconds_local = seconds_local,
seconds_workers = seconds_workers
),
list(path = path, seconds_interval = seconds_interval),
class = c("crew_options_metrics", "crew_options")
)
crew_options_metrics_validate(out)
Expand All @@ -65,63 +50,30 @@ crew_options_metrics_validate <- function(options) {
message = "options_metrics object must come from crew_options_metrics()."
)
crew_assert(
options$path_local %|||% "x",
options$path %|||% "x",
is.character(.),
length(.) == 1L,
!anyNA(.),
nzchar(.),
message = paste(
"path_local must be NULL, \"/dev/stdout\", ",
"\"/dev/stderr\", or a valid directory path."
)
)
crew_assert(
options$path_workers %|||% "x",
is.character(.),
length(.) == 1L,
!anyNA(.),
nzchar(.),
message = paste(
"path_workers must be NULL, \"/dev/stdout\", ",
"path must be NULL, \"/dev/stdout\", ",
"\"/dev/stderr\", or a valid directory path."
)
)
on_windows <- identical(unname(tolower(Sys.info()[["sysname"]])), "windows")
streams <- file.path("/dev", c("stdout", "stderr"))
crew_assert(
!(on_windows && options$path_local %in% streams),
!(on_windows && options$path %in% streams),
message = paste(
"path_local cannot be \"/dev/stdout\" or \"/dev/stderr\" on Windows."
"path cannot be \"/dev/stdout\" or \"/dev/stderr\" on Windows."
)
)
crew_assert(
!(on_windows && options$path_workers %in% streams),
message = paste(
"path_workers cannot be \"/dev/stdout\" or \"/dev/stderr\" on Windows."
)
)
crew_assert(
options$seconds_local,
is.numeric(.),
length(.) == 1,
is.finite(.),
. > 0,
message = "seconds_local must be a positive number."
)
crew_assert(
options$seconds_workers,
options$seconds_interval,
is.numeric(.),
length(.) == 1,
is.finite(.),
. > 0,
message = "seconds_workers must be a positive number."
)
}

log_metrics_name <- function(path, name) {
if_any(
is.null(path) || path %in% c("/dev/stdout", "/dev/stderr"),
path,
file.path(path, name)
message = "seconds_interval must be a positive number."
)
}
31 changes: 29 additions & 2 deletions R/crew_worker.R
Original file line number Diff line number Diff line change
Expand Up @@ -13,10 +13,29 @@
#' of the worker exits and a new instance launches.
#' @param instance Character of length 1 to uniquely identify
#' the current instance of the worker.
crew_worker <- function(settings, launcher, worker, instance) {
#' @param options_metrics Either `NULL` to opt out of resource metric logging
#' for workers, or an object from [crew_options_metrics()] to enable
#' and configure resource metric logging for workers.
crew_worker <- function(
settings,
launcher,
worker,
instance,
options_metrics = crew::crew_options_metrics()
) {
if (!is.null(options_metrics$path)) {
pids <- Sys.getpid()
names(pids) <- sprintf("crew_worker_%s_%s_%s", launcher, worker, instance)
autometric::log_start(
path = log_metrics_path(options_metrics$path, names(pids)),
seconds = options_metrics$seconds_interval,
pids = pids
)
on.exit(autometric::log_stop())
}
envvars <- c("CREW_LAUNCHER", "CREW_WORKER", "CREW_INSTANCE")
previous <- Sys.getenv(envvars)
on.exit(do.call(what = Sys.setenv, args = as.list(previous)))
on.exit(do.call(what = Sys.setenv, args = as.list(previous)), add = TRUE)
Sys.setenv(
CREW_LAUNCHER = launcher,
CREW_WORKER = worker,
Expand All @@ -25,3 +44,11 @@ crew_worker <- function(settings, launcher, worker, instance) {
crew_message(utils::capture.output(print(utils::sessionInfo())))
do.call(what = mirai::daemon, args = settings)
}

log_metrics_path <- function(path, name) {
if_any(
is.null(path) || path %in% c("/dev/stdout", "/dev/stderr"),
path,
file.path(path, paste0(name, ".log"))
)
}
7 changes: 6 additions & 1 deletion man/crew_class_launcher.Rd

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading

0 comments on commit a4e36cf

Please sign in to comment.