diff --git a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/decode/SourceDecoderFactory.java b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/decode/SourceDecoderFactory.java index 2fcab7c0bcf..041e1632eb0 100644 --- a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/decode/SourceDecoderFactory.java +++ b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/decode/SourceDecoderFactory.java @@ -23,6 +23,7 @@ import org.apache.inlong.sdk.transform.pojo.JsonSourceInfo; import org.apache.inlong.sdk.transform.pojo.KvSourceInfo; import org.apache.inlong.sdk.transform.pojo.PbSourceInfo; +import org.apache.inlong.sdk.transform.pojo.YamlSourceInfo; public class SourceDecoderFactory { @@ -48,4 +49,9 @@ public static AvroSourceDecoder createAvroDecoder(AvroSourceInfo sourceInfo) { public static BsonSourceDecoder createBsonDecoder(BsonSourceInfo sourceInfo) { return new BsonSourceDecoder(sourceInfo); } + + public static YamlSourceDecoder createYamlDecoder(YamlSourceInfo sourceInfo) { + return new YamlSourceDecoder(sourceInfo); + } + } diff --git a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/decode/YamlNode.java b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/decode/YamlNode.java new file mode 100644 index 00000000000..097ef854d15 --- /dev/null +++ b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/decode/YamlNode.java @@ -0,0 +1,35 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.inlong.sdk.transform.decode; + +import lombok.Data; + +@Data +public class YamlNode { + + private String name; + private Object value; + + public YamlNode() { + } + + public YamlNode(String name, Object value) { + this.name = name; + this.value = value; + } +} diff --git a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/decode/YamlSourceData.java b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/decode/YamlSourceData.java new file mode 100644 index 00000000000..a9d3162fd79 --- /dev/null +++ b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/decode/YamlSourceData.java @@ -0,0 +1,99 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.inlong.sdk.transform.decode; + +import java.util.List; +import java.util.Map; + +public class YamlSourceData implements SourceData { + + public static final String ROOT_KEY = "$root"; + + public static final String CHILD_KEY = "$child"; + + private YamlNode root; + + private YamlNode childRoot; + + public YamlSourceData(YamlNode root, YamlNode childRoot) { + this.root = root; + this.childRoot = childRoot; + } + @Override + public int getRowCount() { + if (this.childRoot == null) { + return 1; + } else { + Object value = this.childRoot.getValue(); + if (value instanceof List) { + return ((List) value).size(); + } else { + return 1; + } + } + } + + @Override + public String getField(int rowNum, String fieldName) { + try { + String[] nodeString = fieldName.split("\\."); + Object cur = null, last = null; + int start = -1; + + if (nodeString[0].equals(ROOT_KEY)) { + cur = root; + } else if (nodeString[0].equals(CHILD_KEY)) { + cur = ((List) childRoot.getValue()).get(rowNum); + } + + for (int i = 1; i < nodeString.length; i++) { + if (cur == null) { + cur = last; + continue; + } + last = cur; + if (cur instanceof List) { + int idx = 0; + start = nodeString[i].indexOf('('); + if (start != -1) { + idx = Integer.parseInt(nodeString[1].substring(start + 1, nodeString[1].indexOf(')'))); + } + cur = ((List) cur).get(idx); + } else if (cur instanceof Map) { + start = nodeString[i].indexOf('('); + String key = nodeString[i]; + if (start != -1) { + key = key.substring(0, start); + } + cur = ((Map) cur).get(key); + } else if (cur instanceof YamlNode) { + cur = ((YamlNode) cur).getValue(); + } else { + i++; + } + i--; + } + if (cur == null) { + return ""; + } + return cur.toString(); + } catch (Exception e) { + return ""; + } + } +} diff --git a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/decode/YamlSourceDecoder.java b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/decode/YamlSourceDecoder.java new file mode 100644 index 00000000000..b7a2ba915f3 --- /dev/null +++ b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/decode/YamlSourceDecoder.java @@ -0,0 +1,121 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.inlong.sdk.transform.decode; + +import org.apache.inlong.sdk.transform.pojo.YamlSourceInfo; +import org.apache.inlong.sdk.transform.process.Context; + +import lombok.extern.slf4j.Slf4j; +import org.apache.commons.lang3.StringUtils; +import org.apache.flink.shaded.jackson2.org.yaml.snakeyaml.Yaml; + +import java.nio.charset.Charset; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +@Slf4j +public class YamlSourceDecoder implements SourceDecoder { + + protected YamlSourceInfo sourceInfo; + private Charset srcCharset = Charset.defaultCharset(); + private String rowsNodePath; + private List childNodes; + + public YamlSourceDecoder(YamlSourceInfo sourceInfo) { + this.sourceInfo = sourceInfo; + if (!StringUtils.isBlank(sourceInfo.getCharset())) { + this.srcCharset = Charset.forName(sourceInfo.getCharset()); + } + this.rowsNodePath = sourceInfo.getRowsNodePath(); + if (!StringUtils.isBlank(rowsNodePath)) { + this.childNodes = new ArrayList<>(); + String[] nodeStrings = this.rowsNodePath.split("\\."); + childNodes.addAll(Arrays.asList(nodeStrings)); + } + } + @Override + public SourceData decode(byte[] srcBytes, Context context) { + String srcString = new String(srcBytes, srcCharset); + return this.decode(srcString, context); + } + + @Override + public SourceData decode(String srcString, Context context) { + try { + Yaml yaml = new Yaml(); + Map yamlData = yaml.load(srcString); + Map rootMap = new HashMap<>(); + List childList = new ArrayList<>(); + + for (Map.Entry entry : yamlData.entrySet()) { + String key = entry.getKey(); + Object value = entry.getValue(); + + if (value instanceof Map) { + Map childNodes = parser((Map) value); + rootMap.put(key, new YamlNode(key, childNodes)); + } else if (value instanceof List) { + for (Object item : (List) value) { + if (item instanceof Map) { + Map childNodes = parser((Map) item); + childList.add(new YamlNode(key, childNodes)); + } else { + childList.add(new YamlNode(key, item)); + } + } + rootMap.put(key, new YamlNode(key, value)); + } else { + rootMap.put(key, new YamlNode(key, value)); + } + } + YamlNode root = new YamlNode(YamlSourceData.ROOT_KEY, rootMap.isEmpty() ? null : rootMap); + YamlNode childRoot = new YamlNode(YamlSourceData.CHILD_KEY, rowsNodePath.isEmpty() ? null : childList); + return new YamlSourceData(root, childRoot); + } catch (Exception e) { + log.error("Data parsing failed", e); + return null; + } + } + + private static Map parser(Map yamlData) { + Map yamlNodes = new HashMap<>(); + for (Map.Entry entry : yamlData.entrySet()) { + String key = entry.getKey(); + Object value = entry.getValue(); + if (value instanceof Map) { + yamlNodes.put(key, new YamlNode(key, parser((Map) value))); + } else if (value instanceof List) { + List list = new ArrayList<>(); + for (Object item : (List) value) { + if (item instanceof Map) { + list.add(new YamlNode(key, parser((Map) item))); + } else { + list.add(new YamlNode(key, item)); + } + } + yamlNodes.put(key, new YamlNode(key, list)); + } else { + yamlNodes.put(key, new YamlNode(key, value)); + } + } + return yamlNodes; + } +} diff --git a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/pojo/YamlSourceInfo.java b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/pojo/YamlSourceInfo.java new file mode 100644 index 00000000000..0538a5a9ba4 --- /dev/null +++ b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/pojo/YamlSourceInfo.java @@ -0,0 +1,53 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.inlong.sdk.transform.pojo; + +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonIgnoreProperties; +import com.fasterxml.jackson.annotation.JsonProperty; + +@JsonIgnoreProperties(ignoreUnknown = true) +public class YamlSourceInfo extends SourceInfo { + + private String rowsNodePath; + + @JsonCreator + public YamlSourceInfo( + @JsonProperty("charset") String charset, + @JsonProperty("rowsNodePath") String rowsNodePath) { + super(charset); + this.rowsNodePath = rowsNodePath; + } + + /** + * get rowsNodePath + * @return the rowsNodePath + */ + @JsonProperty("rowsNodePath") + public String getRowsNodePath() { + return rowsNodePath; + } + + /** + * set rowsNodePath + * @param rowsNodePath the rowsNodePath to set + */ + public void setRowsNodePath(String rowsNodePath) { + this.rowsNodePath = rowsNodePath; + } +} diff --git a/inlong-sdk/transform-sdk/src/test/java/org/apache/inlong/sdk/transform/process/processor/TestYaml2CsvProcessor.java b/inlong-sdk/transform-sdk/src/test/java/org/apache/inlong/sdk/transform/process/processor/TestYaml2CsvProcessor.java new file mode 100644 index 00000000000..5839820ae79 --- /dev/null +++ b/inlong-sdk/transform-sdk/src/test/java/org/apache/inlong/sdk/transform/process/processor/TestYaml2CsvProcessor.java @@ -0,0 +1,130 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.inlong.sdk.transform.process.processor; + +import org.apache.inlong.sdk.transform.decode.SourceDecoderFactory; +import org.apache.inlong.sdk.transform.encode.SinkEncoderFactory; +import org.apache.inlong.sdk.transform.pojo.CsvSinkInfo; +import org.apache.inlong.sdk.transform.pojo.FieldInfo; +import org.apache.inlong.sdk.transform.pojo.TransformConfig; +import org.apache.inlong.sdk.transform.pojo.YamlSourceInfo; +import org.apache.inlong.sdk.transform.process.TransformProcessor; + +import org.junit.Assert; +import org.junit.Test; + +import java.util.HashMap; +import java.util.List; + +public class TestYaml2CsvProcessor extends AbstractProcessorTestBase { + + @Test + public void testYaml2Csv() throws Exception { + List fields = null; + YamlSourceInfo yamlSource = null; + CsvSinkInfo csvSink = null; + String transformSql = null; + TransformConfig config = null; + TransformProcessor processor = null; + String srcString = null; + List output = null; + + // case1 + fields = this.getTestFieldList("sid", "packageID", "msgTime", "msg"); + yamlSource = new YamlSourceInfo("UTF-8", "msgs"); + csvSink = new CsvSinkInfo("UTF-8", '|', '\\', fields); + transformSql = "select $root.sid,$root.packageID,$child.data,$child.msgTime from source"; + config = new TransformConfig(transformSql); + processor = TransformProcessor + .create(config, SourceDecoderFactory.createYamlDecoder(yamlSource), + SinkEncoderFactory.createCsvEncoder(csvSink)); + srcString = "sid: sid1\n" + + "packageID: pid1\n" + + "msgs:\n" + + " - data: value1\n" + + " msgTime: Time1\n" + + " - data: value2\n" + + " msgTime: Time2\n"; + output = processor.transform(srcString, new HashMap<>()); + Assert.assertEquals(2, output.size()); + Assert.assertEquals(output.get(0), "sid1|pid1|value1|Time1"); + Assert.assertEquals(output.get(1), "sid1|pid1|value2|Time2"); + + // case2 + yamlSource = new YamlSourceInfo("UTF-8", "Persons"); + csvSink = new CsvSinkInfo("UTF-8", '|', '\\', fields); + transformSql = "select $root.sid,$root.packageID,$child.data,$child.habbies(2).name from source"; + config = new TransformConfig(transformSql); + processor = TransformProcessor + .create(config, SourceDecoderFactory.createYamlDecoder(yamlSource), + SinkEncoderFactory.createCsvEncoder(csvSink)); + srcString = "sid: sid\n" + + "packageID: pid\n" + + "Persons:\n" + + " - data: value1\n" + + " habbies:\n" + + " - index: 1\n" + + " name: sing1\n" + + " - index: 2\n" + + " name: dance1\n" + + " - index: 3\n" + + " name: rap1\n" + + " - data: value2\n" + + " habbies:\n" + + " - index: 1\n" + + " name: sing2\n" + + " - index: 2\n" + + " name: dance2\n" + + " - index: 3\n" + + " name: rap2\n"; + output = processor.transform(srcString, new HashMap<>()); + Assert.assertEquals(2, output.size()); + Assert.assertEquals("sid|pid|value1|rap1", output.get(0)); + Assert.assertEquals("sid|pid|value2|rap2", output.get(1)); + } + + @Test + public void testYaml2CsvForOne() throws Exception { + List fields = null; + YamlSourceInfo yamlSource = null; + CsvSinkInfo csvSink = null; + String transformSql = null; + TransformConfig config = null; + TransformProcessor processor = null; + String srcString = null; + List output = null; + fields = this.getTestFieldList("sid", "packageID", "msgTime", "msg"); + yamlSource = new YamlSourceInfo("UTF-8", ""); + csvSink = new CsvSinkInfo("UTF-8", '|', '\\', fields); + transformSql = "select $root.sid,$root.packageID,$root.msgs(1).msgTime,$root.msgs(0).data from source"; + config = new TransformConfig(transformSql); + processor = TransformProcessor.create(config, + SourceDecoderFactory.createYamlDecoder(yamlSource), SinkEncoderFactory.createCsvEncoder(csvSink)); + srcString = "Message:\n" + + "sid: sid1\n" + + "packageID: pid1\n" + + "msgs:\n" + + " - data: value1\n" + + " msgTime: Time1\n" + + " - data: value2\n" + + " msgTime: Time2\n"; + output = processor.transform(srcString, new HashMap<>()); + Assert.assertEquals(1, output.size()); + Assert.assertEquals(output.get(0), "sid1|pid1|Time2|value1"); + } +}