Skip to content

Commit

Permalink
Merge branch 'master' into INLONG-11005
Browse files Browse the repository at this point in the history
# Conflicts:
#	inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/decode/SourceDecoderFactory.java
  • Loading branch information
emptyOVO committed Sep 14, 2024
2 parents 3a25892 + 807717a commit 3d5818f
Show file tree
Hide file tree
Showing 54 changed files with 2,203 additions and 47 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -31,16 +31,20 @@
import org.apache.inlong.manager.pojo.transform.filter.FilterDefinition.TargetValue;
import org.apache.inlong.sort.protocol.FieldInfo;
import org.apache.inlong.sort.protocol.enums.FilterStrategy;
import org.apache.inlong.sort.protocol.transformation.CompareOperator;
import org.apache.inlong.sort.protocol.transformation.ConstantParam;
import org.apache.inlong.sort.protocol.transformation.FilterFunction;
import org.apache.inlong.sort.protocol.transformation.FunctionParam;
import org.apache.inlong.sort.protocol.transformation.LogicOperator;
import org.apache.inlong.sort.protocol.transformation.MultiValueCompareOperator;
import org.apache.inlong.sort.protocol.transformation.SingleValueCompareOperator;
import org.apache.inlong.sort.protocol.transformation.function.CustomFunction;
import org.apache.inlong.sort.protocol.transformation.function.MultiValueFilterFunction;
import org.apache.inlong.sort.protocol.transformation.function.SingleValueFilterFunction;
import org.apache.inlong.sort.protocol.transformation.operator.AndOperator;
import org.apache.inlong.sort.protocol.transformation.operator.EmptyOperator;
import org.apache.inlong.sort.protocol.transformation.operator.EqualOperator;
import org.apache.inlong.sort.protocol.transformation.operator.InOperator;
import org.apache.inlong.sort.protocol.transformation.operator.IsNotNullOperator;
import org.apache.inlong.sort.protocol.transformation.operator.IsNullOperator;
import org.apache.inlong.sort.protocol.transformation.operator.LessThanOperator;
Expand All @@ -51,13 +55,17 @@
import org.apache.inlong.sort.protocol.transformation.operator.OrOperator;

import com.google.common.collect.Lists;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.collections.CollectionUtils;

import java.util.ArrayList;
import java.util.List;
import java.util.stream.Collectors;

/**
* Util for creat filter function.
*/
@Slf4j
public class FilterFunctionUtils {

/**
Expand Down Expand Up @@ -98,11 +106,11 @@ public static List<FilterFunction> createFilterFunctions(FilterDefinition filter
.map(filterRule -> createFilterFunction(filterRule, transformName)).collect(Collectors.toList());
// Move logicOperator to preFunction
for (int index = filterFunctions.size() - 1; index > 0; index--) {
SingleValueFilterFunction function = (SingleValueFilterFunction) filterFunctions.get(index);
SingleValueFilterFunction preFunction = (SingleValueFilterFunction) filterFunctions.get(index - 1);
FilterFunction function = filterFunctions.get(index);
FilterFunction preFunction = filterFunctions.get(index - 1);
function.setLogicOperator(preFunction.getLogicOperator());
}
((SingleValueFilterFunction) filterFunctions.get(0)).setLogicOperator(EmptyOperator.getInstance());
(filterFunctions.get(0)).setLogicOperator(EmptyOperator.getInstance());
return filterFunctions;
}

Expand Down Expand Up @@ -155,12 +163,26 @@ private static FilterFunction createFilterFunction(FilterRule filterRule, String
FieldInfoUtils.convertFieldFormat(fieldType, fieldFormat));
}
OperationType operationType = filterRule.getOperationType();
SingleValueCompareOperator compareOperator = parseCompareOperator(operationType);
CompareOperator compareOperator = parseCompareOperator(operationType);
TargetValue targetValue = filterRule.getTargetValue();
FunctionParam target = parseTargetValue(targetValue, transformName);
RuleRelation relationWithPost = filterRule.getRelationWithPost();
LogicOperator logicOperator = parseLogicOperator(relationWithPost);
return new SingleValueFilterFunction(logicOperator, sourceFieldInfo, compareOperator, target);
if (compareOperator instanceof SingleValueCompareOperator) {
return new SingleValueFilterFunction(logicOperator, sourceFieldInfo,
(SingleValueCompareOperator) compareOperator, target);
} else {
List<FunctionParam> targets = new ArrayList<>();
if (CollectionUtils.isNotEmpty(filterRule.getTargetValues())) {
for (TargetValue value : filterRule.getTargetValues()) {
targets.add(parseTargetValue(value, transformName));
}
} else {
targets.add(target);
}
return new MultiValueFilterFunction(sourceFieldInfo, targets, (MultiValueCompareOperator) compareOperator,
logicOperator);
}
}

private static LogicOperator parseLogicOperator(RuleRelation relation) {
Expand Down Expand Up @@ -199,7 +221,7 @@ private static FunctionParam parseTargetValue(TargetValue value, String transfor
}
}

private static SingleValueCompareOperator parseCompareOperator(OperationType operationType) {
private static CompareOperator parseCompareOperator(OperationType operationType) {
switch (operationType) {
case eq:
return EqualOperator.getInstance();
Expand All @@ -217,6 +239,8 @@ private static SingleValueCompareOperator parseCompareOperator(OperationType ope
return IsNullOperator.getInstance();
case not_null:
return IsNotNullOperator.getInstance();
case in:
return InOperator.getInstance();
default:
throw new IllegalArgumentException(String.format("Unsupported operateType=%s", operationType));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ public class QueryMessageRequest {
private String streamId;

@ApiModelProperty(value = "Message count")
private Integer messageCount = 100;
private Integer messageCount = 10;

@ApiModelProperty(value = "Field name")
private String fieldName;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ public abstract class TransformDefinition {

@JsonFormat
public enum OperationType {
lt, le, eq, ne, ge, gt, is_null, not_null
lt, le, eq, ne, ge, gt, is_null, not_null, in
}

@JsonFormat
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,8 @@ public static class FilterRule {

private TargetValue targetValue;

private List<TargetValue> targetValues;

private RuleRelation relationWithPost;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -95,9 +95,9 @@ private List<StreamField> createStreamFields() {
private List<FilterRule> createFilterRule() {
List<FilterRule> filterRules = Lists.newArrayList();
filterRules.add(new FilterRule(new StreamField(0, FieldType.STRING.toString(), "name", null, null),
OperationType.not_null, null, RuleRelation.OR));
OperationType.not_null, null, null, RuleRelation.OR));
filterRules.add(new FilterRule(new StreamField(1, FieldType.INT.toString(), "age", null, null),
OperationType.gt, new TargetValue(true, null, "50"), null));
OperationType.gt, new TargetValue(true, null, "50"), null, null));
return filterRules;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -910,6 +910,11 @@ public DataProxyNodeResponse getDataProxyNodes(String groupId, String protocolTy
LOGGER.debug("begin to get data proxy nodes for groupId={}, protocol={}", groupId, protocolType);

InlongGroupEntity groupEntity = groupMapper.selectByGroupId(groupId);
if (groupEntity == null) {
String errMsg = String.format("group not found by groupId=%s", groupId);
LOGGER.error(errMsg);
throw new BusinessException(errMsg);
}
GroupStatus groupStatus = GroupStatus.forCode(groupEntity.getStatus());
if (!Objects.equals(groupStatus, GroupStatus.CONFIG_SUCCESSFUL)) {
String errMsg =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,12 +52,13 @@ public List<FieldInfo> parseFields(String str, InlongStreamInfo streamInfo) thro
if (StringUtils.isNotBlank(streamInfo.getDataEscapeChar())) {
escapeChar = streamInfo.getDataEscapeChar().charAt(0);
}
String[][] rowValues = SplitUtils.splitCsv(str, separator, escapeChar, '\"', '\n', true);
String[][] rowValues = SplitUtils.splitCsv(str, separator, escapeChar, null, '\n', true);
int fieldIndex = 0;
for (int i = 0; i < rowValues.length; i++) {
String[] fieldValues = rowValues[i];
for (int j = 0; j < fieldValues.length; j++) {
if (i + j < fields.size()) {
fields.get(i + j).setFieldValue(fieldValues[j]);
fields.get(fieldIndex++).setFieldValue(fieldValues[j]);
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ public List<FieldInfo> parseFields(String str, InlongStreamInfo streamInfo) thro
lineSeparator = (char) Integer.parseInt(streamInfo.getLineSeparator());
}
List<Map<String, String>> rowValues =
KvUtils.splitKv(str, separator, kvSeparator, escapeChar, '\"', lineSeparator);
KvUtils.splitKv(str, separator, kvSeparator, escapeChar, null, lineSeparator);
for (Map<String, String> row : rowValues) {
for (FieldInfo fieldInfo : fields) {
fieldInfo.setFieldValue(row.get(fieldInfo.getFieldName()));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -414,7 +414,7 @@ public List<BriefMQMessage> queryLatestMessage(PulsarClusterInfo pulsarClusterIn
LOGGER.info("begin to query message for topic {}, adminUrl={}", topicFullName, pulsarClusterInfo.getAdminUrl());
List<BriefMQMessage> messageList = new ArrayList<>();
int partitionCount = getPartitionCount(pulsarClusterInfo, topicFullName);
for (int messageIndex = 0; messageIndex < 100; messageIndex++) {
for (int messageIndex = 0; messageIndex < request.getMessageCount(); messageIndex++) {
int currentPartitionNum = messageIndex % partitionCount;
int messagePosition = messageIndex / partitionCount + 1;
String topicNameOfPartition = buildTopicNameOfPartition(topicFullName, currentPartitionNum, serial);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -481,7 +481,7 @@ public Boolean update(SinkRequest request, String operator) {
this.startProcessForSink(request.getInlongGroupId(), request.getInlongStreamId(), operator);
}

LOGGER.info("success to update sink by id: {}", request);
LOGGER.info("success to update sink by id: {}", request.getId());
return true;
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.inlong.sdk.transform.decode;

import lombok.Data;
import org.apache.commons.lang.math.NumberUtils;
import org.apache.commons.lang3.StringUtils;

import java.util.ArrayList;
import java.util.List;

/**
* AvroNode
*/
@Data
public class AvroNode {

private String name;
private boolean isArray = false;
private List<Integer> arrayIndices = new ArrayList<>();

public AvroNode(String nodeString) {
int beginIndex = nodeString.indexOf('(');
if (beginIndex < 0) {
this.name = nodeString;
} else {
this.name = StringUtils.trim(nodeString.substring(0, beginIndex));
int endIndex = nodeString.lastIndexOf(')');
if (endIndex >= 0) {
this.isArray = true;
String indicesString = nodeString.substring(beginIndex + 1, endIndex);
String[] indices = indicesString.split(",");
for (String index : indices) {
int arrayIndex = NumberUtils.toInt(StringUtils.trim(index), -1);
if (arrayIndex < 0) {
arrayIndex = 0;
}
this.arrayIndices.add(arrayIndex);
}
}
}
}
}
Loading

0 comments on commit 3d5818f

Please sign in to comment.