From 99e8860f12a9e7132160b9ccd4a0f196e36f715d Mon Sep 17 00:00:00 2001 From: dsweber2 Date: Wed, 12 Jul 2023 11:56:40 -0700 Subject: [PATCH 1/8] feat: initial epix_rbind and test --- R/methods-epi_archive.R | 233 +++++++++++++++++++++++++++++-- tests/testthat/test-epix_rbind.R | 83 +++++++++++ 2 files changed, 303 insertions(+), 13 deletions(-) create mode 100644 tests/testthat/test-epix_rbind.R diff --git a/R/methods-epi_archive.R b/R/methods-epi_archive.R index c110555c6..c2bbcdd1f 100644 --- a/R/methods-epi_archive.R +++ b/R/methods-epi_archive.R @@ -123,19 +123,11 @@ epix_fill_through_version = function(x, fill_versions_end, #' @param x,y Two `epi_archive` objects to join together. #' @param sync Optional; `"forbid"`, `"na"`, `"locf"`, or `"truncate"`; in the #' case that `x$versions_end` doesn't match `y$versions_end`, what do we do?: -#' `"forbid"`: emit an error; "na": use `max(x$versions_end, y$versions_end)` -#' as the result's `versions_end`, but ensure that, if we request a snapshot -#' as of a version after `min(x$versions_end, y$versions_end)`, the -#' observation columns from the less up-to-date archive will be all NAs (i.e., -#' imagine there was an update immediately after its `versions_end` which -#' revised all observations to be `NA`); `"locf"`: use `max(x$versions_end, -#' y$versions_end)` as the result's `versions_end`, allowing the last version -#' of each observation to be carried forward to extrapolate unavailable -#' versions for the less up-to-date input archive (i.e., imagining that in the -#' less up-to-date archive's data set remained unchanged between its actual -#' `versions_end` and the other archive's `versions_end`); or `"truncate"`: -#' use `min(x$versions_end, y$versions_end)` as the result's `versions_end`, -#' and discard any rows containing update rows for later versions. +#' +#' - `"forbid"`: emit an error; +#' - "na": use `max(x$versions_end, y$versions_end)` as the result's `versions_end`, but ensure that, if we request a snapshot as of a version after `min(x$versions_end, y$versions_end)`, the observation columns from the less up-to-date archive will be all NAs (i.e., imagine there was an update immediately after its `versions_end` which revised all observations to be `NA`); +#' - `"locf"`: use `max(x$versions_end, y$versions_end)` as the result's `versions_end`, allowing the last version of each observation to be carried forward to extrapolate unavailable versions for the less up-to-date input archive (i.e., imagining that in the less up-to-date archive's data set remained unchanged between its actual `versions_end` and the other archive's `versions_end`); or +#' - `"truncate"`: use `min(x$versions_end, y$versions_end)` as the result's `versions_end`, and discard any rows containing update rows for later versions. #' @param compactify Optional; `TRUE`, `FALSE`, or `NULL`; should the result be #' compactified? See [`as_epi_archive`] for an explanation of what this means. #' Default here is `TRUE`. @@ -360,6 +352,221 @@ epix_merge = function(x, y, )) } +#' combine epi_archives by rows +#' +#' Take a sequence of archives and combine by rows. Complications arise if +#' there are `time_value`s shared between the lists. `sync` determines how +#' any later `NA`'s are treated, with the default `"forbid"` throwing an error, +#' `"na"` treating them as intentional data (no modification), and `"locf"` +#' filling forward across versions. +#' Shared keys are another problem; by default, `force_distinct=FALSE`, meaning +#' the entry in the earlier archive overwrites later archives. Otherwise there +#' is an error on shared keys +#' this function is still under active development, so there may be remaining +#' edge cases +#' +#' @param ... list of `epi_archive` objects to append in order. +#' @param sync Optional; `"forbid"`, `"na"`, or `"locf"`; in the case that later +#' versions contain `NA`'s, what do we do? +#' \itemize{ +#' \item `"forbid"`: emit an error if there are any shared time values between +#' different archives; +#' \item `"na"`: All `NA` values are treated as actual data, and thus are +#' maintained (up to archival compression). +#' \item `"locf"`: for every shared time value, use earlier versions of +#' earlier archives to overwrite any `NA`'s found in later +#' versions of later archives. +#' } +#' @param compactify Optional; `TRUE`, `FALSE`, or `NULL`; should the result be +#' compactified? See [`as_epi_archive`] for an explanation of what this means. +#' Default here is `TRUE`. +#' @return the resulting `epi_archive` +#' +#' @details In all cases, `additional_metadata` will be an empty list, and +#' `clobberable_versions_start` will be set to the latest version that could +#' be clobbered in either input archive. +#' +#' @examples +#' # create two example epi_archive datasets where using rbind alone would +#' # work incorrectly +#' x1 <- archive_cases_dv_subset$DT %>% +#' dplyr::select(geo_value, time_value, version, percent_cli) %>% +#' filter(time_value < "2021-06-01") %>% +#' as_epi_archive(compactify = TRUE) +#' x2 <- archive_cases_dv_subset$DT %>% +#' dplyr::select(geo_value, time_value, version, percent_cli) %>% +#' filter(time_value >= "2021-06-01") %>% +#' as_epi_archive(compactify = TRUE) +#' y1 <- archive_cases_dv_subset$DT %>% +#' dplyr::select(geo_value, time_value, version, percent_cli) %>% +#' filter(time_value < "2021-06-01") %>% +#' as_epi_archive(compactify = TRUE) +#' y2 <- archive_cases_dv_subset$DT %>% +#' dplyr::select(geo_value, time_value, version, percent_cli) %>% +#' filter(time_value >= "2021-06-01") %>% +#' as_epi_archive(compactify = TRUE) +#' # the problematic examples +#' first_merge <- epix_merge(x1, y1) +#' second_merge <- epix_merge(x2, y2) +#' # rebind the results together +#' epix_rbind(first_merge, second_merge) +#' +#' @importFrom data.table key set setkeyv +#' @export +epix_rbind <- function(..., sync = c("forbid", "na", "locf"), force_distinct = FALSE, compactify = TRUE) { + # things not currently supported that may be warranted: + # 1. extra keys beyond the default ones + # 2. treating different columns differently (leaving some na's, locfing others) + # 3. there are various parameters for rbind that we haven't defined here; some of them may actually be applicable + archives <- list(...) + if (any(map_vec(archives, function(x) { + !inherits(x, "epi_archive") + }))) { + abort("all must be of class `epi_archive`.") + } + + sync <- rlang::arg_match(sync) + + geo_type <- archives[[1]]$geo_type + if (any(map_vec(archives, function(x) { + !identical(x$geo_type, geo_type) + }))) { + abort("all must have the same `$geo_type`") + } + + time_type <- archives[[1]]$time_type + if (any(map_vec(archives, function(x) { + !identical(x$time_type, time_type) + }))) { + abort("all must have the same `$time_type`") + } + + for (x in archives) { + if (length(x$additional_metadata) != 0L) { + Warn("x$additional_metadata won't appear in merge result", + class = "epiprocess__epix_rbind_ignores_additional_metadata" + ) + } + } + result_additional_metadata <- list() + + clobberable_versions_start <- map_vec(archives, function(x) { + (x$clobberable_versions_start) + }) + + versions_end <- max((map_vec(archives, "versions_end"))) + + result_clobberable_versions_start <- if (all(is.na(clobberable_versions_start))) { + NA # (any type of NA is fine here) + } else { + max(clobberable_versions_start) # unlike the case of merging, we want the later date + } + # plans: + # 1. isolate the shared from the non-shared + # 2. throw everything together, do a group_by and ffill + + DTs <- map(archives, "DT") + + # check the keys are correct as done in epix_merge + keys <- map(DTs, key) + new_key <- keys[[1]] + for (ii in seq_along(DTs)) { + if (!identical(keys[[ii]], key(archives[[ii]]$DT))) { + warn(" + `epiprocess` internal warning (please report): pre-processing for + epix_merge unexpectedly resulted in an intermediate data table (or + tables) with a different key than the corresponding input archive. + Manually setting intermediate data table keys to the expected values. + ", internal = TRUE) + setkeyv(DTs[[ii]], key(archives[[ii]]$DT)) + keys[[ii]] <- key(archives[[ii]]$DT) + } + if (!identical(keys[[ii]], new_key)) { + # Without some sort of annotations of what various columns represent, we can't + # do something that makes sense when rbinding archives with mismatched keys. + # E.g., even if we assume extra keys represent demographic breakdowns, a + # sensible default treatment of count-type and rate-type value columns would + # differ. + if (!identical(sort(key(DTs[[ii]])), sort(key(DTs[[1]])))) { + abort(" + The archives must have the same set of key column names; if the + key columns represent the same things, just with different + names, please retry after manually renaming to match; if they + represent different things (e.g., x has an age breakdown + but y does not), please retry after processing them to share + the same key (e.g., by summarizing x to remove the age breakdown, + or by applying a static age breakdown to y). + ", class = "epiprocess__epix_rbind_input_must_have_same_key_set") + } + } + } + + non_by_colnames <- reduce(map(DTs, function(DT) setdiff(names(DT), new_key)), union) + # find the shared (geo_values, time_values) which requires: + # first define a function to get the unique pairs in a given archive + unique_geo_times <- function(x) { + x %>% + select(geo_value, time_value) %>% + distinct() + } + + other_keys <- setdiff(new_key, c("geo_value", "time_value", "version")) + if (length(other_keys) != 0) { + abort("epix_rbind does not currently support additional keys", + class = "epiprocess__epxi_rbind_unsupported" + ) + } + + shared_geo_time_values <- reduce(map(DTs, unique_geo_times), intersect) + # there are no difference between the methods if there's no overlap + if (nrow(shared_geo_time_values) == 0) { + DT <- reduce(DTs, rbind) + if (force_distinct) { + DT <- distinct(possibly_redundant, geo_value, time_value, version, .keep_all = TRUE) + } + } else if (sync == "forbid") { + abort(paste( + "There are shared time values with different versions;", + "either deal with those separately, or specify how to", + "handle `NA` values (either `NA` or `locf`)." + ), "epiprocess__epix_rbind_unresolved_sync") + } else if (sync == "na") { + # doesn't really care if there are repeated time_values, simply: + # binds the results together + possibly_redundant <- reduce(DTs, rbind) + # remove any redundant keys + if (force_distinct) { + DT <- distinct(possibly_redundant, geo_value, time_value, version, .keep_all = TRUE) + } + # and return an archive (which sorts) + } else if (sync == "locf") { + # filter, creating shared and non shared or + # just do forward fill on all times + DT <- reduce(DTs, rbind) + + if (force_distinct) { + DT <- distinct(possibly_redundant, geo_value, time_value, version, .keep_all = TRUE) + } + DT <- DT %>% + group_by(geo_value, time_value) %>% + arrange(geo_value, time_value, version) %>% + fill(!!!non_by_colnames, .direction = "downup") %>% # everything not in the keys + ungroup() + } + + return(as_epi_archive(DT[], + geo_type = geo_type, + time_type = time_type, + other_keys = other_keys, + additional_metadata = list(), + compactify = compactify, + clobberable_versions_start = result_clobberable_versions_start, + versions_end = versions_end + )) +} + + + # Helpers for `group_by`: #' Make non-testing mock to get [`dplyr::dplyr_col_modify`] input diff --git a/tests/testthat/test-epix_rbind.R b/tests/testthat/test-epix_rbind.R new file mode 100644 index 000000000..1a745e902 --- /dev/null +++ b/tests/testthat/test-epix_rbind.R @@ -0,0 +1,83 @@ +test_that("epix_rbind merges and carries forward updates properly", { + x1 <- as_epi_archive( + data.table::as.data.table( + tibble::tribble( + ~geo_value, ~time_value, ~version, ~x_value, + # X has initial versions defined + "g1", 1L, 1:3, paste0("XA", 1:3), + ) %>% + tidyr::unchop(c(version, x_value)) %>% + dplyr::mutate(dplyr::across(c(x_value), ~ dplyr::if_else(grepl("NA", .x), NA_character_, .x))) + ) + ) + x2 <- as_epi_archive( + data.table::as.data.table( + tibble::tribble( + ~geo_value, ~time_value, ~version, ~x_value, + # X has later versions defined as well + "g1", 1L, 4:6, paste0("XA", 4:6), + "g1", 2L, 4:6, paste0("XB", 4:6), + ) %>% + tidyr::unchop(c(version, x_value)) %>% + dplyr::mutate(dplyr::across(c(x_value), ~ dplyr::if_else(grepl("NA", .x), NA_character_, .x))) + ) + ) + y1 <- as_epi_archive( + data.table::as.data.table( + tibble::tribble( + ~geo_value, ~time_value, ~version, ~y_value, + # y also has earlier version defined + "g1", 1L, 1:3, paste0("YA", 1:3), + ) %>% + tidyr::unchop(c(version, y_value)) %>% + dplyr::mutate(dplyr::across(c(y_value), ~ dplyr::if_else(grepl("NA", .x), NA_character_, .x))) + ) + ) + y2 <- as_epi_archive( + data.table::as.data.table( + tibble::tribble( + ~geo_value, ~time_value, ~version, ~y_value, + # but y's data is "correct", and gets no more updates + "g1", 2L, 4:6, paste0("YB", 4:6), + ) %>% + tidyr::unchop(c(version, y_value)) %>% + dplyr::mutate(dplyr::across(c(y_value), ~ dplyr::if_else(grepl("NA", .x), NA_character_, .x))) + ) + ) + first_merge <- epix_merge(x1, y1) + second_merge <- epix_merge(x2, y2) + # We rely on testthat edition 3 expect_identical using waldo, not identical. See + # test-epix_fill_through_version.R comments for details. + testthat::local_edition(3) + # throw an error without a setting chosen when there's conflicts + expect_error(epix_rbind(first_merge, second_merge), class = "epiprocess__epix_rbind_unresolved_sync") + + # the sync = "na" case + canonical_no_overwrite <- as_epi_archive( + data.table::as.data.table( + tibble::tribble( + ~geo_value, ~time_value, ~version, ~x_value, ~y_value, + # X has initial versions defined + "g1", 1L, 1:3, paste0("XA", 1:3), paste0("YA", 1:3), + "g1", 1L, 4:6, paste0("XA", 4:6), NA, + "g1", 2L, 4:6, paste0("XB", 4:6), paste0("YB", 4:6), + ) %>% + tidyr::unchop(c(version, x_value, y_value)) %>% + dplyr::mutate(dplyr::across(c(x_value), ~ dplyr::if_else(grepl("NA", .x), NA_character_, .x))) + ) + ) + # produce exactly the above result when NA's are unmodified + rbinded_no_overwrite <- epix_rbind(first_merge, second_merge, sync = "na") + expect_identical(rbinded_no_overwrite, canonical_no_overwrite) + + # filling forward is equivalent to doing columns rbinds first, and then merging + x <- as_epi_archive(rbind(x1$DT, x2$DT)) + y <- as_epi_archive(rbind(y1$DT, y2$DT)) + canonical_locf <- epix_merge(x, y, sync = "locf") + rbinded_locf <- epix_rbind(second_merge, first_merge, sync = "locf") + expect_identical(canonical_locf$DT, rbinded_locf$DT) + # y should be the same as epix_rbind(y1,y2), since each has distinct dates + expect_identical(epix_rbind(y1,y2, sync = "forbid"), y) + expect_identical(epix_rbind(y1,y2, sync = "na"), y) + expect_identical(epix_rbind(y1,y2, sync = "locf"), y) +}) From ad9105b019cfd6485ed5760eb709a12e4d619789 Mon Sep 17 00:00:00 2001 From: dsweber2 Date: Wed, 12 Jul 2023 13:06:11 -0700 Subject: [PATCH 2/8] feat: package version passes tests, doc fixes --- NAMESPACE | 1 + R/methods-epi_archive.R | 25 +++++++------ man/epix_merge.Rd | 18 ++++------ man/epix_rbind.Rd | 78 +++++++++++++++++++++++++++++++++++++++++ 4 files changed, 97 insertions(+), 25 deletions(-) create mode 100644 man/epix_rbind.Rd diff --git a/NAMESPACE b/NAMESPACE index 1d8affefb..ce712a74c 100644 --- a/NAMESPACE +++ b/NAMESPACE @@ -41,6 +41,7 @@ export(epi_cor) export(epi_slide) export(epix_as_of) export(epix_merge) +export(epix_rbind) export(epix_slide) export(epix_truncate_versions_after) export(filter) diff --git a/R/methods-epi_archive.R b/R/methods-epi_archive.R index c2bbcdd1f..4e94bdec7 100644 --- a/R/methods-epi_archive.R +++ b/R/methods-epi_archive.R @@ -368,15 +368,14 @@ epix_merge = function(x, y, #' @param ... list of `epi_archive` objects to append in order. #' @param sync Optional; `"forbid"`, `"na"`, or `"locf"`; in the case that later #' versions contain `NA`'s, what do we do? -#' \itemize{ -#' \item `"forbid"`: emit an error if there are any shared time values between +#' - `"forbid"`: emit an error if there are any shared time values between #' different archives; -#' \item `"na"`: All `NA` values are treated as actual data, and thus are +#' - `"na"`: All `NA` values are treated as actual data, and thus are #' maintained (up to archival compression). -#' \item `"locf"`: for every shared time value, use earlier versions of +#' - `"locf"`: for every shared time value, use earlier versions of #' earlier archives to overwrite any `NA`'s found in later #' versions of later archives. -#' } +#' #' @param compactify Optional; `TRUE`, `FALSE`, or `NULL`; should the result be #' compactified? See [`as_epi_archive`] for an explanation of what this means. #' Default here is `TRUE`. @@ -501,7 +500,7 @@ epix_rbind <- function(..., sync = c("forbid", "na", "locf"), force_distinct = F } } - non_by_colnames <- reduce(map(DTs, function(DT) setdiff(names(DT), new_key)), union) + non_by_colnames <- reduce(map(DTs, function(DT) setdiff(names(DT), new_key)), dplyr::union) # find the shared (geo_values, time_values) which requires: # first define a function to get the unique pairs in a given archive unique_geo_times <- function(x) { @@ -510,14 +509,14 @@ epix_rbind <- function(..., sync = c("forbid", "na", "locf"), force_distinct = F distinct() } - other_keys <- setdiff(new_key, c("geo_value", "time_value", "version")) + other_keys <- dplyr::setdiff(new_key, c("geo_value", "time_value", "version")) if (length(other_keys) != 0) { - abort("epix_rbind does not currently support additional keys", + Abort("epix_rbind does not currently support additional keys", class = "epiprocess__epxi_rbind_unsupported" ) } - shared_geo_time_values <- reduce(map(DTs, unique_geo_times), intersect) + shared_geo_time_values <- reduce(map(DTs, unique_geo_times), dplyr::intersect) # there are no difference between the methods if there's no overlap if (nrow(shared_geo_time_values) == 0) { DT <- reduce(DTs, rbind) @@ -525,7 +524,7 @@ epix_rbind <- function(..., sync = c("forbid", "na", "locf"), force_distinct = F DT <- distinct(possibly_redundant, geo_value, time_value, version, .keep_all = TRUE) } } else if (sync == "forbid") { - abort(paste( + Abort(paste( "There are shared time values with different versions;", "either deal with those separately, or specify how to", "handle `NA` values (either `NA` or `locf`)." @@ -533,10 +532,10 @@ epix_rbind <- function(..., sync = c("forbid", "na", "locf"), force_distinct = F } else if (sync == "na") { # doesn't really care if there are repeated time_values, simply: # binds the results together - possibly_redundant <- reduce(DTs, rbind) + DT <- reduce(DTs, rbind) # remove any redundant keys if (force_distinct) { - DT <- distinct(possibly_redundant, geo_value, time_value, version, .keep_all = TRUE) + DT <- distinct(DT, geo_value, time_value, version, .keep_all = TRUE) } # and return an archive (which sorts) } else if (sync == "locf") { @@ -677,7 +676,7 @@ epix_detailed_restricted_mutate = function(.data, ...) { !rlang::is_reference(in_tbl[[key_colname]], col_modify_cols[[key_colname]]) })) if (length(invalidated_key_col_is) != 0L) { - rlang::abort(paste_lines(c( + rlang::Abort(paste_lines(c( "Key columns must not be replaced or removed.", wrap_varnames(key(.data$DT)[invalidated_key_col_is], initial="Flagged key cols: ") diff --git a/man/epix_merge.Rd b/man/epix_merge.Rd index 09f67fa2b..566aed098 100644 --- a/man/epix_merge.Rd +++ b/man/epix_merge.Rd @@ -16,18 +16,12 @@ epix_merge( \item{sync}{Optional; \code{"forbid"}, \code{"na"}, \code{"locf"}, or \code{"truncate"}; in the case that \code{x$versions_end} doesn't match \code{y$versions_end}, what do we do?: -\code{"forbid"}: emit an error; "na": use \code{max(x$versions_end, y$versions_end)} -as the result's \code{versions_end}, but ensure that, if we request a snapshot -as of a version after \code{min(x$versions_end, y$versions_end)}, the -observation columns from the less up-to-date archive will be all NAs (i.e., -imagine there was an update immediately after its \code{versions_end} which -revised all observations to be \code{NA}); \code{"locf"}: use \code{max(x$versions_end, y$versions_end)} as the result's \code{versions_end}, allowing the last version -of each observation to be carried forward to extrapolate unavailable -versions for the less up-to-date input archive (i.e., imagining that in the -less up-to-date archive's data set remained unchanged between its actual -\code{versions_end} and the other archive's \code{versions_end}); or \code{"truncate"}: -use \code{min(x$versions_end, y$versions_end)} as the result's \code{versions_end}, -and discard any rows containing update rows for later versions.} +\itemize{ +\item \code{"forbid"}: emit an error; +\item "na": use \code{max(x$versions_end, y$versions_end)} as the result's \code{versions_end}, but ensure that, if we request a snapshot as of a version after \code{min(x$versions_end, y$versions_end)}, the observation columns from the less up-to-date archive will be all NAs (i.e., imagine there was an update immediately after its \code{versions_end} which revised all observations to be \code{NA}); +\item \code{"locf"}: use \code{max(x$versions_end, y$versions_end)} as the result's \code{versions_end}, allowing the last version of each observation to be carried forward to extrapolate unavailable versions for the less up-to-date input archive (i.e., imagining that in the less up-to-date archive's data set remained unchanged between its actual \code{versions_end} and the other archive's \code{versions_end}); or +\item \code{"truncate"}: use \code{min(x$versions_end, y$versions_end)} as the result's \code{versions_end}, and discard any rows containing update rows for later versions. +}} \item{compactify}{Optional; \code{TRUE}, \code{FALSE}, or \code{NULL}; should the result be compactified? See \code{\link{as_epi_archive}} for an explanation of what this means. diff --git a/man/epix_rbind.Rd b/man/epix_rbind.Rd new file mode 100644 index 000000000..fdac4f34a --- /dev/null +++ b/man/epix_rbind.Rd @@ -0,0 +1,78 @@ +% Generated by roxygen2: do not edit by hand +% Please edit documentation in R/methods-epi_archive.R +\name{epix_rbind} +\alias{epix_rbind} +\title{combine epi_archives by rows} +\usage{ +epix_rbind( + ..., + sync = c("forbid", "na", "locf"), + force_distinct = FALSE, + compactify = TRUE +) +} +\arguments{ +\item{...}{list of \code{epi_archive} objects to append in order.} + +\item{sync}{Optional; \code{"forbid"}, \code{"na"}, or \code{"locf"}; in the case that later +versions contain \code{NA}'s, what do we do? +\itemize{ +\item \code{"forbid"}: emit an error if there are any shared time values between +different archives; +\item \code{"na"}: All \code{NA} values are treated as actual data, and thus are +maintained (up to archival compression). +\item \code{"locf"}: for every shared time value, use earlier versions of +earlier archives to overwrite any \code{NA}'s found in later +versions of later archives. +}} + +\item{compactify}{Optional; \code{TRUE}, \code{FALSE}, or \code{NULL}; should the result be +compactified? See \code{\link{as_epi_archive}} for an explanation of what this means. +Default here is \code{TRUE}.} +} +\value{ +the resulting \code{epi_archive} +} +\description{ +Take a sequence of archives and combine by rows. Complications arise if +there are \code{time_value}s shared between the lists. \code{sync} determines how +any later \code{NA}'s are treated, with the default \code{"forbid"} throwing an error, +\code{"na"} treating them as intentional data (no modification), and \code{"locf"} +filling forward across versions. +Shared keys are another problem; by default, \code{force_distinct=FALSE}, meaning +the entry in the earlier archive overwrites later archives. Otherwise there +is an error on shared keys +this function is still under active development, so there may be remaining +edge cases +} +\details{ +In all cases, \code{additional_metadata} will be an empty list, and +\code{clobberable_versions_start} will be set to the latest version that could +be clobbered in either input archive. +} +\examples{ +# create two example epi_archive datasets where using rbind alone would +# work incorrectly +x1 <- archive_cases_dv_subset$DT \%>\% + dplyr::select(geo_value, time_value, version, percent_cli) \%>\% + filter(time_value < "2021-06-01") \%>\% + as_epi_archive(compactify = TRUE) +x2 <- archive_cases_dv_subset$DT \%>\% + dplyr::select(geo_value, time_value, version, percent_cli) \%>\% + filter(time_value >= "2021-06-01") \%>\% + as_epi_archive(compactify = TRUE) +y1 <- archive_cases_dv_subset$DT \%>\% + dplyr::select(geo_value, time_value, version, percent_cli) \%>\% + filter(time_value < "2021-06-01") \%>\% + as_epi_archive(compactify = TRUE) +y2 <- archive_cases_dv_subset$DT \%>\% + dplyr::select(geo_value, time_value, version, percent_cli) \%>\% + filter(time_value >= "2021-06-01") \%>\% + as_epi_archive(compactify = TRUE) +# the problematic examples +first_merge <- epix_merge(x1, y1) +second_merge <- epix_merge(x2, y2) +# rebind the results together +epix_rbind(first_merge, second_merge) + +} From 18436b16e1024e4bc2dddf0fc4f3b19e4333bbb1 Mon Sep 17 00:00:00 2001 From: dsweber2 Date: Wed, 12 Jul 2023 13:49:15 -0700 Subject: [PATCH 3/8] fix: get remote tests functional --- NAMESPACE | 8 ++++++++ R/methods-epi_archive.R | 16 +++++++++------- 2 files changed, 17 insertions(+), 7 deletions(-) diff --git a/NAMESPACE b/NAMESPACE index ce712a74c..133945594 100644 --- a/NAMESPACE +++ b/NAMESPACE @@ -71,25 +71,33 @@ importFrom(data.table,set) importFrom(data.table,setkeyv) importFrom(dplyr,arrange) importFrom(dplyr,bind_rows) +importFrom(dplyr,distinct) importFrom(dplyr,dplyr_col_modify) importFrom(dplyr,dplyr_reconstruct) importFrom(dplyr,dplyr_row_slice) +importFrom(dplyr,fill) importFrom(dplyr,filter) importFrom(dplyr,group_by) importFrom(dplyr,group_by_drop_default) importFrom(dplyr,group_modify) importFrom(dplyr,group_vars) importFrom(dplyr,groups) +importFrom(dplyr,intersect) importFrom(dplyr,mutate) importFrom(dplyr,relocate) importFrom(dplyr,rename) importFrom(dplyr,select) +importFrom(dplyr,setdiff) importFrom(dplyr,slice) importFrom(dplyr,ungroup) +importFrom(dplyr,union) importFrom(lubridate,days) importFrom(lubridate,weeks) importFrom(magrittr,"%>%") +importFrom(purrr,map) importFrom(purrr,map_lgl) +importFrom(purrr,map_vec) +importFrom(purrr,reduce) importFrom(rlang,"!!!") importFrom(rlang,"!!") importFrom(rlang,.data) diff --git a/R/methods-epi_archive.R b/R/methods-epi_archive.R index 4e94bdec7..e1aea2f31 100644 --- a/R/methods-epi_archive.R +++ b/R/methods-epi_archive.R @@ -411,6 +411,8 @@ epix_merge = function(x, y, #' epix_rbind(first_merge, second_merge) #' #' @importFrom data.table key set setkeyv +#' @importFrom purrr map map_vec reduce +#' @importFrom dplyr setdiff union intersect group_by ungroup distinct arrange fill #' @export epix_rbind <- function(..., sync = c("forbid", "na", "locf"), force_distinct = FALSE, compactify = TRUE) { # things not currently supported that may be warranted: @@ -421,7 +423,7 @@ epix_rbind <- function(..., sync = c("forbid", "na", "locf"), force_distinct = F if (any(map_vec(archives, function(x) { !inherits(x, "epi_archive") }))) { - abort("all must be of class `epi_archive`.") + Abort("all must be of class `epi_archive`.") } sync <- rlang::arg_match(sync) @@ -430,14 +432,14 @@ epix_rbind <- function(..., sync = c("forbid", "na", "locf"), force_distinct = F if (any(map_vec(archives, function(x) { !identical(x$geo_type, geo_type) }))) { - abort("all must have the same `$geo_type`") + Abort("all must have the same `$geo_type`") } time_type <- archives[[1]]$time_type if (any(map_vec(archives, function(x) { !identical(x$time_type, time_type) }))) { - abort("all must have the same `$time_type`") + Abort("all must have the same `$time_type`") } for (x in archives) { @@ -471,7 +473,7 @@ epix_rbind <- function(..., sync = c("forbid", "na", "locf"), force_distinct = F new_key <- keys[[1]] for (ii in seq_along(DTs)) { if (!identical(keys[[ii]], key(archives[[ii]]$DT))) { - warn(" + Warn(" `epiprocess` internal warning (please report): pre-processing for epix_merge unexpectedly resulted in an intermediate data table (or tables) with a different key than the corresponding input archive. @@ -487,7 +489,7 @@ epix_rbind <- function(..., sync = c("forbid", "na", "locf"), force_distinct = F # sensible default treatment of count-type and rate-type value columns would # differ. if (!identical(sort(key(DTs[[ii]])), sort(key(DTs[[1]])))) { - abort(" + Abort(" The archives must have the same set of key column names; if the key columns represent the same things, just with different names, please retry after manually renaming to match; if they @@ -500,7 +502,7 @@ epix_rbind <- function(..., sync = c("forbid", "na", "locf"), force_distinct = F } } - non_by_colnames <- reduce(map(DTs, function(DT) setdiff(names(DT), new_key)), dplyr::union) + non_by_colnames <- reduce(map(DTs, function(DT) setdiff(names(DT), new_key)), union) # find the shared (geo_values, time_values) which requires: # first define a function to get the unique pairs in a given archive unique_geo_times <- function(x) { @@ -516,7 +518,7 @@ epix_rbind <- function(..., sync = c("forbid", "na", "locf"), force_distinct = F ) } - shared_geo_time_values <- reduce(map(DTs, unique_geo_times), dplyr::intersect) + shared_geo_time_values <- reduce(map(DTs, unique_geo_times), intersect) # there are no difference between the methods if there's no overlap if (nrow(shared_geo_time_values) == 0) { DT <- reduce(DTs, rbind) From 68b33e9ccb7d03d7bf4682ac1efa64a9b725dc09 Mon Sep 17 00:00:00 2001 From: dsweber2 Date: Wed, 12 Jul 2023 13:59:04 -0700 Subject: [PATCH 4/8] fix: but actually working remote this time --- NAMESPACE | 2 +- R/methods-epi_archive.R | 3 ++- 2 files changed, 3 insertions(+), 2 deletions(-) diff --git a/NAMESPACE b/NAMESPACE index 133945594..65a749df9 100644 --- a/NAMESPACE +++ b/NAMESPACE @@ -75,7 +75,6 @@ importFrom(dplyr,distinct) importFrom(dplyr,dplyr_col_modify) importFrom(dplyr,dplyr_reconstruct) importFrom(dplyr,dplyr_row_slice) -importFrom(dplyr,fill) importFrom(dplyr,filter) importFrom(dplyr,group_by) importFrom(dplyr,group_by_drop_default) @@ -126,6 +125,7 @@ importFrom(rlang,syms) importFrom(stats,cor) importFrom(stats,median) importFrom(tibble,as_tibble) +importFrom(tidyr,fill) importFrom(tidyr,unnest) importFrom(tidyselect,eval_select) importFrom(tidyselect,starts_with) diff --git a/R/methods-epi_archive.R b/R/methods-epi_archive.R index e1aea2f31..6eefb7ff1 100644 --- a/R/methods-epi_archive.R +++ b/R/methods-epi_archive.R @@ -412,7 +412,8 @@ epix_merge = function(x, y, #' #' @importFrom data.table key set setkeyv #' @importFrom purrr map map_vec reduce -#' @importFrom dplyr setdiff union intersect group_by ungroup distinct arrange fill +#' @importFrom dplyr setdiff union intersect group_by ungroup distinct arrange +#' @importFrom tidyr fill #' @export epix_rbind <- function(..., sync = c("forbid", "na", "locf"), force_distinct = FALSE, compactify = TRUE) { # things not currently supported that may be warranted: From 489d9247f8062b036c223750bdad536f4ebfba87 Mon Sep 17 00:00:00 2001 From: dsweber2 Date: Wed, 12 Jul 2023 15:51:51 -0700 Subject: [PATCH 5/8] fix: roxygen example was wrong --- R/methods-epi_archive.R | 4 ++-- man/epix_rbind.Rd | 4 ++-- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/R/methods-epi_archive.R b/R/methods-epi_archive.R index 6eefb7ff1..53ff61cf4 100644 --- a/R/methods-epi_archive.R +++ b/R/methods-epi_archive.R @@ -389,11 +389,11 @@ epix_merge = function(x, y, #' # create two example epi_archive datasets where using rbind alone would #' # work incorrectly #' x1 <- archive_cases_dv_subset$DT %>% -#' dplyr::select(geo_value, time_value, version, percent_cli) %>% +#' dplyr::select(geo_value,time_value,version,case_rate_7d_av) %>% #' filter(time_value < "2021-06-01") %>% #' as_epi_archive(compactify = TRUE) #' x2 <- archive_cases_dv_subset$DT %>% -#' dplyr::select(geo_value, time_value, version, percent_cli) %>% +#' dplyr::select(geo_value,time_value,version,case_rate_7d_av) %>% #' filter(time_value >= "2021-06-01") %>% #' as_epi_archive(compactify = TRUE) #' y1 <- archive_cases_dv_subset$DT %>% diff --git a/man/epix_rbind.Rd b/man/epix_rbind.Rd index fdac4f34a..fa85543c5 100644 --- a/man/epix_rbind.Rd +++ b/man/epix_rbind.Rd @@ -54,11 +54,11 @@ be clobbered in either input archive. # create two example epi_archive datasets where using rbind alone would # work incorrectly x1 <- archive_cases_dv_subset$DT \%>\% - dplyr::select(geo_value, time_value, version, percent_cli) \%>\% + dplyr::select(geo_value,time_value,version,case_rate_7d_av) \%>\% filter(time_value < "2021-06-01") \%>\% as_epi_archive(compactify = TRUE) x2 <- archive_cases_dv_subset$DT \%>\% - dplyr::select(geo_value, time_value, version, percent_cli) \%>\% + dplyr::select(geo_value,time_value,version,case_rate_7d_av) \%>\% filter(time_value >= "2021-06-01") \%>\% as_epi_archive(compactify = TRUE) y1 <- archive_cases_dv_subset$DT \%>\% From 4abc409d74f35d84b57bf5c726aff09996d0e5a0 Mon Sep 17 00:00:00 2001 From: dsweber2 Date: Wed, 12 Jul 2023 16:12:18 -0700 Subject: [PATCH 6/8] fix: document `force_distinct`, Abort->abort --- R/methods-epi_archive.R | 5 +++-- man/epix_rbind.Rd | 3 +++ 2 files changed, 6 insertions(+), 2 deletions(-) diff --git a/R/methods-epi_archive.R b/R/methods-epi_archive.R index 53ff61cf4..92295d541 100644 --- a/R/methods-epi_archive.R +++ b/R/methods-epi_archive.R @@ -375,7 +375,8 @@ epix_merge = function(x, y, #' - `"locf"`: for every shared time value, use earlier versions of #' earlier archives to overwrite any `NA`'s found in later #' versions of later archives. -#' +#' @param force_distinct Optional; `TRUE`, `FALSE`, or `NULL`; should the keys +#' be forced to be distinct? #' @param compactify Optional; `TRUE`, `FALSE`, or `NULL`; should the result be #' compactified? See [`as_epi_archive`] for an explanation of what this means. #' Default here is `TRUE`. @@ -679,7 +680,7 @@ epix_detailed_restricted_mutate = function(.data, ...) { !rlang::is_reference(in_tbl[[key_colname]], col_modify_cols[[key_colname]]) })) if (length(invalidated_key_col_is) != 0L) { - rlang::Abort(paste_lines(c( + rlang::abort(paste_lines(c( "Key columns must not be replaced or removed.", wrap_varnames(key(.data$DT)[invalidated_key_col_is], initial="Flagged key cols: ") diff --git a/man/epix_rbind.Rd b/man/epix_rbind.Rd index fa85543c5..422d57929 100644 --- a/man/epix_rbind.Rd +++ b/man/epix_rbind.Rd @@ -26,6 +26,9 @@ earlier archives to overwrite any \code{NA}'s found in later versions of later archives. }} +\item{force_distinct}{Optional; \code{TRUE}, \code{FALSE}, or \code{NULL}; should the keys +be forced to be distinct?} + \item{compactify}{Optional; \code{TRUE}, \code{FALSE}, or \code{NULL}; should the result be compactified? See \code{\link{as_epi_archive}} for an explanation of what this means. Default here is \code{TRUE}.} From fbbafe237e533b945bfd19c28732c2627fd482fc Mon Sep 17 00:00:00 2001 From: dsweber2 Date: Thu, 13 Jul 2023 11:11:32 -0700 Subject: [PATCH 7/8] fix: partial variable rename (possibly_redundant->DT) --- R/methods-epi_archive.R | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/R/methods-epi_archive.R b/R/methods-epi_archive.R index 92295d541..6d6253174 100644 --- a/R/methods-epi_archive.R +++ b/R/methods-epi_archive.R @@ -525,7 +525,7 @@ epix_rbind <- function(..., sync = c("forbid", "na", "locf"), force_distinct = F if (nrow(shared_geo_time_values) == 0) { DT <- reduce(DTs, rbind) if (force_distinct) { - DT <- distinct(possibly_redundant, geo_value, time_value, version, .keep_all = TRUE) + DT <- distinct(DT, geo_value, time_value, version, .keep_all = TRUE) } } else if (sync == "forbid") { Abort(paste( From 70c76c7b1fce4cc94c8681f85fcf7031d46e30ab Mon Sep 17 00:00:00 2001 From: dsweber2 Date: Thu, 13 Jul 2023 15:18:58 -0700 Subject: [PATCH 8/8] fix: various Logan catches --- R/methods-epi_archive.R | 14 ++------------ 1 file changed, 2 insertions(+), 12 deletions(-) diff --git a/R/methods-epi_archive.R b/R/methods-epi_archive.R index 6d6253174..77db11178 100644 --- a/R/methods-epi_archive.R +++ b/R/methods-epi_archive.R @@ -474,23 +474,13 @@ epix_rbind <- function(..., sync = c("forbid", "na", "locf"), force_distinct = F keys <- map(DTs, key) new_key <- keys[[1]] for (ii in seq_along(DTs)) { - if (!identical(keys[[ii]], key(archives[[ii]]$DT))) { - Warn(" - `epiprocess` internal warning (please report): pre-processing for - epix_merge unexpectedly resulted in an intermediate data table (or - tables) with a different key than the corresponding input archive. - Manually setting intermediate data table keys to the expected values. - ", internal = TRUE) - setkeyv(DTs[[ii]], key(archives[[ii]]$DT)) - keys[[ii]] <- key(archives[[ii]]$DT) - } if (!identical(keys[[ii]], new_key)) { # Without some sort of annotations of what various columns represent, we can't # do something that makes sense when rbinding archives with mismatched keys. # E.g., even if we assume extra keys represent demographic breakdowns, a # sensible default treatment of count-type and rate-type value columns would # differ. - if (!identical(sort(key(DTs[[ii]])), sort(key(DTs[[1]])))) { + if (!identical(sort(key(DTs[[ii]])), sort(new_key))) { Abort(" The archives must have the same set of key column names; if the key columns represent the same things, just with different @@ -516,7 +506,7 @@ epix_rbind <- function(..., sync = c("forbid", "na", "locf"), force_distinct = F other_keys <- dplyr::setdiff(new_key, c("geo_value", "time_value", "version")) if (length(other_keys) != 0) { Abort("epix_rbind does not currently support additional keys", - class = "epiprocess__epxi_rbind_unsupported" + class = "epiprocess__epix_rbind_unsupported" ) }