Skip to content

Commit

Permalink
[INLONG-11019][SDK] Optimize the mapping strategy of TransformSQL sel…
Browse files Browse the repository at this point in the history
…ect field list (apache#11021)
  • Loading branch information
luchunliang authored Sep 6, 2024
1 parent 50c7b5d commit 179e478
Show file tree
Hide file tree
Showing 4 changed files with 148 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@

package org.apache.inlong.sdk.transform.encode;

import lombok.Data;

import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
Expand All @@ -26,6 +28,7 @@
* DefaultSinkData
*
*/
@Data
public class DefaultSinkData implements SinkData {

private List<String> keyList = new ArrayList<>();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,16 +35,37 @@ public class TransformConfig {
@JsonProperty("configuration")
private Map<String, Object> configuration;

@JsonProperty("strictOrder")
private boolean strictOrder = true;

@JsonProperty("ignoreConfigError")
private boolean ignoreConfigError = true;

@JsonCreator
public TransformConfig(@JsonProperty("transformSql") String transformSql) {
this(transformSql, ImmutableMap.of());
this(transformSql, ImmutableMap.of(), true, true);
}

@JsonCreator
public TransformConfig(@JsonProperty("transformSql") String transformSql,
@JsonProperty("configuration") Map<String, Object> configuration) {
this(transformSql, configuration, true, true);
}
@JsonCreator
public TransformConfig(@JsonProperty("transformSql") String transformSql,
@JsonProperty("strictOrder") boolean strictOrder) {
this(transformSql, ImmutableMap.of(), strictOrder, true);
}

@JsonCreator
public TransformConfig(@JsonProperty("transformSql") String transformSql,
@JsonProperty("configuration") Map<String, Object> configuration,
@JsonProperty("strictOrder") boolean strictOrder,
@JsonProperty("ignoreConfigError") boolean ignoreConfigError) {
this.transformSql = Preconditions.checkNotNull(transformSql, "transform sql should not be null");
this.configuration = configuration;
this.strictOrder = strictOrder;
this.ignoreConfigError = ignoreConfigError;
}

/**
Expand All @@ -61,6 +82,16 @@ public Map<String, Object> getConfiguration() {
return configuration;
}

@JsonProperty("strictOrder")
public boolean isStrictOrder() {
return strictOrder;
}

@JsonProperty("ignoreConfigError")
public boolean isIgnoreConfigError() {
return ignoreConfigError;
}

/**
* set transformSql
* @param transformSql the transformSql to set
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@
import org.apache.inlong.sdk.transform.decode.SourceData;
import org.apache.inlong.sdk.transform.decode.SourceDecoder;
import org.apache.inlong.sdk.transform.encode.DefaultSinkData;
import org.apache.inlong.sdk.transform.encode.SinkData;
import org.apache.inlong.sdk.transform.encode.SinkEncoder;
import org.apache.inlong.sdk.transform.pojo.FieldInfo;
import org.apache.inlong.sdk.transform.pojo.TransformConfig;
Expand Down Expand Up @@ -63,7 +62,7 @@ public class TransformProcessor<I, O> {
private ExpressionOperator where;
private List<ValueParserNode> selectItems;

private boolean includeAllSourceFields = false;
private List<String> sinkFieldList;

public static <I, O> TransformProcessor<I, O> create(
TransformConfig config,
Expand All @@ -84,6 +83,11 @@ private TransformProcessor(
}

private void init() throws JSQLParserException {
if (!config.isStrictOrder() && encoder != null && encoder.getFields() != null) {
List<FieldInfo> fields = encoder.getFields();
this.sinkFieldList = new ArrayList<>(fields.size());
fields.forEach(v -> this.sinkFieldList.add(v.getName()));
}
this.initTransformSql();
}

Expand All @@ -98,7 +102,7 @@ private void initTransformSql() throws JSQLParserException {
for (int i = 0; i < items.size(); i++) {
SelectItem item = items.get(i);
String fieldName = null;
if (i < fields.size()) {
if (config.isStrictOrder() && i < fields.size()) {
fieldName = fields.get(i).getName();
}
if (item instanceof SelectExpressionItem) {
Expand All @@ -109,6 +113,10 @@ private void initTransformSql() throws JSQLParserException {
} else {
fieldName = exprItem.getAlias().getName();
}
if (!this.checkSelectField(fieldName)) {
throw new JSQLParserException(
String.format("Field name:%s can not be found in sink field list.", fieldName));
}
}
this.selectItems
.add(new ValueParserNode(fieldName, OperatorTools.buildParser(exprItem.getExpression())));
Expand All @@ -120,6 +128,13 @@ private void initTransformSql() throws JSQLParserException {
}
}

public boolean checkSelectField(String fieldName) {
if (config.isIgnoreConfigError()) {
return true;
}
return this.sinkFieldList != null && this.sinkFieldList.contains(fieldName);
}

public List<O> transform(I input) {
return this.transform(input, EMPTY_EXT_PARAMS);
}
Expand All @@ -142,7 +157,7 @@ public List<O> transform(I input, Map<String, Object> extParams) {
}

// parse value
SinkData sinkData = new DefaultSinkData();
DefaultSinkData sinkData = new DefaultSinkData();
for (ValueParserNode node : this.selectItems) {
String fieldName = node.getFieldName();
ValueParser parser = node.getParser();
Expand All @@ -163,6 +178,9 @@ public List<O> transform(I input, Map<String, Object> extParams) {
}
}

if (this.sinkFieldList != null) {
sinkData.setKeyList(this.sinkFieldList);
}
// encode
sinkDatas.add(this.encoder.encode(sinkData, context));
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,91 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.inlong.sdk.transform.process.processor;

import org.apache.inlong.sdk.transform.decode.SourceDecoderFactory;
import org.apache.inlong.sdk.transform.encode.SinkEncoderFactory;
import org.apache.inlong.sdk.transform.pojo.CsvSinkInfo;
import org.apache.inlong.sdk.transform.pojo.CsvSourceInfo;
import org.apache.inlong.sdk.transform.pojo.FieldInfo;
import org.apache.inlong.sdk.transform.pojo.KvSinkInfo;
import org.apache.inlong.sdk.transform.pojo.KvSourceInfo;
import org.apache.inlong.sdk.transform.pojo.TransformConfig;
import org.apache.inlong.sdk.transform.process.TransformProcessor;

import net.sf.jsqlparser.JSQLParserException;
import org.junit.Assert;
import org.junit.Test;

import java.util.HashMap;
import java.util.List;

public class TestCsv2CsvForErrorOrderProcessor extends AbstractProcessorTestBase {

@Test
public void testCsv2CsvForErrorOrder() throws Exception {
List<FieldInfo> sourceFields = this.getTestFieldList("ftime", "extinfo", "data");
CsvSourceInfo csvSource = new CsvSourceInfo("UTF-8", '|', '\\', sourceFields);
List<FieldInfo> sinkFields = this.getTestFieldList("field1", "field2", "field3");
CsvSinkInfo csvSink = new CsvSinkInfo("UTF-8", '|', '\\', sinkFields);
String transformSql = "select ftime as field2,data as field3,extinfo as field4 from source where extinfo='ok'";
TransformConfig config = new TransformConfig(transformSql, false);
// case1
TransformProcessor<String, String> processor1 = TransformProcessor
.create(config, SourceDecoderFactory.createCsvDecoder(csvSource),
SinkEncoderFactory.createCsvEncoder(csvSink));

List<String> output1 = processor1.transform("2024-04-28 00:00:00|ok|data1", new HashMap<>());
Assert.assertEquals(1, output1.size());
Assert.assertEquals("|2024-04-28 00:00:00|data1", output1.get(0));
}

@Test
public void testKv2KvForErrorOrder() throws Exception {
List<FieldInfo> sourceFields = this.getTestFieldList("key1", "key2", "key3", "key4");
KvSourceInfo kvSource = new KvSourceInfo("UTF-8", sourceFields);
List<FieldInfo> sinkFields = this.getTestFieldList("field1", "field2", "field3");
KvSinkInfo kvSink = new KvSinkInfo("UTF-8", sinkFields);
String transformSql = "select key4 as field3, key2 as field6, key1 as field1";
TransformConfig config = new TransformConfig(transformSql, false);
// case1
TransformProcessor<String, String> processor1 = TransformProcessor
.create(config, SourceDecoderFactory.createKvDecoder(kvSource),
SinkEncoderFactory.createKvEncoder(kvSink));

List<String> output1 = processor1.transform("key1=string11&key2=string12&key3=number11&key4=number12",
new HashMap<>());
Assert.assertEquals(1, output1.size());
Assert.assertEquals("field1=string11&field2=&field3=number12", output1.get(0));
}

@Test
public void testKv2KvForConfigError() throws Exception {
List<FieldInfo> sourceFields = this.getTestFieldList("key1", "key2", "key3", "key4");
KvSourceInfo kvSource = new KvSourceInfo("UTF-8", sourceFields);
List<FieldInfo> sinkFields = this.getTestFieldList("field1", "field2", "field3");
KvSinkInfo kvSink = new KvSinkInfo("UTF-8", sinkFields);
String transformSql = "select key4 as field3, key2 as field6, key1 as field1";
TransformConfig config = new TransformConfig(transformSql, new HashMap<>(), false, false);
// case1
Assert.assertThrows(JSQLParserException.class, () -> {
TransformProcessor
.create(config, SourceDecoderFactory.createKvDecoder(kvSource),
SinkEncoderFactory.createKvEncoder(kvSink));
});
}
}

0 comments on commit 179e478

Please sign in to comment.