diff --git a/.github/workflows/ci_ut.yml b/.github/workflows/ci_ut.yml index 5c433872bf8..0d67d46320b 100644 --- a/.github/workflows/ci_ut.yml +++ b/.github/workflows/ci_ut.yml @@ -101,7 +101,7 @@ jobs: CI: false - name: Unit test with Maven - run: mvn --batch-mode --update-snapshots -e -V test -pl !:sort-end-to-end-tests-v1.15,!:sort-end-to-end-tests-v1.13 + run: mvn --batch-mode --update-snapshots -e -V test -pl !:sort-end-to-end-tests-v1.15,!:sort-end-to-end-tests-v1.13,!:sort-end-to-end-tests-v1.18 env: CI: false diff --git a/.github/workflows/ci_ut_flink18.yml b/.github/workflows/ci_ut_flink18.yml new file mode 100644 index 00000000000..5c2f2709f27 --- /dev/null +++ b/.github/workflows/ci_ut_flink18.yml @@ -0,0 +1,96 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +name: + InLong Unit Test For Flink 1.18 + +on: + push: + paths: + - '.github/workflows/ci_ut_flink18.yml' + - 'inlong-sort/**' + - '!**.md' + + pull_request: + paths: + - '.github/workflows/ci_ut_flink18.yml' + - 'inlong-sort/**' + - '!**.md' + +jobs: + unit-test: + name: Unit Test + runs-on: ubuntu-22.04 + steps: + - name: Checkout + uses: actions/checkout@v4 + + # Release space size + - name: Remove unnecessary packages + run: | + echo "=== Before pruning ===" + df -h + sudo rm -rf /usr/share/dotnet + sudo rm -rf /usr/local/lib/android + sudo rm -rf /opt/ghc + sudo rm -rf /opt/hostedtoolcache + echo "=== After pruning ===" + df -h + + - name: Set up JDK + uses: actions/setup-java@v4 + with: + java-version: 8 + distribution: adopt + + - name: Cache Maven packages + uses: actions/cache@v4 + with: + path: | + ~/.m2/repository/*/*/* + !~/.m2/repository/org/apache/inlong + key: ${{ runner.os }}-inlong-flink18-${{ hashFiles('**/pom.xml') }} + restore-keys: ${{ runner.os }}-inlong-flink18 + + - name: Build for Flink 1.18 with Maven + run: mvn --update-snapshots -e -V clean install -U -pl :sort-end-to-end-tests-v1.18 -am -Pv1.18 -DskipTests -Dhttp.keepAlive=false -Dmaven.wagon.http.pool=false -Dmaven.wagon.httpconnectionManager.ttlSeconds=120 -Daether.connector.http.reuseConnections=false -Daether.connector.requestTimeout=60000 + env: + CI: false + + - name: Unit test for Flink 1.18 with Maven + run: mvn --update-snapshots -e -V verify -pl :sort-end-to-end-tests-v1.18 -am -Pv1.18 + env: + CI: false + + - name: Upload unit test results + if: ${{ failure() }} + uses: actions/upload-artifact@v4 + with: + name: surefire-reports + path: ./**/target/surefire-reports/ + if-no-files-found: ignore + + - name: Upload integration test results + if: ${{ failure() }} + uses: actions/upload-artifact@v4 + with: + name: failsafe-reports + path: ./**/target/failsafe-reports/ + if-no-files-found: ignore + + - name: Clean up build packages + run: mvn clean \ No newline at end of file diff --git a/inlong-dashboard/src/ui/pages/GroupDetail/DataStream/PreviewModal.tsx b/inlong-dashboard/src/ui/pages/GroupDetail/DataStream/PreviewModal.tsx index fa9edff445c..6593c59e6c3 100644 --- a/inlong-dashboard/src/ui/pages/GroupDetail/DataStream/PreviewModal.tsx +++ b/inlong-dashboard/src/ui/pages/GroupDetail/DataStream/PreviewModal.tsx @@ -69,9 +69,9 @@ const Comp: React.FC = ({ inlongGroupId, inlongStreamId, ...modalProps }) exitsId = true; } const width = - (cur['fieldName'].length > cur['fieldValue'].length - ? cur['fieldName'].length - : cur['fieldValue'].length) * 10; + (cur['fieldName']?.length > cur['fieldValue']?.length + ? cur['fieldName']?.length + : cur['fieldValue']?.length) * 10; acc.push({ title: cur['fieldName'], key: cur['fieldName'], diff --git a/inlong-dashboard/src/ui/pages/ModuleAudit/AuditModule/config.tsx b/inlong-dashboard/src/ui/pages/ModuleAudit/AuditModule/config.tsx index 81d05bd15d0..2ef04c5bd57 100644 --- a/inlong-dashboard/src/ui/pages/ModuleAudit/AuditModule/config.tsx +++ b/inlong-dashboard/src/ui/pages/ModuleAudit/AuditModule/config.tsx @@ -350,13 +350,12 @@ export const getTableColumns = (source, dim) => { title: item.auditName, dataIndex: item.auditId, render: text => { - let color = 'black'; if (text?.includes('+')) { - color = 'red'; + return {text}; } else if (text?.includes('-')) { - color = 'green'; + return {text}; } - return {text}; + return {text}; }, })); return [ diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/datatype/KvDataTypeOperator.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/datatype/KvDataTypeOperator.java index 1eb73888cd3..8b0a40af379 100644 --- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/datatype/KvDataTypeOperator.java +++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/datatype/KvDataTypeOperator.java @@ -23,12 +23,14 @@ import org.apache.inlong.manager.common.util.CommonBeanUtils; import org.apache.inlong.manager.pojo.consume.BriefMQMessage.FieldInfo; import org.apache.inlong.manager.pojo.stream.InlongStreamInfo; +import org.apache.inlong.sdk.transform.decode.KvUtils; import lombok.extern.slf4j.Slf4j; import org.apache.commons.lang3.StringUtils; import org.springframework.stereotype.Service; import java.util.List; +import java.util.Map; @Slf4j @Service @@ -51,18 +53,20 @@ public List parseFields(String str, InlongStreamInfo streamInfo) thro if (StringUtils.isNotBlank(streamInfo.getKvSeparator())) { kvSeparator = (char) Integer.parseInt(streamInfo.getKvSeparator()); } - String[] bodys = StringUtils.split(str, separator); - if (bodys.length != fields.size()) { - log.warn( - "The number of reported fields does not match the number of stream fields for groupId={}, streamId={}, reported field size ={}, stream field size ={}", - streamInfo.getInlongGroupId(), streamInfo.getInlongStreamId(), bodys.length, fields.size()); - return fields; + Character escapeChar = null; + if (StringUtils.isNotBlank(streamInfo.getDataEscapeChar())) { + escapeChar = streamInfo.getDataEscapeChar().charAt(0); } - for (int i = 0; i < bodys.length; i++) { - String body = bodys[i]; - String[] values = StringUtils.split(body, kvSeparator); - fields.get(i).setFieldName(values[0]); - fields.get(i).setFieldValue(values[1]); + Character lineSeparator = null; + if (StringUtils.isNotBlank(streamInfo.getLineSeparator())) { + lineSeparator = (char) Integer.parseInt(streamInfo.getLineSeparator()); + } + List> rowValues = + KvUtils.splitKv(str, separator, kvSeparator, escapeChar, '\"', lineSeparator); + for (Map row : rowValues) { + for (FieldInfo fieldInfo : fields) { + fieldInfo.setFieldValue(row.get(fieldInfo.getFieldName())); + } } } catch (Exception e) { log.warn("parse fields failed for groupId = {}, streamId = {}", streamInfo.getInlongGroupId(), diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/group/InlongGroupServiceImpl.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/group/InlongGroupServiceImpl.java index 54407681917..3ee94a23ce0 100644 --- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/group/InlongGroupServiceImpl.java +++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/group/InlongGroupServiceImpl.java @@ -205,7 +205,7 @@ public String save(InlongGroupRequest request, String operator) { Preconditions.expectNotNull(request, "inlong group request cannot be empty"); String groupId = request.getInlongGroupId(); - InlongGroupEntity entity = groupMapper.selectByGroupId(groupId); + InlongGroupEntity entity = groupMapper.selectByGroupIdWithoutTenant(groupId); if (entity != null) { LOGGER.error("groupId={} has already exists", groupId); throw new BusinessException(ErrorCodeEnum.GROUP_DUPLICATE); @@ -278,7 +278,7 @@ public List batchSave(List groupRequestList, St @Override public Boolean exist(String groupId) { Preconditions.expectNotNull(groupId, ErrorCodeEnum.GROUP_ID_IS_EMPTY.getMessage()); - InlongGroupEntity entity = groupMapper.selectByGroupId(groupId); + InlongGroupEntity entity = groupMapper.selectByGroupIdWithoutTenant(groupId); LOGGER.debug("success to check inlong group {}, exist? {}", groupId, entity != null); return entity != null; } diff --git a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/decode/JsonSourceData.java b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/decode/JsonSourceData.java index 5f92c34965c..539f4b06dc5 100644 --- a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/decode/JsonSourceData.java +++ b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/decode/JsonSourceData.java @@ -111,20 +111,22 @@ public String getField(int rowNum, String fieldName) { // error data return ""; } + // node is not array if (!node.isArray()) { current = newElement; - } else { - if (!newElement.isJsonArray()) { - // error data - return ""; - } - JsonArray newArray = newElement.getAsJsonArray(); - if (node.getArrayIndex() >= newArray.size()) { - // error data - return ""; - } - current = newArray.get(node.getArrayIndex()); + continue; + } + // node is an array + if (!newElement.isJsonArray()) { + // error data + return ""; + } + JsonArray newArray = newElement.getAsJsonArray(); + if (node.getArrayIndex() >= newArray.size()) { + // error data + return ""; } + current = newArray.get(node.getArrayIndex()); } return current.getAsString(); } catch (Exception e) { diff --git a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/decode/JsonSourceDecoder.java b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/decode/JsonSourceDecoder.java index 2d16d92bc1a..426ae167a28 100644 --- a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/decode/JsonSourceDecoder.java +++ b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/decode/JsonSourceDecoder.java @@ -85,39 +85,40 @@ public SourceData decode(byte[] srcBytes, Context context) { public SourceData decode(String srcString, Context context) { JsonObject root = gson.fromJson(srcString, JsonObject.class); JsonArray childRoot = null; - if (CollectionUtils.isNotEmpty(childNodes)) { - JsonElement current = root; - for (JsonNode node : childNodes) { - if (!current.isJsonObject()) { + if (CollectionUtils.isEmpty(childNodes)) { + return new JsonSourceData(root, null); + } + JsonElement current = root; + for (JsonNode node : childNodes) { + if (!current.isJsonObject()) { + // error data + return new JsonSourceData(root, null); + } + JsonElement newElement = current.getAsJsonObject().get(node.getName()); + if (newElement == null) { + // error data + return new JsonSourceData(root, null); + } + if (!node.isArray()) { + current = newElement; + } else { + if (!newElement.isJsonArray()) { // error data - return new JsonSourceData(root, childRoot); + return new JsonSourceData(root, null); } - JsonElement newElement = current.getAsJsonObject().get(node.getName()); - if (newElement == null) { + JsonArray newArray = newElement.getAsJsonArray(); + if (node.getArrayIndex() >= newArray.size()) { // error data - return new JsonSourceData(root, childRoot); + return new JsonSourceData(root, null); } - if (!node.isArray()) { - current = newElement; - } else { - if (!newElement.isJsonArray()) { - // error data - return new JsonSourceData(root, childRoot); - } - JsonArray newArray = newElement.getAsJsonArray(); - if (node.getArrayIndex() >= newArray.size()) { - // error data - return new JsonSourceData(root, childRoot); - } - current = newArray.get(node.getArrayIndex()); - } - } - if (!current.isJsonArray()) { - // error data - return new JsonSourceData(root, childRoot); + current = newArray.get(node.getArrayIndex()); } - childRoot = current.getAsJsonArray(); } + if (!current.isJsonArray()) { + // error data + return new JsonSourceData(root, null); + } + childRoot = current.getAsJsonArray(); return new JsonSourceData(root, childRoot); } } diff --git a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/function/DateDiffFunction.java b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/function/DateDiffFunction.java new file mode 100644 index 00000000000..77b2c93dfc6 --- /dev/null +++ b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/function/DateDiffFunction.java @@ -0,0 +1,89 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.inlong.sdk.transform.process.function; + +import org.apache.inlong.sdk.transform.decode.SourceData; +import org.apache.inlong.sdk.transform.process.Context; +import org.apache.inlong.sdk.transform.process.operator.OperatorTools; +import org.apache.inlong.sdk.transform.process.parser.ValueParser; + +import net.sf.jsqlparser.expression.Expression; +import net.sf.jsqlparser.expression.Function; + +import java.time.LocalDate; +import java.time.LocalDateTime; +import java.time.format.DateTimeFormatter; +import java.time.temporal.ChronoUnit; +import java.util.List; + +/** + * DateDiffFunction + * description: DATEDIFF(d1, d2) + * - return null if one of the two parameters is null or "" + * - return null if one of the two parameters has an incorrect date format + * - return the number of days between the dates d1->d2. + */ +@TransformFunction(names = {"datediff", "date_diff"}) +public class DateDiffFunction implements ValueParser { + + private final ValueParser leftDateParser; + private final ValueParser rightDateParser; + private static final DateTimeFormatter DEFAULT_FORMAT_DATE_TIME = + DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss"); + private static final DateTimeFormatter DEFAULT_FORMAT_DATE = DateTimeFormatter.ofPattern("yyyy-MM-dd"); + + public DateDiffFunction(Function expr) { + List expressions = expr.getParameters().getExpressions(); + leftDateParser = OperatorTools.buildParser(expressions.get(0)); + rightDateParser = OperatorTools.buildParser(expressions.get(1)); + } + + @Override + public Object parse(SourceData sourceData, int rowIndex, Context context) { + Object leftDateObj = leftDateParser.parse(sourceData, rowIndex, context); + Object rightDateObj = rightDateParser.parse(sourceData, rowIndex, context); + if (leftDateObj == null || rightDateObj == null) { + return null; + } + String leftDate = OperatorTools.parseString(leftDateObj); + String rightDate = OperatorTools.parseString(rightDateObj); + if (leftDate.isEmpty() || rightDate.isEmpty()) { + return null; + } + try { + LocalDate left = getLocalDate(leftDate); + LocalDate right = getLocalDate(rightDate); + return ChronoUnit.DAYS.between(right, left); + } catch (Exception e) { + return null; + } + } + + public LocalDate getLocalDate(String dateString) { + DateTimeFormatter formatter = null; + LocalDate dateTime = null; + if (dateString.indexOf(' ') != -1) { + formatter = DEFAULT_FORMAT_DATE_TIME; + dateTime = LocalDateTime.parse(dateString, formatter).toLocalDate(); + } else { + formatter = DEFAULT_FORMAT_DATE; + dateTime = LocalDate.parse(dateString, formatter); + } + return dateTime; + } +} diff --git a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/function/DateExtractFunction.java b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/function/DateExtractFunction.java index ff0eb6b01eb..a1ab7e2dfa6 100644 --- a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/function/DateExtractFunction.java +++ b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/function/DateExtractFunction.java @@ -42,6 +42,7 @@ * - dayofyear(date)--returns the day of a year (an integer between 1 and 366) from SQL date * - dayofmonth(date)--returns the day of a month (an integer between 1 and 31) from SQL date * - dayofweek(date)--returns the day of a week (an integer between 1(Sunday) and 7(Saturday)) from SQL date + * - dayname(date)--returns the name of the day of the week from SQL date */ public abstract class DateExtractFunction implements ValueParser { @@ -50,7 +51,7 @@ public abstract class DateExtractFunction implements ValueParser { private static final TemporalField weekOfYearField = WeekFields.of(Locale.getDefault()).weekOfWeekBasedYear(); public enum DateExtractFunctionType { - YEAR, QUARTER, MONTH, WEEK, DAY_OF_YEAR, DAY_OF_MONTH, DAY_OF_WEEK + YEAR, QUARTER, MONTH, WEEK, DAY_OF_YEAR, DAY_OF_MONTH, DAY_OF_WEEK, DAY_NAME } @TransformFunction(names = {"year"}) @@ -85,7 +86,7 @@ public WeekExtractFunction(Function expr) { } } - @TransformFunction(names = {"day_of_year"}) + @TransformFunction(names = {"day_of_year", "dayofyear"}) public static class DayOfYearExtractFunction extends DateExtractFunction { public DayOfYearExtractFunction(Function expr) { @@ -93,7 +94,7 @@ public DayOfYearExtractFunction(Function expr) { } } - @TransformFunction(names = {"day_of_month"}) + @TransformFunction(names = {"day_of_month", "dayofmonth"}) public static class DayOfMonthExtractFunction extends DateExtractFunction { public DayOfMonthExtractFunction(Function expr) { @@ -101,7 +102,7 @@ public DayOfMonthExtractFunction(Function expr) { } } - @TransformFunction(names = {"day_of_week"}) + @TransformFunction(names = {"day_of_week", "dayofweek"}) public static class DayOfWeekExtractFunction extends DateExtractFunction { public DayOfWeekExtractFunction(Function expr) { @@ -109,6 +110,14 @@ public DayOfWeekExtractFunction(Function expr) { } } + @TransformFunction(names = {"day_name", "dayname"}) + public static class DayNameExtractFunction extends DateExtractFunction { + + public DayNameExtractFunction(Function expr) { + super(DateExtractFunctionType.DAY_NAME, expr); + } + } + public DateExtractFunction(DateExtractFunctionType type, Function expr) { this.type = type; List expressions = expr.getParameters().getExpressions(); @@ -142,6 +151,9 @@ public Object parse(SourceData sourceData, int rowIndex, Context context) { // dayofweek(between 1 and 7) case DAY_OF_WEEK: return localDate.getDayOfWeek().getValue() % 7 + 1; + // dayname(between Sunday and Saturday) + case DAY_NAME: + return localDate.getDayOfWeek().name(); default: return null; } diff --git a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/function/FromUnixTimeFunction.java b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/function/FromUnixTimeFunction.java index b7b68a0e733..6a897ee3e04 100644 --- a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/function/FromUnixTimeFunction.java +++ b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/function/FromUnixTimeFunction.java @@ -41,7 +41,7 @@ * format(default is ‘yyyy-MM-dd HH:mm:ss’). numeric is an internal timestamp value representing seconds * since ‘1970-01-01 00:00:00’ UTC, such as produced by the UNIX_TIMESTAMP() function. */ -@TransformFunction(names = {"from_unix_time"}) +@TransformFunction(names = {"from_unix_time", "form_unixtime"}) public class FromUnixTimeFunction implements ValueParser { private ValueParser numericParser; diff --git a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/function/HexFunction.java b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/function/HexFunction.java index c641209162a..0d33056c728 100644 --- a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/function/HexFunction.java +++ b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/function/HexFunction.java @@ -36,7 +36,7 @@ * - If the input argument is a string, the HEX function converts each character in the string to its corresponding hexadecimal ASCII encoding and returns the hexadecimal representation of the entire string. */ @TransformFunction(names = {"hex"}) -class HexFunction implements ValueParser { +public class HexFunction implements ValueParser { private static final Pattern BIG_DECIMAL_PATTERN = Pattern.compile("^[-+]?\\d+(\\.\\d+)?([eE][-+]?\\d+)?$"); diff --git a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/function/IfNullFunction.java b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/function/IfNullFunction.java new file mode 100644 index 00000000000..f19b291d7e0 --- /dev/null +++ b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/function/IfNullFunction.java @@ -0,0 +1,72 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.inlong.sdk.transform.process.function; + +import org.apache.inlong.sdk.transform.decode.SourceData; +import org.apache.inlong.sdk.transform.process.Context; +import org.apache.inlong.sdk.transform.process.operator.OperatorTools; +import org.apache.inlong.sdk.transform.process.parser.ValueParser; + +import lombok.extern.slf4j.Slf4j; +import net.sf.jsqlparser.expression.Expression; +import net.sf.jsqlparser.expression.Function; + +import java.util.List; + +/** + * IfNullFunction + * description: IFNULL(expr1,expr2) + * - return expr1 if expr1 is not NULL + * - return expr2 otherwise + */ +@Slf4j +@TransformFunction(names = {"ifnull", "if_null"}) +public class IfNullFunction implements ValueParser { + + private final ValueParser firstExprParser; + private final ValueParser secondExprParser; + + public IfNullFunction(Function expr) { + List expressions = expr.getParameters().getExpressions(); + firstExprParser = OperatorTools.buildParser(expressions.get(0)); + secondExprParser = OperatorTools.buildParser(expressions.get(1)); + } + + @Override + public Object parse(SourceData sourceData, int rowIndex, Context context) { + try { + Object firstExprObj = firstExprParser.parse(sourceData, rowIndex, context); + if (firstExprObj == null) { + return parseSecondExpr(sourceData, rowIndex, context); + } + return firstExprObj; + } catch (Exception e) { + log.error("Value parsing failed", e); + return parseSecondExpr(sourceData, rowIndex, context); + } + } + + private Object parseSecondExpr(SourceData sourceData, int rowIndex, Context context) { + try { + return secondExprParser.parse(sourceData, rowIndex, context); + } catch (Exception e) { + log.error("Value parsing failed", e); + return null; + } + } +} diff --git a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/function/LpadFunction.java b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/function/LpadFunction.java new file mode 100644 index 00000000000..cce51ba1e42 --- /dev/null +++ b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/function/LpadFunction.java @@ -0,0 +1,81 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.inlong.sdk.transform.process.function; + +import org.apache.inlong.sdk.transform.decode.SourceData; +import org.apache.inlong.sdk.transform.process.Context; +import org.apache.inlong.sdk.transform.process.operator.OperatorTools; +import org.apache.inlong.sdk.transform.process.parser.ValueParser; + +import net.sf.jsqlparser.expression.Expression; +import net.sf.jsqlparser.expression.Function; + +import java.util.List; + +/** + * LpadFunction + * description: Lpad(s1,len,s2) Fill string s2 at the beginning of string s1 to make the string length len + * - return null if any of the three parameters is null or len is less than 0 + * - return the substring of s1 with subscripts in the range of [0, len) if len is less than or equal to the length of s1 + * - if s2 is "" + * - return "" if len is longer than the length of s1 + * - if s2 is not "" + * - return the filled string + */ +@TransformFunction(names = {"lpad"}) +public class LpadFunction implements ValueParser { + + private final ValueParser leftStringParser; + private final ValueParser lengthParser; + private final ValueParser rightStringParser; + + public LpadFunction(Function expr) { + List expressions = expr.getParameters().getExpressions(); + leftStringParser = OperatorTools.buildParser(expressions.get(0)); + lengthParser = OperatorTools.buildParser(expressions.get(1)); + rightStringParser = OperatorTools.buildParser(expressions.get(2)); + } + + @Override + public Object parse(SourceData sourceData, int rowIndex, Context context) { + Object leftStringObj = leftStringParser.parse(sourceData, rowIndex, context); + Object lengthObj = lengthParser.parse(sourceData, rowIndex, context); + Object rightStringObj = rightStringParser.parse(sourceData, rowIndex, context); + if (leftStringObj == null || lengthObj == null || rightStringObj == null) { + return null; + } + int len = Integer.parseInt(OperatorTools.parseString(lengthObj)); + if (len < 0) { + return null; + } + String leftStr = OperatorTools.parseString(leftStringObj); + if (len <= leftStr.length()) { + return leftStr.substring(0, len); + } + String rightStr = OperatorTools.parseString(rightStringObj); + if (rightStr.isEmpty()) { + return ""; + } + int padLen = len - leftStr.length(); + StringBuilder builder = new StringBuilder(padLen); + while (builder.length() < padLen) { + builder.append(rightStr); + } + return builder.substring(0, padLen).concat(leftStr); + } +} diff --git a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/function/RpadFunction.java b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/function/RpadFunction.java new file mode 100644 index 00000000000..c3357c7b552 --- /dev/null +++ b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/function/RpadFunction.java @@ -0,0 +1,81 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.inlong.sdk.transform.process.function; + +import org.apache.inlong.sdk.transform.decode.SourceData; +import org.apache.inlong.sdk.transform.process.Context; +import org.apache.inlong.sdk.transform.process.operator.OperatorTools; +import org.apache.inlong.sdk.transform.process.parser.ValueParser; + +import net.sf.jsqlparser.expression.Expression; +import net.sf.jsqlparser.expression.Function; + +import java.util.List; + +/** + * RpadFunction + * description: RPAD(s1,len,s2) : Fill string s2 at the end of string s1 to make the length of the string len + * - return null if any of the three parameters is null or len is less than 0 + * - return the substring of s1 with subscripts in the range of [0, len) if len is less than or equal to the length of s1 + * - if s2 is "" + * - return "" if len is longer than the length of s1 + * - if s2 is not "" + * - return the filled string + */ +@TransformFunction(names = {"rpad"}) +public class RpadFunction implements ValueParser { + + private final ValueParser leftStringParser; + private final ValueParser lengthParser; + private final ValueParser rightStringParser; + + public RpadFunction(Function expr) { + List expressions = expr.getParameters().getExpressions(); + leftStringParser = OperatorTools.buildParser(expressions.get(0)); + lengthParser = OperatorTools.buildParser(expressions.get(1)); + rightStringParser = OperatorTools.buildParser(expressions.get(2)); + } + + @Override + public Object parse(SourceData sourceData, int rowIndex, Context context) { + Object leftStringObj = leftStringParser.parse(sourceData, rowIndex, context); + Object lengthObj = lengthParser.parse(sourceData, rowIndex, context); + Object rightStringObj = rightStringParser.parse(sourceData, rowIndex, context); + if (leftStringObj == null || lengthObj == null || rightStringObj == null) { + return null; + } + int len = Integer.parseInt(OperatorTools.parseString(lengthObj)); + if (len < 0) { + return null; + } + String leftStr = OperatorTools.parseString(leftStringObj); + if (len <= leftStr.length()) { + return leftStr.substring(0, len); + } + String rightStr = OperatorTools.parseString(rightStringObj); + if (rightStr.isEmpty()) { + return ""; + } + StringBuilder builder = new StringBuilder(len); + builder.append(leftStr); + while (builder.length() < len) { + builder.append(rightStr); + } + return builder.substring(0, len); + } +} diff --git a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/function/SubstringFunction.java b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/function/SubstringFunction.java index 772dc2950cf..727c0484b4b 100644 --- a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/function/SubstringFunction.java +++ b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/function/SubstringFunction.java @@ -32,7 +32,7 @@ * description: substring(string FROM INT1 [ FOR INT2 ])--returns a substring of STRING starting from position INT1 with * length INT2 (to the end by default) */ -@TransformFunction(names = {"substring"}) +@TransformFunction(names = {"substring", "substr"}) public class SubstringFunction implements ValueParser { private ValueParser stringParser; @@ -41,6 +41,7 @@ public class SubstringFunction implements ValueParser { /** * Constructor + * * @param expr */ public SubstringFunction(Function expr) { @@ -55,6 +56,7 @@ public SubstringFunction(Function expr) { /** * parse + * * @param sourceData * @param rowIndex * @return diff --git a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/function/TimestampAddFunction.java b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/function/TimestampAddFunction.java index 9bcbb8c8e42..29b4636d81d 100644 --- a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/function/TimestampAddFunction.java +++ b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/function/TimestampAddFunction.java @@ -36,7 +36,7 @@ * The unit of the time interval is specified by the unit parameter, which should be one of the following values: * FRAC_SECOND, SECOND, MINUTE, HOUR, DAY, WEEK, MONTH, QUARTER, or YEAR. */ -@TransformFunction(names = {"timestamp_add"}) +@TransformFunction(names = {"timestamp_add", "timestampadd"}) public class TimestampAddFunction implements ValueParser { private ValueParser intervalParser; diff --git a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/operator/OperatorTools.java b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/operator/OperatorTools.java index 0a29d168926..fdf68153e26 100644 --- a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/operator/OperatorTools.java +++ b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/operator/OperatorTools.java @@ -39,7 +39,7 @@ /** * OperatorTools - * + * */ @Slf4j public class OperatorTools { diff --git a/inlong-sdk/transform-sdk/src/test/java/org/apache/inlong/sdk/transform/process/TestTransformArithmeticFunctionsProcessor.java b/inlong-sdk/transform-sdk/src/test/java/org/apache/inlong/sdk/transform/process/TestTransformArithmeticFunctionsProcessor.java index 87ce145df60..1e0f14025f0 100644 --- a/inlong-sdk/transform-sdk/src/test/java/org/apache/inlong/sdk/transform/process/TestTransformArithmeticFunctionsProcessor.java +++ b/inlong-sdk/transform-sdk/src/test/java/org/apache/inlong/sdk/transform/process/TestTransformArithmeticFunctionsProcessor.java @@ -224,6 +224,69 @@ public void testModuloFunction() throws Exception { } + @Test + public void testIfNullFunction() throws Exception { + String transformSql = null, data = null; + TransformConfig config = null; + TransformProcessor processor = null; + List output = null; + + // case1: ifnull(5, 3) + transformSql = "select ifnull(numeric1,numeric2) from source"; + config = new TransformConfig(transformSql); + processor = TransformProcessor + .create(config, SourceDecoderFactory.createCsvDecoder(csvSource), + SinkEncoderFactory.createKvEncoder(kvSink)); + data = "5|3|3|5"; + output = processor.transform(data, new HashMap<>()); + Assert.assertEquals(1, output.size()); + Assert.assertEquals("result=5", output.get(0)); + + // case2: ifnull(null,3) + transformSql = "select ifnull(xxd,numeric2) from source"; + config = new TransformConfig(transformSql); + processor = TransformProcessor + .create(config, SourceDecoderFactory.createCsvDecoder(csvSource), + SinkEncoderFactory.createKvEncoder(kvSink)); + data = "5|3|3|5"; + output = processor.transform(data, new HashMap<>()); + Assert.assertEquals(1, output.size()); + Assert.assertEquals("result=3", output.get(0)); + + // case3: ifnull(6 / 3,'YES') + transformSql = "select ifnull(numeric1 / numeric2,'YES') from source"; + config = new TransformConfig(transformSql); + processor = TransformProcessor + .create(config, SourceDecoderFactory.createCsvDecoder(csvSource), + SinkEncoderFactory.createKvEncoder(kvSink)); + data = "6|3|3|5"; + output = processor.transform(data, new HashMap<>()); + Assert.assertEquals(1, output.size()); + Assert.assertEquals("result=2", output.get(0)); + + // case4: ifnull(6 / 0,'YES') + transformSql = "select ifnull(numeric1 / numeric2,'YES') from source"; + config = new TransformConfig(transformSql); + processor = TransformProcessor + .create(config, SourceDecoderFactory.createCsvDecoder(csvSource), + SinkEncoderFactory.createKvEncoder(kvSink)); + data = "6|0|3|5"; + output = processor.transform(data, new HashMap<>()); + Assert.assertEquals(1, output.size()); + Assert.assertEquals("result=YES", output.get(0)); + + // case5: ifnull(6 / 0,3 / 0) + transformSql = "select ifnull(numeric1 / numeric2,numeric3 / numeric2) from source"; + config = new TransformConfig(transformSql); + processor = TransformProcessor + .create(config, SourceDecoderFactory.createCsvDecoder(csvSource), + SinkEncoderFactory.createKvEncoder(kvSink)); + data = "6|0|3|5"; + output = processor.transform(data, new HashMap<>()); + Assert.assertEquals(1, output.size()); + Assert.assertEquals("result=null", output.get(0)); + } + @Test public void testMd5Function() throws Exception { String transformSql = "select md5(numeric1) from source"; diff --git a/inlong-sdk/transform-sdk/src/test/java/org/apache/inlong/sdk/transform/process/TestTransformStringFunctionsProcessor.java b/inlong-sdk/transform-sdk/src/test/java/org/apache/inlong/sdk/transform/process/TestTransformStringFunctionsProcessor.java index 10cfa740c97..14511946070 100644 --- a/inlong-sdk/transform-sdk/src/test/java/org/apache/inlong/sdk/transform/process/TestTransformStringFunctionsProcessor.java +++ b/inlong-sdk/transform-sdk/src/test/java/org/apache/inlong/sdk/transform/process/TestTransformStringFunctionsProcessor.java @@ -428,6 +428,146 @@ public void testStrcmpFunction() throws Exception { Assert.assertEquals("result=null", output.get(0)); } + @Test + public void testRpadFunction() throws Exception { + String transformSql = null, data = null; + TransformConfig config = null; + TransformProcessor processor = null; + List output = null; + + transformSql = "select rpad(string1,numeric1,string2) from source"; + config = new TransformConfig(transformSql); + processor = TransformProcessor + .create(config, SourceDecoderFactory.createCsvDecoder(csvSource), + SinkEncoderFactory.createKvEncoder(kvSink)); + // case1: rpad('he',7,'xxd') + data = "he|xxd|cloud|7|3|3"; + output = processor.transform(data, new HashMap<>()); + Assert.assertEquals(1, output.size()); + Assert.assertEquals("result=hexxdxx", output.get(0)); + + // case2: rpad('he',1,'xxd') + data = "he|xxd|cloud|1|3|3"; + output = processor.transform(data, new HashMap<>()); + Assert.assertEquals(1, output.size()); + Assert.assertEquals("result=h", output.get(0)); + + // case3: rpad('he',1,'') + data = "he||cloud|1|3|3"; + output = processor.transform(data, new HashMap<>()); + Assert.assertEquals(1, output.size()); + Assert.assertEquals("result=h", output.get(0)); + + // case4: rpad('he',-1,'xxd') + data = "he|xxd|cloud|-1|3|3"; + output = processor.transform(data, new HashMap<>()); + Assert.assertEquals(1, output.size()); + Assert.assertEquals("result=null", output.get(0)); + + // case5: rpad(null,5,'xxd') + transformSql = "select rpad(xxd,numeric1,string2) from source"; + config = new TransformConfig(transformSql); + processor = TransformProcessor + .create(config, SourceDecoderFactory.createCsvDecoder(csvSource), + SinkEncoderFactory.createKvEncoder(kvSink)); + data = "he|xxd|cloud|5|3|3"; + output = processor.transform(data, new HashMap<>()); + Assert.assertEquals(1, output.size()); + Assert.assertEquals("result=null", output.get(0)); + + // case6: rpad('he',null,'xxd') + transformSql = "select rpad(string1,xxd,string2) from source"; + config = new TransformConfig(transformSql); + processor = TransformProcessor + .create(config, SourceDecoderFactory.createCsvDecoder(csvSource), + SinkEncoderFactory.createKvEncoder(kvSink)); + data = "he|xxd|cloud|5|3|3"; + output = processor.transform(data, new HashMap<>()); + Assert.assertEquals(1, output.size()); + Assert.assertEquals("result=null", output.get(0)); + + // case7: rpad('he',5,null) + transformSql = "select rpad(string1,numeric1,xxd) from source"; + config = new TransformConfig(transformSql); + processor = TransformProcessor + .create(config, SourceDecoderFactory.createCsvDecoder(csvSource), + SinkEncoderFactory.createKvEncoder(kvSink)); + data = "he|xxd|cloud|5|3|3"; + output = processor.transform(data, new HashMap<>()); + Assert.assertEquals(1, output.size()); + Assert.assertEquals("result=null", output.get(0)); + } + + @Test + public void testLpadFunction() throws Exception { + String transformSql = null, data = null; + TransformConfig config = null; + TransformProcessor processor = null; + List output = null; + + transformSql = "select lpad(string1,numeric1,string2) from source"; + config = new TransformConfig(transformSql); + processor = TransformProcessor + .create(config, SourceDecoderFactory.createCsvDecoder(csvSource), + SinkEncoderFactory.createKvEncoder(kvSink)); + // case1: lpad('he',7,'xxd') + data = "he|xxd|cloud|7|3|3"; + output = processor.transform(data, new HashMap<>()); + Assert.assertEquals(1, output.size()); + Assert.assertEquals("result=xxdxxhe", output.get(0)); + + // case2: lpad('he',1,'xxd') + data = "he|xxd|cloud|1|3|3"; + output = processor.transform(data, new HashMap<>()); + Assert.assertEquals(1, output.size()); + Assert.assertEquals("result=h", output.get(0)); + + // case3: lpad('he',1,'') + data = "he||cloud|1|3|3"; + output = processor.transform(data, new HashMap<>()); + Assert.assertEquals(1, output.size()); + Assert.assertEquals("result=h", output.get(0)); + + // case4: lpad('he',-1,'xxd') + data = "he|xxd|cloud|-1|3|3"; + output = processor.transform(data, new HashMap<>()); + Assert.assertEquals(1, output.size()); + Assert.assertEquals("result=null", output.get(0)); + + // case5: lpad(null,5,'xxd') + transformSql = "select lpad(xxd,numeric1,string2) from source"; + config = new TransformConfig(transformSql); + processor = TransformProcessor + .create(config, SourceDecoderFactory.createCsvDecoder(csvSource), + SinkEncoderFactory.createKvEncoder(kvSink)); + data = "he|xxd|cloud|5|3|3"; + output = processor.transform(data, new HashMap<>()); + Assert.assertEquals(1, output.size()); + Assert.assertEquals("result=null", output.get(0)); + + // case6: lpad('he',null,'xxd') + transformSql = "select lpad(string1,xxd,string2) from source"; + config = new TransformConfig(transformSql); + processor = TransformProcessor + .create(config, SourceDecoderFactory.createCsvDecoder(csvSource), + SinkEncoderFactory.createKvEncoder(kvSink)); + data = "he|xxd|cloud|5|3|3"; + output = processor.transform(data, new HashMap<>()); + Assert.assertEquals(1, output.size()); + Assert.assertEquals("result=null", output.get(0)); + + // case7: lpad('he',5,null) + transformSql = "select lpad(string1,numeric1,xxd) from source"; + config = new TransformConfig(transformSql); + processor = TransformProcessor + .create(config, SourceDecoderFactory.createCsvDecoder(csvSource), + SinkEncoderFactory.createKvEncoder(kvSink)); + data = "he|xxd|cloud|5|3|3"; + output = processor.transform(data, new HashMap<>()); + Assert.assertEquals(1, output.size()); + Assert.assertEquals("result=null", output.get(0)); + } + @Test public void testRightFunction() throws Exception { String transformSql = "select right(string1,numeric1) from source"; diff --git a/inlong-sdk/transform-sdk/src/test/java/org/apache/inlong/sdk/transform/process/TestTransformTemporalFunctionsProcessor.java b/inlong-sdk/transform-sdk/src/test/java/org/apache/inlong/sdk/transform/process/TestTransformTemporalFunctionsProcessor.java index 354b8b7f185..14c1fab04ba 100644 --- a/inlong-sdk/transform-sdk/src/test/java/org/apache/inlong/sdk/transform/process/TestTransformTemporalFunctionsProcessor.java +++ b/inlong-sdk/transform-sdk/src/test/java/org/apache/inlong/sdk/transform/process/TestTransformTemporalFunctionsProcessor.java @@ -216,6 +216,16 @@ public void testDateExtractFunction() throws Exception { List output7 = processor7.transform("2024-02-29", new HashMap<>()); Assert.assertEquals(1, output7.size()); Assert.assertEquals(output7.get(0), "result=5"); + + String transformSql8 = "select dayname(string1) from source"; + TransformConfig config8 = new TransformConfig(transformSql8); + TransformProcessor processor8 = TransformProcessor + .create(config8, SourceDecoderFactory.createCsvDecoder(csvSource), + SinkEncoderFactory.createKvEncoder(kvSink)); + // case8: dayname(2024-02-29) (Thursday) + List output8 = processor8.transform("2024-02-29", new HashMap<>()); + Assert.assertEquals(1, output8.size()); + Assert.assertEquals(output8.get(0), "result=THURSDAY"); } @Test @@ -379,6 +389,54 @@ public void testToTimestampFunction() throws Exception { Assert.assertEquals(output4.get(0), "result=1970-01-01 00:00:00.0"); } + @Test + public void testDateDiffFunction() throws Exception { + String transformSql = null; + TransformConfig config = null; + TransformProcessor processor = null; + List output = null; + + transformSql = "select datediff(string1,string2) from source"; + config = new TransformConfig(transformSql); + processor = TransformProcessor + .create(config, SourceDecoderFactory.createCsvDecoder(csvSource), + SinkEncoderFactory.createKvEncoder(kvSink)); + // case1: datediff('1970-01-01','1970-01-02') + output = processor.transform("1970-01-01|1970-01-02", new HashMap<>()); + Assert.assertEquals(1, output.size()); + Assert.assertEquals("result=-1", output.get(0)); + + // case2: datediff('1970-01-02','1970-01-01') + output = processor.transform("1970-01-02|1970-01-01", new HashMap<>()); + Assert.assertEquals(1, output.size()); + Assert.assertEquals("result=1", output.get(0)); + + // case3: datediff('2018-12-10 12:30:00', '2018-12-09 13:30:00') + output = processor.transform("2018-12-10 12:30:00|2018-12-09 13:30:00", new HashMap<>()); + Assert.assertEquals(1, output.size()); + Assert.assertEquals("result=1", output.get(0)); + + // case4: datediff('2018-12-10 12:30:00', '') + output = processor.transform("2018-12-10 12:30:00|", new HashMap<>()); + Assert.assertEquals(1, output.size()); + Assert.assertEquals("result=null", output.get(0)); + + // case5: datediff('2018-12', '2018-12-12') + output = processor.transform("2018-12|2018-12-12", new HashMap<>()); + Assert.assertEquals(1, output.size()); + Assert.assertEquals("result=null", output.get(0)); + + // case6: datediff('1970-01-01',null) + transformSql = "select datediff(string1,xxd) from source"; + config = new TransformConfig(transformSql); + processor = TransformProcessor + .create(config, SourceDecoderFactory.createCsvDecoder(csvSource), + SinkEncoderFactory.createKvEncoder(kvSink)); + output = processor.transform("1970-01-01|1970-01-02", new HashMap<>()); + Assert.assertEquals(1, output.size()); + Assert.assertEquals("result=null", output.get(0)); + } + @Test public void testLocalTimeFunction() throws Exception { String transformSql1 = "select localtime() from source"; diff --git a/inlong-sort/sort-core/pom.xml b/inlong-sort/sort-core/pom.xml index 2ef9506520a..e4881e6043c 100644 --- a/inlong-sort/sort-core/pom.xml +++ b/inlong-sort/sort-core/pom.xml @@ -72,18 +72,6 @@ ${mysql.jdbc.version} provided - - org.apache.inlong - sort-format-inlongmsg-base - ${project.version} - test - - - org.apache.inlong - sort-format-csv - ${project.version} - test - org.apache.hadoop hadoop-common @@ -106,6 +94,18 @@ flink-table-common provided + + org.apache.inlong + sort-format-csv + ${project.version} + test + + + org.apache.inlong + sort-format-inlongmsg-base + ${project.version} + test + org.apache.inlong sort-flink-dependencies-${sort.flink.version} @@ -251,6 +251,18 @@ flink-table-common provided + + org.apache.inlong + sort-format-csv + ${project.version} + test + + + org.apache.inlong + sort-format-inlongmsg-base + ${project.version} + test + org.apache.inlong sort-flink-dependencies-${sort.flink.version} @@ -372,6 +384,24 @@ ${project.version} test + + org.apache.inlong + sort-connector-jdbc-v1.18 + ${project.version} + test + + + org.apache.inlong + sort-connector-elasticsearch6-v1.18 + ${project.version} + test + + + org.apache.inlong + sort-connector-elasticsearch7-v1.18 + ${project.version} + test + diff --git a/inlong-sort/sort-dist/pom.xml b/inlong-sort/sort-dist/pom.xml index 0f28f495bf6..4541bb37eab 100644 --- a/inlong-sort/sort-dist/pom.xml +++ b/inlong-sort/sort-dist/pom.xml @@ -55,31 +55,6 @@ sort-format-common ${project.version} - - org.apache.inlong - sort-format-base - ${project.version} - - - org.apache.inlong - sort-format-csv - ${project.version} - - - org.apache.inlong - sort-format-inlongmsg-base - ${project.version} - - - org.apache.inlong - sort-format-inlongmsg-csv - ${project.version} - - - org.apache.inlong - sort-format-inlongmsg-kv - ${project.version} - org.apache.inlong sort-format-rowdata-kv @@ -134,6 +109,31 @@ sort-format-json-v1.13 ${project.version} + + org.apache.inlong + sort-format-csv + ${project.version} + + + org.apache.inlong + sort-format-inlongmsg-base + ${project.version} + + + org.apache.inlong + sort-format-inlongmsg-csv + ${project.version} + + + org.apache.inlong + sort-format-inlongmsg-kv + ${project.version} + + + org.apache.inlong + sort-format-base + ${project.version} + org.apache.flink flink-sql-parquet_${scala.binary.version} @@ -172,6 +172,31 @@ sort-format-json-v1.15 ${project.version} + + org.apache.inlong + sort-format-csv + ${project.version} + + + org.apache.inlong + sort-format-inlongmsg-base + ${project.version} + + + org.apache.inlong + sort-format-inlongmsg-csv + ${project.version} + + + org.apache.inlong + sort-format-inlongmsg-kv + ${project.version} + + + org.apache.inlong + sort-format-base + ${project.version} + org.apache.flink flink-sql-parquet diff --git a/inlong-sort/sort-end-to-end-tests/pom.xml b/inlong-sort/sort-end-to-end-tests/pom.xml index 04b87c02826..6c6319cd4e8 100644 --- a/inlong-sort/sort-end-to-end-tests/pom.xml +++ b/inlong-sort/sort-end-to-end-tests/pom.xml @@ -52,6 +52,15 @@ sort-end-to-end-tests-v1.15 + + v1.18 + + true + + + sort-end-to-end-tests-v1.18 + + diff --git a/inlong-sort/sort-end-to-end-tests/sort-end-to-end-tests-v1.18/pom.xml b/inlong-sort/sort-end-to-end-tests/sort-end-to-end-tests-v1.18/pom.xml new file mode 100644 index 00000000000..22c8e6fc6bd --- /dev/null +++ b/inlong-sort/sort-end-to-end-tests/sort-end-to-end-tests-v1.18/pom.xml @@ -0,0 +1,209 @@ + + + + 4.0.0 + + org.apache.inlong + sort-end-to-end-tests + 1.14.0-SNAPSHOT + + + sort-end-to-end-tests-v1.18 + Apache InLong - Sort End to End Tests v1.18 + + + ${project.parent.parent.parent.basedir} + 1.18.1 + 6.8.17 + 2.15.3-18.0 + + + + + org.apache.inlong + sort-dist + ${project.version} + test + + + org.testcontainers + testcontainers + + + org.testcontainers + postgresql + ${testcontainers.version} + + + org.postgresql + postgresql + test + + + org.testcontainers + elasticsearch + ${testcontainers.version} + + + org.elasticsearch.client + elasticsearch-rest-high-level-client + ${elasticsearch.version} + + + + org.elasticsearch.client + elasticsearch-rest-client + ${elasticsearch.version} + + + + org.apache.flink + flink-shaded-jackson + ${flink.shaded.jackson.version} + + + org.apache.flink + flink-test-utils + ${flink.version} + test + + + org.apache.logging.log4j + log4j-slf4j-impl + + + org.apache.logging.log4j + log4j-core + + + + + org.apache.inlong + sort-flink-dependencies-v1.18 + ${project.version} + test + + + org.apache.flink + flink-core + ${flink.version} + test + + + org.apache.flink + flink-json + ${flink.version} + test + + + org.apache.flink + flink-avro + ${flink.version} + test + + + org.apache.flink + flink-csv + ${flink.version} + test + + + org.apache.flink + flink-sql-avro + ${flink.version} + test + + + org.apache.flink + flink-runtime + ${flink.version} + test + + + org.apache.flink + flink-table-common + ${flink.version} + test + + + + + + org.apache.maven.plugins + maven-dependency-plugin + + + + org.apache.inlong + sort-dist + ${project.version} + sort-dist.jar + jar + ${project.build.directory}/dependencies + + + + + + copy-jars + + copy + + validate + + + + + + org.apache.maven.plugins + maven-failsafe-plugin + + + end-to-end-tests-v1.18 + integration-test + + + **/*.* + + 1 + + ${project.basedir} + + + + + + + + org.apache.maven.plugins + maven-deploy-plugin + + true + + + + + org.apache.maven.plugins + maven-surefire-plugin + ${plugin.surefire.version} + + + + diff --git a/inlong-sort/sort-end-to-end-tests/sort-end-to-end-tests-v1.18/src/test/java/org/apache/inlong/sort/tests/utils/FlinkContainerTestEnv.java b/inlong-sort/sort-end-to-end-tests/sort-end-to-end-tests-v1.18/src/test/java/org/apache/inlong/sort/tests/utils/FlinkContainerTestEnv.java new file mode 100644 index 00000000000..de6166442ea --- /dev/null +++ b/inlong-sort/sort-end-to-end-tests/sort-end-to-end-tests-v1.18/src/test/java/org/apache/inlong/sort/tests/utils/FlinkContainerTestEnv.java @@ -0,0 +1,241 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.inlong.sort.tests.utils; + +import org.apache.commons.io.IOUtils; +import org.apache.flink.api.common.JobStatus; +import org.apache.flink.api.common.time.Deadline; +import org.apache.flink.client.deployment.StandaloneClusterId; +import org.apache.flink.client.program.rest.RestClusterClient; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.RestOptions; +import org.apache.flink.runtime.client.JobStatusMessage; +import org.apache.flink.runtime.jobmaster.JobMaster; +import org.apache.flink.runtime.taskexecutor.TaskExecutor; +import org.apache.flink.table.api.ValidationException; +import org.apache.flink.util.TestLogger; +import org.junit.AfterClass; +import org.junit.ClassRule; +import org.junit.Rule; +import org.junit.rules.TemporaryFolder; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.testcontainers.containers.Container.ExecResult; +import org.testcontainers.containers.GenericContainer; +import org.testcontainers.containers.Network; +import org.testcontainers.images.builder.Transferable; + +import javax.annotation.Nullable; + +import java.io.File; +import java.io.FileInputStream; +import java.io.FileOutputStream; +import java.io.IOException; +import java.io.InputStream; +import java.nio.file.Files; +import java.nio.file.Path; +import java.nio.file.Paths; +import java.time.Duration; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collection; +import java.util.List; +import java.util.concurrent.TimeUnit; +import java.util.jar.JarEntry; +import java.util.jar.JarFile; +import java.util.jar.JarOutputStream; + +import static org.apache.flink.util.Preconditions.checkState; + +/** + * End to end base test environment for test sort-connectors. + * Every link : MySQL -> Xxx (Test connector) -> MySQL + */ +public abstract class FlinkContainerTestEnv extends TestLogger { + + static final Logger JM_LOG = LoggerFactory.getLogger(JobMaster.class); + static final Logger TM_LOG = LoggerFactory.getLogger(TaskExecutor.class); + static final Logger LOG = LoggerFactory.getLogger(FlinkContainerTestEnv.class); + + private static final Path SORT_DIST_JAR = TestUtils.getResource("sort-dist.jar"); + // ------------------------------------------------------------------------------------------ + // Flink Variables + // ------------------------------------------------------------------------------------------ + static final int JOB_MANAGER_REST_PORT = 8081; + static final int DEBUG_PORT = 20000; + static final String FLINK_BIN = "bin"; + static final String INTER_CONTAINER_JM_ALIAS = "jobmanager"; + static final String INTER_CONTAINER_TM_ALIAS = "taskmanager"; + static final String FLINK_PROPERTIES = String.join("\n", Arrays.asList( + "jobmanager.rpc.address: jobmanager", + "taskmanager.numberOfTaskSlots: 10", + "parallelism.default: 4", + "env.java.opts.jobmanager: -agentlib:jdwp=transport=dt_socket,server=y,suspend=n,address=20000", + "env.java.opts.taskmanager: -agentlib:jdwp=transport=dt_socket,server=y,suspend=n,address=20000", + // this is needed for oracle-cdc tests. + // see https://stackoverflow.com/a/47062742/4915129 + "env.java.opts: -Doracle.jdbc.timezoneAsRegion=false")); + + @ClassRule + public static final Network NETWORK = Network.newNetwork(); + + @Rule + public final TemporaryFolder temporaryFolder = new TemporaryFolder(); + + @Nullable + private static RestClusterClient restClusterClient; + + static GenericContainer jobManager; + static GenericContainer taskManager; + + @AfterClass + public static void after() { + if (restClusterClient != null) { + restClusterClient.close(); + } + if (jobManager != null) { + jobManager.stop(); + } + if (taskManager != null) { + taskManager.stop(); + } + } + + /** + * Submits a SQL job to the running cluster. + * + *

NOTE: You should not use {@code '\t'}. + */ + public void submitSQLJob(String sqlFile, Path... jars) + throws IOException, InterruptedException { + final List commands = new ArrayList<>(); + String containerSqlFile = copyToContainerTmpPath(jobManager, sqlFile); + commands.add(FLINK_BIN + "/flink run -d"); + commands.add("-c org.apache.inlong.sort.Entrance"); + commands.add(copyToContainerTmpPath(jobManager, constructDistJar(jars))); + commands.add("--sql.script.file"); + commands.add(containerSqlFile); + + ExecResult execResult = + jobManager.execInContainer("bash", "-c", String.join(" ", commands)); + LOG.info(execResult.getStdout()); + if (execResult.getExitCode() != 0) { + LOG.error(execResult.getStderr()); + throw new AssertionError("Failed when submitting the SQL job."); + } + } + + /** + * Get {@link RestClusterClient} connected to this FlinkContainer. + * + *

This method lazily initializes the REST client on-demand. + */ + public RestClusterClient getRestClusterClient() { + checkState( + jobManager.isRunning(), + "Cluster client should only be retrieved for a running cluster"); + try { + final Configuration clientConfiguration = new Configuration(); + clientConfiguration.set(RestOptions.ADDRESS, jobManager.getHost()); + clientConfiguration.set( + RestOptions.PORT, jobManager.getMappedPort(JOB_MANAGER_REST_PORT)); + this.restClusterClient = + new RestClusterClient<>(clientConfiguration, StandaloneClusterId.getInstance()); + } catch (Exception e) { + throw new IllegalStateException( + "Failed to create client for Flink container cluster", e); + } + return restClusterClient; + } + + /** + * Polling to detect task status until the task successfully into {@link JobStatus.RUNNING} + * + * @param timeout + */ + public void waitUntilJobRunning(Duration timeout) { + RestClusterClient clusterClient = getRestClusterClient(); + Deadline deadline = Deadline.fromNow(timeout); + while (deadline.hasTimeLeft()) { + Collection jobStatusMessages; + try { + jobStatusMessages = clusterClient.listJobs().get(10, TimeUnit.SECONDS); + } catch (Exception e) { + LOG.warn("Error when fetching job status.", e); + continue; + } + if (jobStatusMessages != null && !jobStatusMessages.isEmpty()) { + JobStatusMessage message = jobStatusMessages.iterator().next(); + JobStatus jobStatus = message.getJobState(); + if (jobStatus.isTerminalState()) { + throw new ValidationException( + String.format( + "Job has been terminated! JobName: %s, JobID: %s, Status: %s", + message.getJobName(), + message.getJobId(), + message.getJobState())); + } else if (jobStatus == JobStatus.RUNNING) { + return; + } + } + } + } + + /** + * Copy all other dependencies into user jar 'lib/' entry. + * Flink per-job mode only support upload one jar to cluster. + */ + private String constructDistJar(Path... jars) throws IOException { + + File newJar = temporaryFolder.newFile("sort-dist.jar"); + try ( + JarFile jarFile = new JarFile(SORT_DIST_JAR.toFile()); + JarOutputStream jos = new JarOutputStream(new FileOutputStream(newJar))) { + jarFile.stream().forEach(entry -> { + try (InputStream is = jarFile.getInputStream(entry)) { + jos.putNextEntry(entry); + jos.write(IOUtils.toByteArray(is)); + jos.closeEntry(); + } catch (IOException e) { + throw new RuntimeException(e); + } + }); + + for (Path jar : jars) { + try (InputStream is = new FileInputStream(jar.toFile())) { + jos.putNextEntry(new JarEntry("lib/" + jar.getFileName().toString())); + jos.write(IOUtils.toByteArray(is)); + jos.closeEntry(); + } catch (IOException e) { + throw new RuntimeException(e); + } + } + + } + return newJar.getAbsolutePath(); + } + + // Should not a big file, all file data will load into memory, then copy to container. + private String copyToContainerTmpPath(GenericContainer container, String filePath) throws IOException { + Path path = Paths.get(filePath); + byte[] fileData = Files.readAllBytes(path); + String containerPath = "/tmp/" + path.getFileName(); + container.copyFileToContainer(Transferable.of(fileData), containerPath); + return containerPath; + } +} diff --git a/inlong-sort/sort-end-to-end-tests/sort-end-to-end-tests-v1.18/src/test/java/org/apache/inlong/sort/tests/utils/FlinkContainerTestEnvJRE11.java b/inlong-sort/sort-end-to-end-tests/sort-end-to-end-tests-v1.18/src/test/java/org/apache/inlong/sort/tests/utils/FlinkContainerTestEnvJRE11.java new file mode 100644 index 00000000000..9033740822f --- /dev/null +++ b/inlong-sort/sort-end-to-end-tests/sort-end-to-end-tests-v1.18/src/test/java/org/apache/inlong/sort/tests/utils/FlinkContainerTestEnvJRE11.java @@ -0,0 +1,55 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.inlong.sort.tests.utils; + +import org.junit.BeforeClass; +import org.testcontainers.containers.GenericContainer; +import org.testcontainers.containers.output.Slf4jLogConsumer; +import org.testcontainers.lifecycle.Startables; + +import java.util.stream.Stream; + +public abstract class FlinkContainerTestEnvJRE11 extends FlinkContainerTestEnv { + + @BeforeClass + public static void before() { + LOG.info("Starting containers..."); + jobManager = + new GenericContainer<>("flink:1.18.1-scala_2.12") + .withCommand("jobmanager") + .withNetwork(NETWORK) + .withNetworkAliases(INTER_CONTAINER_JM_ALIAS) + .withExposedPorts(JOB_MANAGER_REST_PORT, DEBUG_PORT) + .withEnv("FLINK_PROPERTIES", FLINK_PROPERTIES) + .withExposedPorts(JOB_MANAGER_REST_PORT) + .withLogConsumer(new Slf4jLogConsumer(JM_LOG)); + taskManager = + new GenericContainer<>("flink:1.18.1-scala_2.12") + .withCommand("taskmanager") + .withNetwork(NETWORK) + .withNetworkAliases(INTER_CONTAINER_TM_ALIAS) + .withExposedPorts(DEBUG_PORT) + .withEnv("FLINK_PROPERTIES", FLINK_PROPERTIES) + .dependsOn(jobManager) + .withLogConsumer(new Slf4jLogConsumer(TM_LOG)); + + Startables.deepStart(Stream.of(jobManager)).join(); + Startables.deepStart(Stream.of(taskManager)).join(); + LOG.info("Containers are started."); + } +} diff --git a/inlong-sort/sort-end-to-end-tests/sort-end-to-end-tests-v1.18/src/test/java/org/apache/inlong/sort/tests/utils/FlinkContainerTestEnvJRE8.java b/inlong-sort/sort-end-to-end-tests/sort-end-to-end-tests-v1.18/src/test/java/org/apache/inlong/sort/tests/utils/FlinkContainerTestEnvJRE8.java new file mode 100644 index 00000000000..de982da4ba0 --- /dev/null +++ b/inlong-sort/sort-end-to-end-tests/sort-end-to-end-tests-v1.18/src/test/java/org/apache/inlong/sort/tests/utils/FlinkContainerTestEnvJRE8.java @@ -0,0 +1,55 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.inlong.sort.tests.utils; + +import org.junit.BeforeClass; +import org.testcontainers.containers.GenericContainer; +import org.testcontainers.containers.output.Slf4jLogConsumer; +import org.testcontainers.lifecycle.Startables; + +import java.util.stream.Stream; + +public abstract class FlinkContainerTestEnvJRE8 extends FlinkContainerTestEnv { + + @BeforeClass + public static void before() { + LOG.info("Starting containers..."); + jobManager = + new GenericContainer<>("flink:1.18.1-scala_2.12-java8") + .withCommand("jobmanager") + .withNetwork(NETWORK) + .withNetworkAliases(INTER_CONTAINER_JM_ALIAS) + .withExposedPorts(JOB_MANAGER_REST_PORT, DEBUG_PORT) + .withEnv("FLINK_PROPERTIES", FLINK_PROPERTIES) + .withExposedPorts(JOB_MANAGER_REST_PORT) + .withLogConsumer(new Slf4jLogConsumer(JM_LOG)); + taskManager = + new GenericContainer<>("flink:1.18.1-scala_2.12-java8") + .withCommand("taskmanager") + .withNetwork(NETWORK) + .withNetworkAliases(INTER_CONTAINER_TM_ALIAS) + .withExposedPorts(DEBUG_PORT) + .withEnv("FLINK_PROPERTIES", FLINK_PROPERTIES) + .dependsOn(jobManager) + .withLogConsumer(new Slf4jLogConsumer(TM_LOG)); + + Startables.deepStart(Stream.of(jobManager)).join(); + Startables.deepStart(Stream.of(taskManager)).join(); + LOG.info("Containers are started."); + } +} diff --git a/inlong-sort/sort-end-to-end-tests/sort-end-to-end-tests-v1.18/src/test/java/org/apache/inlong/sort/tests/utils/PlaceholderResolver.java b/inlong-sort/sort-end-to-end-tests/sort-end-to-end-tests-v1.18/src/test/java/org/apache/inlong/sort/tests/utils/PlaceholderResolver.java new file mode 100644 index 00000000000..0c283336999 --- /dev/null +++ b/inlong-sort/sort-end-to-end-tests/sort-end-to-end-tests-v1.18/src/test/java/org/apache/inlong/sort/tests/utils/PlaceholderResolver.java @@ -0,0 +1,150 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.inlong.sort.tests.utils; + +import java.io.IOException; +import java.nio.charset.StandardCharsets; +import java.nio.file.Files; +import java.nio.file.Path; +import java.nio.file.Paths; +import java.util.List; +import java.util.Map; +import java.util.function.Function; +import java.util.stream.Collectors; + +/** + * A file placeholder replacement tool. + */ +public class PlaceholderResolver { + + /** + * Default placeholder prefix + */ + public static final String DEFAULT_PLACEHOLDER_PREFIX = "${"; + + /** + * Default placeholder suffix + */ + public static final String DEFAULT_PLACEHOLDER_SUFFIX = "}"; + + /** + * Default singleton resolver + */ + private static PlaceholderResolver defaultResolver = new PlaceholderResolver(); + + /** + * Placeholder prefix + */ + private String placeholderPrefix = DEFAULT_PLACEHOLDER_PREFIX; + + /** + * Placeholder suffix + */ + private String placeholderSuffix = DEFAULT_PLACEHOLDER_SUFFIX; + + private PlaceholderResolver() { + + } + + private PlaceholderResolver(String placeholderPrefix, String placeholderSuffix) { + this.placeholderPrefix = placeholderPrefix; + this.placeholderSuffix = placeholderSuffix; + } + + public static PlaceholderResolver getDefaultResolver() { + return defaultResolver; + } + + public static PlaceholderResolver getResolver(String placeholderPrefix, String placeholderSuffix) { + return new PlaceholderResolver(placeholderPrefix, placeholderSuffix); + } + + /** + * Replace template string with special placeholder according to replace function. + * @param content template string with special placeholder + * @param rule placeholder replacement rule + * @return new replaced string + */ + public String resolveByRule(String content, Function rule) { + int start = content.indexOf(this.placeholderPrefix); + if (start == -1) { + return content; + } + StringBuilder result = new StringBuilder(content); + while (start != -1) { + int end = result.indexOf(this.placeholderSuffix, start); + // get placeholder actual value (e.g. ${id}, get the value represent id) + String placeholder = result.substring(start + this.placeholderPrefix.length(), end); + // replace placeholder value + String replaceContent = placeholder.trim().isEmpty() ? "" : rule.apply(placeholder); + result.replace(start, end + this.placeholderSuffix.length(), replaceContent); + start = result.indexOf(this.placeholderPrefix, start + replaceContent.length()); + } + return result.toString(); + } + + /** + * Replace template string with special placeholder according to replace function. + * @param file template file with special placeholder + * @param rule placeholder replacement rule + * @return new replaced string + */ + public Path resolveByRule(Path file, Function rule) { + try { + List newContents = Files.readAllLines(file, StandardCharsets.UTF_8) + .stream() + .map(content -> resolveByRule(content, rule)) + .collect(Collectors.toList()); + Path newPath = Paths.get(file.getParent().toString(), file.getFileName() + "$"); + Files.write(newPath, String.join(System.lineSeparator(), newContents).getBytes(StandardCharsets.UTF_8)); + return newPath; + } catch (IOException e) { + throw new RuntimeException(e); + } + } + + /** + * Replace template string with special placeholder according to properties file. + * Key is the content of the placeholder

+ * e.g: content = product:${id}:detail:${did}
+ * valueMap = id -> 1; pid -> 2
+ * return: product:1:detail:2
+ * + * @param content template string with special placeholder + * @param valueMap placeholder replacement map + * @return new replaced string + */ + public String resolveByMap(String content, final Map valueMap) { + return resolveByRule(content, placeholderValue -> String.valueOf(valueMap.get(placeholderValue))); + } + + /** + * Replace template string with special placeholder according to properties file. + * Key is the content of the placeholder

+ * e.g: content = product:${id}:detail:${did}
+ * valueMap = id -> 1; pid -> 2
+ * return: product:1:detail:2
+ * + * @param file template string with special placeholder + * @param valueMap placeholder replacement map + * @return new replaced string + */ + public Path resolveByMap(Path file, final Map valueMap) { + return resolveByRule(file, placeholderValue -> String.valueOf(valueMap.get(placeholderValue))); + } +} diff --git a/inlong-sort/sort-end-to-end-tests/sort-end-to-end-tests-v1.18/src/test/java/org/apache/inlong/sort/tests/utils/TestUtils.java b/inlong-sort/sort-end-to-end-tests/sort-end-to-end-tests-v1.18/src/test/java/org/apache/inlong/sort/tests/utils/TestUtils.java new file mode 100644 index 00000000000..8daff533da2 --- /dev/null +++ b/inlong-sort/sort-end-to-end-tests/sort-end-to-end-tests-v1.18/src/test/java/org/apache/inlong/sort/tests/utils/TestUtils.java @@ -0,0 +1,124 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.inlong.sort.tests.utils; + +import org.junit.Test; + +import java.io.FileNotFoundException; +import java.io.IOException; +import java.nio.file.Files; +import java.nio.file.Path; +import java.nio.file.Paths; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.function.Function; +import java.util.regex.Pattern; +import java.util.stream.Collectors; +import java.util.stream.Stream; + +import static org.junit.Assert.assertEquals; + +/** + * Test util for test container. + */ +public class TestUtils { + + private static final ParameterProperty MODULE_DIRECTORY = + new ParameterProperty<>("moduleDir", Paths::get); + + /** + * Searches for a resource file matching the given regex in the given directory. This method is + * primarily intended to be used for the initialization of static {@link Path} fields for + * resource file(i.e. jar, config file) that reside in the modules {@code target} directory. + * + * @param resourceNameRegex regex pattern to match against + * @return Path pointing to the matching jar + * @throws RuntimeException if none or multiple resource files could be found + */ + public static Path getResource(final String resourceNameRegex) { + // if the property is not set then we are most likely running in the IDE, where the working + // directory is the + // module of the test that is currently running, which is exactly what we want + Path moduleDirectory = MODULE_DIRECTORY.get(Paths.get("").toAbsolutePath()); + + try (Stream dependencyResources = Files.walk(moduleDirectory)) { + final List matchingResources = + dependencyResources + .filter( + jar -> Pattern.compile(resourceNameRegex) + .matcher(jar.toAbsolutePath().toString()) + .find()) + .collect(Collectors.toList()); + switch (matchingResources.size()) { + case 0: + throw new RuntimeException( + new FileNotFoundException( + String.format( + "No resource file could be found that matches the pattern %s. " + + "This could mean that the test module must be rebuilt via maven.", + resourceNameRegex))); + case 1: + return matchingResources.get(0); + default: + throw new RuntimeException( + new IOException( + String.format( + "Multiple resource files were found matching the pattern %s. Matches=%s", + resourceNameRegex, matchingResources))); + } + } catch (final IOException ioe) { + throw new RuntimeException("Could not search for resource resource files.", ioe); + } + } + + /** + * A simple system properties value getter with default value when could not find the system property. + * @param + */ + static class ParameterProperty { + + private final String propertyName; + private final Function converter; + + public ParameterProperty(final String propertyName, final Function converter) { + this.propertyName = propertyName; + this.converter = converter; + } + + /** + * Retrieves the value of this property, or the given default if no value was set. + * + * @return the value of this property, or the given default if no value was set + */ + public V get(final V defaultValue) { + final String value = System.getProperty(propertyName); + return value == null ? defaultValue : converter.apply(value); + } + } + + @Test + public void testReplaceholder() { + String before = "today is ${date}, today weather is ${weather}"; + Map maps = new HashMap<>(); + maps.put("date", "2024.07.15"); + maps.put("weather", "song"); + String after = PlaceholderResolver.getDefaultResolver().resolveByMap(before, maps); + assertEquals(after, "today is 2024.07.15, today weather is song"); + } +} diff --git a/inlong-sort/sort-end-to-end-tests/sort-end-to-end-tests-v1.18/src/test/resources/log4j2-test.properties b/inlong-sort/sort-end-to-end-tests/sort-end-to-end-tests-v1.18/src/test/resources/log4j2-test.properties new file mode 100644 index 00000000000..3e95477751b --- /dev/null +++ b/inlong-sort/sort-end-to-end-tests/sort-end-to-end-tests-v1.18/src/test/resources/log4j2-test.properties @@ -0,0 +1,47 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# +rootLogger=INFO, STDOUT + +appender.console.type=Console +appender.console.name=STDOUT +appender.console.layout.type=PatternLayout +appender.console.layout.pattern=%-4r [%t] %-5p %c %x - %m%n + +appender.jm.type = File +appender.jm.name = jobmanager +appender.jm.fileName = target/logs/jobmanager.log +appender.jm.layout.type = PatternLayout +appender.jm.layout.pattern = - %m%n + +appender.tm.type = File +appender.tm.name = taskmanager +appender.tm.fileName = target/logs/taskmanager.log +appender.tm.layout.type = PatternLayout +appender.tm.layout.pattern = - %m%n + +logger.jm=INFO, jobmanager +logger.jm.name=org.apache.flink.runtime.jobmaster.JobMaster +logger.jm.additivity=false + +logger.tm=INFO, taskmanager +logger.tm.name=org.apache.flink.runtime.taskexecutor.TaskExecutor +logger.tm.additivity=false + + + diff --git a/inlong-sort/sort-formats/pom.xml b/inlong-sort/sort-formats/pom.xml index e36378306f4..3c392001f3e 100644 --- a/inlong-sort/sort-formats/pom.xml +++ b/inlong-sort/sort-formats/pom.xml @@ -249,7 +249,8 @@ v1.18 format-common - format-row/format-json-v1.18 + format-row + format-rowdata