diff --git a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/decode/YamlNode.java b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/decode/YamlNode.java index 097ef854d15..6a348dbde8e 100644 --- a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/decode/YamlNode.java +++ b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/decode/YamlNode.java @@ -18,18 +18,30 @@ package org.apache.inlong.sdk.transform.decode; import lombok.Data; +import org.apache.commons.lang.math.NumberUtils; +import org.apache.commons.lang3.StringUtils; @Data public class YamlNode { private String name; - private Object value; + private boolean isArray = false; + private int arrayIndex = -1; - public YamlNode() { - } - - public YamlNode(String name, Object value) { - this.name = name; - this.value = value; + public YamlNode(String nodeString) { + int beginIndex = nodeString.indexOf('('); + if (beginIndex < 0) { + this.name = nodeString; + } else { + this.name = StringUtils.trim(nodeString.substring(0, beginIndex)); + int endIndex = nodeString.lastIndexOf(')'); + if (endIndex >= 0) { + this.isArray = true; + this.arrayIndex = NumberUtils.toInt(nodeString.substring(beginIndex + 1, endIndex), -1); + if (this.arrayIndex < 0) { + this.arrayIndex = 0; + } + } + } } } diff --git a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/decode/YamlSourceData.java b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/decode/YamlSourceData.java index a9d3162fd79..7006501a06a 100644 --- a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/decode/YamlSourceData.java +++ b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/decode/YamlSourceData.java @@ -26,13 +26,19 @@ public class YamlSourceData implements SourceData { public static final String CHILD_KEY = "$child"; - private YamlNode root; + private YamlO root; private YamlNode childRoot; public YamlSourceData(YamlNode root, YamlNode childRoot) { this.root = root; - this.childRoot = childRoot; + this.childRoot = new YamlNode(); + if (childRoot != null) { + Object value = childRoot.getValue(); + if (value instanceof List) { + this.childRoot = childRoot; + } + } } @Override public int getRowCount() { diff --git a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/decode/YamlSourceDecoder.java b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/decode/YamlSourceDecoder.java index 0f99c142ed1..18aa5ae1dad 100644 --- a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/decode/YamlSourceDecoder.java +++ b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/decode/YamlSourceDecoder.java @@ -17,12 +17,11 @@ package org.apache.inlong.sdk.transform.decode; -import org.apache.inlong.sdk.transform.pojo.YamlSourceInfo; -import org.apache.inlong.sdk.transform.process.Context; - import lombok.extern.slf4j.Slf4j; import org.apache.commons.lang3.StringUtils; import org.apache.flink.shaded.jackson2.org.yaml.snakeyaml.Yaml; +import org.apache.inlong.sdk.transform.pojo.YamlSourceInfo; +import org.apache.inlong.sdk.transform.process.Context; import java.nio.charset.Charset; import java.util.ArrayList; diff --git a/inlong-sdk/transform-sdk/src/test/java/org/apache/inlong/sdk/transform/process/TestTransformProcessor.java b/inlong-sdk/transform-sdk/src/test/java/org/apache/inlong/sdk/transform/process/TestTransformProcessor.java index a372c76a39d..e2c5503315f 100644 --- a/inlong-sdk/transform-sdk/src/test/java/org/apache/inlong/sdk/transform/process/TestTransformProcessor.java +++ b/inlong-sdk/transform-sdk/src/test/java/org/apache/inlong/sdk/transform/process/TestTransformProcessor.java @@ -469,25 +469,35 @@ public void testYaml2Csv() throws Exception { processor = TransformProcessor .create(config, SourceDecoderFactory.createYamlDecoder(yamlSource), SinkEncoderFactory.createCsvEncoder(csvSink)); - srcString = "sid: sid\n" + - "packageID: pid\n" + - "Persons:\n" + - " - data: value1\n" + - " habbies:\n" + - " - index: 1\n" + - " name: sing1\n" + - " - index: 2\n" + - " name: dance1\n" + - " - index: 3\n" + - " name: rap1\n" + - " - data: value2\n" + - " habbies:\n" + - " - index: 1\n" + - " name: sing2\n" + - " - index: 2\n" + - " name: dance2\n" + - " - index: 3\n" + - " name: rap2\n"; + srcString = " sid: sid\n" + + " packageID: pid\n" + + " Persons:\n" + + " - Person:\n" + + " data: value1\n" + + " msgTime: time1\n" + + " habbies:\n" + + " - habby:\n" + + " index: 1\n" + + " name: sing1\n" + + " - habby:\n" + + " index: 2\n" + + " name: dance1\n" + + " - habby:\n" + + " index: 3\n" + + " name: rap1\n" + + " - Person:\n" + + " data: value2\n" + + " msgTime: time2\n" + + " habbies:\n" + + " - habby:\n" + + " index: 1\n" + + " name: sing2\n" + + " - habby:\n" + + " index: 2\n" + + " name: dance2\n" + + " - habby:\n" + + " index: 3\n" + + " name: rap2\n"; output = processor.transform(srcString, new HashMap<>()); Assert.assertEquals(2, output.size()); Assert.assertEquals("sid|pid|value1|rap1", output.get(0)); @@ -508,7 +518,7 @@ public void testYaml2CsvForOne() throws Exception { fields = this.getTestFieldList("sid", "packageID", "msgTime", "msg"); yamlSource = new YamlSourceInfo("UTF-8", ""); csvSink = new CsvSinkInfo("UTF-8", '|', '\\', fields); - transformSql = "select $root.sid,$root.packageID,$root.msgs(1).msgTime,$root.msgs(0).data from source"; + transformSql = "select $root.msgs(1).msgTime,$root.msgs(0).data from source"; config = new TransformConfig(transformSql); processor = TransformProcessor.create(config, SourceDecoderFactory.createYamlDecoder(yamlSource), SinkEncoderFactory.createCsvEncoder(csvSink)); @@ -521,8 +531,8 @@ public void testYaml2CsvForOne() throws Exception { " - data: value2\n" + " msgTime: Time2\n"; output = processor.transform(srcString, new HashMap<>()); - Assert.assertEquals(2, output.size()); + Assert.assertEquals(1, output.size()); Assert.assertEquals(output.get(0), "sid1|pid1|Time2|value1"); - Assert.assertEquals(output.get(1), "sid1|pid1|Time2|value1"); + //Assert.assertEquals(output.get(1), "sid1|pid1|Time2|value1"); } }