Skip to content

Commit

Permalink
Merge branch 'master' into INLONG-11042
Browse files Browse the repository at this point in the history
  • Loading branch information
emptyOVO committed Sep 14, 2024
2 parents 069c25d + 807717a commit 8c32435
Show file tree
Hide file tree
Showing 28 changed files with 1,281 additions and 11 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -910,6 +910,11 @@ public DataProxyNodeResponse getDataProxyNodes(String groupId, String protocolTy
LOGGER.debug("begin to get data proxy nodes for groupId={}, protocol={}", groupId, protocolType);

InlongGroupEntity groupEntity = groupMapper.selectByGroupId(groupId);
if (groupEntity == null) {
String errMsg = String.format("group not found by groupId=%s", groupId);
LOGGER.error(errMsg);
throw new BusinessException(errMsg);
}
GroupStatus groupStatus = GroupStatus.forCode(groupEntity.getStatus());
if (!Objects.equals(groupStatus, GroupStatus.CONFIG_SUCCESSFUL)) {
String errMsg =
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.inlong.sdk.transform.decode;

import lombok.Data;
import org.apache.commons.lang.math.NumberUtils;
import org.apache.commons.lang3.StringUtils;

import java.util.ArrayList;
import java.util.List;

/**
* AvroNode
*/
@Data
public class AvroNode {

private String name;
private boolean isArray = false;
private List<Integer> arrayIndices = new ArrayList<>();

public AvroNode(String nodeString) {
int beginIndex = nodeString.indexOf('(');
if (beginIndex < 0) {
this.name = nodeString;
} else {
this.name = StringUtils.trim(nodeString.substring(0, beginIndex));
int endIndex = nodeString.lastIndexOf(')');
if (endIndex >= 0) {
this.isArray = true;
String indicesString = nodeString.substring(beginIndex + 1, endIndex);
String[] indices = indicesString.split(",");
for (String index : indices) {
int arrayIndex = NumberUtils.toInt(StringUtils.trim(index), -1);
if (arrayIndex < 0) {
arrayIndex = 0;
}
this.arrayIndices.add(arrayIndex);
}
}
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,170 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.inlong.sdk.transform.decode;

import org.apache.avro.Schema;
import org.apache.avro.Schema.Type;
import org.apache.avro.generic.GenericRecord;
import org.apache.commons.lang3.StringUtils;

import java.nio.ByteBuffer;
import java.nio.charset.Charset;
import java.util.ArrayList;
import java.util.List;

public class AvroSourceData implements SourceData {

public static final String ROOT_KEY = "$root";

public static final String CHILD_KEY = "$child";

private GenericRecord root;

private List<GenericRecord> childRoot;

private Charset srcCharset;

public AvroSourceData(GenericRecord root, List<GenericRecord> childRoot, Charset srcCharset) {
this.root = root;
this.childRoot = childRoot;
this.srcCharset = srcCharset;
}

@Override
public int getRowCount() {
if (this.childRoot == null) {
return 1;
} else {
return this.childRoot.size();
}
}

@Override
public String getField(int rowNum, String fieldName) {
try {
List<AvroNode> childNodes = new ArrayList<>();
String[] nodeStrings = fieldName.split("\\.");
for (String nodeString : nodeStrings) {
childNodes.add(new AvroNode(nodeString));
}
// parse
if (childNodes.size() == 0) {
return "";
}
// first node
AvroNode firstNode = childNodes.get(0);
Object current = root;
Schema curSchema = root.getSchema();
if (StringUtils.equals(ROOT_KEY, firstNode.getName())) {
current = root;
curSchema = root.getSchema();
} else if (StringUtils.equals(CHILD_KEY, firstNode.getName())) {
if (rowNum < childRoot.size()) {
current = childRoot.get(rowNum);
curSchema = childRoot.get(rowNum).getSchema();
} else {
return "";
}
} else {
// error data
return "";
}
if (current == null) {
// error data
return "";
}
// parse other node
for (int i = 1; i < childNodes.size(); i++) {
AvroNode node = childNodes.get(i);
if (curSchema.getType() != Type.RECORD) {
// error data
return "";
}
Object newElement = ((GenericRecord) current).get(node.getName());
if (newElement == null) {
// error data
return "";
}
// node is not array
if (!node.isArray()) {
curSchema = curSchema.getField(node.getName()).schema();
current = newElement;
continue;
}
// node is an array
current = getElementFromArray(node, newElement, curSchema);
if (current == null) {
// error data
return "";
}
}
return getNodeAsString(current, curSchema);
} catch (Exception e) {
return "";
}
}

private Object getElementFromArray(AvroNode node, Object curElement, Schema curSchema) {
if (node.getArrayIndices().isEmpty()) {
// error data
return null;
}
for (int index : node.getArrayIndices()) {
if (curSchema.getType() != Type.ARRAY) {
// error data
return null;
}
List<?> newArray = (List<?>) curElement;
if (index >= newArray.size()) {
// error data
return null;
}
curSchema = curSchema.getElementType();
curElement = newArray.get(index);
}
return curElement;
}

private String getNodeAsString(Object node, Schema schema) {
String fieldValue = "";
Type fieldType = schema.getType();
switch (fieldType) {
case INT:
case LONG:
case FLOAT:
case DOUBLE:
case STRING:
case BOOLEAN:
case ENUM:
fieldValue = String.valueOf(node);
break;
case BYTES:
ByteBuffer byteBuffer = (ByteBuffer) node;
byte[] bytes = new byte[byteBuffer.remaining()];
byteBuffer.get(bytes);
fieldValue = new String(bytes, srcCharset);
break;
case FIXED:
byteBuffer = (ByteBuffer) node;
bytes = new byte[schema.getFixedSize()];
fieldValue = new String(bytes, srcCharset);
}
return fieldValue;
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,139 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.inlong.sdk.transform.decode;

import org.apache.inlong.sdk.transform.pojo.AvroSourceInfo;
import org.apache.inlong.sdk.transform.process.Context;

import org.apache.avro.Schema;
import org.apache.avro.Schema.Type;
import org.apache.avro.file.DataFileStream;
import org.apache.avro.generic.GenericDatumReader;
import org.apache.avro.generic.GenericRecord;
import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.lang3.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.ByteArrayInputStream;
import java.io.InputStream;
import java.nio.charset.Charset;
import java.util.ArrayList;
import java.util.List;

public class AvroSourceDecoder implements SourceDecoder<byte[]> {

private static final Logger LOG = LoggerFactory.getLogger(AvroSourceDecoder.class);

protected AvroSourceInfo sourceInfo;
private Charset srcCharset = Charset.defaultCharset();
private String rowsNodePath;
private List<AvroNode> childNodes;

public AvroSourceDecoder(AvroSourceInfo sourceInfo) {
try {
this.sourceInfo = sourceInfo;
if (!StringUtils.isBlank(sourceInfo.getCharset())) {
this.srcCharset = Charset.forName(sourceInfo.getCharset());
}
this.rowsNodePath = sourceInfo.getRowsNodePath();
if (!StringUtils.isBlank(rowsNodePath)) {
this.childNodes = new ArrayList<>();
String[] nodeStrings = this.rowsNodePath.split("\\.");
for (String nodeString : nodeStrings) {
this.childNodes.add(new AvroNode(nodeString));
}
}
} catch (Exception e) {
LOG.error(e.getMessage(), e);
throw new TransformException(e.getMessage(), e);
}
}

@Override
public SourceData decode(byte[] srcBytes, Context context) {
try {
InputStream inputStream = new ByteArrayInputStream(srcBytes);
DataFileStream<GenericRecord> dataFileStream =
new DataFileStream<>(inputStream, new GenericDatumReader<>());
GenericRecord root = dataFileStream.next();
List<GenericRecord> childRoot = null;
if (CollectionUtils.isEmpty(childNodes)) {
return new AvroSourceData(root, null, srcCharset);
}

Object current = root;
Schema curSchema = root.getSchema();

for (AvroNode node : childNodes) {
if (curSchema.getType() != Type.RECORD) {
// error data
return new AvroSourceData(root, null, srcCharset);
}
Object newElement = ((GenericRecord) current).get(node.getName());
if (newElement == null) {
// error data
return new AvroSourceData(root, null, srcCharset);
}
// node is not array
if (!node.isArray()) {
curSchema = curSchema.getField(node.getName()).schema();
current = newElement;
continue;
}
// node is an array
current = getElementFromArray(node, newElement, curSchema);
if (current == null) {
// error data
return new AvroSourceData(root, null, srcCharset);
}
}
if (curSchema.getType() != Type.ARRAY) {
// error data
return new AvroSourceData(root, null, srcCharset);
}
childRoot = (List<GenericRecord>) current;
return new AvroSourceData(root, childRoot, srcCharset);
} catch (Exception e) {
LOG.error(e.getMessage(), e);
return null;
}
}

private Object getElementFromArray(AvroNode node, Object curElement, Schema curSchema) {
if (node.getArrayIndices().isEmpty()) {
// error data
return null;
}
for (int index : node.getArrayIndices()) {
if (curSchema.getType() != Type.ARRAY) {
// error data
return null;
}
List<?> newArray = (List<?>) curElement;
if (index >= newArray.size()) {
// error data
return null;
}
curSchema = curSchema.getElementType();
curElement = newArray.get(index);
}
return curElement;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ public SourceData decode(String srcString, Context context) {
try {
fieldValue = field.getConverter().convert(fieldValues[fieldIndex]);
} catch (Exception e) {
throw new RuntimeException(e);

}
}
sourceData.putField(fieldName, fieldValue);
Expand Down
Loading

0 comments on commit 8c32435

Please sign in to comment.