From ada870cec794cc899d6b50f37f617a7663e8540d Mon Sep 17 00:00:00 2001 From: William Dearden Date: Thu, 11 Jan 2018 19:56:03 -0600 Subject: [PATCH 1/3] speed up unpack_nested_data #42 handle mixed atomic/data frame column and better explanations in comments --- .Rbuildignore | 1 + R/elasticsearch_parsers.R | 71 +++++++++++++-------- tests/testthat/test-elasticsearch_parsers.R | 29 +++++++++ 3 files changed, 75 insertions(+), 26 deletions(-) diff --git a/.Rbuildignore b/.Rbuildignore index 443eb31..70fc9e7 100644 --- a/.Rbuildignore +++ b/.Rbuildignore @@ -45,3 +45,4 @@ vignettes/*\.pdf # system files .*\.DS_Store +^.*\.Rproj$ diff --git a/R/elasticsearch_parsers.R b/R/elasticsearch_parsers.R index 8afd8e8..61204ad 100644 --- a/R/elasticsearch_parsers.R +++ b/R/elasticsearch_parsers.R @@ -370,7 +370,7 @@ chomp_aggs <- function(aggs_json = NULL) { #' unpackedDT <- unpack_nested_data(chomped_df = sampleChompedDT #' , col_to_unpack = "details.pastPurchases") #' print(unpackedDT) -unpack_nested_data <- function(chomped_df, col_to_unpack) { +unpack_nested_data <- function(chomped_df, col_to_unpack) { # Input checks if (!("data.table" %in% class(chomped_df))) { @@ -390,45 +390,64 @@ unpack_nested_data <- function(chomped_df, col_to_unpack) { log_fatal(msg) } - # Avoid side effects outDT <- data.table::copy(chomped_df) - - # Get the column to unpack listDT <- outDT[[col_to_unpack]] - # Make each row a data.table - listDT <- lapply(listDT, data.table::as.data.table) - - # Remove the empty ones... important, due to data.table 1.10.4 bug - oldIDs <- which(sapply(listDT, nrow) != 0) - listDT <- listDT[oldIDs] - - # Bind them together with an ID to match to the other data - newDT <- data.table::rbindlist(listDT, fill = TRUE, idcol = TRUE) - - # If we tried to unpack an empty column, fail - if (nrow(newDT) == 0) { + # Check for empty column + if (all(lengths(listDT) == 0)) { msg <- "The column given to unpack_nested_data had no data in it." log_fatal(msg) } - # Fix the ID because we may have removed some empty elements due to that bug - newDT[, .id := oldIDs[.id]] + listDT[lengths(listDT) == 0] <- NA - # Merge - outDT[, .id := .I] - outDT <- newDT[outDT, on = ".id"] + is_df <- purrr::map_lgl(listDT, is.data.frame) + is_atomic <- purrr::map_lgl(listDT, is.atomic) + is_na <- is.na(listDT) + + # Bind packed column into one data.table + if (all(is_atomic)) { + newDT <- data.table::as.data.table(unlist(listDT)) + } else if (all(is_df | is_atomic)) { + # If the packed column contains a mixture of data tables, we need to + # to convert the atomic vectors to data.tables + + # Find column name to use for NA vectors + first_df <- min(which(is_df)) + col_name <- names(listDT[[first_df]])[1] + + # Convert non data.frame rows to data.table and assign name to rows + # with no name + prep_row <- function(x) { + if (is.atomic(x)) { + x <- data.table::as.data.table(x) + if (is.na(x)) names(x) <- col_name + else names(x) <- col_to_unpack + } + x + } + newDT <- purrr::map(listDT, prep_row) + + newDT <- data.table::rbindlist(newDT, fill = TRUE) + } else { + msg <- paste0("Each row in column ", col_to_unpack, " must be a data frame or a vector.") + futile.logger::flog.fatal(msg) + stop(msg) + } - # Remove the id column and the original column - outDT <- outDT[, !c(".id", col_to_unpack), with = FALSE] + # Create the unpacked data.table by replicating the originally unpacked + # columns by the number of rows in each entry in the original unpacked column + times_to_replicate <- pmax(purrr::map_int(listDT, NROW), 1) + # Replicate the rows of the data.table by entries of times_to_replicate but drop col_to_unpack + replicatedDT <- chomped_df[rep(1:nrow(chomped_df), times_to_replicate)] + replicatedDT[, col_to_unpack] <- NULL - # Rename unpacked column if it didn't get a name + # Then bind the replicated columns with the unpacked column + outDT <- data.table::data.table(newDT, replicatedDT) if ("V1" %in% names(outDT)) { data.table::setnames(outDT, "V1", col_to_unpack) } - return(outDT) - } #' @title Hits to data.tables diff --git a/tests/testthat/test-elasticsearch_parsers.R b/tests/testthat/test-elasticsearch_parsers.R index 5512034..2a3a5fe 100644 --- a/tests/testthat/test-elasticsearch_parsers.R +++ b/tests/testthat/test-elasticsearch_parsers.R @@ -916,6 +916,35 @@ futile.logger::flog.threshold(0) regexp = "The column given to unpack_nested_data had no data in it")} ) + test_that("unpack_nested_data should break if the column contains a non data frame/vector", { + DT <- data.table::data.table(x = 1:2, y = list(list(2), 3)) + expect_error(unpack_nested_data(chomped_df = DT, col_to_unpack = "y") + , regexp = "must be a data frame or a vector") + }) + + test_that("unpack_nested_data should handle NA and empty rows", { + DT <- data.table::data.table(x = 1:2, y = list(z = NA, data.table(w = 5:6, z = 7:8))) + DT2 <- data.table::data.table(x = 1:2, y = list(z = list(), data.table(w = 5:6, z = 7:8))) + unpackedDT <- data.table::data.table( + w = c(NA, 5, 6) + , z = c(NA, 7, 8) + , x = c(1, 2, 2) + ) + expect_equal(unpack_nested_data(DT, col_to_unpack = "y"), unpackedDT) + expect_equal(unpack_nested_data(DT2, col_to_unpack = "y"), unpackedDT) + }) + + test_that("unpack_nested_data should handle mixed atomic/data frame column", { + DT <- data.table::data.table(x = 1:2, y = list(1, data.table(w = 5:6, z = 7:8))) + unpackedDT <- data.table::data.table( + y = c(1, NA, NA) + , w = c(NA, 5, 6) + , z = c(NA, 7, 8) + , x = c(1, 2, 2) + ) + expect_equal(unpack_nested_data(DT, col_to_unpack = "y"), unpackedDT) + }) + #---- 5. .ConvertToSec # .ConvertToSec should work for seconds From f9b4516eeb67596fb7da93af3a796608f0a2c0fb Mon Sep 17 00:00:00 2001 From: William Dearden Date: Mon, 12 Mar 2018 20:20:14 -0400 Subject: [PATCH 2/3] Remove handling of mixed atomic/data.table columns --- NAMESPACE | 3 ++ R/elasticsearch_parsers.R | 40 +++++++++------------ tests/testthat/test-elasticsearch_parsers.R | 11 ------ 3 files changed, 20 insertions(+), 34 deletions(-) diff --git a/NAMESPACE b/NAMESPACE index bc62c12..6902192 100644 --- a/NAMESPACE +++ b/NAMESPACE @@ -30,6 +30,9 @@ importFrom(parallel,makeForkCluster) importFrom(parallel,makePSOCKcluster) importFrom(parallel,stopCluster) importFrom(purrr,map2) +importFrom(purrr,map_if) +importFrom(purrr,map_int) +importFrom(purrr,map_lgl) importFrom(purrr,simplify) importFrom(purrr,transpose) importFrom(stringr,str_detect) diff --git a/R/elasticsearch_parsers.R b/R/elasticsearch_parsers.R index 61204ad..c7d9530 100644 --- a/R/elasticsearch_parsers.R +++ b/R/elasticsearch_parsers.R @@ -342,6 +342,7 @@ chomp_aggs <- function(aggs_json = NULL) { #' This is a side-effect-free function: it returns a new data.table and the #' input data.table is unmodified. #' @importFrom data.table copy as.data.table rbindlist setnames +#' @importFrom purrr map_if map_lgl map_int #' @export #' @param chomped_df a data.table #' @param col_to_unpack a character vector of length one: the column name to @@ -390,11 +391,10 @@ unpack_nested_data <- function(chomped_df, col_to_unpack) { log_fatal(msg) } - outDT <- data.table::copy(chomped_df) - listDT <- outDT[[col_to_unpack]] + listDT <- chomped_df[[col_to_unpack]] # Check for empty column - if (all(lengths(listDT) == 0)) { + if (all(purrr::map_int(listDT, NROW) == 0)) { msg <- "The column given to unpack_nested_data had no data in it." log_fatal(msg) } @@ -402,48 +402,42 @@ unpack_nested_data <- function(chomped_df, col_to_unpack) { listDT[lengths(listDT) == 0] <- NA is_df <- purrr::map_lgl(listDT, is.data.frame) + is_list <- purrr::map_lgl(listDT, is.list) is_atomic <- purrr::map_lgl(listDT, is.atomic) is_na <- is.na(listDT) # Bind packed column into one data.table if (all(is_atomic)) { newDT <- data.table::as.data.table(unlist(listDT)) - } else if (all(is_df | is_atomic)) { - # If the packed column contains a mixture of data tables, we need to - # to convert the atomic vectors to data.tables - - # Find column name to use for NA vectors + } else if (all(is_df | is_list | is_na)) { + # Find name to use for NA columns first_df <- min(which(is_df)) col_name <- names(listDT[[first_df]])[1] - - # Convert non data.frame rows to data.table and assign name to rows - # with no name - prep_row <- function(x) { - if (is.atomic(x)) { - x <- data.table::as.data.table(x) - if (is.na(x)) names(x) <- col_name - else names(x) <- col_to_unpack - } + + .prep_na_row <- function(x, col_name) { + x <- data.table::as.data.table(x) + names(x) <- col_name x } - newDT <- purrr::map(listDT, prep_row) - + + # If the packed column contains data.tables, we use rbindlist + newDT <- purrr::map_if(listDT, is_na, .prep_na_row, col_name = col_name) newDT <- data.table::rbindlist(newDT, fill = TRUE) } else { msg <- paste0("Each row in column ", col_to_unpack, " must be a data frame or a vector.") - futile.logger::flog.fatal(msg) - stop(msg) + log_fatal(msg) } - + # Create the unpacked data.table by replicating the originally unpacked # columns by the number of rows in each entry in the original unpacked column + # We don't use newDT because it doesn't have the original row lengths times_to_replicate <- pmax(purrr::map_int(listDT, NROW), 1) # Replicate the rows of the data.table by entries of times_to_replicate but drop col_to_unpack replicatedDT <- chomped_df[rep(1:nrow(chomped_df), times_to_replicate)] replicatedDT[, col_to_unpack] <- NULL - # Then bind the replicated columns with the unpacked column outDT <- data.table::data.table(newDT, replicatedDT) + if ("V1" %in% names(outDT)) { data.table::setnames(outDT, "V1", col_to_unpack) } diff --git a/tests/testthat/test-elasticsearch_parsers.R b/tests/testthat/test-elasticsearch_parsers.R index 2a3a5fe..42458ef 100644 --- a/tests/testthat/test-elasticsearch_parsers.R +++ b/tests/testthat/test-elasticsearch_parsers.R @@ -934,17 +934,6 @@ futile.logger::flog.threshold(0) expect_equal(unpack_nested_data(DT2, col_to_unpack = "y"), unpackedDT) }) - test_that("unpack_nested_data should handle mixed atomic/data frame column", { - DT <- data.table::data.table(x = 1:2, y = list(1, data.table(w = 5:6, z = 7:8))) - unpackedDT <- data.table::data.table( - y = c(1, NA, NA) - , w = c(NA, 5, 6) - , z = c(NA, 7, 8) - , x = c(1, 2, 2) - ) - expect_equal(unpack_nested_data(DT, col_to_unpack = "y"), unpackedDT) - }) - #---- 5. .ConvertToSec # .ConvertToSec should work for seconds From 721cd9c96141f22a5b53f0b25adc3583a3606e5c Mon Sep 17 00:00:00 2001 From: Austin Dickey Date: Thu, 12 Apr 2018 10:44:18 -0500 Subject: [PATCH 3/3] Made it a merge instead of rep --- R/elasticsearch_parsers.R | 31 +++++++++++---------- tests/testthat/test-elasticsearch_parsers.R | 23 ++++++--------- 2 files changed, 24 insertions(+), 30 deletions(-) diff --git a/R/elasticsearch_parsers.R b/R/elasticsearch_parsers.R index c7d9530..f8a0b01 100644 --- a/R/elasticsearch_parsers.R +++ b/R/elasticsearch_parsers.R @@ -378,10 +378,6 @@ unpack_nested_data <- function(chomped_df, col_to_unpack) { msg <- "For unpack_nested_data, chomped_df must be a data.table" log_fatal(msg) } - if (".id" %in% names(chomped_df)) { - msg <- "For unpack_nested_data, chomped_df cannot have a column named '.id'" - log_fatal(msg) - } if (!("character" %in% class(col_to_unpack)) || length(col_to_unpack) != 1) { msg <- "For unpack_nested_data, col_to_unpack must be a character of length 1" log_fatal(msg) @@ -391,7 +387,15 @@ unpack_nested_data <- function(chomped_df, col_to_unpack) { log_fatal(msg) } - listDT <- chomped_df[[col_to_unpack]] + inDT <- data.table::copy(chomped_df) + + # Define a column name to store original row ID + joinCol <- uuid::UUIDgenerate() + inDT[, (joinCol) := .I] + + # Take out the packed column + listDT <- inDT[[col_to_unpack]] + inDT[, (col_to_unpack) := NULL] # Check for empty column if (all(purrr::map_int(listDT, NROW) == 0)) { @@ -409,6 +413,7 @@ unpack_nested_data <- function(chomped_df, col_to_unpack) { # Bind packed column into one data.table if (all(is_atomic)) { newDT <- data.table::as.data.table(unlist(listDT)) + newDT[, (joinCol) := rep(seq_along(listDT), lengths(listDT))] } else if (all(is_df | is_list | is_na)) { # Find name to use for NA columns first_df <- min(which(is_df)) @@ -422,25 +427,21 @@ unpack_nested_data <- function(chomped_df, col_to_unpack) { # If the packed column contains data.tables, we use rbindlist newDT <- purrr::map_if(listDT, is_na, .prep_na_row, col_name = col_name) - newDT <- data.table::rbindlist(newDT, fill = TRUE) + newDT <- data.table::rbindlist(newDT, fill = TRUE, idcol = joinCol) } else { msg <- paste0("Each row in column ", col_to_unpack, " must be a data frame or a vector.") log_fatal(msg) } - # Create the unpacked data.table by replicating the originally unpacked - # columns by the number of rows in each entry in the original unpacked column - # We don't use newDT because it doesn't have the original row lengths - times_to_replicate <- pmax(purrr::map_int(listDT, NROW), 1) - # Replicate the rows of the data.table by entries of times_to_replicate but drop col_to_unpack - replicatedDT <- chomped_df[rep(1:nrow(chomped_df), times_to_replicate)] - replicatedDT[, col_to_unpack] <- NULL - # Then bind the replicated columns with the unpacked column - outDT <- data.table::data.table(newDT, replicatedDT) + # Join it back in + outDT <- inDT[newDT, on = joinCol] + outDT[, (joinCol) := NULL] + # In the case of all atomic... if ("V1" %in% names(outDT)) { data.table::setnames(outDT, "V1", col_to_unpack) } + return(outDT) } diff --git a/tests/testthat/test-elasticsearch_parsers.R b/tests/testthat/test-elasticsearch_parsers.R index 42458ef..3c10c1d 100644 --- a/tests/testthat/test-elasticsearch_parsers.R +++ b/tests/testthat/test-elasticsearch_parsers.R @@ -845,8 +845,8 @@ futile.logger::flog.threshold(0) , col_to_unpack = "details.appData") expect_true("data.table" %in% class(unpackedDT)) expect_equivalent(dim(unpackedDT), c(7, 8)) - expect_named(unpackedDT, c('appName', 'minutes', 'value', 'typovalue', 'dateTime', 'username', - 'details.interactions', 'details.userType')) + expect_named(unpackedDT, c('dateTime', 'username', 'details.interactions', + 'details.userType', 'appName', 'minutes', 'value', 'typovalue')) expect_identical(unpackedDT$appName, c('farmville', 'candy_crush', 'angry_birds', 'minesweeper', 'pokemon_go', 'pokemon_stay', 'block_dude')) @@ -868,8 +868,8 @@ futile.logger::flog.threshold(0) , col_to_unpack = "details.minutes") expect_true("data.table" %in% class(unpackedDT)) expect_equivalent(dim(unpackedDT), c(8, 5)) - expect_named(unpackedDT, c('details.minutes', 'dateTime', 'username', - 'details.interactions', 'details.userType')) + expect_named(unpackedDT, c('dateTime', 'username', 'details.interactions', + 'details.userType', 'details.minutes')) expect_equivalent(unpackedDT$details.minutes, c(500, 350, 422, NA, 28, 190, 1, 796)) expect_identical(unpackedDT$username, c(rep("Austin1", 3), "Austin2", rep("Austin3", 4))) }) @@ -881,13 +881,6 @@ futile.logger::flog.threshold(0) regexp = "chomped_df must be a data.table")} ) - # Should break if chomped_df already has a column named ".id" - test_that("unpack_nested_data should break if you pass a data.table with column '.id'", - {expect_error(unpack_nested_data(chomped_df = data.table::data.table(wow = 7, .id = 8) - , col_to_unpack = "blah"), - regexp = "chomped_df cannot have a column named '.id'")} - ) - # Should break if col_to_unpack is not a string test_that("unpack_nested_data should break if col_to_unpack is not a string", {expect_error(unpack_nested_data(chomped_df = data.table::data.table(wow = 7) @@ -923,12 +916,12 @@ futile.logger::flog.threshold(0) }) test_that("unpack_nested_data should handle NA and empty rows", { - DT <- data.table::data.table(x = 1:2, y = list(z = NA, data.table(w = 5:6, z = 7:8))) - DT2 <- data.table::data.table(x = 1:2, y = list(z = list(), data.table(w = 5:6, z = 7:8))) + DT <- data.table::data.table(x = 1:2, y = list(z = NA, data.table::data.table(w = 5:6, z = 7:8))) + DT2 <- data.table::data.table(x = 1:2, y = list(z = list(), data.table::data.table(w = 5:6, z = 7:8))) unpackedDT <- data.table::data.table( - w = c(NA, 5, 6) + x = c(1, 2, 2) + , w = c(NA, 5, 6) , z = c(NA, 7, 8) - , x = c(1, 2, 2) ) expect_equal(unpack_nested_data(DT, col_to_unpack = "y"), unpackedDT) expect_equal(unpack_nested_data(DT2, col_to_unpack = "y"), unpackedDT)