Skip to content

Commit

Permalink
Add FILLNULL command in PPL (#3032) (#3075)
Browse files Browse the repository at this point in the history
Signed-off-by: Norman Jordan <[email protected]>
Signed-off-by: normanj-bitquill <[email protected]>
Co-authored-by: Andrew Carbonetto <[email protected]>
  • Loading branch information
normanj-bitquill and acarbonetto authored Dec 5, 2024
1 parent 6712526 commit b6846ce
Show file tree
Hide file tree
Showing 17 changed files with 635 additions and 0 deletions.
24 changes: 24 additions & 0 deletions core/src/main/java/org/opensearch/sql/analysis/Analyzer.java
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@
import org.opensearch.sql.ast.tree.Dedupe;
import org.opensearch.sql.ast.tree.Eval;
import org.opensearch.sql.ast.tree.FetchCursor;
import org.opensearch.sql.ast.tree.FillNull;
import org.opensearch.sql.ast.tree.Filter;
import org.opensearch.sql.ast.tree.Head;
import org.opensearch.sql.ast.tree.Kmeans;
Expand Down Expand Up @@ -558,6 +559,29 @@ public LogicalPlan visitAD(AD node, AnalysisContext context) {
return new LogicalAD(child, options);
}

/** Build {@link LogicalEval} for fillnull command. */
@Override
public LogicalPlan visitFillNull(final FillNull node, final AnalysisContext context) {
LogicalPlan child = node.getChild().get(0).accept(this, context);

ImmutableList.Builder<Pair<ReferenceExpression, Expression>> expressionsBuilder =
new Builder<>();
for (FillNull.NullableFieldFill fieldFill : node.getNullableFieldFills()) {
Expression fieldExpr =
expressionAnalyzer.analyze(fieldFill.getNullableFieldReference(), context);
ReferenceExpression ref =
DSL.ref(fieldFill.getNullableFieldReference().getField().toString(), fieldExpr.type());
FunctionExpression ifNullFunction =
DSL.ifnull(ref, expressionAnalyzer.analyze(fieldFill.getReplaceNullWithMe(), context));
expressionsBuilder.add(new ImmutablePair<>(ref, ifNullFunction));
TypeEnvironment typeEnvironment = context.peek();
// define the new reference in type env.
typeEnvironment.define(ref);
}

return new LogicalEval(child, expressionsBuilder.build());
}

/** Build {@link LogicalML} for ml command. */
@Override
public LogicalPlan visitML(ML node, AnalysisContext context) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@
import org.opensearch.sql.ast.tree.Dedupe;
import org.opensearch.sql.ast.tree.Eval;
import org.opensearch.sql.ast.tree.FetchCursor;
import org.opensearch.sql.ast.tree.FillNull;
import org.opensearch.sql.ast.tree.Filter;
import org.opensearch.sql.ast.tree.Head;
import org.opensearch.sql.ast.tree.Kmeans;
Expand Down Expand Up @@ -312,4 +313,8 @@ public T visitFetchCursor(FetchCursor cursor, C context) {
public T visitCloseCursor(CloseCursor closeCursor, C context) {
return visitChildren(closeCursor, context);
}

public T visitFillNull(FillNull fillNull, C context) {
return visitChildren(fillNull, context);
}
}
21 changes: 21 additions & 0 deletions core/src/main/java/org/opensearch/sql/ast/dsl/AstDSL.java
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,12 @@

package org.opensearch.sql.ast.dsl;

import com.google.common.collect.ImmutableList;
import java.util.Arrays;
import java.util.List;
import java.util.stream.Collectors;
import lombok.experimental.UtilityClass;
import org.apache.commons.lang3.tuple.ImmutablePair;
import org.apache.commons.lang3.tuple.Pair;
import org.opensearch.sql.ast.expression.AggregateFunction;
import org.opensearch.sql.ast.expression.Alias;
Expand Down Expand Up @@ -46,6 +48,7 @@
import org.opensearch.sql.ast.tree.Aggregation;
import org.opensearch.sql.ast.tree.Dedupe;
import org.opensearch.sql.ast.tree.Eval;
import org.opensearch.sql.ast.tree.FillNull;
import org.opensearch.sql.ast.tree.Filter;
import org.opensearch.sql.ast.tree.Head;
import org.opensearch.sql.ast.tree.Limit;
Expand Down Expand Up @@ -471,4 +474,22 @@ public static Parse parse(
java.util.Map<String, Literal> arguments) {
return new Parse(parseMethod, sourceField, pattern, arguments, input);
}

public static FillNull fillNull(UnresolvedExpression replaceNullWithMe, Field... fields) {
return new FillNull(
FillNull.ContainNullableFieldFill.ofSameValue(
replaceNullWithMe, ImmutableList.copyOf(fields)));
}

public static FillNull fillNull(
List<ImmutablePair<Field, UnresolvedExpression>> fieldAndReplacements) {
ImmutableList.Builder<FillNull.NullableFieldFill> replacementsBuilder = ImmutableList.builder();
for (ImmutablePair<Field, UnresolvedExpression> fieldAndReplacement : fieldAndReplacements) {
replacementsBuilder.add(
new FillNull.NullableFieldFill(
fieldAndReplacement.getLeft(), fieldAndReplacement.getRight()));
}
return new FillNull(
FillNull.ContainNullableFieldFill.ofVariousValue(replacementsBuilder.build()));
}
}
89 changes: 89 additions & 0 deletions core/src/main/java/org/opensearch/sql/ast/tree/FillNull.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,89 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.sql.ast.tree;

import java.util.List;
import java.util.Objects;
import lombok.AllArgsConstructor;
import lombok.Getter;
import lombok.NonNull;
import lombok.RequiredArgsConstructor;
import org.opensearch.sql.ast.AbstractNodeVisitor;
import org.opensearch.sql.ast.Node;
import org.opensearch.sql.ast.expression.Field;
import org.opensearch.sql.ast.expression.UnresolvedExpression;

/** AST node represent FillNull operation. */
@RequiredArgsConstructor
@AllArgsConstructor
public class FillNull extends UnresolvedPlan {

@Getter
@RequiredArgsConstructor
public static class NullableFieldFill {
@NonNull private final Field nullableFieldReference;
@NonNull private final UnresolvedExpression replaceNullWithMe;
}

public interface ContainNullableFieldFill {
List<NullableFieldFill> getNullFieldFill();

static ContainNullableFieldFill ofVariousValue(List<NullableFieldFill> replacements) {
return new VariousValueNullFill(replacements);
}

static ContainNullableFieldFill ofSameValue(
UnresolvedExpression replaceNullWithMe, List<Field> nullableFieldReferences) {
return new SameValueNullFill(replaceNullWithMe, nullableFieldReferences);
}
}

private static class SameValueNullFill implements ContainNullableFieldFill {
@Getter(onMethod_ = @Override)
private final List<NullableFieldFill> nullFieldFill;

public SameValueNullFill(
UnresolvedExpression replaceNullWithMe, List<Field> nullableFieldReferences) {
Objects.requireNonNull(replaceNullWithMe, "Null replacement is required");
this.nullFieldFill =
Objects.requireNonNull(nullableFieldReferences, "Nullable field reference is required")
.stream()
.map(nullableReference -> new NullableFieldFill(nullableReference, replaceNullWithMe))
.toList();
}
}

@RequiredArgsConstructor
private static class VariousValueNullFill implements ContainNullableFieldFill {
@NonNull
@Getter(onMethod_ = @Override)
private final List<NullableFieldFill> nullFieldFill;
}

private UnresolvedPlan child;

@NonNull private final ContainNullableFieldFill containNullableFieldFill;

public List<NullableFieldFill> getNullableFieldFills() {
return containNullableFieldFill.getNullFieldFill();
}

@Override
public UnresolvedPlan attach(UnresolvedPlan child) {
this.child = child;
return this;
}

@Override
public List<? extends Node> getChild() {
return child == null ? List.of() : List.of(child);
}

@Override
public <T, C> T accept(AbstractNodeVisitor<T, C> nodeVisitor, C context) {
return nodeVisitor.visitFillNull(this, context);
}
}
44 changes: 44 additions & 0 deletions core/src/test/java/org/opensearch/sql/analysis/AnalyzerTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,7 @@
import org.opensearch.sql.ast.dsl.AstDSL;
import org.opensearch.sql.ast.expression.Argument;
import org.opensearch.sql.ast.expression.DataType;
import org.opensearch.sql.ast.expression.Field;
import org.opensearch.sql.ast.expression.HighlightFunction;
import org.opensearch.sql.ast.expression.Literal;
import org.opensearch.sql.ast.expression.ParseMethod;
Expand All @@ -81,6 +82,7 @@
import org.opensearch.sql.ast.tree.AD;
import org.opensearch.sql.ast.tree.CloseCursor;
import org.opensearch.sql.ast.tree.FetchCursor;
import org.opensearch.sql.ast.tree.FillNull;
import org.opensearch.sql.ast.tree.Kmeans;
import org.opensearch.sql.ast.tree.ML;
import org.opensearch.sql.ast.tree.Paginate;
Expand Down Expand Up @@ -1437,6 +1439,48 @@ public void kmeanns_relation() {
new Kmeans(AstDSL.relation("schema"), argumentMap));
}

@Test
public void fillnull_same_value() {
assertAnalyzeEqual(
LogicalPlanDSL.eval(
LogicalPlanDSL.relation("schema", table),
ImmutablePair.of(
DSL.ref("integer_value", INTEGER),
DSL.ifnull(DSL.ref("integer_value", INTEGER), DSL.literal(0))),
ImmutablePair.of(
DSL.ref("int_null_value", INTEGER),
DSL.ifnull(DSL.ref("int_null_value", INTEGER), DSL.literal(0)))),
new FillNull(
AstDSL.relation("schema"),
FillNull.ContainNullableFieldFill.ofSameValue(
AstDSL.intLiteral(0),
ImmutableList.<Field>builder()
.add(AstDSL.field("integer_value"))
.add(AstDSL.field("int_null_value"))
.build())));
}

@Test
public void fillnull_various_values() {
assertAnalyzeEqual(
LogicalPlanDSL.eval(
LogicalPlanDSL.relation("schema", table),
ImmutablePair.of(
DSL.ref("integer_value", INTEGER),
DSL.ifnull(DSL.ref("integer_value", INTEGER), DSL.literal(0))),
ImmutablePair.of(
DSL.ref("int_null_value", INTEGER),
DSL.ifnull(DSL.ref("int_null_value", INTEGER), DSL.literal(1)))),
new FillNull(
AstDSL.relation("schema"),
FillNull.ContainNullableFieldFill.ofVariousValue(
ImmutableList.of(
new FillNull.NullableFieldFill(
AstDSL.field("integer_value"), AstDSL.intLiteral(0)),
new FillNull.NullableFieldFill(
AstDSL.field("int_null_value"), AstDSL.intLiteral(1))))));
}

@Test
public void ad_batchRCF_relation() {
Map<String, Literal> argumentMap =
Expand Down
1 change: 1 addition & 0 deletions docs/category.json
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
"user/ppl/cmd/information_schema.rst",
"user/ppl/cmd/eval.rst",
"user/ppl/cmd/fields.rst",
"user/ppl/cmd/fillnull.rst",
"user/ppl/cmd/grok.rst",
"user/ppl/cmd/head.rst",
"user/ppl/cmd/parse.rst",
Expand Down
62 changes: 62 additions & 0 deletions docs/user/ppl/cmd/fillnull.rst
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
=============
fillnull
=============

.. rubric:: Table of contents

.. contents::
:local:
:depth: 2


Description
============
Using ``fillnull`` command to fill null with provided value in one or more fields in the search result.


Syntax
============
`fillnull [with <null-replacement> in <nullable-field>["," <nullable-field>]] | [using <source-field> = <null-replacement> [","<source-field> = <null-replacement>]]`

* null-replacement: mandatory. The value used to replace `null`s.
* nullable-field: mandatory. Field reference. The `null` values in the field referred to by the property will be replaced with the values from the null-replacement.

Example 1: fillnull one field
======================================================================

The example show fillnull one field.

PPL query::

os> source=accounts | fields email, employer | fillnull with '<not found>' in email ;
fetched rows / total rows = 4/4
+-----------------------+----------+
| email | employer |
|-----------------------+----------|
| [email protected] | Pyrami |
| [email protected] | Netagy |
| <not found> | Quility |
| [email protected] | null |
+-----------------------+----------+

Example 2: fillnull applied to multiple fields
========================================================================

The example show fillnull applied to multiple fields.

PPL query::

os> source=accounts | fields email, employer | fillnull using email = '<not found>', employer = '<no employer>' ;
fetched rows / total rows = 4/4
+-----------------------+---------------+
| email | employer |
|-----------------------+---------------|
| [email protected] | Pyrami |
| [email protected] | Netagy |
| <not found> | Quility |
| [email protected] | <no employer> |
+-----------------------+---------------+

Limitation
==========
The ``fillnull`` command is not rewritten to OpenSearch DSL, it is only executed on the coordination node.
11 changes: 11 additions & 0 deletions integ-test/src/test/java/org/opensearch/sql/ppl/ExplainIT.java
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,17 @@ public void testLimitPushDownExplain() throws Exception {
+ "| fields ageMinus"));
}

@Test
public void testFillNullPushDownExplain() throws Exception {
String expected = loadFromFile("expectedOutput/ppl/explain_fillnull_push.json");

assertJsonEquals(
expected,
explainQueryToString(
"source=opensearch-sql_test_index_account"
+ " | fillnull with -1 in age,balance | fields age, balance"));
}

String loadFromFile(String filename) throws Exception {
URI uri = Resources.getResource(filename).toURI();
return new String(Files.readAllBytes(Paths.get(uri)));
Expand Down
Loading

0 comments on commit b6846ce

Please sign in to comment.