Skip to content

Commit

Permalink
Merge branch 'auth' into develop (#125)
Browse files Browse the repository at this point in the history
  • Loading branch information
mschubert committed Feb 22, 2019
2 parents 0075968 + 11be0f2 commit 016b0d3
Show file tree
Hide file tree
Showing 8 changed files with 35 additions and 16 deletions.
1 change: 1 addition & 0 deletions NEWS.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
* 0.8.6
* Progress bar is now shown before any workers start (#107)
* Socket connections are now authenticated using a session password (#125)
* Marked internal functions with @keywords internal
* Added vignettes for the _User Guide_ and _Technical Documentation_

Expand Down
16 changes: 15 additions & 1 deletion R/qsys.r
Original file line number Diff line number Diff line change
Expand Up @@ -107,13 +107,17 @@ QSys = R6::R6Class("QSys",

if (rcv[[1]]$read) { # otherwise timeout reached
msg = rzmq::receive.socket(private$socket)

if (private$auth != "" && (is.null(msg$auth) || msg$auth != private$auth))
stop("Authentication provided by worker does not match")

switch(msg$id,
"WORKER_UP" = {
if (!is.null(private$pkg_warn) && msg$pkgver != private$pkg_warn) {
warning("\nVersion mismatch: master has ", private$pkg_warn,
", worker ", msg$pkgver, immediate.=TRUE)
private$pkg_warn = NULL
}
private$pkg_warn = NULL
msg$id = "WORKER_READY"
msg$token = "not set"
private$workers_up = private$workers_up + 1
Expand Down Expand Up @@ -194,6 +198,7 @@ QSys = R6::R6Class("QSys",
defaults = list(),
is_cleaned_up = FALSE,
pkg_warn = utils::packageVersion("clustermq"),
auth = "",

send = function(..., serialize=TRUE) {
rzmq::send.socket(socket = private$socket,
Expand All @@ -211,13 +216,22 @@ QSys = R6::R6Class("QSys",
fill_options = function(...) {
values = utils::modifyList(private$defaults, list(...))
values$master = private$master
if ("auth" %in% names(infuser::variables_requested(private$template))) {
values$auth = private$auth = paste(sample(letters, 5, TRUE), collapse="")
} else {
values$auth = NULL
warning("Add 'CMQ_AUTH={{ auth }}' to template to enable socket authentication",
immediate.=TRUE)
}
if (!"job_name" %in% names(values))
values$job_name = paste0("cmq", private$port)
private$workers_total = values$n_jobs
values
},

fill_template = function(values) {
# note: auth will be obligatory in the future and this check will
# be removed (i.e., filling will fail if no field in template)
infuser::infuse(private$template, values)
},

Expand Down
24 changes: 14 additions & 10 deletions R/worker.r
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,9 @@ worker = function(master, timeout=600, ..., verbose=TRUE) {
if (!verbose)
message = function(...) invisible(NULL)

#TODO: replace this by proper authentication
auth = Sys.getenv("CMQ_AUTH")

message("Master: ", master)
if (length(list(...)) > 0)
warning("Arguments ignored: ", paste(names(list(...)), collapse=", "))
Expand All @@ -22,7 +25,7 @@ worker = function(master, timeout=600, ..., verbose=TRUE) {

# send the master a ready signal
rzmq::connect.socket(socket, master)
rzmq::send.socket(socket, data=list(id="WORKER_UP",
rzmq::send.socket(socket, data=list(id="WORKER_UP", auth=auth,
pkgver=utils::packageVersion("clustermq")))
message("WORKER_UP to: ", master)

Expand All @@ -46,14 +49,14 @@ worker = function(master, timeout=600, ..., verbose=TRUE) {
"DO_CALL" = {
result = try(eval(msg$expr, envir=msg$env))
message("eval'd: ", msg$expr)
rzmq::send.socket(socket, data=list(id="WORKER_READY",
rzmq::send.socket(socket, data=list(id="WORKER_READY", auth=auth,
token=token, ref=msg$ref, result=result))
},
"DO_SETUP" = {
if (!is.null(msg$redirect)) {
data_socket = rzmq::init.socket(zmq_context, "ZMQ_REQ")
rzmq::connect.socket(data_socket, msg$redirect)
rzmq::send.socket(data_socket, data=list(id="WORKER_READY"))
rzmq::send.socket(data_socket, data=list(id="WORKER_READY", auth=auth))
message("WORKER_READY to redirect: ", msg$redirect)
msg = rzmq::receive.socket(data_socket)
}
Expand All @@ -64,18 +67,18 @@ worker = function(master, timeout=600, ..., verbose=TRUE) {
token = msg$token
message("token from msg: ", token)
rzmq::send.socket(socket, data=list(id="WORKER_READY",
token=token))
auth=auth, token=token))
} else {
msg = paste("wrong field names for DO_SETUP:",
setdiff(names(msg), need))
rzmq::send.socket(socket, data=list(id="WORKER_ERROR", msg=msg))
rzmq::send.socket(socket, data=list(id="WORKER_ERROR", auth=auth, msg=msg))
}
},
"DO_CHUNK" = {
if (!identical(token, msg$token)) {
msg = paste("mismatch chunk & common data", token, msg$token)
rzmq::send.socket(socket, send.more=TRUE,
data=list(id="WORKER_ERROR", msg=msg))
data=list(id="WORKER_ERROR", auth=auth, msg=msg))
message("WORKER_ERROR: ", msg)
break
}
Expand All @@ -88,21 +91,21 @@ worker = function(master, timeout=600, ..., verbose=TRUE) {

if ("error" %in% class(result)) {
rzmq::send.socket(socket, send.more=TRUE,
data=list(id="WORKER_ERROR", msg=conditionMessage(result)))
data=list(id="WORKER_ERROR", auth=auth, msg=conditionMessage(result)))
message("WORKER_ERROR: ", conditionMessage(result))
break
} else {
message("completed ", sprintf(fmt, length(result$result),
delta[1], delta[2], delta[3]))
send_data = c(list(id="WORKER_READY", token=token), result)
send_data = c(list(id="WORKER_READY", auth=auth, token=token), result)
rzmq::send.socket(socket, send_data)
counter = counter + length(result$result)
}
},
"WORKER_WAIT" = {
message(sprintf("waiting %.2fs", msg$wait))
Sys.sleep(msg$wait)
rzmq::send.socket(socket, data=list(id="WORKER_READY", token=token))
rzmq::send.socket(socket, data=list(id="WORKER_READY", auth=auth, token=token))
},
"WORKER_STOP" = {
break
Expand All @@ -117,7 +120,8 @@ worker = function(master, timeout=600, ..., verbose=TRUE) {
id = "WORKER_DONE",
time = run_time,
mem = sum(gc()[,6]),
calls = counter
calls = counter,
auth = auth
))

message("\nTotal: ", sprintf(fmt, counter, run_time[1], run_time[2], run_time[3]))
Expand Down
2 changes: 1 addition & 1 deletion inst/LSF.tmpl
Original file line number Diff line number Diff line change
Expand Up @@ -4,4 +4,4 @@
#BSUB-R rusage[mem={{ memory | 4096 }}]

ulimit -v $(( 1024 * {{ memory | 4096 }} ))
R --no-save --no-restore -e 'clustermq:::worker("{{ master }}")'
CMQ_AUTH={{ auth }} R --no-save --no-restore -e 'clustermq:::worker("{{ master }}")'
2 changes: 1 addition & 1 deletion inst/PBS.tmpl
Original file line number Diff line number Diff line change
Expand Up @@ -4,4 +4,4 @@
#PBS -j oe

ulimit -v $(( 1024 * {{ memory | 4096 }} ))
R --no-save --no-restore -e 'clustermq:::worker("{{ master }}")'
CMQ_AUTH={{ auth }} R --no-save --no-restore -e 'clustermq:::worker("{{ master }}")'
2 changes: 1 addition & 1 deletion inst/SGE.tmpl
Original file line number Diff line number Diff line change
Expand Up @@ -6,4 +6,4 @@
#$ -t 1-{{ n_jobs }}

ulimit -v $(( 1024 * {{ memory | 4096 }} ))
R --no-save --no-restore -e 'clustermq:::worker("{{ master }}")'
CMQ_AUTH={{ auth }} R --no-save --no-restore -e 'clustermq:::worker("{{ master }}")'
2 changes: 1 addition & 1 deletion inst/SLURM.tmpl
Original file line number Diff line number Diff line change
Expand Up @@ -6,4 +6,4 @@
#SBATCH --array=1-{{ n_jobs }}

ulimit -v $(( 1024 * {{ memory | 4096 }} ))
R --no-save --no-restore -e 'clustermq:::worker("{{ master }}")'
CMQ_AUTH={{ auth }} R --no-save --no-restore -e 'clustermq:::worker("{{ master }}")'
2 changes: 1 addition & 1 deletion inst/TORQUE.tmpl
Original file line number Diff line number Diff line change
Expand Up @@ -5,4 +5,4 @@
#PBS -j oe

ulimit -v $(( 1024 * {{ memory | 4096 }} ))
R --no-save --no-restore -e 'clustermq:::worker("{{ master }}")'
CMQ_AUTH={{ auth }} R --no-save --no-restore -e 'clustermq:::worker("{{ master }}")'

0 comments on commit 016b0d3

Please sign in to comment.