Skip to content

Commit

Permalink
Merge branch 'master' into INLONG-10986
Browse files Browse the repository at this point in the history
# Conflicts:
#	inlong-sdk/transform-sdk/src/test/java/org/apache/inlong/sdk/transform/process/TestTransformStringFunctionsProcessor.java
  • Loading branch information
emptyOVO committed Sep 3, 2024
2 parents 3f4f462 + edf93bd commit a064277
Show file tree
Hide file tree
Showing 42 changed files with 1,724 additions and 120 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,7 @@ public class TubeMQOperator {
private static final String CREATE_USER = "&createUser=";
private static final String CONF_MOD_AUTH_TOKEN = "&confModAuthToken=";
private static final String MSG_COUNT = "&msgCount=";
private static final String FILTER_CONDS = "&filterConds=";

private static final String QUERY_TOPIC_PATH = "/webapi.htm?method=admin_query_cluster_topic_view";
private static final String QUERY_BROKER_PATH = "/webapi.htm?method=admin_query_broker_run_status";
Expand Down Expand Up @@ -288,7 +289,7 @@ public List<BriefMQMessage> queryLastMessage(TubeClusterInfo tubeCluster, String
}

String url = "http://" + brokerUrl + QUERY_MESSAGE_PATH + TOPIC_NAME + topicName + MSG_COUNT
+ request.getMessageCount();
+ request.getMessageCount() + FILTER_CONDS + streamInfo.getInlongStreamId();
TubeMessageResponse response = HttpUtils.request(restTemplate, url, HttpMethod.GET,
null, new HttpHeaders(), TubeMessageResponse.class);
if (response.getErrCode() != SUCCESS_CODE && response.getErrCode() != 200) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -175,6 +175,7 @@ private DataFlowConfig getDataFlowConfig(InlongGroupInfo groupInfo, InlongStream
.dataflowId(String.valueOf(sink.getId()))
.sourceConfig(getSourceConfig(groupInfo, streamInfo, sink))
.auditTag(String.valueOf(sink.getId()))
.transformSql(sink.getTransformSql())
.sinkConfig(getSinkConfig(groupInfo, streamInfo, sink))
.inlongGroupId(groupInfo.getInlongGroupId())
.inlongStreamId(streamInfo.getInlongStreamId())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,14 +28,14 @@
*/
public class CsvSourceData implements SourceData {

private List<Map<String, String>> rows = new ArrayList<>();
private List<Map<String, Object>> rows = new ArrayList<>();

private Map<String, String> currentRow;
private Map<String, Object> currentRow;

public CsvSourceData() {
}

public void putField(String fieldName, String fieldValue) {
public void putField(String fieldName, Object fieldValue) {
this.currentRow.put(fieldName, fieldValue);
}

Expand All @@ -50,11 +50,11 @@ public int getRowCount() {
}

@Override
public String getField(int rowNum, String fieldName) {
public Object getField(int rowNum, String fieldName) {
if (rowNum >= this.rows.size()) {
return null;
}
Map<String, String> targetRow = this.rows.get(rowNum);
Map<String, Object> targetRow = this.rows.get(rowNum);
return targetRow.get(fieldName);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -76,9 +76,13 @@ public SourceData decode(String srcString, Context context) {
int fieldIndex = 0;
for (FieldInfo field : fields) {
String fieldName = field.getName();
String fieldValue = null;
Object fieldValue = null;
if (fieldIndex < fieldValues.length) {
fieldValue = fieldValues[fieldIndex];
try {
fieldValue = field.getConverter().convert(fieldValues[fieldIndex]);
} catch (Exception e) {
throw new RuntimeException(e);
}
}
sourceData.putField(fieldName, fieldValue);
fieldIndex++;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,5 +26,5 @@ public interface SourceData {

int getRowCount();

String getField(int rowNum, String fieldName);
Object getField(int rowNum, String fieldName);
}
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,11 @@ public String encode(SinkData sinkData, Context context) {
} else {
for (String fieldName : sinkData.keyList()) {
String fieldValue = sinkData.getField(fieldName);
EscapeUtils.escapeContent(builder, delimiter, escapeChar, fieldValue);
if (StringUtils.equals(fieldName, ALL_SOURCE_FIELD_SIGN)) {
builder.append(fieldValue);
} else {
EscapeUtils.escapeContent(builder, delimiter, escapeChar, fieldValue);
}
builder.append(delimiter);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,11 @@ public String encode(SinkData sinkData, Context context) {
if (fields == null || fields.size() == 0) {
for (String fieldName : sinkData.keyList()) {
String fieldValue = sinkData.getField(fieldName);
builder.append(fieldName).append(kvDelimiter).append(fieldValue).append(entryDelimiter);
if (StringUtils.equals(fieldName, ALL_SOURCE_FIELD_SIGN)) {
builder.append(fieldValue).append(entryDelimiter);
} else {
builder.append(fieldName).append(kvDelimiter).append(fieldValue).append(entryDelimiter);
}
}
} else {
for (FieldInfo field : fields) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,8 @@
*/
public interface SinkEncoder<Output> {

public static final String ALL_SOURCE_FIELD_SIGN = "*";

Output encode(SinkData sinkData, Context context);

List<FieldInfo> getFields();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@
public class FieldInfo {

private String name;
private TypeConverter converter;
private TypeConverter converter = TypeConverter.DefaultTypeConverter();

public FieldInfo() {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,23 +31,23 @@
import com.google.common.collect.ImmutableMap;
import net.sf.jsqlparser.JSQLParserException;
import net.sf.jsqlparser.parser.CCJSqlParserManager;
import net.sf.jsqlparser.statement.select.AllColumns;
import net.sf.jsqlparser.statement.select.PlainSelect;
import net.sf.jsqlparser.statement.select.Select;
import net.sf.jsqlparser.statement.select.SelectExpressionItem;
import net.sf.jsqlparser.statement.select.SelectItem;
import org.apache.commons.lang3.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.StringReader;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;

/**
* TransformProcessor
*
*
*/
public class TransformProcessor<I, O> {

Expand All @@ -61,7 +61,9 @@ public class TransformProcessor<I, O> {

private PlainSelect transformSelect;
private ExpressionOperator where;
private Map<String, ValueParser> selectItemMap;
private List<ValueParserNode> selectItems;

private boolean includeAllSourceFields = false;

public static <I, O> TransformProcessor<I, O> create(
TransformConfig config,
Expand Down Expand Up @@ -91,7 +93,7 @@ private void initTransformSql() throws JSQLParserException {
this.transformSelect = (PlainSelect) select.getSelectBody();
this.where = OperatorTools.buildOperator(this.transformSelect.getWhere());
List<SelectItem> items = this.transformSelect.getSelectItems();
this.selectItemMap = new HashMap<>(items.size());
this.selectItems = new ArrayList<>(items.size());
List<FieldInfo> fields = this.encoder.getFields();
for (int i = 0; i < items.size(); i++) {
SelectItem item = items.get(i);
Expand All @@ -108,8 +110,12 @@ private void initTransformSql() throws JSQLParserException {
fieldName = exprItem.getAlias().getName();
}
}
this.selectItemMap.put(fieldName,
OperatorTools.buildParser(exprItem.getExpression()));
this.selectItems
.add(new ValueParserNode(fieldName, OperatorTools.buildParser(exprItem.getExpression())));
} else if (item instanceof AllColumns) {
fieldName = item.toString();
this.encoder.getFields().clear();
this.selectItems.add(new ValueParserNode(fieldName, null));
}
}
}
Expand Down Expand Up @@ -137,10 +143,18 @@ public List<O> transform(I input, Map<String, Object> extParams) {

// parse value
SinkData sinkData = new DefaultSinkData();
for (Entry<String, ValueParser> entry : this.selectItemMap.entrySet()) {
String fieldName = entry.getKey();
for (ValueParserNode node : this.selectItems) {
String fieldName = node.getFieldName();
ValueParser parser = node.getParser();
if (parser == null && StringUtils.equals(fieldName, SinkEncoder.ALL_SOURCE_FIELD_SIGN)) {
if (input instanceof String) {
sinkData.addField(fieldName, (String) input);
} else {
sinkData.addField(fieldName, "");
}
continue;
}
try {
ValueParser parser = entry.getValue();
Object fieldValue = parser.parse(sourceData, i, context);
sinkData.addField(fieldName, String.valueOf(fieldValue));
} catch (Throwable t) {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.inlong.sdk.transform.process;

import org.apache.inlong.sdk.transform.process.parser.ValueParser;

import lombok.AllArgsConstructor;
import lombok.Data;

/**
* ValueParserNode
*/
@AllArgsConstructor
@Data
public class ValueParserNode {

private String fieldName;
private ValueParser parser;
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.inlong.sdk.transform.process.converter;

public class DoubleConverter implements TypeConverter {

@Override
public Object convert(String value) throws Exception {
return Double.parseDouble(value);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.inlong.sdk.transform.process.converter;

public class LongConverter implements TypeConverter {

@Override
public Object convert(String value) throws Exception {
return Long.parseLong(value);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.inlong.sdk.transform.process.function;

import org.apache.inlong.sdk.transform.decode.SourceData;
import org.apache.inlong.sdk.transform.process.Context;
import org.apache.inlong.sdk.transform.process.operator.ExpressionOperator;
import org.apache.inlong.sdk.transform.process.operator.OperatorTools;
import org.apache.inlong.sdk.transform.process.parser.ValueParser;

import net.sf.jsqlparser.expression.Expression;
import net.sf.jsqlparser.expression.Function;

import java.util.List;

/**
* IfFunction
* description: if(expr,r1,r2) -- expr is an expression, if it holds, return r1; otherwise, return r2
*/
@TransformFunction(names = {"if"})
public class IfFunction implements ValueParser {

private final ExpressionOperator expressionOperator;
private final ValueParser tureValueParser;
private final ValueParser falseValueParser;

public IfFunction(Function expr) {
List<Expression> expressions = expr.getParameters().getExpressions();
expressionOperator = OperatorTools.buildOperator(expressions.get(0));
tureValueParser = OperatorTools.buildParser(expressions.get(1));
falseValueParser = OperatorTools.buildParser(expressions.get(2));
}

@Override
public Object parse(SourceData sourceData, int rowIndex, Context context) {
boolean condition = expressionOperator.check(sourceData, rowIndex, context);
return condition ? tureValueParser.parse(sourceData, rowIndex, context)
: falseValueParser.parse(sourceData, rowIndex, context);
}
}
Loading

0 comments on commit a064277

Please sign in to comment.