Skip to content

Commit

Permalink
enable basic socket authentication
Browse files Browse the repository at this point in the history
  • Loading branch information
mschubert committed Feb 21, 2019
1 parent 3e382cc commit 552f68d
Show file tree
Hide file tree
Showing 7 changed files with 31 additions and 15 deletions.
15 changes: 14 additions & 1 deletion R/qsys.r
Original file line number Diff line number Diff line change
Expand Up @@ -107,13 +107,17 @@ QSys = R6::R6Class("QSys",

if (rcv[[1]]$read) { # otherwise timeout reached
msg = rzmq::receive.socket(private$socket)

if (private$auth != "" && (is.null(msg$auth) || msg$auth != private$auth))
stop("Authentication provided by worker does not match")

switch(msg$id,
"WORKER_UP" = {
if (!is.null(private$pkg_warn) && msg$pkgver != private$pkg_warn) {
warning("\nVersion mismatch: master has ", private$pkg_warn,
", worker ", msg$pkgver, immediate.=TRUE)
private$pkg_warn = NULL
}
private$pkg_warn = NULL
msg$id = "WORKER_READY"
msg$token = "not set"
private$workers_up = private$workers_up + 1
Expand Down Expand Up @@ -194,6 +198,7 @@ QSys = R6::R6Class("QSys",
defaults = list(),
is_cleaned_up = FALSE,
pkg_warn = utils::packageVersion("clustermq"),
auth = "",

send = function(..., serialize=TRUE) {
rzmq::send.socket(socket = private$socket,
Expand All @@ -211,13 +216,21 @@ QSys = R6::R6Class("QSys",
fill_options = function(...) {
values = utils::modifyList(private$defaults, list(...))
values$master = private$master
values$auth = private$auth = paste(sample(letters, 5, TRUE), collapse="")
if (!"job_name" %in% names(values))
values$job_name = paste0("cmq", private$port)
private$workers_total = values$n_jobs
values
},

fill_template = function(values) {
# note: auth will be obligatory in the future and this check will
# be removed (i.e., filling will fail if no field in template)
if (! "auth" %in% names(infuser::variables_requested(private$template))) {
values$auth = NULL
warning("Add 'CMQ_AUTH={{ auth }}' to template to enable socket authentication",
immediate.=TRUE)
}
infuser::infuse(private$template, values)
},

Expand Down
21 changes: 12 additions & 9 deletions R/worker.r
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,9 @@ worker = function(master, timeout=600, ..., verbose=TRUE) {
if (!verbose)
message = function(...) invisible(NULL)

#TODO: replace this by proper authentication
auth = Sys.getenv("CMQ_AUTH")

message("Master: ", master)
if (length(list(...)) > 0)
warning("Arguments ignored: ", paste(names(list(...)), collapse=", "))
Expand All @@ -22,7 +25,7 @@ worker = function(master, timeout=600, ..., verbose=TRUE) {

# send the master a ready signal
rzmq::connect.socket(socket, master)
rzmq::send.socket(socket, data=list(id="WORKER_UP",
rzmq::send.socket(socket, data=list(id="WORKER_UP", auth=auth,
pkgver=utils::packageVersion("clustermq")))
message("WORKER_UP to: ", master)

Expand All @@ -46,14 +49,14 @@ worker = function(master, timeout=600, ..., verbose=TRUE) {
"DO_CALL" = {
result = try(eval(msg$expr, envir=msg$env))
message("eval'd: ", msg$expr)
rzmq::send.socket(socket, data=list(id="WORKER_READY",
rzmq::send.socket(socket, data=list(id="WORKER_READY", auth=auth,
token=token, ref=msg$ref, result=result))
},
"DO_SETUP" = {
if (!is.null(msg$redirect)) {
data_socket = rzmq::init.socket(zmq_context, "ZMQ_REQ")
rzmq::connect.socket(data_socket, msg$redirect)
rzmq::send.socket(data_socket, data=list(id="WORKER_READY"))
rzmq::send.socket(data_socket, data=list(id="WORKER_READY", auth=auth))
message("WORKER_READY to redirect: ", msg$redirect)
msg = rzmq::receive.socket(data_socket)
}
Expand All @@ -64,18 +67,18 @@ worker = function(master, timeout=600, ..., verbose=TRUE) {
token = msg$token
message("token from msg: ", token)
rzmq::send.socket(socket, data=list(id="WORKER_READY",
token=token))
auth=auth, token=token))
} else {
msg = paste("wrong field names for DO_SETUP:",
setdiff(names(msg), need))
rzmq::send.socket(socket, data=list(id="WORKER_ERROR", msg=msg))
rzmq::send.socket(socket, data=list(id="WORKER_ERROR", auth=auth, msg=msg))
}
},
"DO_CHUNK" = {
if (!identical(token, msg$token)) {
msg = paste("mismatch chunk & common data", token, msg$token)
rzmq::send.socket(socket, send.more=TRUE,
data=list(id="WORKER_ERROR", msg=msg))
data=list(id="WORKER_ERROR", auth=auth, msg=msg))
message("WORKER_ERROR: ", msg)
break
}
Expand All @@ -88,21 +91,21 @@ worker = function(master, timeout=600, ..., verbose=TRUE) {

if ("error" %in% class(result)) {
rzmq::send.socket(socket, send.more=TRUE,
data=list(id="WORKER_ERROR", msg=conditionMessage(result)))
data=list(id="WORKER_ERROR", auth=auth, msg=conditionMessage(result)))
message("WORKER_ERROR: ", conditionMessage(result))
break
} else {
message("completed ", sprintf(fmt, length(result$result),
delta[1], delta[2], delta[3]))
send_data = c(list(id="WORKER_READY", token=token), result)
send_data = c(list(id="WORKER_READY", auth=auth, token=token), result)
rzmq::send.socket(socket, send_data)
counter = counter + length(result$result)
}
},
"WORKER_WAIT" = {
message(sprintf("waiting %.2fs", msg$wait))
Sys.sleep(msg$wait)
rzmq::send.socket(socket, data=list(id="WORKER_READY", token=token))
rzmq::send.socket(socket, data=list(id="WORKER_READY", auth=auth, token=token))
},
"WORKER_STOP" = {
break
Expand Down
2 changes: 1 addition & 1 deletion inst/LSF.tmpl
Original file line number Diff line number Diff line change
Expand Up @@ -4,4 +4,4 @@
#BSUB-R rusage[mem={{ memory | 4096 }}]

ulimit -v $(( 1024 * {{ memory | 4096 }} ))
R --no-save --no-restore -e 'clustermq:::worker("{{ master }}")'
CMQ_AUTH={{ auth }} R --no-save --no-restore -e 'clustermq:::worker("{{ master }}")'
2 changes: 1 addition & 1 deletion inst/PBS.tmpl
Original file line number Diff line number Diff line change
Expand Up @@ -4,4 +4,4 @@
#PBS -j oe

ulimit -v $(( 1024 * {{ memory | 4096 }} ))
R --no-save --no-restore -e 'clustermq:::worker("{{ master }}")'
CMQ_AUTH={{ auth }} R --no-save --no-restore -e 'clustermq:::worker("{{ master }}")'
2 changes: 1 addition & 1 deletion inst/SGE.tmpl
Original file line number Diff line number Diff line change
Expand Up @@ -6,4 +6,4 @@
#$ -t 1-{{ n_jobs }}

ulimit -v $(( 1024 * {{ memory | 4096 }} ))
R --no-save --no-restore -e 'clustermq:::worker("{{ master }}")'
CMQ_AUTH={{ auth }} R --no-save --no-restore -e 'clustermq:::worker("{{ master }}")'
2 changes: 1 addition & 1 deletion inst/SLURM.tmpl
Original file line number Diff line number Diff line change
Expand Up @@ -6,4 +6,4 @@
#SBATCH --array=1-{{ n_jobs }}

ulimit -v $(( 1024 * {{ memory | 4096 }} ))
R --no-save --no-restore -e 'clustermq:::worker("{{ master }}")'
CMQ_AUTH={{ auth }} R --no-save --no-restore -e 'clustermq:::worker("{{ master }}")'
2 changes: 1 addition & 1 deletion inst/TORQUE.tmpl
Original file line number Diff line number Diff line change
Expand Up @@ -5,4 +5,4 @@
#PBS -j oe

ulimit -v $(( 1024 * {{ memory | 4096 }} ))
R --no-save --no-restore -e 'clustermq:::worker("{{ master }}")'
CMQ_AUTH={{ auth }} R --no-save --no-restore -e 'clustermq:::worker("{{ master }}")'

0 comments on commit 552f68d

Please sign in to comment.