Skip to content

Commit

Permalink
[INLONG-10999][SDK] Support to return raw data by star sign in transf…
Browse files Browse the repository at this point in the history
…ormer SQL (apache#11004)

* [INLONG-10999][SDK] Support to return raw data by star sign in transformer SQL

* add more UT Case

* fix code format problems

* fix pom.xml problem
  • Loading branch information
luchunliang authored Sep 3, 2024
1 parent e188410 commit edf93bd
Show file tree
Hide file tree
Showing 6 changed files with 146 additions and 12 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,11 @@ public String encode(SinkData sinkData, Context context) {
} else {
for (String fieldName : sinkData.keyList()) {
String fieldValue = sinkData.getField(fieldName);
EscapeUtils.escapeContent(builder, delimiter, escapeChar, fieldValue);
if (StringUtils.equals(fieldName, ALL_SOURCE_FIELD_SIGN)) {
builder.append(fieldValue);
} else {
EscapeUtils.escapeContent(builder, delimiter, escapeChar, fieldValue);
}
builder.append(delimiter);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,11 @@ public String encode(SinkData sinkData, Context context) {
if (fields == null || fields.size() == 0) {
for (String fieldName : sinkData.keyList()) {
String fieldValue = sinkData.getField(fieldName);
builder.append(fieldName).append(kvDelimiter).append(fieldValue).append(entryDelimiter);
if (StringUtils.equals(fieldName, ALL_SOURCE_FIELD_SIGN)) {
builder.append(fieldValue).append(entryDelimiter);
} else {
builder.append(fieldName).append(kvDelimiter).append(fieldValue).append(entryDelimiter);
}
}
} else {
for (FieldInfo field : fields) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,8 @@
*/
public interface SinkEncoder<Output> {

public static final String ALL_SOURCE_FIELD_SIGN = "*";

Output encode(SinkData sinkData, Context context);

List<FieldInfo> getFields();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,23 +31,23 @@
import com.google.common.collect.ImmutableMap;
import net.sf.jsqlparser.JSQLParserException;
import net.sf.jsqlparser.parser.CCJSqlParserManager;
import net.sf.jsqlparser.statement.select.AllColumns;
import net.sf.jsqlparser.statement.select.PlainSelect;
import net.sf.jsqlparser.statement.select.Select;
import net.sf.jsqlparser.statement.select.SelectExpressionItem;
import net.sf.jsqlparser.statement.select.SelectItem;
import org.apache.commons.lang3.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.StringReader;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;

/**
* TransformProcessor
*
*
*/
public class TransformProcessor<I, O> {

Expand All @@ -61,7 +61,9 @@ public class TransformProcessor<I, O> {

private PlainSelect transformSelect;
private ExpressionOperator where;
private Map<String, ValueParser> selectItemMap;
private List<ValueParserNode> selectItems;

private boolean includeAllSourceFields = false;

public static <I, O> TransformProcessor<I, O> create(
TransformConfig config,
Expand Down Expand Up @@ -91,7 +93,7 @@ private void initTransformSql() throws JSQLParserException {
this.transformSelect = (PlainSelect) select.getSelectBody();
this.where = OperatorTools.buildOperator(this.transformSelect.getWhere());
List<SelectItem> items = this.transformSelect.getSelectItems();
this.selectItemMap = new HashMap<>(items.size());
this.selectItems = new ArrayList<>(items.size());
List<FieldInfo> fields = this.encoder.getFields();
for (int i = 0; i < items.size(); i++) {
SelectItem item = items.get(i);
Expand All @@ -108,8 +110,12 @@ private void initTransformSql() throws JSQLParserException {
fieldName = exprItem.getAlias().getName();
}
}
this.selectItemMap.put(fieldName,
OperatorTools.buildParser(exprItem.getExpression()));
this.selectItems
.add(new ValueParserNode(fieldName, OperatorTools.buildParser(exprItem.getExpression())));
} else if (item instanceof AllColumns) {
fieldName = item.toString();
this.encoder.getFields().clear();
this.selectItems.add(new ValueParserNode(fieldName, null));
}
}
}
Expand Down Expand Up @@ -137,10 +143,18 @@ public List<O> transform(I input, Map<String, Object> extParams) {

// parse value
SinkData sinkData = new DefaultSinkData();
for (Entry<String, ValueParser> entry : this.selectItemMap.entrySet()) {
String fieldName = entry.getKey();
for (ValueParserNode node : this.selectItems) {
String fieldName = node.getFieldName();
ValueParser parser = node.getParser();
if (parser == null && StringUtils.equals(fieldName, SinkEncoder.ALL_SOURCE_FIELD_SIGN)) {
if (input instanceof String) {
sinkData.addField(fieldName, (String) input);
} else {
sinkData.addField(fieldName, "");
}
continue;
}
try {
ValueParser parser = entry.getValue();
Object fieldValue = parser.parse(sourceData, i, context);
sinkData.addField(fieldName, String.valueOf(fieldValue));
} catch (Throwable t) {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.inlong.sdk.transform.process;

import org.apache.inlong.sdk.transform.process.parser.ValueParser;

import lombok.AllArgsConstructor;
import lombok.Data;

/**
* ValueParserNode
*/
@AllArgsConstructor
@Data
public class ValueParserNode {

private String fieldName;
private ValueParser parser;
}
Original file line number Diff line number Diff line change
Expand Up @@ -350,4 +350,80 @@ public void testPb2CsvForNow() throws Exception {
List<String> output = processor.transform(srcBytes, new HashMap<>());
Assert.assertEquals(2, output.size());
}
@Test
public void testCsv2Star() throws Exception {
List<FieldInfo> fields = this.getTestFieldList("ftime", "extinfo");
CsvSourceInfo csvSource = new CsvSourceInfo("UTF-8", '|', '\\', fields);
CsvSinkInfo csvSink = new CsvSinkInfo("UTF-8", '|', '\\', new ArrayList<>());
String transformSql = "select *";
TransformConfig config = new TransformConfig(transformSql);
// case1
TransformProcessor<String, String> processor1 = TransformProcessor
.create(config, SourceDecoderFactory.createCsvDecoder(csvSource),
SinkEncoderFactory.createCsvEncoder(csvSink));

List<String> output1 = processor1.transform("2024-04-28 00:00:00|ok", new HashMap<>());
Assert.assertEquals(1, output1.size());
Assert.assertEquals(output1.get(0), "2024-04-28 00:00:00|ok");
// case2
config.setTransformSql("select * from source where extinfo!='ok'");
TransformProcessor<String, String> processor2 = TransformProcessor
.create(config, SourceDecoderFactory.createCsvDecoder(csvSource),
SinkEncoderFactory.createCsvEncoder(csvSink));

List<String> output2 = processor2.transform("2024-04-28 00:00:00|ok", new HashMap<>());
Assert.assertEquals(0, output2.size());
// case3
config.setTransformSql("select *,extinfo,ftime from source where extinfo!='ok'");
TransformProcessor<String, String> processor3 = TransformProcessor
.create(config, SourceDecoderFactory.createCsvDecoder(csvSource),
SinkEncoderFactory.createCsvEncoder(csvSink));

List<String> output3 = processor3.transform("2024-04-28 00:00:00|nok", new HashMap<>());
Assert.assertEquals(1, output3.size());
Assert.assertEquals(output3.get(0), "2024-04-28 00:00:00|nok|nok|2024-04-28 00:00:00");
// case4
CsvSourceInfo csvSourceNoField = new CsvSourceInfo("UTF-8", '|', '\\', new ArrayList<>());
CsvSinkInfo csvSinkNoField = new CsvSinkInfo("UTF-8", '|', '\\', new ArrayList<>());
config.setTransformSql("select *,$2,$1 from source where $2='nok'");
TransformProcessor<String, String> processor4 = TransformProcessor
.create(config, SourceDecoderFactory.createCsvDecoder(csvSourceNoField),
SinkEncoderFactory.createCsvEncoder(csvSinkNoField));

List<String> output4 = processor4.transform("2024-04-28 00:00:00|nok", new HashMap<>());
Assert.assertEquals(1, output4.size());
Assert.assertEquals(output4.get(0), "2024-04-28 00:00:00|nok|nok|2024-04-28 00:00:00");
}

@Test
public void testKv2Star() throws Exception {
List<FieldInfo> fields = this.getTestFieldList("ftime", "extinfo");
KvSourceInfo kvSource = new KvSourceInfo("UTF-8", fields);
KvSinkInfo kvSink = new KvSinkInfo("UTF-8", new ArrayList<>());
String transformSql = "select *";
TransformConfig config = new TransformConfig(transformSql);
// case1
TransformProcessor<String, String> processor1 = TransformProcessor
.create(config, SourceDecoderFactory.createKvDecoder(kvSource),
SinkEncoderFactory.createKvEncoder(kvSink));
List<String> output1 = processor1.transform("ftime=2024-04-28 00:00:00&extinfo=ok", new HashMap<>());
Assert.assertEquals(1, output1.size());
Assert.assertEquals(output1.get(0), "ftime=2024-04-28 00:00:00&extinfo=ok");
// case2
config.setTransformSql("select * from source where extinfo!='ok'");
TransformProcessor<String, String> processor2 = TransformProcessor
.create(config, SourceDecoderFactory.createKvDecoder(kvSource),
SinkEncoderFactory.createKvEncoder(kvSink));
List<String> output2 = processor2.transform("ftime=2024-04-28 00:00:00&extinfo=ok", new HashMap<>());
Assert.assertEquals(0, output2.size());
// case3
config.setTransformSql("select *,extinfo e1,ftime f1 from source where extinfo!='ok'");
TransformProcessor<String, String> processor3 = TransformProcessor
.create(config, SourceDecoderFactory.createKvDecoder(kvSource),
SinkEncoderFactory.createKvEncoder(kvSink));

List<String> output3 = processor3.transform("ftime=2024-04-28 00:00:00&extinfo=nok", new HashMap<>());
Assert.assertEquals(1, output3.size());
Assert.assertEquals(output3.get(0), "ftime=2024-04-28 00:00:00&extinfo=nok&e1=nok&f1=2024-04-28 00:00:00");
}
}

0 comments on commit edf93bd

Please sign in to comment.