Skip to content

Commit

Permalink
[INLONG-11219][SDK] Transform support COALESCE() function
Browse files Browse the repository at this point in the history
  • Loading branch information
emptyOVO committed Sep 30, 2024
1 parent e0d7f8d commit 2c3f923
Show file tree
Hide file tree
Showing 2 changed files with 184 additions and 0 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,88 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.inlong.sdk.transform.process.function;

import org.apache.inlong.sdk.transform.decode.SourceData;
import org.apache.inlong.sdk.transform.process.Context;
import org.apache.inlong.sdk.transform.process.operator.OperatorTools;
import org.apache.inlong.sdk.transform.process.parser.ValueParser;

import net.sf.jsqlparser.expression.Expression;
import net.sf.jsqlparser.expression.Function;

import java.math.BigDecimal;
import java.sql.Timestamp;
import java.util.ArrayList;
import java.util.Date;
import java.util.List;

/**
* CoalesceFunction
*
*/
@TransformFunction(names = {"coalesce"})
public class CoalesceFunction implements ValueParser {

private List<ValueParser> parserList;

public CoalesceFunction(Function expr) {
if (expr.getParameters() == null) {
this.parserList = new ArrayList<>();
} else {
List<Expression> params = expr.getParameters().getExpressions();
parserList = new ArrayList<>(params.size());
for (Expression param : params) {
ValueParser node = OperatorTools.buildParser(param);
parserList.add(node);
}
}
}

@Override
public Object parse(SourceData sourceData, int rowIndex, Context context) {
for (ValueParser node : parserList) {
Object parseObj = node.parse(sourceData, rowIndex, context);
Object valueObj = parseValue(parseObj);
if (valueObj != null) {
return parseValue(parseObj);
}
}
return null;
}

private Object parseValue(Object value) {
Object parsedValue;
if (value instanceof BigDecimal) {
parsedValue = OperatorTools.parseBigDecimal(value);
} else if (value instanceof Timestamp) {
parsedValue = OperatorTools.parseTimestamp(value);
} else if (value instanceof Date) {
parsedValue = OperatorTools.parseDate(value);
} else if (value instanceof byte[]) {
parsedValue = OperatorTools.parseBytes(value);
} else {
parsedValue = OperatorTools.parseString(value);
}
// invalid
if (parsedValue instanceof String && ((String) parsedValue).isEmpty()) {
return null;
}

return parsedValue;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,96 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.inlong.sdk.transform.process.function.flowcontrol;

import org.apache.inlong.sdk.transform.decode.SourceDecoderFactory;
import org.apache.inlong.sdk.transform.encode.SinkEncoderFactory;
import org.apache.inlong.sdk.transform.pojo.TransformConfig;
import org.apache.inlong.sdk.transform.process.TransformProcessor;
import org.apache.inlong.sdk.transform.process.function.string.AbstractFunctionStringTestBase;

import org.junit.Assert;
import org.junit.Test;

import java.util.HashMap;
import java.util.List;

public class TestCoalesceFunction extends AbstractFunctionStringTestBase {

@Test
public void testCoalesceFunction() throws Exception {
String transformSql1 = "select coalesce(string1, string2, string3) from source";
TransformConfig config1 = new TransformConfig(transformSql1);
TransformProcessor<String, String> processor1 = TransformProcessor
.create(config1, SourceDecoderFactory.createCsvDecoder(csvSource),
SinkEncoderFactory.createKvEncoder(kvSink));

// case1: coalesce('Transform SQL', 'SQL', 'hh')
/*
* List<String> output1 = processor1.transform("Transform SQL|SQL|hh", new HashMap<>()); Assert.assertEquals(1,
* output1.size()); Assert.assertEquals(output1.get(0), "result=Transform SQL");
*/

// case2: coalesce('', 'SQL', 'hh')
List<String> output2 = processor1.transform("|SQL|hh", new HashMap<>());
Assert.assertEquals(1, output2.size());
Assert.assertEquals(output2.get(0), "result=SQL");

// case2: coalesce('', '', 'hh')
List<String> output3 = processor1.transform("||hh", new HashMap<>());
Assert.assertEquals(1, output3.size());
Assert.assertEquals(output3.get(0), "result=hh");

String transformSql2 = "select coalesce(string1, string2) from source";
TransformConfig config2 = new TransformConfig(transformSql2);
TransformProcessor<String, String> processor2 = TransformProcessor
.create(config2, SourceDecoderFactory.createCsvDecoder(csvSource),
SinkEncoderFactory.createKvEncoder(kvSink));

// case3: coalesce('', '')
List<String> output4 = processor2.transform("|", new HashMap<>());
Assert.assertEquals(1, output4.size());
Assert.assertEquals(output4.get(0), "result=");

// case4: coalesce('Transform SQL', 'Transformer')
List<String> output5 = processor2.transform("Transform SQL|Transformer", new HashMap<>());
Assert.assertEquals(1, output5.size());
Assert.assertEquals(output5.get(0), "result=Transform SQL");

// case5: coalesce('', 'm S')
List<String> output6 = processor2.transform("|m S", new HashMap<>());
Assert.assertEquals(1, output6.size());
Assert.assertEquals(output6.get(0), "result=m S");

String transformSql3 = "select coalesce(string1) from source";
TransformConfig config3 = new TransformConfig(transformSql3);
TransformProcessor<String, String> processor3 = TransformProcessor
.create(config3, SourceDecoderFactory.createCsvDecoder(csvSource),
SinkEncoderFactory.createKvEncoder(kvSink));

// case6: coalesce('')
List<String> output7 = processor3.transform("", new HashMap<>());
Assert.assertEquals(1, output7.size());
Assert.assertEquals(output7.get(0), "result=");

// case6: coalesce('inlong')
List<String> output8 = processor3.transform("inlong", new HashMap<>());
Assert.assertEquals(1, output8.size());
Assert.assertEquals(output8.get(0), "result=inlong");

}
}

0 comments on commit 2c3f923

Please sign in to comment.