From d31891ce3182fba30ff7028223f471d2db241f68 Mon Sep 17 00:00:00 2001 From: emptyOVO Date: Tue, 3 Sep 2024 22:13:33 +0800 Subject: [PATCH] [INLONG-11005][SDK] Add YAML formatted data source for Transform --- .../decode/SourceDecoderFactory.java | 6 + .../inlong/sdk/transform/decode/YamlNode.java | 35 +++++ .../sdk/transform/decode/YamlSourceData.java | 99 +++++++++++++ .../transform/decode/YamlSourceDecoder.java | 130 ++++++++++++++++++ .../sdk/transform/pojo/YamlSourceInfo.java | 53 +++++++ .../process/TestTransformProcessor.java | 83 +++++++++++ 6 files changed, 406 insertions(+) create mode 100644 inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/decode/YamlNode.java create mode 100644 inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/decode/YamlSourceData.java create mode 100644 inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/decode/YamlSourceDecoder.java create mode 100644 inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/pojo/YamlSourceInfo.java diff --git a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/decode/SourceDecoderFactory.java b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/decode/SourceDecoderFactory.java index b29f6f093c8..a83cbbe3eb6 100644 --- a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/decode/SourceDecoderFactory.java +++ b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/decode/SourceDecoderFactory.java @@ -21,6 +21,7 @@ import org.apache.inlong.sdk.transform.pojo.JsonSourceInfo; import org.apache.inlong.sdk.transform.pojo.KvSourceInfo; import org.apache.inlong.sdk.transform.pojo.PbSourceInfo; +import org.apache.inlong.sdk.transform.pojo.YamlSourceInfo; public class SourceDecoderFactory { @@ -39,4 +40,9 @@ public static JsonSourceDecoder createJsonDecoder(JsonSourceInfo sourceInfo) { public static PbSourceDecoder createPbDecoder(PbSourceInfo sourceInfo) { return new PbSourceDecoder(sourceInfo); } + + public static YamlSourceDecoder createYamlDecoder(YamlSourceInfo sourceInfo) { + return new YamlSourceDecoder(sourceInfo); + } + } diff --git a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/decode/YamlNode.java b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/decode/YamlNode.java new file mode 100644 index 00000000000..097ef854d15 --- /dev/null +++ b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/decode/YamlNode.java @@ -0,0 +1,35 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.inlong.sdk.transform.decode; + +import lombok.Data; + +@Data +public class YamlNode { + + private String name; + private Object value; + + public YamlNode() { + } + + public YamlNode(String name, Object value) { + this.name = name; + this.value = value; + } +} diff --git a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/decode/YamlSourceData.java b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/decode/YamlSourceData.java new file mode 100644 index 00000000000..775de5d2061 --- /dev/null +++ b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/decode/YamlSourceData.java @@ -0,0 +1,99 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.inlong.sdk.transform.decode; + +import java.util.List; +import java.util.Map; + +public class YamlSourceData implements SourceData { + + public static final String ROOT_KEY = "$root"; + + public static final String CHILD_KEY = "$child"; + + private YamlNode root; + + private YamlNode childRoot; + + public YamlSourceData(YamlNode root, YamlNode childRoot) { + this.root = root; + this.childRoot = childRoot; + } + @Override + public int getRowCount() { + if (this.childRoot == null) { + return 1; + } else { + Object value = this.childRoot.getValue(); + if (value instanceof List) { + return ((List) value).size(); + } else { + return 1; + } + } + } + + @Override + public String getField(int rowNum, String fieldName) { + try { + String[] nodeString = fieldName.split("\\."); + Object cur = null, last = null; + int start = -1; + + if (nodeString[0].equals(ROOT_KEY)) { + cur = root; + } else if (nodeString[0].equals(CHILD_KEY)) { + cur = ((List) childRoot.getValue()).get(rowNum); + } + + for (int i = 1; i < nodeString.length; i++) { + if (cur == null) { + cur = last; + continue; + } + last = cur; + if (cur instanceof List) { + int idx = 0; + start = nodeString[i].indexOf('('); + if (start != -1) { + idx = Integer.parseInt(nodeString[1].substring(start + 1, nodeString[1].indexOf(')'))); + } + cur = ((List) cur).get(idx).getValue(); + } else if (cur instanceof Map) { + start = nodeString[i].indexOf('('); + String key = nodeString[i]; + if (start != -1) { + key = key.substring(0, start); + } + cur = ((Map) cur).get(key); + } else if (cur instanceof YamlNode) { + cur = ((YamlNode) cur).getValue(); + } else { + i++; + } + i--; + } + if (cur == null) { + return ""; + } + return cur.toString(); + } catch (Exception e) { + return ""; + } + } +} diff --git a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/decode/YamlSourceDecoder.java b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/decode/YamlSourceDecoder.java new file mode 100644 index 00000000000..4449f2729e9 --- /dev/null +++ b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/decode/YamlSourceDecoder.java @@ -0,0 +1,130 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.inlong.sdk.transform.decode; + +import org.apache.inlong.sdk.transform.pojo.YamlSourceInfo; +import org.apache.inlong.sdk.transform.process.Context; + +import lombok.extern.slf4j.Slf4j; +import org.apache.commons.lang3.StringUtils; +import org.apache.flink.shaded.jackson2.org.yaml.snakeyaml.Yaml; + +import java.nio.charset.Charset; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +@Slf4j +public class YamlSourceDecoder implements SourceDecoder { + + protected YamlSourceInfo sourceInfo; + private Charset srcCharset = Charset.defaultCharset(); + private String rowsNodePath; + private List childNodes; + + public YamlSourceDecoder(YamlSourceInfo sourceInfo) { + this.sourceInfo = sourceInfo; + if (!StringUtils.isBlank(sourceInfo.getCharset())) { + this.srcCharset = Charset.forName(sourceInfo.getCharset()); + } + this.rowsNodePath = sourceInfo.getRowsNodePath(); + if (!StringUtils.isBlank(rowsNodePath)) { + this.childNodes = new ArrayList<>(); + String[] nodeStrings = this.rowsNodePath.split("\\."); + childNodes.addAll(Arrays.asList(nodeStrings)); + } + } + @Override + public SourceData decode(byte[] srcBytes, Context context) { + String srcString = new String(srcBytes, srcCharset); + return this.decode(srcString, context); + } + + @Override + public SourceData decode(String srcString, Context context) { + try { + Yaml yaml = new Yaml(); + Map yamlData = yaml.load(srcString); + if (yamlData == null || yamlData.isEmpty()) { + log.error("YAML data is empty or null."); + return null; + } + Map rootMap = new HashMap<>(); + List childList = new ArrayList<>(); + + for (Map.Entry entry : yamlData.entrySet()) { + String key = entry.getKey(); + Object value = entry.getValue(); + + if (value instanceof Map) { + // 递归处理子Map,并将其作为根节点的一部分 + Map childNodes = parser((Map) value); + rootMap.put(key, new YamlNode(key, childNodes)); + } else if (value instanceof List) { + // 处理列表,并将其作为子节点的一部分 + for (Object item : (List) value) { + if (item instanceof Map) { + Map childNodes = parser((Map) item); + childList.add(new YamlNode(key, childNodes)); + } else { + childList.add(new YamlNode(key, item)); + } + } + rootMap.put(key, new YamlNode(key, value)); + } else { + // 处理简单类型,将其作为根节点的一部分 + rootMap.put(key, new YamlNode(key, value)); + } + } + YamlNode root = new YamlNode(YamlSourceData.ROOT_KEY, rootMap.isEmpty() ? null : rootMap); + YamlNode childRoot = new YamlNode(YamlSourceData.CHILD_KEY, childList.isEmpty() ? null : childList); + return new YamlSourceData(root, childRoot); + } catch (Exception e) { + log.error("Data parsing failed", e); + return null; + } + } + + private static Map parser(Map yamlData) { + Map yamlNodes = new HashMap<>(); + for (Map.Entry entry : yamlData.entrySet()) { + String key = entry.getKey(); + Object value = entry.getValue(); + + /* + * if (value instanceof Map) { yamlNodes.put(key, new YamlNode(key, parser((Map) value))); } + * else + */if (value instanceof List) { + List list = new ArrayList<>(); + for (Object item : (List) value) { + if (item instanceof Map) { + list.add(new YamlNode(key, parser((Map) item))); + } else { + list.add(new YamlNode(key, item)); + } + } + yamlNodes.put(key, new YamlNode(key, list)); + } else { + yamlNodes.put(key, new YamlNode(key, value)); + } + } + return yamlNodes; + } +} diff --git a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/pojo/YamlSourceInfo.java b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/pojo/YamlSourceInfo.java new file mode 100644 index 00000000000..0538a5a9ba4 --- /dev/null +++ b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/pojo/YamlSourceInfo.java @@ -0,0 +1,53 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.inlong.sdk.transform.pojo; + +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonIgnoreProperties; +import com.fasterxml.jackson.annotation.JsonProperty; + +@JsonIgnoreProperties(ignoreUnknown = true) +public class YamlSourceInfo extends SourceInfo { + + private String rowsNodePath; + + @JsonCreator + public YamlSourceInfo( + @JsonProperty("charset") String charset, + @JsonProperty("rowsNodePath") String rowsNodePath) { + super(charset); + this.rowsNodePath = rowsNodePath; + } + + /** + * get rowsNodePath + * @return the rowsNodePath + */ + @JsonProperty("rowsNodePath") + public String getRowsNodePath() { + return rowsNodePath; + } + + /** + * set rowsNodePath + * @param rowsNodePath the rowsNodePath to set + */ + public void setRowsNodePath(String rowsNodePath) { + this.rowsNodePath = rowsNodePath; + } +} diff --git a/inlong-sdk/transform-sdk/src/test/java/org/apache/inlong/sdk/transform/process/TestTransformProcessor.java b/inlong-sdk/transform-sdk/src/test/java/org/apache/inlong/sdk/transform/process/TestTransformProcessor.java index 3413f1aca3e..78966fc4e4a 100644 --- a/inlong-sdk/transform-sdk/src/test/java/org/apache/inlong/sdk/transform/process/TestTransformProcessor.java +++ b/inlong-sdk/transform-sdk/src/test/java/org/apache/inlong/sdk/transform/process/TestTransformProcessor.java @@ -27,6 +27,7 @@ import org.apache.inlong.sdk.transform.pojo.KvSourceInfo; import org.apache.inlong.sdk.transform.pojo.PbSourceInfo; import org.apache.inlong.sdk.transform.pojo.TransformConfig; +import org.apache.inlong.sdk.transform.pojo.YamlSourceInfo; import org.junit.Assert; import org.junit.Test; @@ -350,4 +351,86 @@ public void testPb2CsvForNow() throws Exception { List output = processor.transform(srcBytes, new HashMap<>()); Assert.assertEquals(2, output.size()); } + + @Test + public void testYaml2Csv() throws Exception { + List fields = null; + YamlSourceInfo yamlSource = null; + CsvSinkInfo csvSink = null; + String transformSql = null; + TransformConfig config = null; + TransformProcessor processor = null; + String srcString = null; + List output = null; + + // case1 + fields = this.getTestFieldList("sid", "packageID", "msgTime", "msg"); + yamlSource = new YamlSourceInfo("UTF-8", "msgs"); + csvSink = new CsvSinkInfo("UTF-8", '|', '\\', fields); + transformSql = "select $root.sid,$root.packageID,$child.data,$child.msgTime from source"; + config = new TransformConfig(transformSql); + processor = TransformProcessor + .create(config, SourceDecoderFactory.createYamlDecoder(yamlSource), + SinkEncoderFactory.createCsvEncoder(csvSink)); + srcString = "sid: sid1\n" + + "packageID: pid1\n" + + "msgs:\n" + + " - data: value1\n" + + " msgTime: Time1\n" + + " - data: value2\n" + + " msgTime: Time2\n"; + output = processor.transform(srcString, new HashMap<>()); + Assert.assertEquals(2, output.size()); + Assert.assertEquals(output.get(0), "sid1|pid1|value1|Time1"); + Assert.assertEquals(output.get(1), "sid1|pid1|value2|Time2"); + + // case2 + yamlSource = new YamlSourceInfo("UTF-8", "Persons"); + csvSink = new CsvSinkInfo("UTF-8", '|', '\\', fields); + transformSql = "select $root.sid,$root.packageID,$child.data,$child.habbies(2).name from source"; + config = new TransformConfig(transformSql); + processor = TransformProcessor + .create(config, SourceDecoderFactory.createYamlDecoder(yamlSource), + SinkEncoderFactory.createCsvEncoder(csvSink)); + srcString = "sid: sid\n" + + "packageID: pid\n" + + "Persons:\n" + + " - data: value1\n" + + " habbies:\n" + + " - index: 1\n" + + " name: sing1\n" + + " - index: 2\n" + + " name: dance1\n" + + " - index: 3\n" + + " name: rap1\n" + + " - data: value2\n" + + " habbies:\n" + + " - index: 1\n" + + " name: sing2\n" + + " - index: 2\n" + + " name: dance2\n" + + " - index: 3\n" + + " name: rap2\n"; + output = processor.transform(srcString, new HashMap<>()); + Assert.assertEquals(2, output.size()); + Assert.assertEquals("sid|pid|value1|rap1", output.get(0)); + Assert.assertEquals("sid|pid|value2|rap2", output.get(1)); + } + // TODO: testYaml2CsvForOne() + /* + * @Test public void testYaml2CsvForOne() throws Exception { List fields = null; YamlSourceInfo + * yamlSource = null; CsvSinkInfo csvSink = null; String transformSql = null; TransformConfig config = null; + * TransformProcessor processor = null; String srcString = null; List output = null; + * + * // case1 fields = this.getTestFieldList(); yamlSource = new YamlSourceInfo("UTF-8", ""); csvSink = new + * CsvSinkInfo("UTF-8", '|', '\\', fields); //transformSql = + * "select $root.sid,$root.packageID,$root.msgs(1).msgTime,$root.msgs(0).data from source"; transformSql = + * "select $root.msgs(1).msgTime,$root.msgs(0).data from source"; config = new TransformConfig(transformSql); + * processor = TransformProcessor .create(config, SourceDecoderFactory.createYamlDecoder(yamlSource), + * SinkEncoderFactory.createCsvEncoder(csvSink)); srcString = "msgs:\n" + " - data: value1\n" + + * " msgTime: Time1\n" + " - data: value2\n" + " msgTime: Time2\n"; output = processor.transform(srcString, + * new HashMap<>()); Assert.assertEquals(2, output.size()); //Assert.assertEquals(output.get(0), + * "sid1|pid1|Time2|value1"); Assert.assertEquals(output.get(0), "Time2|value1"); } + */ + }