From 70968a9d4b26a2d95acb845ae68ce4d0d80706a0 Mon Sep 17 00:00:00 2001 From: fuweng11 <76141879+fuweng11@users.noreply.github.com> Date: Fri, 13 Sep 2024 13:10:54 +0800 Subject: [PATCH] [INLONG-11091][Manager] Manager supports in filter function configuration (#11094) --- .../pojo/sort/util/FilterFunctionUtils.java | 30 +++++++++++++++++-- .../pojo/transform/TransformDefinition.java | 2 +- .../transform/filter/FilterDefinition.java | 2 ++ .../transform/TransformDefinitionTest.java | 4 +-- .../service/sink/StreamSinkServiceImpl.java | 2 +- 5 files changed, 33 insertions(+), 7 deletions(-) diff --git a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/util/FilterFunctionUtils.java b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/util/FilterFunctionUtils.java index 1b3eadc7f22..c66dccd5113 100644 --- a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/util/FilterFunctionUtils.java +++ b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/util/FilterFunctionUtils.java @@ -31,16 +31,20 @@ import org.apache.inlong.manager.pojo.transform.filter.FilterDefinition.TargetValue; import org.apache.inlong.sort.protocol.FieldInfo; import org.apache.inlong.sort.protocol.enums.FilterStrategy; +import org.apache.inlong.sort.protocol.transformation.CompareOperator; import org.apache.inlong.sort.protocol.transformation.ConstantParam; import org.apache.inlong.sort.protocol.transformation.FilterFunction; import org.apache.inlong.sort.protocol.transformation.FunctionParam; import org.apache.inlong.sort.protocol.transformation.LogicOperator; +import org.apache.inlong.sort.protocol.transformation.MultiValueCompareOperator; import org.apache.inlong.sort.protocol.transformation.SingleValueCompareOperator; import org.apache.inlong.sort.protocol.transformation.function.CustomFunction; +import org.apache.inlong.sort.protocol.transformation.function.MultiValueFilterFunction; import org.apache.inlong.sort.protocol.transformation.function.SingleValueFilterFunction; import org.apache.inlong.sort.protocol.transformation.operator.AndOperator; import org.apache.inlong.sort.protocol.transformation.operator.EmptyOperator; import org.apache.inlong.sort.protocol.transformation.operator.EqualOperator; +import org.apache.inlong.sort.protocol.transformation.operator.InOperator; import org.apache.inlong.sort.protocol.transformation.operator.IsNotNullOperator; import org.apache.inlong.sort.protocol.transformation.operator.IsNullOperator; import org.apache.inlong.sort.protocol.transformation.operator.LessThanOperator; @@ -51,13 +55,17 @@ import org.apache.inlong.sort.protocol.transformation.operator.OrOperator; import com.google.common.collect.Lists; +import lombok.extern.slf4j.Slf4j; +import org.apache.commons.collections.CollectionUtils; +import java.util.ArrayList; import java.util.List; import java.util.stream.Collectors; /** * Util for creat filter function. */ +@Slf4j public class FilterFunctionUtils { /** @@ -155,12 +163,26 @@ private static FilterFunction createFilterFunction(FilterRule filterRule, String FieldInfoUtils.convertFieldFormat(fieldType, fieldFormat)); } OperationType operationType = filterRule.getOperationType(); - SingleValueCompareOperator compareOperator = parseCompareOperator(operationType); + CompareOperator compareOperator = parseCompareOperator(operationType); TargetValue targetValue = filterRule.getTargetValue(); FunctionParam target = parseTargetValue(targetValue, transformName); RuleRelation relationWithPost = filterRule.getRelationWithPost(); LogicOperator logicOperator = parseLogicOperator(relationWithPost); - return new SingleValueFilterFunction(logicOperator, sourceFieldInfo, compareOperator, target); + if (compareOperator instanceof SingleValueCompareOperator) { + return new SingleValueFilterFunction(logicOperator, sourceFieldInfo, + (SingleValueCompareOperator) compareOperator, target); + } else { + List targets = new ArrayList<>(); + if (CollectionUtils.isNotEmpty(filterRule.getTargetValues())) { + for (TargetValue value : filterRule.getTargetValues()) { + targets.add(parseTargetValue(value, transformName)); + } + } else { + targets.add(target); + } + return new MultiValueFilterFunction(sourceFieldInfo, targets, (MultiValueCompareOperator) compareOperator, + logicOperator); + } } private static LogicOperator parseLogicOperator(RuleRelation relation) { @@ -199,7 +221,7 @@ private static FunctionParam parseTargetValue(TargetValue value, String transfor } } - private static SingleValueCompareOperator parseCompareOperator(OperationType operationType) { + private static CompareOperator parseCompareOperator(OperationType operationType) { switch (operationType) { case eq: return EqualOperator.getInstance(); @@ -217,6 +239,8 @@ private static SingleValueCompareOperator parseCompareOperator(OperationType ope return IsNullOperator.getInstance(); case not_null: return IsNotNullOperator.getInstance(); + case in: + return InOperator.getInstance(); default: throw new IllegalArgumentException(String.format("Unsupported operateType=%s", operationType)); } diff --git a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/transform/TransformDefinition.java b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/transform/TransformDefinition.java index 95c119ef25f..f1aafdab011 100644 --- a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/transform/TransformDefinition.java +++ b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/transform/TransformDefinition.java @@ -36,7 +36,7 @@ public abstract class TransformDefinition { @JsonFormat public enum OperationType { - lt, le, eq, ne, ge, gt, is_null, not_null + lt, le, eq, ne, ge, gt, is_null, not_null, in } @JsonFormat diff --git a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/transform/filter/FilterDefinition.java b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/transform/filter/FilterDefinition.java index 9cec3550de6..bdd642cadf7 100644 --- a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/transform/filter/FilterDefinition.java +++ b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/transform/filter/FilterDefinition.java @@ -105,6 +105,8 @@ public static class FilterRule { private TargetValue targetValue; + private List targetValues; + private RuleRelation relationWithPost; } diff --git a/inlong-manager/manager-pojo/src/test/java/org/apache/inlong/manager/pojo/transform/TransformDefinitionTest.java b/inlong-manager/manager-pojo/src/test/java/org/apache/inlong/manager/pojo/transform/TransformDefinitionTest.java index f68bf36f62d..4d5cecbad3e 100644 --- a/inlong-manager/manager-pojo/src/test/java/org/apache/inlong/manager/pojo/transform/TransformDefinitionTest.java +++ b/inlong-manager/manager-pojo/src/test/java/org/apache/inlong/manager/pojo/transform/TransformDefinitionTest.java @@ -95,9 +95,9 @@ private List createStreamFields() { private List createFilterRule() { List filterRules = Lists.newArrayList(); filterRules.add(new FilterRule(new StreamField(0, FieldType.STRING.toString(), "name", null, null), - OperationType.not_null, null, RuleRelation.OR)); + OperationType.not_null, null, null, RuleRelation.OR)); filterRules.add(new FilterRule(new StreamField(1, FieldType.INT.toString(), "age", null, null), - OperationType.gt, new TargetValue(true, null, "50"), null)); + OperationType.gt, new TargetValue(true, null, "50"), null, null)); return filterRules; } diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/StreamSinkServiceImpl.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/StreamSinkServiceImpl.java index 9177253d903..4ce1d1c76c2 100644 --- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/StreamSinkServiceImpl.java +++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/StreamSinkServiceImpl.java @@ -481,7 +481,7 @@ public Boolean update(SinkRequest request, String operator) { this.startProcessForSink(request.getInlongGroupId(), request.getInlongStreamId(), operator); } - LOGGER.info("success to update sink by id: {}", request); + LOGGER.info("success to update sink by id: {}", request.getId()); return true; }