Skip to content

ESQL: Fix mv_expand inconsistent column order #129745

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 2 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions docs/changelog/129745.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
pr: 129745
summary: "ESQL: Fix `mv_expand` inconsistent column order"
area: ES|QL
type: bug
issues:
- 129000
Original file line number Diff line number Diff line change
Expand Up @@ -261,5 +261,9 @@ public boolean isEmpty() {
public AttributeSet build() {
return new AttributeSet(mapBuilder.build());
}

public void clear() {
mapBuilder.keySet().clear();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -419,3 +419,37 @@ emp_no:integer | job_positions:keyword
10001 | Accountant
10001 | Senior Python Developer
;

testMvExpandInconsistentColumnOrder1
required_capability: fix_mv_expand_inconsistent_column_order
from languages
| eval foo_1 = 1, foo_2 = 1
| sort language_code
| mv_expand foo_1
;

language_code:integer | language_name:keyword | foo_1:integer | foo_2:integer
1 | English | 1 | 1
2 | French | 1 | 1
3 | Spanish | 1 | 1
4 | German | 1 | 1
;

testMvExpandInconsistentColumnOrder2
required_capability: fix_mv_expand_inconsistent_column_order
from message_types,languages_lookup_non_unique_key
| sort type
| eval language_code = 1, `language_name` = false, message = true, foo_3 = 1, foo_2 = null
| eval foo_3 = "1", `foo_3` = -1, foo_1 = 1, `language_code` = null, `foo_2` = "1"
| mv_expand foo_1
| limit 5
;

country:text | country.keyword:keyword | type:keyword | language_name:boolean | message:boolean | foo_3:integer | foo_1:integer | language_code:null | foo_2:keyword
null | null | Development | false | true | -1 | 1 | null | 1
null | null | Disconnected | false | true | -1 | 1 | null | 1
null | null | Error | false | true | -1 | 1 | null | 1
null | null | Production | false | true | -1 | 1 | null | 1
null | null | Success | false | true | -1 | 1 | null | 1
;

Original file line number Diff line number Diff line change
Expand Up @@ -1210,7 +1210,13 @@ public enum Cap {
*
* https://github.com/elastic/elasticsearch/issues/129322
*/
NO_PLAIN_STRINGS_IN_LITERALS;
NO_PLAIN_STRINGS_IN_LITERALS,

/**
* Support for the mv_expand target attribute should be retained in its original position.
* see <a href="https://github.com/elastic/elasticsearch/issues/129000"> ES|QL: inconsistent column order #129000 </a>
*/
FIX_MV_EXPAND_INCONSISTENT_COLUMN_ORDER;

private final boolean enabled;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import org.elasticsearch.xpack.esql.plan.physical.ExchangeExec;
import org.elasticsearch.xpack.esql.plan.physical.FragmentExec;
import org.elasticsearch.xpack.esql.plan.physical.MergeExec;
import org.elasticsearch.xpack.esql.plan.physical.MvExpandExec;
import org.elasticsearch.xpack.esql.plan.physical.PhysicalPlan;
import org.elasticsearch.xpack.esql.rule.Rule;

Expand Down Expand Up @@ -49,6 +50,17 @@ public PhysicalPlan apply(PhysicalPlan plan) {
return currentPlanNode;
}

// for mv_expand, the target attribute should be retained in its original position
if (currentPlanNode instanceof MvExpandExec mvExpand) {
List<Attribute> updatedAttrs = new ArrayList<>(requiredAttrBuilder.build());
int idx = updatedAttrs.indexOf(mvExpand.expanded());
if (idx != -1) {
updatedAttrs.set(idx, (Attribute) mvExpand.target());
requiredAttrBuilder.clear();
requiredAttrBuilder.addAll(updatedAttrs);
}
}

// for non-unary execution plans, we apply the rule for each child
if (currentPlanNode instanceof MergeExec mergeExec) {
keepTraversing.set(FALSE);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -128,6 +128,7 @@
import org.elasticsearch.xpack.esql.plan.physical.LimitExec;
import org.elasticsearch.xpack.esql.plan.physical.LocalSourceExec;
import org.elasticsearch.xpack.esql.plan.physical.LookupJoinExec;
import org.elasticsearch.xpack.esql.plan.physical.MvExpandExec;
import org.elasticsearch.xpack.esql.plan.physical.PhysicalPlan;
import org.elasticsearch.xpack.esql.plan.physical.ProjectExec;
import org.elasticsearch.xpack.esql.plan.physical.TopNExec;
Expand Down Expand Up @@ -192,6 +193,7 @@
import static org.hamcrest.Matchers.closeTo;
import static org.hamcrest.Matchers.contains;
import static org.hamcrest.Matchers.containsInAnyOrder;
import static org.hamcrest.Matchers.containsInRelativeOrder;
import static org.hamcrest.Matchers.containsString;
import static org.hamcrest.Matchers.empty;
import static org.hamcrest.Matchers.equalTo;
Expand Down Expand Up @@ -3165,6 +3167,54 @@ public void testProjectAwayAllColumnsWhenOnlyTheCountMattersInStats() {
assertThat(Expressions.names(esQuery.attrs()), contains("_doc"));
}

/**
* LimitExec[1000[INTEGER],336]
* \_MvExpandExec[foo_1{r}#3,foo_1{r}#20]
* \_TopNExec[[Order[emp_no{f}#9,ASC,LAST]],1000[INTEGER],336]
* \_ExchangeExec[[_meta_field{f}#15, emp_no{f}#9, first_name{f}#10, gender{f}#11, hire_date{f}#16, job{f}#17, job.raw{f}#18, la
* nguages{f}#12, last_name{f}#13, long_noidx{f}#19, salary{f}#14, foo_1{r}#3, foo_2{r}#5],false]
* \_ProjectExec[[_meta_field{f}#15, emp_no{f}#9, first_name{f}#10, gender{f}#11, hire_date{f}#16, job{f}#17, job.raw{f}#18, la
* nguages{f}#12, last_name{f}#13, long_noidx{f}#19, salary{f}#14, foo_1{r}#3, foo_2{r}#5]]
* \_FieldExtractExec[_meta_field{f}#15, emp_no{f}#9, first_name{f}#10, g..]&gt;[],[]&lt;
* \_EvalExec[[1[INTEGER] AS foo_1#3, 1[INTEGER] AS foo_2#5]]
* \_EsQueryExec[test], indexMode[standard], query[][_doc{f}#35], limit[1000], sort[[FieldSort[field=emp_no{f}#9, direction=ASC, nulls=LAST]]] estimatedRowSize[352]
*/
public void testProjectAwayMvExpandColumnOrder() {
var plan = optimizedPlan(physicalPlan("""
from test
| eval foo_1 = 1, foo_2 = 1
| sort emp_no
| mv_expand foo_1
"""));
var limit = as(plan, LimitExec.class);
var mvExpand = as(limit.child(), MvExpandExec.class);
var topN = as(mvExpand.child(), TopNExec.class);
var exchange = as(topN.child(), ExchangeExec.class);
var project = as(exchange.child(), ProjectExec.class);

assertThat(
Expressions.names(project.projections()),
containsInRelativeOrder(
"_meta_field",
"emp_no",
"first_name",
"gender",
"hire_date",
"job",
"job.raw",
"languages",
"last_name",
"long_noidx",
"salary",
"foo_1",
"foo_2"
)
);
var fieldExtract = as(project.child(), FieldExtractExec.class);
var eval = as(fieldExtract.child(), EvalExec.class);
EsQueryExec esQuery = as(eval.child(), EsQueryExec.class);
}

/**
* ProjectExec[[a{r}#5]]
* \_EvalExec[[__a_SUM@81823521{r}#15 / __a_COUNT@31645621{r}#16 AS a]]
Expand Down
Loading