-
Notifications
You must be signed in to change notification settings - Fork 0
/
ChAMP_DisributedScript.R
109 lines (81 loc) · 2.79 KB
/
ChAMP_DisributedScript.R
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
source("utils/utils.R")
set.seed(101)
run_distributed_champ <-
function(path_ss,
path_idats,
output,
method = "champ",
array_type = "EPIC",
force = TRUE,
norm_type = "BMIQ",
cores = 1,
chunk_size = 50) {
check_if_file_exists(path_idats)
check_if_file_exists(path_ss)
n_csv_files <- count_files(path_idats, "SampleSheet", ".csv")
if (n_csv_files > 1) {
stop("Please remove SampleSheet from idat directory!")
}
count_files(path_idats, "IDATs", ".idat")
check_if_file_exists(output)
sample_sheet <- read.csv(path_ss, row.names = 1)
n_rows <- length(row.names(sample_sheet))
check_if_necessary(chunk_size, n_rows)
randomized_samples <- shuffle(rownames(sample_sheet))
for (i in 1:ceiling(n_rows / chunk_size)) {
# Crete QC path
QC_path <- create_dir(output, "QC", i)
# norm path
norm_path <- create_dir(output, "Norm", i)
cat(
"Run",
i,
"/",
ceiling(n_rows / chunk_size),
"Each up to: ",
chunk_size ,
"samples.",
"\n"
)
idx_start <- chunk_size * (i - 1) + 1
idx_end <- (chunk_size * i)
if (idx_end > n_rows) {
idx_end <- n_rows
}
temp_samples <- randomized_samples[idx_start:idx_end]
temp_sample_sheet <- sample_sheet[temp_samples,]
temp_sample_sheet["Sample_Name"] <-
rownames(temp_sample_sheet)
write.csv(temp_sample_sheet,
file.path(path_idats, "temp_sample_sheet.csv"),
sep = ",")
mynorm_temp <-
run_champ(
path_idats = path_idats,
QC_path = QC_path,
Norm_path = norm_path,
method = method,
array_type = array_type,
force = force,
norm_type = norm_type,
cores = cores
)
delete_temp_sample_sheet(path_idats)
path = file.path(output, glue(i, "_temp_chunk", ".parquet"))
mynorm_temp$CpG <- rownames(mynorm_temp)
write.parquet(mynorm_temp, path)
message("Saved chunk no. ", i, "\n")
}
message("Loading chunks ...", "\n")
mynorms <- load_chunks(output, "_temp_chunk.parquet")
message("Looking for CpGs common across batches ...", "\n")
common_cpg <- overlap_cpgs(mynorms)
message("Concating batches into myNorm ...", "\n")
myNorm <- concate_mynorms(mynorms, common_cpg)
mynorm_path = glue(output, "myNorm.csv")
cat("Saving myNorm ...", "\n")
write.csv(myNorm, mynorm_path)
cat("Removing temporary files ...", "\n")
delete_temp_files(output, "temp_chunk.csv")
message("DONE!")
}