-
Notifications
You must be signed in to change notification settings - Fork 28
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Support clustermq as backend for drake #86
Comments
Thank you for your kind words, and please excuse my delay in getting back to you: this question is a bit harder to answer than the others. We have discussed a non-blocking version of For One option would be to implement Another option would be to use parallel In either case, I think this would use persistent (and reusable) workers and not actually need non-blocking |
Thanks for explaining, Michael. I think futureverse/future#204 would be ideal, and I am optimistic about its eventual implementation. I will close this issue as a duplicate of #23. |
I am reopening this for |
And I appreciate it. Really glad you're willing to help me speed up |
Let me expand on the previous point a bit: (1) I'm pretty sure you do not want transient workers for Many of your function calls may be short. Submitting many short-running jobs in a short period of time puts stress on the scheduler. The lower limit that is usually quoted is 2 minutes, and (2) There are usually batches of function calls where some arguments are iterated but others are not Often, function calls will work on the same data but e.g. different indices are supplied. In this case, you will not want the common part to be sent over the network for each call separately. I think the way to go here is to:
The downside of course is that each group of function calls needs to be completed before the next can start. As the calls themselves are load-balanced, I don't think this is a big deal. |
You bring up a good point re transient workers. I do eventually want to get to job groups, but it is much harder to implement than persistent and transient workers. All in good time. I do like the idea of maintaining a pool of common workers and submitting jobs to them as targets become ready (i.e. when their dependencies are all checked and built). The only downside is that users may hit wall time limits fairly easily, but we can work around that later with job groups (or simply by refreshing workers periodically). Unfortunately, it is not so simple to group tasks by function call. Users define arbitrary dependency networks of commands, and those commands are arbitrary chunks of R code (not necessarily neat function calls). I suppose some targets sometimes share all the same dependencies, but I do not think such groupings would efficiently generalize to larger workflows.
At first, I did think this was such a big deal. What you describe is essentially what I have been calling "staged scheduling", and it was a good start. However, during its first several months of use, there were several projects that lost considerable parallel efficiency. The most successful solutions so far have been
|
I did try to use |
I'm beginning to see the issues you are facing. It should be possible to use This would be to use something like: w = workers(2) # submit 2 worker jobs for testing
on.exit(w$cleanup())
msg = w$receive_data()
if (!is.null(msg$result))
# handle result ...
# if there is work
# this still needs some adjustments on cmq for this to work
w$set_common_data(data=list(fun, const, export, seed)) # set function to call, constant args
w$send_common_data()
w$send_job_data(msg=list(id="do_chunk", df)) # df is data.frame with row=id and fun args
# if not
w$disconnect_worker(msg) Processing the calls this way would be asynchronous, with the Is that something you could work with? |
Thanks for meeting me half way! I think I get the general idea, and I want to understand more about the details. From what I understand, the master process will loop over something like the above code to receive worker data and send jobs to workers. After In |
I've written about the specification here, hope that helps. Please note the proposed drake-specific extensions
Yes
It will be the result of a single chunk (in your case, likely a function call or expression on a single set of arguments). The fields of
This would be a single call to a single worker. You'd loop through the following way: switch(msg$id,
"WORKER_UP" = {
if (there is calls left that you can do right now)
w$do_call(...)
if (there is a result in msg$id)
# handle result here
}
"WORKER_ERROR" = {
# something went wrong, clean up and print error
}
) Calls will be load-balanced, but you need to check the result ID to match the result to a call you sent. Otherwise, you'll have no idea which worker sent which result. |
Thanks for elaborating. I will experiment with this and get back to you. |
Michael, thank you for your patience. I know I was slow getting back to you. I wanted to make sure I had a good quality block of time to devote to this.
The spec helped a lot, and I do see the necessity of Regarding your follow-up code from #86 (comment): do you plan to implement I studied your code from #86 (comment), and the results of my experimentation are below, along with some embedded questions and comments. I am optimistic about interacting with library(clustermq)
#> * Option 'clustermq.scheduler' not set, defaulting to SGE
#> --- see: https://github.com/mschubert/clustermq/wiki#setting-up-the-scheduler
w <- workers(2)
#> Submitting 2 worker jobs (ID: 7628) ...
on.exit(w$finalize()) # See #91.
msg <- w$receive_data()
msg
#> $id
#> [1] "WORKER_UP"
#>
#> $pkgver
#> [1] 0.8.4.1
if (is.null(msg)){
# Under what circumstances is the message NULL?
# What should I check if it is?
}
# From https://github.com/mschubert/clustermq/blob/3c7fda0a7c6ddcc7f9b7ac557a6a2abfcc917e16/R/Q_rows.r#L24-L25
data <- list(
fun = function(x){
x * 2
},
const = list(),
export = list(),
rettype = "list",
common_seed = 123
)
# From https://github.com/mschubert/clustermq/blob/3c7fda0a7c6ddcc7f9b7ac557a6a2abfcc917e16/R/Q_rows.r#L41
# Do we need `id = "DO_SETUP"` somewhere?
do.call(w$set_common_data, data)
w$send_common_data()
w$send_job_data(
msg = list(
id = "DO_CHUNK", # Maybe eventually "DO_CALL"?
df = data.frame(x = 1) # drake will deploy targets 1 at a time.
)
)
#> Operation cannot be accomplished in current state
msg <- w$receive_data()
msg
#> $id
#> [1] "WORKER_ERROR"
#>
#> $token
#> [1] "agasx"
w$disconnect_worker(msg) It seems I do not know how to set the common data properly. In a previous attempt, I tried to send w$set_common_data(data)
w$send_common_data()
w$send_job_data(
msg = list(
id = "DO_CHUNK", # Maybe eventually "DO_CALL"?
df = data.frame(x = 1) # drake will deploy targets 1 at a time.
)
)
#> Operation cannot be accomplished in current state
msg <- w$receive_data()
msg
#> $id
#> [1] "WORKER_ERROR"
#>
#> $msg
#> [1] "wrong field names for DO_SETUP: data" Like you said, maybe |
@wlandau Can you try the following using w = workers(1)
# worker connecting to master with id=WORKER_UP
w$receive_data()
# you should be able to send arbitrary expressions to evaluate on a worker
w$send_call(x*2, list(x=5))
# receive the result from any worker with id=WORKER_READY
w$receive_data()
w$cleanup() |
It worked exactly as I guessed. > options(clustermq.scheduler = "sge", clustermq.template = "template.tmpl")
> library(clustermq)
> w = workers(1)
Submitting 1 worker jobs (ID: 7225) ...
> # worker connecting to master with id=WORKER_UP
> w$receive_data()
$id
[1] "WORKER_UP"
$pkgver
[1] ‘0.8.4.99’
> # you should be able to send arbitrary expressions to evaluate on a worker
> w$send_call(x*2, list(x=5))
> # receive the result from any worker with id=WORKER_READY
> w$receive_data()
$id
[1] "WORKER_READY"
$token
[1] NA
$expr
x * 2
$result
[1] 10
> w$cleanup() And with a low enough > options(clustermq.scheduler = "sge", clustermq.template = "template.tmpl")
> library(clustermq)
>
> w = workers(2)
Submitting 2 worker jobs (ID: 7321) ...
>
> # worker connecting to master with id=WORKER_UP
> w$receive_data()
$id
[1] "WORKER_UP"
$pkgver
[1] ‘0.8.4.99’
> w$receive_data(timeout = 1e-9)
NULL
>
> # you should be able to send arbitrary expressions to evaluate on a worker
> w$send_call(x*2, list(x=5))
> w$send_call(x*2, list(x=5))
Operation cannot be accomplished in current state
>
> # receive the result from any worker with id=WORKER_READY
> w$receive_data()
$id
[1] "WORKER_READY"
$token
[1] NA
$expr
x * 2
$result
[1] 10
>
> w$cleanup()
Master: [2.3s 1.5% CPU]; Worker: [avg NA% CPU, max NA Mb] |
For your 2nd example you still need an event loop ;-) while(work remaining or worker still running) {
msg = w$receive_data() # wait until a worker is ready
if (msg$id == "WORKER_READY") {
if (work remaining)
w$send_call(next work) # send it work if it is
else
w$send_shutdown_worker()
# ...handle your result...
} else
# w$disconnect_worker() if WORKER_DONE, stop if WORKER_ERROR
} The key here is that receive and send are called alternately, so that ZeroMQ can distribute the work in the background. |
I see. This is beginning to take shape. Also, Does this look sort of right to you? w$send_job_data(config)
while(work remains or worker still running) {
msg = w$receive_data()
if (identical(msg$id, "WORKER_READY")) {
store_result(msg)
target <- choose_next_target(msg)
if (!is.null(target)){ # There is work to be done.
# Load the dependencies the the next worker needs to have in memory
# and remove unnecessary objects
# (depending on the user-defined memory strategy)
prune_envir(config) # https://github.com/ropensci/drake/blob/16700f77700729511db103610f55c23978d7fef7/R/envir.R#L28
# Build the target.
w$send_call(
build_target,
list(target = target),
const = list(dependencies = get_dependencies(config$envir))
)
}
} else if (identical(msg$id, "WORKER_DONE") {
w$disconnect_worker()
} else if (identical(msg$id, "WORKER_ERROR") {
stop("worker error")
}
} Eventually, I would like to be more selective about the data that gets sent to each worker. Many targets share dependencies, and depending what the assigned worker built previously, we could be repeating ourselves if we send all the dependency data over the socket. Of course, because of |
Looks good! Some minor pointers: If you want to send the config separately in the beginning, use
1 I know requiring dummy fields is not ideal, and this is still subject to change |
I am not sure I understand your comments about Also, do we need a Overall, I am having trouble understanding the functions in the QSys class and when to use them. I think it would be helpful to have a specification to distinguish among them and walk through how they work together practice. |
Update: the new backend has progressed enough for a PR: ropensci/drake#501. The only thing left is an unpredictable yet frequent error: Error in rzmq::poll.socket(list(private$socket), list("read"), timeout = timeout) :
Interrupted system call Any ideas about what might be happening? I am running the reprex from #86 (comment) using ropensci/drake@8f84aa7. |
I can process
Note that we had issues with terminal size changes sending interrupts before. |
ropensci/rzmq#37 does provide context, thanks. I am not sure what could be sending the interruption, but since the error message mentions |
Yes, it's no surprise that you catch the interrupts with Using
Are you running any multicore operations in the background? |
I was running some occasional By the way, I am not sure this is relevant, but in the current loop on the master process, I never see a |
Regarding In your pseudocode from #86 (comment), the |
Never mind: I saw you use |
bb4477e seemed to help, but test-clustermq.R:16: warning: clustermq parallelism
cannot wait for child 14833 as it does not exist Then, sometimes the task hangs, and other times the infamous "Interrupted system call" error appears. |
However, jobs on SGE run great, and real HPC schedulers should be |
Please note that the latest version on This will cause problems for drake/clustermq.R#L34; you will need to check Sorry for this, but that's the downside of using the bleeding edge version. I might still simplify the shutdown procedure before a release (eliminating the requirement for |
Thanks for letting me know. Nothing to apologize for, I am excited to see In the new cmq_set_common_data <- function(config){
export <- list()
if (identical(config$envir, globalenv())){
export <- as.list(config$envir, all.names = TRUE) # nocov
}
export$config <- config
config$workers$set_common_data(
export = export,
fun = identity,
const = list(),
rettype = list(),
common_seed = config$seed,
token = "set_common_data_token"
)
}
cmq_master <- function(config){
while (cmq_work_remains(config) || config$workers$workers_running > 0){
msg <- config$workers$receive_data()
cmq_conclude_build(msg = msg, config = config)
if (identical(msg$id, "WORKER_READY")) {
if (identical(msg$token, "set_common_data_token")){
config$workers$send_common_data()
} else if (cmq_work_remains(config)){
cmq_send_target(config)
} else {
config$workers$send_shutdown_worker()
}
} else if (identical(msg$id, "WORKER_DONE")) {
config$workers$disconnect_worker(msg)
} else if (identical(msg$id, "WORKER_ERROR")) {
stop("clustermq worker error") # nocov
}
}
} |
Also, if/when you remove the requirement to check cmq_master <- function(config){
on.exit(config$workers$cleanup()) # Replaces WORKER_DONE checks?
while (cmq_work_remains(config) || config$workers$workers_running > 0){
msg <- config$workers$receive_data()
cmq_conclude_build(msg = msg, config = config)
if (identical(msg$id, "WORKER_READY")) {
if (identical(msg$token, "set_common_data_token")){
config$workers$send_common_data()
} else if (cmq_work_remains(config)){
cmq_send_target(config)
} else {
config$workers$send_shutdown_worker() # Do we still need this?
}
} else if (identical(msg$id, "WORKER_ERROR")) {
stop("clustermq worker error") # nocov
}
}
} |
The idea (implemented in cmq_master <- function(config){
on.exit(config$workers$finalize())
while (not all results are in){
msg <- config$workers$receive_data()
cmq_conclude_build(msg = msg, config = config)
if (!identical(msg$token, "set_common_data_token")){
config$workers$send_common_data()
} else if (cmq_work_remains(config)){
cmq_send_target(config)
} else {
config$workers$send_shutdown_worker()
}
}
if (config$workers$cleanup())
on.exit()
} The downside I can see is that if you enter the loop after all results are finished, you will get a |
I do like having a worker API that does not require checking message types most of the time. I have begun the adjustment in ropensci/drake@8ee84b0. Currently, # devtools::install_github("ropensci/drake", ref = "clustermq")
library(drake)
load_mtcars_example()
clean(destroy = TRUE)
options(clustermq.scheduler = "multicore")
make(my_plan, parallelism = "clustermq", jobs = 2, verbose = 4) |
You still seem to be using while (cmq_work_remains(config) || config$workers$workers_running > 0){ and not while (not all results are in){ The new API requires that you exit your main loop when the last result is in, not when the last worker is shut down (this is handled by I've added (2a4c3bf) the errors: Error in config$workers$receive_data() :
Trying to receive data without workers and Error in config$workers$receive_data() :
Trying to receive data after work finished to tell the user when they attempt to do that. |
Thanks. In ropensci/drake@0489ccf, the main loop now checks if all the results are in. I just tried #86 (comment) again using ropensci/drake@0489ccf and 6af6dff, and I see the following.
[1] "receiveing"
$id
[1] "WORKER_READY"
$pkgver
[1] 0.8.4.99
$token
[1] "not set"
The results are the same whether I use 1, 2, or 8 workers. |
The call to cleanup should go outside the loop (see #86 (comment)). Now you shut down your workers after the first |
Wow, I can't believe I missed that! Thanks! |
Initial testing is looking so much better! #99 aside, my last concern is that I am seeing some (but not all) of my SGE jobs still running even after cleanup. I submitted more workers than targets that time, so I wonder if these zombie workers never fully started up to begin with. Anyway, is the new worker API likely to change significantly going forward? If not, we are close being ready to merge ropensci/drake#501. |
|
Good to see that it works & happy I could contribute a bit! I still aim to solve #99 and have the |
Fantastic! I will keep an eye out for the next CRAN release. |
First of all, I am super excited about this package! I just tried it out, and it automatically detected my company's SGE scheduler and submitted jobs in a fraction of the time I am used to. Way cool!
I am thinking about a couple of
clustermq
backends fordrake
: one for persistent workers, and another for transient workers. The former is easiest to implement, but I do not expect to see performance gains there because persistent workers need to use the file system to save targets on the go.However,
clustermq
-based transient workers could really shine, especially when compared todrake
's currentfuture
/batchtools
-based transient workers (make(parallelism = "future")
, which add noticeable overhead to every single target. But to implement transient workers withclustermq
, I believe I will need a way to submit and monitor workers in a non-blocking way. Withfuture
, for example, the master process can submit a job, go check on other jobs, and come back and check if the first job finished.I suppose one potential alternative is to take advantage of the native pipeline functionality in the schedulers themselves. Are you considering this for
clustermq
?The text was updated successfully, but these errors were encountered: