Skip to content

Commit

Permalink
CRAN patch
Browse files Browse the repository at this point in the history
  • Loading branch information
wlandau committed Jun 24, 2024
1 parent c02449f commit 6fc2665
Show file tree
Hide file tree
Showing 6 changed files with 45 additions and 70 deletions.
2 changes: 1 addition & 1 deletion DESCRIPTION
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ Description: In computationally demanding analysis projects,
'clustermq' by Schubert (2019) <doi:10.1093/bioinformatics/btz284>),
and 'batchtools' by Lang, Bischel, and Surmann (2017)
<doi:10.21105/joss.00135>.
Version: 0.9.4
Version: 0.9.5
License: MIT + file LICENSE
URL: https://wlandau.github.io/crew/, https://github.com/wlandau/crew
BugReports: https://github.com/wlandau/crew/issues
Expand Down
4 changes: 4 additions & 0 deletions NEWS.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,7 @@
# crew 0.9.5

* CRAN patch.

# crew 0.9.4

* Do not use extended tasks in Shiny vignette.
Expand Down
26 changes: 8 additions & 18 deletions vignettes/groups.Rmd
Original file line number Diff line number Diff line change
Expand Up @@ -7,21 +7,11 @@ vignette: >
%\VignetteEncoding{UTF-8}
---

```{r setup, include = FALSE}
set.seed(0)
knitr::opts_chunk$set(
collapse = TRUE,
comment = "#>",
paged.print = FALSE,
eval = FALSE
)
```

Each controller object only supports only one type of worker configuration which you set in advance. However, different controllers may have different types of workers, and `crew` supports controller groups to coordinate among these different worker types. With third-party launcher subclasses from other packages, this mechanism will allow you to e.g. send some tasks to GPU-capable or high-memory workers while other tasks go to low-spec workers.

We demonstrate with a controller of fully persistent workers which always stay running and a controller of semi-persistent workers which terminate after completing four tasks. We create controller objects with names.

```{r}
```r
library(crew)
persistent <- crew_controller_local(name = "persistent")
transient <- crew_controller_local(name = "semi-persistent", tasks_max = 4L)
Expand All @@ -31,25 +21,25 @@ transient <- crew_controller_local(name = "semi-persistent", tasks_max = 4L)

We put these controller objects into a new controller group object.

```{r}
```r
group <- crew_controller_group(persistent, transient)
```

This controller group has a global `connect()` method to initialize both controllers.

```{r}
```r
group$start()
```

You can choose which worker pool to receive tasks.

```{r}
```r
group$push(name = "my task", command = sqrt(4), controller = "semi-persistent")
```

The controller group also supports global methods for `wait()`, `pop()`, and `terminate()`. These methods operate on all controllers at once by default, but the `controllers` argument allows you to select a subset of controllers to act on. Below in `pop()` the `launcher` column of the output indicates which controller ran the task.

```{r}
```r
group$wait(controllers = "semi-persistent")
group$pop()
#> # A tibble: 1 × 12
Expand All @@ -61,7 +51,7 @@ group$pop()

The [`map()`](https://wlandau.github.io/crew/reference/crew_class_controller.html#method-crew_class_controller-group-map) method provides functional programming, and the `controller` argument lets you choose the controller to submit the tasks.

```{r}
```r
group$map(
command = a + b + c + d,
iterate = list(
Expand All @@ -82,7 +72,7 @@ group$map(

The controller group has a `summary()` method which aggregates the summaries of one or more controllers.

```{r}
```r
group$summary()
#> # A tibble: 2 × 6
#> controller worker tasks seconds errors warnings
Expand All @@ -93,6 +83,6 @@ group$summary()

When you are finished, please call `terminate()` with no arguments to terminate all controllers in the controller group.

```{r}
```r
group$terminate()
```
39 changes: 15 additions & 24 deletions vignettes/introduction.Rmd
Original file line number Diff line number Diff line change
Expand Up @@ -7,15 +7,6 @@ vignette: >
%\VignetteEncoding{UTF-8}
---

```{r, include = FALSE}
knitr::opts_chunk$set(
collapse = TRUE,
comment = "#>",
eval = FALSE
)
library(crew)
```

`crew` is a distributed computing framework with a centralized interface and auto-scaling. A `crew` controller is an object in R which accepts tasks, returns results, and launches workers. Workers can be local processes, jobs on traditional clusters such as SLURM, or jobs on cloud services such as AWS Batch, depending on the [launcher plugin](https://wlandau.github.io/crew/articles/plugins.html) of the controller.

# Tasks vs workers
Expand All @@ -26,7 +17,7 @@ A *task* is a piece of R code, such as an expression or a function call. A *work

First, create a controller object to manage tasks and workers.

```{r}
```r
library(crew)
controller <- crew_controller_local(
name = "example",
Expand All @@ -37,19 +28,19 @@ controller <- crew_controller_local(

Next, start the controller to create the [`mirai`](https://github.com/shikokuchuo/mirai) client. Later, when you are done with the controller, call `controller$terminate()` to clean up the workers and dispatcher.

```{r}
```r
controller$start()
```

Use `push()` to submit a new task and `pop()` to return a completed task.

```{r}
```r
controller$push(name = "get pid", command = ps::ps_pid())
```

As a side effect, methods `push()`, `pop()`, and `scale()` also launch workers to run the tasks. If your controller uses transient workers and has a backlog of tasks, you may need to loop over `pop()` or `scale()` multiple times to make sure enough workers are always available.

```{r}
```r
controller$pop() # No workers started yet and the task is not done.
#> NULL

Expand All @@ -64,13 +55,13 @@ task

Alternatively, `wait()` is a loop that repeatedly checks tasks and launches workers until all tasks complete.

```{r}
```r
controller$wait(mode = "all")
```

The return value of the task is in the `result` column.

```{r}
```r
task$result[[1]] # return value of the task
#> [1] 69631
```
Expand Down Expand Up @@ -111,7 +102,7 @@ The [`map()`](https://wlandau.github.io/crew/reference/crew_class_controller.htm

Below, [`map()`](https://wlandau.github.io/crew/reference/crew_class_controller.html#method-crew_class_controller-map) submits one task to compute `1 + 2 + 5 + 6` and another task to compute `3 + 4 + 5 + 6`. The lists and vectors inside `iterate` vary from task to task, while the elements of `data` and `globals` stay constant across tasks.

```{r}
```r
results <- controller$map(
command = a + b + c + d,
iterate = list(
Expand Down Expand Up @@ -140,7 +131,7 @@ If at least one task in [`map()`](https://wlandau.github.io/crew/reference/crew_

The [`walk()`](https://wlandau.github.io/crew/reference/crew_class_controller.html#method-crew_class_controller-walk) method is just like `map()`, but it does not wait for any tasks to complete. Instead, it returns control to the local R session immediately and lets you do other things while the tasks run in the background.

```{r}
```r
controller$walk(
command = a + b + c + d,
iterate = list(
Expand All @@ -154,7 +145,7 @@ controller$walk(

The [`collect()`](https://wlandau.github.io/crew/reference/crew_class_controller.html#method-crew_class_controller-collect) pops all completed tasks. Put together, `walk()`, `wait(mode = "all")`, and `collect()` have the same overall effect as `map()`.

```{r}
```r
controller$wait(mode = "all")

controller$collect()
Expand All @@ -175,7 +166,7 @@ However, there are subtle differences between the synchronous and asynchronous f

The controller summary shows how many tasks each worker ran, how many total seconds it spent running tasks, and how many tasks threw warnings and errors.

```{r}
```r
controller$summary()
#> # A tibble: 2 × 6
#> controller worker tasks seconds errors warnings
Expand All @@ -186,7 +177,7 @@ controller$summary()

The launcher summary counts the number of times each worker was launched, and it shows the total number of assigned and completed tasks from all past terminated instances of each worker. In addition, it shows whether the current worker instance was actively connected ("online") or had connected at some point during its life cycle ("discovered") as of the last call to `controller$launcher$tally()`.

```{r}
```r
controller$launcher$summary()
#> # A tibble: 2 × 6
#> worker launches online discovered assigned complete
Expand All @@ -197,7 +188,7 @@ controller$launcher$summary()

Finally, the client summary shows up-to-date worker status from `mirai::daemons()`.

```{r}
```r
controller$client$summary()
#> # A tibble: 2 × 6
#> worker online instances assigned complete socket
Expand All @@ -210,13 +201,13 @@ controller$client$summary()

Call `terminate()` on the controller after you finish using it. `terminate()` tries to close the the [`mirai`](https://github.com/shikokuchuo/mirai) dispatcher and any workers that may still be running. It is important to free up these resources.

```{r}
```r
controller$terminate()
```

The `mirai` dispatcher process should exit on its own, but if not, you can manually terminate the process ID at `controller$client$dispatcher` or call `crew_clean()` to terminate any dispatchers from current or previous R sessions.

```{r}
```r
crew_clean()
#> nothing to clean up
```
Expand All @@ -231,7 +222,7 @@ A `crew` controller creates different types of local processes. These include:

Usually these processes terminate themselves when the parent R session exits or the controller terminates, but under rare circumstances they may continue running. The "local monitor" in `crew` makes it easy to list and terminate any of these processes which may be running on your local computer. Example:

```{r, eval = FALSE}
```r
monitor <- crew_monitor_local()
monitor$dispatchers() # List PIDs of all local {mirai} dispatcher processes.
#> [1] 31215
Expand Down
36 changes: 17 additions & 19 deletions vignettes/plugins.Rmd
Original file line number Diff line number Diff line change
Expand Up @@ -11,12 +11,11 @@ vignette: >
---

```{r, include = FALSE}
library(crew)
knitr::opts_chunk$set(
collapse = TRUE,
comment = "#>",
eval = FALSE
comment = "#>"
)
library(crew)
```

# About
Expand Down Expand Up @@ -55,7 +54,7 @@ If you implement a custom [`terminate_worker()`](https://wlandau.github.io/crew/

The following is a custom custom launcher class whose workers are local R processes on Unix-like systems.

```{r}
```r
custom_launcher_class <- R6::R6Class(
classname = "custom_launcher_class",
inherit = crew::crew_class_launcher,
Expand Down Expand Up @@ -88,7 +87,7 @@ Every `launch_worker()` method must accept arguments `call`, `launcher`, `worker

To see what the `call` object looks like, create a new launcher and run the `call()` method.

```{r, eval = TRUE, message = FALSE}
```{r}
library(crew)
launcher <- crew_launcher_local()
launcher$call(
Expand All @@ -111,7 +110,7 @@ It is useful to have a helper function that creates controllers with your custom

Feel free to borrow from the [`crew_controller_local()` source code](https://github.com/wlandau/crew/blob/main/R/crew_controller_local.R). For packages, you can use the `@inheritParams` [`roxygen2`](https://roxygen2.r-lib.org/) tag to inherit the documentation of all the arguments instead of writing it by hand. You may want to adjust the default arguments based on the specifics of your platform, especially `seconds_launch` if workers take a long time to launch.

```{r}
```r
#' @title Create a controller with the custom launcher.
#' @export
#' @description Create an `R6` object to submit tasks and
Expand Down Expand Up @@ -177,15 +176,15 @@ When you are ready to begin testing, try out the example in the [README](https:/

First, create and start a controller. You may wish to monitor local processes on your computer to make sure the `mirai` dispatcher starts.

```{r}
```r
library(crew)
controller <- crew_controller_custom(workers = 2)
controller$start()
```

Try pushing a task that gets the local IP address and process ID of the worker instance.

```{r}
```r
controller$push(
name = "get worker IP address and process ID",
command = paste(getip::getip(type = "local"), ps::ps_pid())
Expand All @@ -194,7 +193,7 @@ controller$push(

Wait for the task to complete and look at the result.

```{r}
```r
controller$wait()
result <- controller$pop()
result$result[[1]]
Expand All @@ -203,7 +202,7 @@ result$result[[1]]

Please use the result to verify that the task really ran on a worker as intended. The process ID above should agree with the one from the handle ([except on Windows](https://github.com/r-lib/processx/issues/364) because the actual R process may be different from the `R.exe` process created first). In addition, if the worker is running on a different computer, the worker IP address should be different than the local IP address. Since our custom launcher creates local processes, the IP addresses are the same in this case, but they should be different for a [SLURM](https://slurm.schedmd.com/) or [AWS Batch](https://aws.amazon.com/batch/) launcher.

```{r}
```r
getip::getip(type = "local")
#> "192.168.0.2"
controller$launcher$workers$handle[[1]]$get_pid()
Expand All @@ -212,7 +211,7 @@ controller$launcher$workers$handle[[1]]$get_pid()

If you did not set any timeouts or task limits, the worker that ran the task should still be online. The other worker had no tasks, so it did not need to launch.

```{r}
```r
controller$client$summary()
#> # A tibble: 2 × 6
#> worker online instances assigned complete socket
Expand All @@ -223,7 +222,7 @@ controller$client$summary()

When you are done, terminate the controller. This terminates the `mirai` dispatcher process and the `crew` workers.

```{r}
```r
controller$terminate()
```

Expand All @@ -233,7 +232,7 @@ Finally, use the process monitoring interface of your computing platform or oper

If the informal testing succeeded, we recommend you scale up testing to more ambitious scenarios. As one example, you can test that your workers can auto-scale and quickly churn through a large number of tasks.

```{r}
```r
library(crew)
controller <- crew_controller_custom(
seconds_idle = 2L,
Expand Down Expand Up @@ -278,9 +277,8 @@ all(sort(unlist(results$result)) == seq_len(200L))
#> [1] TRUE
# View worker and task summaries.
controller$summary()
controller$schedule$summary()
controller$client$summary()
controller$launcher$summary()
controller$schedule$summary()
# Terminate the controller.
controller$terminate()
# Now outside crew, verify that the mirai dispatcher
Expand All @@ -298,7 +296,7 @@ Let's demonstrate on the simple `processx` example. The use case itself may sill

Here is what the launcher class looks like. We work with `processx` PIDs directly because they are light and easy to send to local async `mirai` daemons. The `self$async$eval()` function accepts R code, data, and packages to run a quick local asynchronous task, and it returns a `mirai::mirai()` task object as the handle. `handle$data` returns the results if available, and `crew` uses `mirai::call_mirai()` to make sure any tasks submitted by `launch_worker()` have resolved before they are used by `terminate_worker()`.

```{r}
```r
async_launcher_class <- R6::R6Class(
classname = "custom_launcher_class",
inherit = crew::crew_class_launcher,
Expand All @@ -322,7 +320,7 @@ async_launcher_class <- R6::R6Class(

The controller helper includes a `processes` argument which sets how many asynchronous `mirai` daemons to create. Set `processes` to `NULL` to disable async and use it like an ordinary synchronous controller.

```{r}
```r
crew_controller_async <- function(
name = "async controller name",
workers = 1L,
Expand Down Expand Up @@ -379,7 +377,7 @@ crew_controller_async <- function(

Creating a controller is the same as before, except the user sets both the `workers` and `processes` arguments. Remember, these are two different things: `workers` is the number of serious workers that run serious tasks from `push()`, whereas `processes` is the number of `mirai` daemons that asynchronously launch and terminate those serious workers. Workers may or may not be local, but `processes` are always local.

```{r}
```r
async_controller <- crew_controller_async(workers = 12, processes = 4)
```

Expand Down Expand Up @@ -428,7 +426,7 @@ crew_monitor_pids <- function(pattern) {

Example usage:

```{r, eval = FALSE}
```r
monitor <- crew_monitor_local()
monitor$dispatchers() # List PIDs of all local {mirai} dispatcher processes.
#> [1] 31215
Expand Down
Loading

0 comments on commit 6fc2665

Please sign in to comment.