-
Notifications
You must be signed in to change notification settings - Fork 0
/
helpers.R
78 lines (76 loc) · 4.12 KB
/
helpers.R
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
# return a list(2) of BiocParallel::BiocParallelParam for nested parallelization
# [[1]] is used to parallelize across files
# [[2]] is used to parallelize across threads (accessing a single file) -> has to be a MulticoreParam or SerialParam object
# for downwards compatibility, a parallel::cluster object can be passed that will be used to create the BiocParallel parameter objects
getListOfBiocParallelParam <- function(clObj = NULL) {
if (is.null(clObj)) { # no 'clObj' argument
bppl <- list(BiocParallel::SerialParam(), BiocParallel::SerialParam())
} else { # have 'clObj' argument
if (inherits(clObj, "SOCKcluster")) {
# get node names
tryCatch({
nodeNames <- unlist(parallel::clusterEvalQ(clObj, Sys.info()['nodename']))
}, error = function(ex) {
message("FAILED")
stop("The cluster object does not work properly on this system. Please consult the manual of the package 'parallel'\n", call. = FALSE)
})
coresPerNode <- table(nodeNames)
# subset cluster object (represent each node just a single time)
clObjSub <- clObj[!duplicated(nodeNames)]
bppl <- if (min(coresPerNode) == 1)
list(methods::as(clObj, "SnowParam"), BiocParallel::SerialParam())
else
list(methods::as(clObjSub, "SnowParam"),
BiocParallel::MulticoreParam(workers = min(coresPerNode)))
} else if (is.list(clObj) &&
all(vapply(clObj, FUN = inherits,
FUN.VALUE = TRUE, "BiocParallelParam"))) {
bppl <- clObj
}
}
if (length(bppl) == 1)
bppl[[2]] <- BiocParallel::SerialParam()
if (!inherits(bppl[[2]], c("MulticoreParam", "SerialParam")))
stop('Error configuring the parallel backend. The second registered backend (registered()[[2]] or clObj[[2]]) has to be of class "MulticoreParam" or "SerialParam"')
return(bppl[seq_len(2)])
}
# variant that uses a separate logic on windows (where MulticoreParam is not supported)
getListOfBiocParallelParam2 <- function(clObj = NULL) {
if (is.null(clObj)) { # no 'clObj' argument
bppl <- list(BiocParallel::SerialParam(), BiocParallel::SerialParam())
} else { # have 'clObj' argument
if (inherits(clObj, "SOCKcluster")) {
if (identical(.Platform$OS.type, "windows")) {
# MulticoreParam is not supported on windows -> keep the current clObj
bppl <- list(methods::as(clObj, "SnowParam"),
BiocParallel::SerialParam())
} else {
# non-windows platforms:
# get node names
tryCatch({
nodeNames <- unlist(parallel::clusterEvalQ(clObj, Sys.info()['nodename']))
}, error = function(ex) {
message("FAILED")
stop("The cluster object does not work properly on this system. Please consult the manual of the package 'parallel'\n", call. = FALSE)
})
coresPerNode <- table(nodeNames)
# subset cluster object (represent each node just a single time)
clObjSub <- clObj[!duplicated(nodeNames)]
bppl <- if (min(coresPerNode) == 1)
list(methods::as(clObj, "SnowParam"), BiocParallel::SerialParam())
else
list(methods::as(clObjSub, "SnowParam"),
BiocParallel::MulticoreParam(workers = min(coresPerNode)))
}
} else if (is.list(clObj) &&
all(vapply(clObj, FUN = inherits,
FUN.VALUE = TRUE, "BiocParallelParam"))) {
bppl <- clObj
}
}
if (length(bppl) == 1)
bppl[[2]] <- BiocParallel::SerialParam()
if (!inherits(bppl[[2]], c("MulticoreParam", "SerialParam")))
stop('Error configuring the parallel backend. The second registered backend (registered()[[2]] or clObj[[2]]) has to be of class "MulticoreParam" or "SerialParam"')
return(bppl[seq_len(2)])
}