Skip to content

Commit

Permalink
[INLONG-10618][SDK] Transform SQL support common functions(Including …
Browse files Browse the repository at this point in the history
…substring, locate, to_date and date_format) (apache#10744)

* [INLONG-10618][SDK] Transform SQL support common functions(Including substring, locate, to_date and date_format)

* [INLONG-10618][SDK] Fix unit test failure, set time zone to Shanghai

* [INLONG-10618][SDK] Make the DateTimeFormatter object reusable to avoid creating multiple identical DateTimeFormatter objects.

* [INLONG-10618][SDK] Make the SimpleDateFormat object reusable to avoid creating multiple identical SimpleDateFormat objects.

---------

Co-authored-by: jameswyli <[email protected]>
  • Loading branch information
yfsn666 and jameswyli authored Aug 6, 2024
1 parent 30ce46c commit 8a27664
Show file tree
Hide file tree
Showing 7 changed files with 603 additions and 0 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,90 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.inlong.sdk.transform.process.function;

import org.apache.inlong.sdk.transform.decode.SourceData;
import org.apache.inlong.sdk.transform.process.Context;
import org.apache.inlong.sdk.transform.process.operator.OperatorTools;
import org.apache.inlong.sdk.transform.process.parser.ValueParser;

import net.sf.jsqlparser.expression.Expression;
import net.sf.jsqlparser.expression.Function;

import java.math.BigDecimal;
import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;

/**
* DateFormatFunction
* description: date_format(timestamp,format)--converts timestamp(in seconds) to a value of string in the format
* specified by the date format string. The format string is compatible with Java’s SimpleDateFormat
*/
public class DateFormatFunction implements ValueParser {

private ValueParser timestampParser;
private ValueParser formatParser;
private static final Map<String, SimpleDateFormat> SIMPLE_DATE_FORMATS = new ConcurrentHashMap<>();

/**
* Constructor
*
* @param expr
*/
public DateFormatFunction(Function expr) {
List<Expression> expressions = expr.getParameters().getExpressions();
timestampParser = OperatorTools.buildParser(expressions.get(0));
formatParser = OperatorTools.buildParser(expressions.get(1));
}

/**
* parse
*
* @param sourceData
* @param rowIndex
* @return
*/
@Override
public Object parse(SourceData sourceData, int rowIndex, Context context) {
Object timestampObj = timestampParser.parse(sourceData, rowIndex, context);
Object formatObj = formatParser.parse(sourceData, rowIndex, context);
BigDecimal timestamp = OperatorTools.parseBigDecimal(timestampObj);
String format = OperatorTools.parseString(formatObj);
SimpleDateFormat sdf = getSimpleDateFormat(format);
// the timestamp is in seconds, multiply 1000 to get milliseconds
Date date = new Date(timestamp.longValue() * 1000);
return sdf.format(date);
}

/**
* getSimpleDateFormat
*
* @param pattern
* @return
*/
private SimpleDateFormat getSimpleDateFormat(String pattern) {
SimpleDateFormat sdf = SIMPLE_DATE_FORMATS.get(pattern);
if (sdf == null) {
sdf = new SimpleDateFormat(pattern);
SIMPLE_DATE_FORMATS.put(pattern, sdf);
}
return sdf;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,87 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.inlong.sdk.transform.process.function;

import org.apache.inlong.sdk.transform.decode.SourceData;
import org.apache.inlong.sdk.transform.process.Context;
import org.apache.inlong.sdk.transform.process.operator.OperatorTools;
import org.apache.inlong.sdk.transform.process.parser.ValueParser;

import net.sf.jsqlparser.expression.Expression;
import net.sf.jsqlparser.expression.Function;

import java.util.List;

/**
* LocateFunction
* description: locate(string1, string2[, integer])
* - returns the position of the first occurrence of string1 in string2 after position integer
* - returns 0 if not found
* - returns NULL if any of arguments is NULL
*/
public class LocateFunction implements ValueParser {

private ValueParser stringParser1;
private ValueParser stringParser2;
private ValueParser startPositionParser;

/**
* Constructor
*
* @param expr
*/
public LocateFunction(Function expr) {
List<Expression> expressions = expr.getParameters().getExpressions();
// Determine the number of arguments and build parser
stringParser1 = OperatorTools.buildParser(expressions.get(0));
stringParser2 = OperatorTools.buildParser(expressions.get(1));
if (expressions.size() == 3) {
startPositionParser = OperatorTools.buildParser(expressions.get(2));
}
}

/**
* parse
*
* @param sourceData
* @param rowIndex
* @return
*/
@Override
public Object parse(SourceData sourceData, int rowIndex, Context context) {
Object stringObj1 = stringParser1.parse(sourceData, rowIndex, context);
Object stringObj2 = stringParser2.parse(sourceData, rowIndex, context);
// If any of arguments is null, return null
if (stringObj1 == null || stringObj2 == null) {
return null;
}
String str1 = OperatorTools.parseString(stringObj1);
String str2 = OperatorTools.parseString(stringObj2);
if (startPositionParser != null) {
Object startPositionObj = startPositionParser.parse(sourceData, rowIndex, context);
// if startPositionObj is null, return null
if (startPositionObj == null) {
return null;
}
int startPosition = OperatorTools.parseBigDecimal(startPositionObj).intValue();
return str2.indexOf(str1, startPosition - 1) + 1;
} else {
return str2.indexOf(str1) + 1;
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,81 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.inlong.sdk.transform.process.function;

import org.apache.inlong.sdk.transform.decode.SourceData;
import org.apache.inlong.sdk.transform.process.Context;
import org.apache.inlong.sdk.transform.process.operator.OperatorTools;
import org.apache.inlong.sdk.transform.process.parser.ValueParser;

import net.sf.jsqlparser.expression.Expression;
import net.sf.jsqlparser.expression.Function;

import java.util.List;

/**
* SubstringFunction
* description: substring(string FROM INT1 [ FOR INT2 ])--returns a substring of STRING starting from position INT1 with
* length INT2 (to the end by default)
*/
public class SubstringFunction implements ValueParser {

private ValueParser stringParser;
private ValueParser startPositionParser;
private ValueParser lengthParser;

/**
* Constructor
* @param expr
*/
public SubstringFunction(Function expr) {
List<Expression> expressions = expr.getParameters().getExpressions();
// Determine the number of arguments and build parser
stringParser = OperatorTools.buildParser(expressions.get(0));
startPositionParser = OperatorTools.buildParser(expressions.get(1));
if (expressions.size() == 3) {
lengthParser = OperatorTools.buildParser(expressions.get(2));
}
}

/**
* parse
* @param sourceData
* @param rowIndex
* @return
*/
@Override
public Object parse(SourceData sourceData, int rowIndex, Context context) {
Object stringObj = stringParser.parse(sourceData, rowIndex, context);
Object startPositionObj = startPositionParser.parse(sourceData, rowIndex, context);
String str = OperatorTools.parseString(stringObj);
int start = OperatorTools.parseBigDecimal(startPositionObj).intValue();
if (start > str.length()) {
return "";
}
if (lengthParser != null) {
Object lengthObj = lengthParser.parse(sourceData, rowIndex, context);
int len = OperatorTools.parseBigDecimal(lengthObj).intValue();
if (len <= 0) {
return "";
}
return str.substring(Math.max(start - 1, 0), Math.min(start - 1 + len, str.length()));
} else {
return str.substring(Math.max(start - 1, 0));
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,93 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.inlong.sdk.transform.process.function;

import org.apache.inlong.sdk.transform.decode.SourceData;
import org.apache.inlong.sdk.transform.process.Context;
import org.apache.inlong.sdk.transform.process.operator.OperatorTools;
import org.apache.inlong.sdk.transform.process.parser.ValueParser;

import net.sf.jsqlparser.expression.Expression;
import net.sf.jsqlparser.expression.Function;

import java.time.LocalDate;
import java.time.format.DateTimeFormatter;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;

/**
* ToDateFunction
* description: to_date(string1[, string2])--converts a date string string1 with format string2 (by default ‘yyyy-MM-dd’) to a date
*/
public class ToDateFunction implements ValueParser {

private ValueParser stringParser1;
private ValueParser stringParser2;
private static final Map<String, DateTimeFormatter> INPUT_FORMATTERS = new ConcurrentHashMap<>();
private static final DateTimeFormatter OUTPUT_FORMATTER = DateTimeFormatter.ofPattern("yyyy-MM-dd");

/**
* Constructor
*
* @param expr
*/
public ToDateFunction(Function expr) {
List<Expression> expressions = expr.getParameters().getExpressions();
// Determine the number of arguments and build parser
stringParser1 = OperatorTools.buildParser(expressions.get(0));
if (expressions.size() == 2) {
stringParser2 = OperatorTools.buildParser(expressions.get(1));
}
}

/**
* parse
*
* @param sourceData
* @param rowIndex
* @return
*/
@Override
public Object parse(SourceData sourceData, int rowIndex, Context context) {
Object stringObj1 = stringParser1.parse(sourceData, rowIndex, context);
String str1 = OperatorTools.parseString(stringObj1);
String str2 = "yyyy-MM-dd";
if (stringParser2 != null) {
Object stringObj2 = stringParser2.parse(sourceData, rowIndex, context);
str2 = OperatorTools.parseString(stringObj2);
}
LocalDate date = LocalDate.parse(str1, getDateTimeFormatter(str2));
return date.format(OUTPUT_FORMATTER);
}

/**
* getDateTimeFormatter
*
* @param pattern
* @return
*/
private DateTimeFormatter getDateTimeFormatter(String pattern) {
DateTimeFormatter formatter = INPUT_FORMATTERS.get(pattern);
if (formatter == null) {
formatter = DateTimeFormatter.ofPattern(pattern);
INPUT_FORMATTERS.put(pattern, formatter);
}
return formatter;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,14 +19,18 @@

import org.apache.inlong.sdk.transform.process.function.AbsFunction;
import org.apache.inlong.sdk.transform.process.function.ConcatFunction;
import org.apache.inlong.sdk.transform.process.function.DateFormatFunction;
import org.apache.inlong.sdk.transform.process.function.ExpFunction;
import org.apache.inlong.sdk.transform.process.function.LnFunction;
import org.apache.inlong.sdk.transform.process.function.LocateFunction;
import org.apache.inlong.sdk.transform.process.function.Log10Function;
import org.apache.inlong.sdk.transform.process.function.Log2Function;
import org.apache.inlong.sdk.transform.process.function.LogFunction;
import org.apache.inlong.sdk.transform.process.function.NowFunction;
import org.apache.inlong.sdk.transform.process.function.PowerFunction;
import org.apache.inlong.sdk.transform.process.function.SqrtFunction;
import org.apache.inlong.sdk.transform.process.function.SubstringFunction;
import org.apache.inlong.sdk.transform.process.function.ToDateFunction;
import org.apache.inlong.sdk.transform.process.parser.AdditionParser;
import org.apache.inlong.sdk.transform.process.parser.ColumnParser;
import org.apache.inlong.sdk.transform.process.parser.DivisionParser;
Expand Down Expand Up @@ -85,6 +89,10 @@ public class OperatorTools {
functionMap.put("log2", Log2Function::new);
functionMap.put("log", LogFunction::new);
functionMap.put("exp", ExpFunction::new);
functionMap.put("substring", SubstringFunction::new);
functionMap.put("locate", LocateFunction::new);
functionMap.put("to_date", ToDateFunction::new);
functionMap.put("date_format", DateFormatFunction::new);
}

public static ExpressionOperator buildOperator(Expression expr) {
Expand Down
Loading

0 comments on commit 8a27664

Please sign in to comment.