Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

GH-37655: [C++] Allow joins of large tables in Acero #37709

Closed
wants to merge 8 commits into from

Conversation

oliviermeslin
Copy link

@oliviermeslin oliviermeslin commented Sep 13, 2023

PR 35087 introduced an explicit fail in large joins with Acero when key data is larger than 4GB (solving the problem reported by issue 34474). However, I think (but I'm not sure) that this quick fix is too restrictive because the total size condition is applied to the total size of tables to be joined, rather than to the size of keys. As a consequence, Acero fails when trying to merge large tables, even when the size of key data is well below 4 GB.

This PR modifies the source code so that the logical test only verifies whether the total size of key variable is below 4 GB.

Rationale for this change

In the current situation, joins with arrow fail when the tables to be joined are jointly larger than 4GB, even if the key data is smaller than 4GB. This is due to the fact that the test on the size of key data is erroneously applied to the size the tables to be joined.

What changes are included in this PR?

I modify slightly the C++ code so that the test on the size of key data does not apply any more to the size the tables to be joined.

Are these changes tested?

Not so far.

Are there any user-facing changes?

No.

[PR 35087](apache#35087) introduced an explicit fail in large joins with Acero when key data is larger than 4GB (solving the problem reported by [issue 34474](apache#34474)). However, I think (but I'm not sure) that this quick fix is too restrictive because the total size condition is applied to the total size of tables to be joined, rather than to the size of keys. As a consequence, Acero fails when trying to merge large tables, even when the size of key data is well below 4 GB.

This PR modifies the source code so that the logical test only verifies whether the total size of _key variable_ is below 4 GB.
@github-actions
Copy link

Thanks for opening a pull request!

If this is not a minor PR. Could you open an issue for this pull request on GitHub? https://github.com/apache/arrow/issues/new/choose

Opening GitHub issues ahead of time contributes to the Openness of the Apache Arrow project.

Then could you also rename the pull request title in the following format?

GH-${GITHUB_ISSUE_ID}: [${COMPONENT}] ${SUMMARY}

or

MINOR: [${COMPONENT}] ${SUMMARY}

In the case of PARQUET issues on JIRA the title also supports:

PARQUET-${JIRA_ISSUE_ID}: [${COMPONENT}] ${SUMMARY}

See also:

@oliviermeslin oliviermeslin changed the title Solve issue #37655 [C++]: Solve issue #37655 to allow Acero to merge large tables Sep 13, 2023
@oliviermeslin oliviermeslin changed the title [C++]: Solve issue #37655 to allow Acero to merge large tables GH-37655: [C++] Allow joins of large tables in Acero Sep 19, 2023
@github-actions
Copy link

⚠️ GitHub issue #37655 has been automatically assigned in GitHub to PR creator.

@pitrou
Copy link
Member

pitrou commented Oct 10, 2023

@oliviermeslin Thanks for this. This probably can't be tested efficiently in the test suite unfortunately, due to the test size required.

@github-actions github-actions bot added awaiting committer review Awaiting committer review and removed awaiting review Awaiting review labels Oct 10, 2023
Copy link
Member

@raulcd raulcd left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@github-actions github-actions bot added awaiting changes Awaiting changes and removed awaiting committer review Awaiting committer review labels Oct 11, 2023
Remove trailing whitespace
@github-actions github-actions bot added awaiting change review Awaiting change review and removed awaiting changes Awaiting changes labels Oct 12, 2023
@oliviermeslin
Copy link
Author

@raulcd : I think I solved the problem (I forgot to remove a trailing whitespace). Can you please re-launch the checks?

@westonpace
Copy link
Member

@oliviermeslin I agree it's too big to test in any kind of reasonable unit test. have you run any tests manually with payloads larger than 4GB and confirmed the results are correct?

Copy link
Member

@westonpace westonpace left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I can't verify that joins with 4GiB or more of payload data will work correctly, I never tested for that. But I agree that the issue I fixed was specific to key and so, assuming we have manually done some testing of large payload data then it seems fine to loosen this restriction.

@github-actions github-actions bot added awaiting changes Awaiting changes and removed awaiting change review Awaiting change review labels Oct 13, 2023
@github-actions github-actions bot added awaiting change review Awaiting change review and removed awaiting changes Awaiting changes labels Oct 13, 2023
@raulcd
Copy link
Member

raulcd commented Oct 13, 2023

@raulcd : I think I solved the problem (I forgot to remove a trailing whitespace). Can you please re-launch the checks?

I've pushed a minor lint change, I do think that this should fix it. GitHub UI did not allowed me to propose the change to remove a trailing whitespace, that's why I just pushed myself.

Copy link
Member

@westonpace westonpace left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The only failing test now appears to be unrelated (s3). Conditional approval provided someone has at least done some basic manual testing to verify that this does actually work.

@github-actions github-actions bot added awaiting merge Awaiting merge and removed awaiting change review Awaiting change review labels Oct 14, 2023
@oliviermeslin
Copy link
Author

Thanks @westonpace ! I already prepared some tests (one using artificial data, the other one using real data). Unfortunately, I could not install arrow as an R package from my own fork, because installing a development version of arrow seems to be quite complicated (see this issue). I'll try my best to find a solution and I'll let you know the result.

@amoeba
Copy link
Member

amoeba commented Oct 16, 2023

Hey @oliviermeslin, I ran your example from #37655 w/o this patch and got the expected error after the script printed "Doing the join with 9 variables":

! Invalid: There are more than 2^32 bytes of key data. Acero cannot process a join of this magnitude

I then built libarrow and the R package off your patch and actually got a segfault on deref of target_row_ptr:

*target_row_ptr++ = *source_row_ptr++;

Edit: Looks like it might be an off-by-one error from this output:

Error in tryCatchOne(expr, names, parentenv, handlers[[1L]]) :
R_Reprotect: only 41 protected items, can't reprotect index 42

Full output below:

Output with segfault
❯ Rscript acero-join-test.R
Welcome to R :)
This session's PID is 2890
Some features are not enabled in this build of Arrow. Run `arrow_info()` for more information.

Attaching package: ‘arrow’

The following object is masked from ‘package:utils’:

    timestamp


Attaching package: ‘dplyr’

The following objects are masked from ‘package:stats’:

    filter, lag

The following objects are masked from ‘package:base’:

    intersect, setdiff, setequal, union

[1] "Doing the join with 2 variables"
[1] "id"        "variable1"
[1] "Doing the join with 3 variables"
[1] "id"        "variable1" "variable2"
[1] "Doing the join with 4 variables"
[1] "id"        "variable1" "variable2" "variable3"
[1] "Doing the join with 5 variables"
[1] "id"        "variable1" "variable2" "variable3" "variable4"
[1] "Doing the join with 6 variables"
[1] "id"        "variable1" "variable2" "variable3" "variable4" "variable5"
[1] "Doing the join with 7 variables"
[1] "id"        "variable1" "variable2" "variable3" "variable4" "variable5"
[7] "variable6"
[1] "Doing the join with 8 variables"
[1] "id"        "variable1" "variable2" "variable3" "variable4" "variable5"
[7] "variable6" "variable7"
[1] "Doing the join with 9 variables"
[1] "id"        "variable1" "variable2" "variable3" "variable4" "variable5"
[7] "variable6" "variable7" "variable8"

 *** caught segfault ***
address 0x7ed0457b20, cause 'invalid permissions'

 *** caught segfault ***
address 0x7f6676b420, cause 'invalid permissions'

 *** caught segfault ***
address 0x7ed8992de0, cause 'invalid permissions'

 *** caught segfault ***
address 0x7ee948e360, cause 'invalid permissions'

Traceback:
 1: Table__from_ExecPlanReader(self)
 2: x$read_table()
 3: as_arrow_table.RecordBatchReader(reader)

Traceback:
 1: Table__from_ExecPlanReader(self)
 2: x$read_table()
 3: as_arrow_table.RecordBatchReader(reader)
 4: as_arrow_table(reader)
 5: as_arrow_table.arrow_dplyr_query(x)
 6: as_arrow_table(x)

Traceback:

Traceback:
 1: Table__from_ExecPlanReader(self) 4: as_arrow_table(reader)
 2: x$read_table()
 3: as_arrow_table.RecordBatchReader(reader) 7: doTryCatch(return(expr), name, parentenv, handler)
 8: tryCatchOne(expr, names, parentenv, handlers[[1L]])
 9: tryCatchList(expr, classes, parentenv, handlers)
 5:  1:
as_arrow_table.arrow_dplyr_query(x)
 6: as_arrow_table(x) 4:
 7: doTryCatch(return(expr), name, parentenv, handler)
 8: as_arrow_table(reader)
 5: as_arrow_table.arrow_dplyr_query(x)Table__from_ExecPlanReader(self)
 2: tryCatchOne(expr, names, parentenv, handlers[[1L]])
 9: tryCatchList(expr, classes, parentenv, handlers)
10: tryCatch(as_arrow_table(x), error = function(e, call = caller_env(n = 4)) {    augment_io_error_msg(e, call, schema = schema())})
11: compute.arrow_dplyr_query(left_join(data, data(all_of(vars_temp),     all_of(vars_temp)), by = c(id = "id")))
12: compute(left_join(data, select(data, all_of(vars_temp)), by = c(id = "id")))
13: FUN(X[[i]], ...)
14: lapply(1:nb_var, function(n) {    print(paste0("Doing the join with ", n + 1, " variables"))    vars_temp <- c("id", vars[1:n])    print(vars_temp)    data_out <- compute(left_join(data, select(data, all_of(vars_temp)),         by = c(id = "id")))    return("Success!")})
An irrecoverable exception occurred. R is aborting now ...

10: tryCatch(as_arrow_table(x), error = function(e, call = caller_env(n = 4)) {    augment_io_error_msg(e, call, schema = schema())(x$read_table())()
 6: as_arrow_table(x)
 7: doTryCatch(return(expr), name, parentenv, handler)
 8: })
tryCatchOne(expr, names, parentenv, handlers[[1L]])
 9: tryCatchList(expr, classes, parentenv, handlers)

 3: as_arrow_table.RecordBatchReader(reader)
 4: as_arrow_table(reader)10: tryCatch(as_arrow_table(x), error = function(e, call = caller_env(n = 4)) {
 5: as_arrow_table.arrow_dplyr_query(x)
 6:     augment_io_error_msg(e, call, schema = schema())})
11: compute.arrow_dplyr_query(left_join(data, select(data, all_of(vars_temp)),     by = c(id = "id")))
12: compute(left_join(data, select(data, all_of(vars_temp)), by = c(id = "id")))
13: FUN(X[[i]], ...)
14: lapply(1:nb_var, function(n) {    print(paste0("Doing the join with ", n + 1, " variables"))    vars_temp <- c("id", vars[1:n])    print(vars_temp)    data_out <- compute(left_join(data, select(data, all_of(vars_temp)),         by = c(id = "id")))    return("Success!")})
An irrecoverable exception occurred. R is aborting now ...
as_arrow_table(x)
 7: doTryCatch(return(expr), name, parentenv, handler)
 8: tryCatchOne(expr, names, parentenv, handlers[[1L]])
 9: tryCatchList(expr, classes, parentenv, handlers)
10: tryCatch(as_arrow_table(x), error = function(e, call = caller_env(n = 4)) {    augment_io_error_msg(e, call, schema = schema())})
11: compute.arrow_dplyr_query(left_join(data, select(data, all_of(vars_temp)),     by = c(id = "id")))
12: compute(left_join(data, select(data, all_of(vars_temp)), by = c(id = "id")))
13: FUN(X[[i]], ...)
14: lapply(1:nb_var, function(n) {    print(paste0("Doing the join with ", n + 1, "Doing thes"))    vars_temp <- c("id", vars[1:n])    print(vars_temp)    data_out <- compute(left_join(data, select(data, all_of(vars_temp)),         by = c(id = "id")))    return("Success!")})
An irrecoverable exception occurred. R is aborting now ...
11: compute.arrow_dplyr_query(left_join(data, vars_temp(), by = c(id = "id")))
12: compute(left_join(data, select(data, all_of(vars_temp)), by = c(id = "id")))
13: FUN(X[[i]], ...)
14: lapply(1:nb_var, function(n) {    print(paste0("Doing the join with ", n + 1, " variables"))    vars_temp <- c("id", vars[1:n])    print(vars_temp)    data_out <- compute(left_join(data, select(data, all_of(vars_temp)),         by = c(id = "id")))    return("Success!")})
An irrecoverable exception occurred. R is aborting now ...

 *** caught segfault ***
address 0x7f6ecc2520, cause 'invalid permissions'

Traceback:
 1: Table__from_ExecPlanReader(self)
 2: x$read_table()
 3: as_arrow_table.RecordBatchReader(reader)
 4: as_arrow_table(reader)
 5: as_arrow_table.arrow_dplyr_query(x)
 6: as_arrow_table(x)
 7: doTryCatch(return(expr), name, parentenv, handler)
 8: tryCatchOne(expr, names, parentenv, handlers[[1L]])
 9: tryCatchList(expr, classes, parentenv, handlers)
10: tryCatch(as_arrow_table(x), error = function(e, call = caller_env(n = 4)) {    augment_io_error_msg(e, call, schema = schema())})
11: compute.arrow_dplyr_query(left_join(data, select(data, all_of(vars_temp)),     by = c(id = "id")))
12: compute(left_join(data, select(data, all_of(vars_temp)), by = c(id = "id")))
13: FUN(X[[i]], ...)
14: lapply(1:nb_var, function(n) {    print(paste0("Doing the join with ", n + 1, " variables"))    vars_temp <- c("id", vars[1:n])    print(vars_temp)    data_out <- compute(left_join(data, select(data, all_of(vars_temp)),         by = c(id = "id")))    return("Success!")})
An irrecoverable exception occurred. R is aborting now ...

 *** caught segfault ***
address 0x7f5e1daf00, cause 'invalid permissions'

Traceback:
 1: Table__from_ExecPlanReader(self)
 2: x$read_table()
 3: as_arrow_table.RecordBatchReader(reader)
 4: as_arrow_table(reader)
 5: as_arrow_table.arrow_dplyr_query(x)
 6: as_arrow_table(x)
 7: doTryCatch(return(expr), name, parentenv, handler)
 8: tryCatchOne(expr, names, parentenv, handlers[[1L]])
 9: tryCatchList(expr, classes, parentenv, handlers)
10: tryCatch(as_arrow_table(x), error = function(e, call = caller_env(n = 4)) {    augment_io_error_msg(e, call, schema = schema())})
11: compute.arrow_dplyr_query(left_join(data, select(data, all_of(vars_temp)),     by = c(id = "id")))
12: compute(left_join(data, select(data, all_of(vars_temp)), by = c(id = "id")))
13: FUN(X[[i]], ...)
14: lapply(1:nb_var, function(n) {    print(paste0("Doing the join with ", n + 1, " variables"))    vars_temp <- c("id", vars[1:n])    print(vars_temp)    data_out <- compute(left_join(data, select(data, all_of(vars_temp)),         by = c(id = "id")))    return("Success!")})
An irrecoverable exception occurred. R is aborting now ...

 *** caught segfault ***
address 0x7ee0f0bac0, cause 'invalid permissions'

Traceback:
 1: Table__from_ExecPlanReader(self)
 2: x$read_table()
 3: as_arrow_table.RecordBatchReader(reader)
 4: as_arrow_table(reader)
 5: as_arrow_table.arrow_dplyr_query(x)
 6: as_arrow_table(x)
 7: doTryCatch(return(expr), name, parentenv, handler)
 8: tryCatchOne(expr, names, parentenv, handlers[[1L]])
 9: tryCatchList(expr, classes, parentenv, handlers)
10: tryCatch(as_arrow_table(x), error = function(e, call = caller_env(n = 4)) {    augment_io_error_msg(e, call, schema = schema())})
11: compute.arrow_dplyr_query(left_join(data, select(data, all_of(vars_temp)),     by = c(id = "id")))
12: compute(left_join(data, select(data, all_of(vars_temp)), by = c(id = "id")))
13: FUN(X[[i]], ...)
14: lapply(1:nb_var, function(n) {    print(paste0("Doing the join with ", n + 1, " variables"))    vars_temp <- c("id", vars[1:n])    print(vars_temp)    data_out <- compute(left_join(data, select(data, all_of(vars_temp)),         by = c(id = "id")))    return("Success!")})
An irrecoverable exception occurred. R is aborting now ...

 *** caught segfault ***
address 0x7ecb394000, cause 'invalid permissions'

Traceback:
 1: Table__from_ExecPlanReader(self)
 2: x$read_table()
 3: as_arrow_table.RecordBatchReader(reader)
 4: as_arrow_table(reader)
 5: as_arrow_table.arrow_dplyr_query(x)
 6: as_arrow_table(x)
 7: doTryCatch(return(expr), name, parentenv, handler)
 8: tryCatchOne(expr, names, parentenv, handlers[[1L]])
 9: tryCatchList(expr, classes, parentenv, handlers)
10: tryCatch(as_arrow_table(x), error = function(e, call = caller_env(n = 4)) {    augment_io_error_msg(e, call, schema = schema())})
11: compute.arrow_dplyr_query(left_join(data, select(data, all_of(vars_temp)),     by = c(id = "id")))
12: compute(left_join(data, select(data, all_of(vars_temp)), by = c(id = "id")))
13: FUN(X[[i]], ...)
14: lapply(1:nb_var, function(n) {    print(paste0("Doing the join with ", n + 1, " variables"))    vars_temp <- c("id", vars[1:n])    print(vars_temp)    data_out <- compute(left_join(data, select(data, all_of(vars_temp)),         by = c(id = "id")))    return("Success!")})
An irrecoverable exception occurred. R is aborting now ...
fish: Job 1, 'Rscript acero-join-test.R' terminated by signal SIGSEGV (Address boundary error)

@oliviermeslin
Copy link
Author

oliviermeslin commented Oct 16, 2023

@amoeba : I could finally install arrow with my patch (on Linux/Ubuntu). I re-ran my my patch with using the gdb debugger and got the same error as you. The debugger output is the following:

Thread 125 "R" received signal SIGSEGV, Segmentation fault.
[Switching to Thread 0x7ffcdd7fa640 (LWP 20471)]
arrow::acero::RowArrayMerge::CopyVaryingLength (target=<optimized out>, source=..., first_target_row_id=<optimized out>, first_target_row_offset=<optimized out>, source_rows_permutation=0x7ffcc8052020) at /home/onyxia/work/arrow/cpp/src/arrow/acero/swiss_join.cc:605
605             *target_row_ptr++ = *source_row_ptr++;

Line 605 of swiss_join.cc is part of the following loop, where the content of source rows is iteratively added to target rows:

for (uint32_t word = 0; word < length / sizeof(uint64_t); ++word) {
  *target_row_ptr++ = *source_row_ptr++;
}

As I am completely new to C++, I'm not sure what this bug means. My intuition is that we're trying to add too much to the target_row_ptr pointer given its type. Would you or @westonpace have any clue on what is going wrong?

@@ -473,7 +474,7 @@ Status RowArrayMerge::PrepareForMerge(RowArray* target,
(*first_target_row_id)[sources.size()] = num_rows;
}

if (num_bytes > std::numeric_limits<uint32_t>::max()) {
if (is_key_data && num_bytes > std::numeric_limits<uint32_t>::max()) {
Copy link
Contributor

@zanmato1984 zanmato1984 Jul 19, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When letting non-key data that is greater than std::numeric_limits<uint32_t>::max() bypass this check, the num_bytes will underflow to a much smaller value in the static_cast<uint32_t> in #486. Then the target won't allocate enough space, resulting in segfault when copying data to target. The original check is necessary and there is unfortunately nothing to loosen.

@zanmato1984
Copy link
Contributor

zanmato1984 commented Jul 19, 2024

Hi @oliviermeslin @amoeba , I was trying to help on the issue you met. After looking a bit, I'd say it's unfortunately an inherent problem of the fix. In other words, the original issue just transfers to a later place and explodes in a more implicit fashion (the segfault).

Please see my comment on the changed code about what is actually happening. Thanks.

cc @westonpace

@zanmato1984
Copy link
Contributor

PR #43389 fixed the issue in a more thorough way so I'm closing this one. Thanks.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

[C++] Acero cannot join large tables because of a misspecified test
6 participants