diff --git a/src/reporting/get_metric_info/script.R b/src/reporting/get_metric_info/script.R index 0b3aff308..a13e55de7 100644 --- a/src/reporting/get_metric_info/script.R +++ b/src/reporting/get_metric_info/script.R @@ -32,7 +32,7 @@ outputs <- map(configs, function(config) { info$task_id <- gsub("/.*", "", config$namespace) info$id <- info$name info$name <- NULL - info$component_id <- config$name + info$component_name <- config$name info$namespace <- config$namespace info$commit_sha <- build_info$git_commit %||% "missing-sha" info$code_version <- "missing-version" @@ -47,6 +47,7 @@ outputs <- map(configs, function(config) { # construct v1 format out <- list( task_id = info$task_id, + component_name = info$component_name, metric_id = info$id, metric_name = info$label, metric_summary = info$summary, diff --git a/src/reporting/get_results/script.R b/src/reporting/get_results/script.R index 9f8459cff..6b4555665 100644 --- a/src/reporting/get_results/script.R +++ b/src/reporting/get_results/script.R @@ -9,16 +9,24 @@ library(purrr, warn.conflicts = FALSE) library(rlang, warn.conflicts = FALSE) ## VIASH START +# raw_dir <- "resources_test/openproblems/task_results_v3/raw" +# processed_dir <- "resources_test/openproblems/task_results_v3/processed" +# raw_dir <- "/home/rcannood/workspace/openproblems-bio/task_perturbation_prediction/resources/results/run_2024-10-31_06-14-14" +# processed_dir <- "/home/rcannood/workspace/openproblems-bio/website/results/perturbation_prediction/data" +raw_dir <- "/home/rcannood/workspace/openproblems-bio/task_batch_integration/resources/results/run_2024-11-20_12-47-03" +processed_dir <- "/home/rcannood/workspace/openproblems-bio/website/results/batch_integration/data" + par <- list( # inputs - input_scores = "resources_test/openproblems/task_results_v3/raw/score_uns.yaml", - input_execution = "resources_test/openproblems/task_results_v3/raw/trace.txt", - input_dataset_info = "resources_test/openproblems/task_results_v3/processed/dataset_info.json", - input_method_info = "resources_test/openproblems/task_results_v3/processed/method_info.json", - input_metric_info = "resources_test/openproblems/task_results_v3/processed/metric_info.json", + input_scores = paste0(raw_dir, "/score_uns.yaml"), + input_execution = paste0(raw_dir, "/trace.txt"), + input_dataset_info = paste0(processed_dir, "/dataset_info.json"), + input_method_info = paste0(processed_dir, "/method_info.json"), + input_method_configs = paste0(raw_dir, "/method_configs.yaml"), + input_metric_info = paste0(processed_dir, "/metric_info.json"), # outputs - output_results = "resources_test/openproblems/task_results_v3/processed/results.json", - output_metric_execution_info = "resources_test/openproblems/task_results_v3/processed/metric_execution_info.json" + output_results = paste0(processed_dir, "/results.json"), + output_metric_execution_info = paste0(processed_dir, "/metric_execution_info.json") ) ## VIASH END @@ -49,6 +57,8 @@ parse_size <- function(x) { out <- if (is.na(x) || x == "-") { NA_integer_ + } else if (grepl("TB", x)) { + as.numeric(gsub(" *TB", "", x)) * 1024 * 1024 } else if (grepl("GB", x)) { as.numeric(gsub(" *GB", "", x)) * 1024 } else if (grepl("MB", x)) { @@ -129,51 +139,63 @@ scores <- raw_scores %>% .groups = "drop" ) -# read nxf log and process the task id -norm_methods <- "/log_cp10k|/log_cpm|/sqrt_cp10k|/sqrt_cpm|/l1_sqrt|/log_scran_pooling" -id_regex <- paste0("^.*:(.*)_process \\(([^\\.]*)(", norm_methods, ")?(.[^\\.]*)?\\.(.*)\\)$") -trace <- readr::read_tsv(par$input_execution) %>% +# read execution info +# -> only keep the last execution of each process +input_execution <- readr::read_tsv(par$input_execution) |> + group_by(name) |> + mutate(num_runs = n()) |> + slice(which.max(submit)) |> + ungroup() + +method_lookup <- map_dfr(method_info$method_id, function(method_id) { + regex <- paste0("(.*:", method_id, ":[^ ]*)") + name <- + input_execution$name[grepl(regex, input_execution$name)] |> + unique() + name_ <- name[!grepl(":publishStatesProc", name)] + tibble(method_id = method_id, name = name_) +}) +dataset_lookup <- map_dfr(dataset_info$dataset_id, function(dataset_id) { + regex <- paste0(".*[(.](", dataset_id, ")[)./].*") + name <- + input_execution$name[grepl(regex, input_execution$name)] |> + unique() + tibble(dataset_id = dataset_id, name = name) +}) + +# parse values +execution_info_ind <- input_execution |> + left_join(method_lookup, by = "name") |> + left_join(dataset_lookup, by = "name") |> + filter(!is.na(method_id)) %>% + rowwise() |> mutate( - id = name, - process_id = stringr::str_extract(id, id_regex, 1L), - dataset_id = stringr::str_extract(id, id_regex, 2L), - normalization_id = gsub("^/", "", stringr::str_extract(id, id_regex, 3L)), - grp4 = gsub("^\\.", "", stringr::str_extract(id, id_regex, 4L)), - grp5 = stringr::str_extract(id, id_regex, 5L), + process_id = gsub(" .*", "", name), submit = strptime(submit, "%Y-%m-%d %H:%M:%S"), - ) %>% - # detect whether entry is a metric or a method - mutate( - method_id = ifelse(is.na(grp4), grp5, grp4), - metric_id = ifelse(is.na(grp4), grp4, grp5) - ) %>% - select(-grp4, -grp5) %>% - filter(!is.na(method_id)) %>% - # take last entry for each run - arrange(desc(submit)) %>% - group_by(name) %>% - slice(1) %>% + exit_code = parse_exit(exit), + duration_sec = parse_duration(realtime), + cpu_pct = parse_cpu(`%cpu`), + peak_memory_mb = parse_size(peak_vmem), + disk_read_mb = parse_size(rchar), + disk_write_mb = parse_size(wchar) + ) |> ungroup() -# parse values -execution_info <- trace %>% - filter(process_id == method_id) %>% # only keep method entries - rowwise() %>% - transmute( - dataset_id, - normalization_id, - method_id, +execution_info <- execution_info_ind |> + group_by(dataset_id, method_id) |> + summarise( resources = list(list( - exit_code = parse_exit(exit), - duration_sec = parse_duration(realtime), - cpu_pct = parse_cpu(`%cpu`), - peak_memory_mb = parse_size(peak_vmem), - disk_read_mb = parse_size(rchar), - disk_write_mb = parse_size(wchar) - )) - ) %>% - ungroup() + submit = min(submit), + exit_code = max(exit_code), + duration_sec = sum(duration_sec), + cpu_pct = sum(cpu_pct * duration_sec) / sum(duration_sec), + peak_memory_mb = max(peak_memory_mb), + disk_read_mb = sum(disk_read_mb), + disk_write_mb = sum(disk_write_mb) + )), + .groups = "drop" + ) # combine scores with execution info # fill up missing entries with NAs and 0s @@ -201,25 +223,68 @@ out <- full_join( # --- process metric execution info -------------------------------------------- cat("Processing metric execution info\n") -metric_execution_info <- trace %>% - filter(process_id == metric_id) %>% # only keep metric entries - rowwise() %>% - transmute( - dataset_id, - normalization_id, - method_id, - metric_id, - resources = list(list( - exit_code = parse_exit(exit), - duration_sec = parse_duration(realtime), - cpu_pct = parse_cpu(`%cpu`), - peak_memory_mb = parse_size(peak_vmem), - disk_read_mb = parse_size(rchar), - disk_write_mb = parse_size(wchar) - )) - ) %>% + +# manually add component id to metric info +metric_info$component_name <- metric_info$component_name %||% rep(NA_character_, nrow(metric_info)) %|% + gsub(".*/([^/]*)/config\\.vsh\\.yaml", "\\1", metric_info$implementation_url) + +metric_lookup2 <- pmap_dfr(metric_info, function(metric_id, component_name, ...) { + regex <- paste0("(.*:", component_name, ":[^ ]*)") + name <- + input_execution$name[grepl(regex, input_execution$name)] |> + unique() + name_ <- name[!grepl(":publishStatesProc", name)] + tibble(metric_id = metric_id, component_name = component_name, name = name_) +}) +dataset_lookup2 <- map_dfr(dataset_info$dataset_id, function(dataset_id) { + regex <- paste0(".*[(.](", dataset_id, ")[)./].*") + name <- + input_execution$name[grepl(regex, input_execution$name)] |> + unique() + tibble(dataset_id = dataset_id, name = name) +}) +method_lookup2 <- map_dfr(method_info$method_id, function(method_id) { + regex <- paste0(".*[(.](", method_id, ")[)./].*") + name <- + input_execution$name[grepl(regex, input_execution$name)] |> + unique() + tibble(method_id = method_id, name = name) +}) + +metric_execution_info_ind <- input_execution |> + left_join(metric_lookup2, by = "name") |> + left_join(dataset_lookup2, by = "name") |> + left_join(method_lookup2, by = "name") |> + filter(!is.na(metric_id)) %>% + rowwise() |> + mutate( + process_id = gsub(" .*", "", name), + submit = strptime(submit, "%Y-%m-%d %H:%M:%S"), + exit_code = parse_exit(exit), + duration_sec = parse_duration(realtime), + cpu_pct = parse_cpu(`%cpu`), + peak_memory_mb = parse_size(peak_vmem), + disk_read_mb = parse_size(rchar), + disk_write_mb = parse_size(wchar) + ) |> ungroup() +metric_execution_info <- metric_execution_info_ind |> + group_by(dataset_id, method_id, metric_component_name = component_name) |> + summarise( + resources = list(list( + submit = min(submit), + exit_code = max(exit_code), + duration_sec = sum(duration_sec), + cpu_pct = sum(cpu_pct * duration_sec) / sum(duration_sec), + peak_memory_mb = max(peak_memory_mb), + disk_read_mb = sum(disk_read_mb), + disk_write_mb = sum(disk_write_mb) + )), + .groups = "drop" + ) + + # --- write output files ------------------------------------------------------- cat("Writing output files\n") # write output files diff --git a/src/reporting/process_task_results/run_test.sh b/src/reporting/process_task_results/run_test.sh index c46b3627b..63ea27027 100755 --- a/src/reporting/process_task_results/run_test.sh +++ b/src/reporting/process_task_results/run_test.sh @@ -21,7 +21,7 @@ for TASK in "task_perturbation_prediction"; do # # temp sync # aws s3 sync $INPUT_DIR output/temp - echo "Processing $TASK - $DATE" + echo "Processing $TASK - $DATE -> $OUTPUT_DIR" # start the run NXF_VER=23.10.0 nextflow run . \ @@ -40,5 +40,5 @@ for TASK in "task_perturbation_prediction"; do --publish_dir "$OUTPUT_DIR" # cause quarto rerender to index page when in preview mode - touch ../website/results/$TASK/index.qmd + touch ../website/results/$TASK_STRIP_PREFIX/index.qmd done \ No newline at end of file