Skip to content

Commit

Permalink
[INLONG-11091][Manager] Manager supports in filter function configura…
Browse files Browse the repository at this point in the history
…tion (apache#11094)
  • Loading branch information
fuweng11 committed Sep 13, 2024
1 parent 5056e49 commit 70968a9
Show file tree
Hide file tree
Showing 5 changed files with 33 additions and 7 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -31,16 +31,20 @@
import org.apache.inlong.manager.pojo.transform.filter.FilterDefinition.TargetValue;
import org.apache.inlong.sort.protocol.FieldInfo;
import org.apache.inlong.sort.protocol.enums.FilterStrategy;
import org.apache.inlong.sort.protocol.transformation.CompareOperator;
import org.apache.inlong.sort.protocol.transformation.ConstantParam;
import org.apache.inlong.sort.protocol.transformation.FilterFunction;
import org.apache.inlong.sort.protocol.transformation.FunctionParam;
import org.apache.inlong.sort.protocol.transformation.LogicOperator;
import org.apache.inlong.sort.protocol.transformation.MultiValueCompareOperator;
import org.apache.inlong.sort.protocol.transformation.SingleValueCompareOperator;
import org.apache.inlong.sort.protocol.transformation.function.CustomFunction;
import org.apache.inlong.sort.protocol.transformation.function.MultiValueFilterFunction;
import org.apache.inlong.sort.protocol.transformation.function.SingleValueFilterFunction;
import org.apache.inlong.sort.protocol.transformation.operator.AndOperator;
import org.apache.inlong.sort.protocol.transformation.operator.EmptyOperator;
import org.apache.inlong.sort.protocol.transformation.operator.EqualOperator;
import org.apache.inlong.sort.protocol.transformation.operator.InOperator;
import org.apache.inlong.sort.protocol.transformation.operator.IsNotNullOperator;
import org.apache.inlong.sort.protocol.transformation.operator.IsNullOperator;
import org.apache.inlong.sort.protocol.transformation.operator.LessThanOperator;
Expand All @@ -51,13 +55,17 @@
import org.apache.inlong.sort.protocol.transformation.operator.OrOperator;

import com.google.common.collect.Lists;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.collections.CollectionUtils;

import java.util.ArrayList;
import java.util.List;
import java.util.stream.Collectors;

/**
* Util for creat filter function.
*/
@Slf4j
public class FilterFunctionUtils {

/**
Expand Down Expand Up @@ -155,12 +163,26 @@ private static FilterFunction createFilterFunction(FilterRule filterRule, String
FieldInfoUtils.convertFieldFormat(fieldType, fieldFormat));
}
OperationType operationType = filterRule.getOperationType();
SingleValueCompareOperator compareOperator = parseCompareOperator(operationType);
CompareOperator compareOperator = parseCompareOperator(operationType);
TargetValue targetValue = filterRule.getTargetValue();
FunctionParam target = parseTargetValue(targetValue, transformName);
RuleRelation relationWithPost = filterRule.getRelationWithPost();
LogicOperator logicOperator = parseLogicOperator(relationWithPost);
return new SingleValueFilterFunction(logicOperator, sourceFieldInfo, compareOperator, target);
if (compareOperator instanceof SingleValueCompareOperator) {
return new SingleValueFilterFunction(logicOperator, sourceFieldInfo,
(SingleValueCompareOperator) compareOperator, target);
} else {
List<FunctionParam> targets = new ArrayList<>();
if (CollectionUtils.isNotEmpty(filterRule.getTargetValues())) {
for (TargetValue value : filterRule.getTargetValues()) {
targets.add(parseTargetValue(value, transformName));
}
} else {
targets.add(target);
}
return new MultiValueFilterFunction(sourceFieldInfo, targets, (MultiValueCompareOperator) compareOperator,
logicOperator);
}
}

private static LogicOperator parseLogicOperator(RuleRelation relation) {
Expand Down Expand Up @@ -199,7 +221,7 @@ private static FunctionParam parseTargetValue(TargetValue value, String transfor
}
}

private static SingleValueCompareOperator parseCompareOperator(OperationType operationType) {
private static CompareOperator parseCompareOperator(OperationType operationType) {
switch (operationType) {
case eq:
return EqualOperator.getInstance();
Expand All @@ -217,6 +239,8 @@ private static SingleValueCompareOperator parseCompareOperator(OperationType ope
return IsNullOperator.getInstance();
case not_null:
return IsNotNullOperator.getInstance();
case in:
return InOperator.getInstance();
default:
throw new IllegalArgumentException(String.format("Unsupported operateType=%s", operationType));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ public abstract class TransformDefinition {

@JsonFormat
public enum OperationType {
lt, le, eq, ne, ge, gt, is_null, not_null
lt, le, eq, ne, ge, gt, is_null, not_null, in
}

@JsonFormat
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,8 @@ public static class FilterRule {

private TargetValue targetValue;

private List<TargetValue> targetValues;

private RuleRelation relationWithPost;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -95,9 +95,9 @@ private List<StreamField> createStreamFields() {
private List<FilterRule> createFilterRule() {
List<FilterRule> filterRules = Lists.newArrayList();
filterRules.add(new FilterRule(new StreamField(0, FieldType.STRING.toString(), "name", null, null),
OperationType.not_null, null, RuleRelation.OR));
OperationType.not_null, null, null, RuleRelation.OR));
filterRules.add(new FilterRule(new StreamField(1, FieldType.INT.toString(), "age", null, null),
OperationType.gt, new TargetValue(true, null, "50"), null));
OperationType.gt, new TargetValue(true, null, "50"), null, null));
return filterRules;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -481,7 +481,7 @@ public Boolean update(SinkRequest request, String operator) {
this.startProcessForSink(request.getInlongGroupId(), request.getInlongStreamId(), operator);
}

LOGGER.info("success to update sink by id: {}", request);
LOGGER.info("success to update sink by id: {}", request.getId());
return true;
}

Expand Down

0 comments on commit 70968a9

Please sign in to comment.