Skip to content

Commit

Permalink
Merge branch 'master' into INLONG-10968
Browse files Browse the repository at this point in the history
# Conflicts:
#	inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/operator/OperatorTools.java
  • Loading branch information
emptyOVO committed Sep 2, 2024
2 parents 64f3d27 + 4de972e commit dac2448
Show file tree
Hide file tree
Showing 32 changed files with 1,752 additions and 106 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/ci_ut.yml
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,7 @@ jobs:
CI: false

- name: Unit test with Maven
run: mvn --batch-mode --update-snapshots -e -V test -pl !:sort-end-to-end-tests-v1.15,!:sort-end-to-end-tests-v1.13
run: mvn --batch-mode --update-snapshots -e -V test -pl !:sort-end-to-end-tests-v1.15,!:sort-end-to-end-tests-v1.13,!:sort-end-to-end-tests-v1.18
env:
CI: false

Expand Down
96 changes: 96 additions & 0 deletions .github/workflows/ci_ut_flink18.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,96 @@
#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#

name:
InLong Unit Test For Flink 1.18

on:
push:
paths:
- '.github/workflows/ci_ut_flink18.yml'
- 'inlong-sort/**'
- '!**.md'

pull_request:
paths:
- '.github/workflows/ci_ut_flink18.yml'
- 'inlong-sort/**'
- '!**.md'

jobs:
unit-test:
name: Unit Test
runs-on: ubuntu-22.04
steps:
- name: Checkout
uses: actions/checkout@v4

# Release space size
- name: Remove unnecessary packages
run: |
echo "=== Before pruning ==="
df -h
sudo rm -rf /usr/share/dotnet
sudo rm -rf /usr/local/lib/android
sudo rm -rf /opt/ghc
sudo rm -rf /opt/hostedtoolcache
echo "=== After pruning ==="
df -h
- name: Set up JDK
uses: actions/setup-java@v4
with:
java-version: 8
distribution: adopt

- name: Cache Maven packages
uses: actions/cache@v4
with:
path: |
~/.m2/repository/*/*/*
!~/.m2/repository/org/apache/inlong
key: ${{ runner.os }}-inlong-flink18-${{ hashFiles('**/pom.xml') }}
restore-keys: ${{ runner.os }}-inlong-flink18

- name: Build for Flink 1.18 with Maven
run: mvn --update-snapshots -e -V clean install -U -pl :sort-end-to-end-tests-v1.18 -am -Pv1.18 -DskipTests -Dhttp.keepAlive=false -Dmaven.wagon.http.pool=false -Dmaven.wagon.httpconnectionManager.ttlSeconds=120 -Daether.connector.http.reuseConnections=false -Daether.connector.requestTimeout=60000
env:
CI: false

- name: Unit test for Flink 1.18 with Maven
run: mvn --update-snapshots -e -V verify -pl :sort-end-to-end-tests-v1.18 -am -Pv1.18
env:
CI: false

- name: Upload unit test results
if: ${{ failure() }}
uses: actions/upload-artifact@v4
with:
name: surefire-reports
path: ./**/target/surefire-reports/
if-no-files-found: ignore

- name: Upload integration test results
if: ${{ failure() }}
uses: actions/upload-artifact@v4
with:
name: failsafe-reports
path: ./**/target/failsafe-reports/
if-no-files-found: ignore

- name: Clean up build packages
run: mvn clean
Original file line number Diff line number Diff line change
Expand Up @@ -69,9 +69,9 @@ const Comp: React.FC<Props> = ({ inlongGroupId, inlongStreamId, ...modalProps })
exitsId = true;
}
const width =
(cur['fieldName'].length > cur['fieldValue'].length
? cur['fieldName'].length
: cur['fieldValue'].length) * 10;
(cur['fieldName']?.length > cur['fieldValue']?.length
? cur['fieldName']?.length
: cur['fieldValue']?.length) * 10;
acc.push({
title: cur['fieldName'],
key: cur['fieldName'],
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -350,13 +350,12 @@ export const getTableColumns = (source, dim) => {
title: item.auditName,
dataIndex: item.auditId,
render: text => {
let color = 'black';
if (text?.includes('+')) {
color = 'red';
return <span style={{ color: 'red' }}>{text}</span>;
} else if (text?.includes('-')) {
color = 'green';
return <span style={{ color: 'green' }}>{text}</span>;
}
return <span style={{ color: color }}>{text}</span>;
return <span>{text}</span>;
},
}));
return [
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,12 +23,14 @@
import org.apache.inlong.manager.common.util.CommonBeanUtils;
import org.apache.inlong.manager.pojo.consume.BriefMQMessage.FieldInfo;
import org.apache.inlong.manager.pojo.stream.InlongStreamInfo;
import org.apache.inlong.sdk.transform.decode.KvUtils;

import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.springframework.stereotype.Service;

import java.util.List;
import java.util.Map;

@Slf4j
@Service
Expand All @@ -51,18 +53,20 @@ public List<FieldInfo> parseFields(String str, InlongStreamInfo streamInfo) thro
if (StringUtils.isNotBlank(streamInfo.getKvSeparator())) {
kvSeparator = (char) Integer.parseInt(streamInfo.getKvSeparator());
}
String[] bodys = StringUtils.split(str, separator);
if (bodys.length != fields.size()) {
log.warn(
"The number of reported fields does not match the number of stream fields for groupId={}, streamId={}, reported field size ={}, stream field size ={}",
streamInfo.getInlongGroupId(), streamInfo.getInlongStreamId(), bodys.length, fields.size());
return fields;
Character escapeChar = null;
if (StringUtils.isNotBlank(streamInfo.getDataEscapeChar())) {
escapeChar = streamInfo.getDataEscapeChar().charAt(0);
}
for (int i = 0; i < bodys.length; i++) {
String body = bodys[i];
String[] values = StringUtils.split(body, kvSeparator);
fields.get(i).setFieldName(values[0]);
fields.get(i).setFieldValue(values[1]);
Character lineSeparator = null;
if (StringUtils.isNotBlank(streamInfo.getLineSeparator())) {
lineSeparator = (char) Integer.parseInt(streamInfo.getLineSeparator());
}
List<Map<String, String>> rowValues =
KvUtils.splitKv(str, separator, kvSeparator, escapeChar, '\"', lineSeparator);
for (Map<String, String> row : rowValues) {
for (FieldInfo fieldInfo : fields) {
fieldInfo.setFieldValue(row.get(fieldInfo.getFieldName()));
}
}
} catch (Exception e) {
log.warn("parse fields failed for groupId = {}, streamId = {}", streamInfo.getInlongGroupId(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -205,7 +205,7 @@ public String save(InlongGroupRequest request, String operator) {
Preconditions.expectNotNull(request, "inlong group request cannot be empty");

String groupId = request.getInlongGroupId();
InlongGroupEntity entity = groupMapper.selectByGroupId(groupId);
InlongGroupEntity entity = groupMapper.selectByGroupIdWithoutTenant(groupId);
if (entity != null) {
LOGGER.error("groupId={} has already exists", groupId);
throw new BusinessException(ErrorCodeEnum.GROUP_DUPLICATE);
Expand Down Expand Up @@ -278,7 +278,7 @@ public List<BatchResult> batchSave(List<InlongGroupRequest> groupRequestList, St
@Override
public Boolean exist(String groupId) {
Preconditions.expectNotNull(groupId, ErrorCodeEnum.GROUP_ID_IS_EMPTY.getMessage());
InlongGroupEntity entity = groupMapper.selectByGroupId(groupId);
InlongGroupEntity entity = groupMapper.selectByGroupIdWithoutTenant(groupId);
LOGGER.debug("success to check inlong group {}, exist? {}", groupId, entity != null);
return entity != null;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -111,20 +111,22 @@ public String getField(int rowNum, String fieldName) {
// error data
return "";
}
// node is not array
if (!node.isArray()) {
current = newElement;
} else {
if (!newElement.isJsonArray()) {
// error data
return "";
}
JsonArray newArray = newElement.getAsJsonArray();
if (node.getArrayIndex() >= newArray.size()) {
// error data
return "";
}
current = newArray.get(node.getArrayIndex());
continue;
}
// node is an array
if (!newElement.isJsonArray()) {
// error data
return "";
}
JsonArray newArray = newElement.getAsJsonArray();
if (node.getArrayIndex() >= newArray.size()) {
// error data
return "";
}
current = newArray.get(node.getArrayIndex());
}
return current.getAsString();
} catch (Exception e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -85,39 +85,40 @@ public SourceData decode(byte[] srcBytes, Context context) {
public SourceData decode(String srcString, Context context) {
JsonObject root = gson.fromJson(srcString, JsonObject.class);
JsonArray childRoot = null;
if (CollectionUtils.isNotEmpty(childNodes)) {
JsonElement current = root;
for (JsonNode node : childNodes) {
if (!current.isJsonObject()) {
if (CollectionUtils.isEmpty(childNodes)) {
return new JsonSourceData(root, null);
}
JsonElement current = root;
for (JsonNode node : childNodes) {
if (!current.isJsonObject()) {
// error data
return new JsonSourceData(root, null);
}
JsonElement newElement = current.getAsJsonObject().get(node.getName());
if (newElement == null) {
// error data
return new JsonSourceData(root, null);
}
if (!node.isArray()) {
current = newElement;
} else {
if (!newElement.isJsonArray()) {
// error data
return new JsonSourceData(root, childRoot);
return new JsonSourceData(root, null);
}
JsonElement newElement = current.getAsJsonObject().get(node.getName());
if (newElement == null) {
JsonArray newArray = newElement.getAsJsonArray();
if (node.getArrayIndex() >= newArray.size()) {
// error data
return new JsonSourceData(root, childRoot);
return new JsonSourceData(root, null);
}
if (!node.isArray()) {
current = newElement;
} else {
if (!newElement.isJsonArray()) {
// error data
return new JsonSourceData(root, childRoot);
}
JsonArray newArray = newElement.getAsJsonArray();
if (node.getArrayIndex() >= newArray.size()) {
// error data
return new JsonSourceData(root, childRoot);
}
current = newArray.get(node.getArrayIndex());
}
}
if (!current.isJsonArray()) {
// error data
return new JsonSourceData(root, childRoot);
current = newArray.get(node.getArrayIndex());
}
childRoot = current.getAsJsonArray();
}
if (!current.isJsonArray()) {
// error data
return new JsonSourceData(root, null);
}
childRoot = current.getAsJsonArray();
return new JsonSourceData(root, childRoot);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,89 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.inlong.sdk.transform.process.function;

import org.apache.inlong.sdk.transform.decode.SourceData;
import org.apache.inlong.sdk.transform.process.Context;
import org.apache.inlong.sdk.transform.process.operator.OperatorTools;
import org.apache.inlong.sdk.transform.process.parser.ValueParser;

import net.sf.jsqlparser.expression.Expression;
import net.sf.jsqlparser.expression.Function;

import java.time.LocalDate;
import java.time.LocalDateTime;
import java.time.format.DateTimeFormatter;
import java.time.temporal.ChronoUnit;
import java.util.List;

/**
* DateDiffFunction
* description: DATEDIFF(d1, d2)
* - return null if one of the two parameters is null or ""
* - return null if one of the two parameters has an incorrect date format
* - return the number of days between the dates d1->d2.
*/
@TransformFunction(names = {"datediff", "date_diff"})
public class DateDiffFunction implements ValueParser {

private final ValueParser leftDateParser;
private final ValueParser rightDateParser;
private static final DateTimeFormatter DEFAULT_FORMAT_DATE_TIME =
DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss");
private static final DateTimeFormatter DEFAULT_FORMAT_DATE = DateTimeFormatter.ofPattern("yyyy-MM-dd");

public DateDiffFunction(Function expr) {
List<Expression> expressions = expr.getParameters().getExpressions();
leftDateParser = OperatorTools.buildParser(expressions.get(0));
rightDateParser = OperatorTools.buildParser(expressions.get(1));
}

@Override
public Object parse(SourceData sourceData, int rowIndex, Context context) {
Object leftDateObj = leftDateParser.parse(sourceData, rowIndex, context);
Object rightDateObj = rightDateParser.parse(sourceData, rowIndex, context);
if (leftDateObj == null || rightDateObj == null) {
return null;
}
String leftDate = OperatorTools.parseString(leftDateObj);
String rightDate = OperatorTools.parseString(rightDateObj);
if (leftDate.isEmpty() || rightDate.isEmpty()) {
return null;
}
try {
LocalDate left = getLocalDate(leftDate);
LocalDate right = getLocalDate(rightDate);
return ChronoUnit.DAYS.between(right, left);
} catch (Exception e) {
return null;
}
}

public LocalDate getLocalDate(String dateString) {
DateTimeFormatter formatter = null;
LocalDate dateTime = null;
if (dateString.indexOf(' ') != -1) {
formatter = DEFAULT_FORMAT_DATE_TIME;
dateTime = LocalDateTime.parse(dateString, formatter).toLocalDate();
} else {
formatter = DEFAULT_FORMAT_DATE;
dateTime = LocalDate.parse(dateString, formatter);
}
return dateTime;
}
}
Loading

0 comments on commit dac2448

Please sign in to comment.