From fe481bd5d0d39deffa06fed79af2c9e86647c894 Mon Sep 17 00:00:00 2001 From: fuwenkai <834260992@qq.com> Date: Fri, 13 Sep 2024 11:00:22 +0800 Subject: [PATCH] [INLONG-11091][Manager] Manager supports in filter function configuration --- .../pojo/sort/util/FilterFunctionUtils.java | 27 ++++++++++++++++--- .../pojo/transform/TransformDefinition.java | 2 +- .../transform/filter/FilterDefinition.java | 2 ++ .../transform/TransformDefinitionTest.java | 4 +-- .../service/sink/StreamSinkServiceImpl.java | 2 +- 5 files changed, 30 insertions(+), 7 deletions(-) diff --git a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/util/FilterFunctionUtils.java b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/util/FilterFunctionUtils.java index 1b3eadc7f22..59ef0d842c1 100644 --- a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/util/FilterFunctionUtils.java +++ b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/util/FilterFunctionUtils.java @@ -31,12 +31,15 @@ import org.apache.inlong.manager.pojo.transform.filter.FilterDefinition.TargetValue; import org.apache.inlong.sort.protocol.FieldInfo; import org.apache.inlong.sort.protocol.enums.FilterStrategy; +import org.apache.inlong.sort.protocol.transformation.CompareOperator; import org.apache.inlong.sort.protocol.transformation.ConstantParam; import org.apache.inlong.sort.protocol.transformation.FilterFunction; import org.apache.inlong.sort.protocol.transformation.FunctionParam; import org.apache.inlong.sort.protocol.transformation.LogicOperator; +import org.apache.inlong.sort.protocol.transformation.MultiValueCompareOperator; import org.apache.inlong.sort.protocol.transformation.SingleValueCompareOperator; import org.apache.inlong.sort.protocol.transformation.function.CustomFunction; +import org.apache.inlong.sort.protocol.transformation.function.MultiValueFilterFunction; import org.apache.inlong.sort.protocol.transformation.function.SingleValueFilterFunction; import org.apache.inlong.sort.protocol.transformation.operator.AndOperator; import org.apache.inlong.sort.protocol.transformation.operator.EmptyOperator; @@ -51,13 +54,17 @@ import org.apache.inlong.sort.protocol.transformation.operator.OrOperator; import com.google.common.collect.Lists; +import lombok.extern.slf4j.Slf4j; +import org.apache.commons.collections.CollectionUtils; +import java.util.ArrayList; import java.util.List; import java.util.stream.Collectors; /** * Util for creat filter function. */ +@Slf4j public class FilterFunctionUtils { /** @@ -155,12 +162,26 @@ private static FilterFunction createFilterFunction(FilterRule filterRule, String FieldInfoUtils.convertFieldFormat(fieldType, fieldFormat)); } OperationType operationType = filterRule.getOperationType(); - SingleValueCompareOperator compareOperator = parseCompareOperator(operationType); + CompareOperator compareOperator = parseCompareOperator(operationType); TargetValue targetValue = filterRule.getTargetValue(); FunctionParam target = parseTargetValue(targetValue, transformName); RuleRelation relationWithPost = filterRule.getRelationWithPost(); LogicOperator logicOperator = parseLogicOperator(relationWithPost); - return new SingleValueFilterFunction(logicOperator, sourceFieldInfo, compareOperator, target); + if (compareOperator instanceof SingleValueCompareOperator) { + return new SingleValueFilterFunction(logicOperator, sourceFieldInfo, + (SingleValueCompareOperator) compareOperator, target); + } else { + List targets = new ArrayList<>(); + if (CollectionUtils.isNotEmpty(filterRule.getTargetValues())) { + for (TargetValue value : filterRule.getTargetValues()) { + targets.add(parseTargetValue(value, transformName)); + } + } else { + targets.add(target); + } + return new MultiValueFilterFunction(sourceFieldInfo, targets, (MultiValueCompareOperator) compareOperator, + logicOperator); + } } private static LogicOperator parseLogicOperator(RuleRelation relation) { @@ -199,7 +220,7 @@ private static FunctionParam parseTargetValue(TargetValue value, String transfor } } - private static SingleValueCompareOperator parseCompareOperator(OperationType operationType) { + private static CompareOperator parseCompareOperator(OperationType operationType) { switch (operationType) { case eq: return EqualOperator.getInstance(); diff --git a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/transform/TransformDefinition.java b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/transform/TransformDefinition.java index 95c119ef25f..f1aafdab011 100644 --- a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/transform/TransformDefinition.java +++ b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/transform/TransformDefinition.java @@ -36,7 +36,7 @@ public abstract class TransformDefinition { @JsonFormat public enum OperationType { - lt, le, eq, ne, ge, gt, is_null, not_null + lt, le, eq, ne, ge, gt, is_null, not_null, in } @JsonFormat diff --git a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/transform/filter/FilterDefinition.java b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/transform/filter/FilterDefinition.java index 9cec3550de6..bdd642cadf7 100644 --- a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/transform/filter/FilterDefinition.java +++ b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/transform/filter/FilterDefinition.java @@ -105,6 +105,8 @@ public static class FilterRule { private TargetValue targetValue; + private List targetValues; + private RuleRelation relationWithPost; } diff --git a/inlong-manager/manager-pojo/src/test/java/org/apache/inlong/manager/pojo/transform/TransformDefinitionTest.java b/inlong-manager/manager-pojo/src/test/java/org/apache/inlong/manager/pojo/transform/TransformDefinitionTest.java index f68bf36f62d..4d5cecbad3e 100644 --- a/inlong-manager/manager-pojo/src/test/java/org/apache/inlong/manager/pojo/transform/TransformDefinitionTest.java +++ b/inlong-manager/manager-pojo/src/test/java/org/apache/inlong/manager/pojo/transform/TransformDefinitionTest.java @@ -95,9 +95,9 @@ private List createStreamFields() { private List createFilterRule() { List filterRules = Lists.newArrayList(); filterRules.add(new FilterRule(new StreamField(0, FieldType.STRING.toString(), "name", null, null), - OperationType.not_null, null, RuleRelation.OR)); + OperationType.not_null, null, null, RuleRelation.OR)); filterRules.add(new FilterRule(new StreamField(1, FieldType.INT.toString(), "age", null, null), - OperationType.gt, new TargetValue(true, null, "50"), null)); + OperationType.gt, new TargetValue(true, null, "50"), null, null)); return filterRules; } diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/StreamSinkServiceImpl.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/StreamSinkServiceImpl.java index 9177253d903..4ce1d1c76c2 100644 --- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/StreamSinkServiceImpl.java +++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/StreamSinkServiceImpl.java @@ -481,7 +481,7 @@ public Boolean update(SinkRequest request, String operator) { this.startProcessForSink(request.getInlongGroupId(), request.getInlongStreamId(), operator); } - LOGGER.info("success to update sink by id: {}", request); + LOGGER.info("success to update sink by id: {}", request.getId()); return true; }