From 6203004031d3109e32c6bb90231f5f3a5bacb84f Mon Sep 17 00:00:00 2001 From: vernedeng Date: Sun, 15 Sep 2024 19:48:14 +0800 Subject: [PATCH 01/11] [INLONG-11105][SDK] Fix the empty string is converted to a "null" (#11106) * [INLONG-11105][SDK] Fix the empty string is converted into a "null" * fix ut * fix --- .../sdk/transform/process/TransformProcessor.java | 6 +++++- .../process/TestParseUrlFunctionProcessor.java | 6 +++--- .../process/TestTransformChrFunctionProcessor.java | 2 +- .../TestTransformConcatWsFunctionProcessor.java | 2 +- .../process/function/arithmetic/TestBinFunction.java | 2 +- .../function/arithmetic/TestIfNullFunction.java | 2 +- .../process/function/arithmetic/TestMd5Function.java | 2 +- .../process/function/arithmetic/TestSha2Function.java | 4 ++-- .../process/function/arithmetic/TestShaFunction.java | 2 +- .../function/flowcontrol/TestNullIfFunction.java | 4 ++-- .../process/function/string/TestAsciiFunction.java | 2 +- .../process/function/string/TestBitLengthFunction.java | 2 +- .../process/function/string/TestCompressFunction.java | 4 ++-- .../function/string/TestFromBase64Function.java | 2 +- .../process/function/string/TestLcaseFunction.java | 2 +- .../process/function/string/TestLeftFunction.java | 4 ++-- .../process/function/string/TestLengthFunction.java | 2 +- .../process/function/string/TestLocateFunction.java | 2 +- .../process/function/string/TestLowerFunction.java | 2 +- .../process/function/string/TestLpadFunction.java | 8 ++++---- .../process/function/string/TestLtrimFunction.java | 2 +- .../process/function/string/TestRightFunction.java | 4 ++-- .../process/function/string/TestRpadFunction.java | 8 ++++---- .../process/function/string/TestRtrimFunction.java | 2 +- .../process/function/string/TestSpaceFunction.java | 2 +- .../function/string/TestSplitIndexFunction.java | 10 +++++----- .../process/function/string/TestStrcmpFunction.java | 2 +- .../process/function/string/TestUcaseFunction.java | 2 +- .../function/string/TestUnCompressFunction.java | 8 ++++---- .../process/function/string/TestUpperFunction.java | 2 +- .../function/temporal/TestDateDiffFunction.java | 6 +++--- 31 files changed, 57 insertions(+), 53 deletions(-) diff --git a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/TransformProcessor.java b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/TransformProcessor.java index 32024ec1bea..6d650cfd4a5 100644 --- a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/TransformProcessor.java +++ b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/TransformProcessor.java @@ -177,7 +177,11 @@ public List transform(I input, Map extParams) { } try { Object fieldValue = parser.parse(sourceData, i, context); - sinkData.addField(fieldName, String.valueOf(fieldValue)); + if (fieldValue == null) { + sinkData.addField(fieldName, ""); + } else { + sinkData.addField(fieldName, fieldValue.toString()); + } } catch (Throwable t) { sinkData.addField(fieldName, ""); } diff --git a/inlong-sdk/transform-sdk/src/test/java/org/apache/inlong/sdk/transform/process/TestParseUrlFunctionProcessor.java b/inlong-sdk/transform-sdk/src/test/java/org/apache/inlong/sdk/transform/process/TestParseUrlFunctionProcessor.java index 407fa72d0ac..5850d50d4b5 100644 --- a/inlong-sdk/transform-sdk/src/test/java/org/apache/inlong/sdk/transform/process/TestParseUrlFunctionProcessor.java +++ b/inlong-sdk/transform-sdk/src/test/java/org/apache/inlong/sdk/transform/process/TestParseUrlFunctionProcessor.java @@ -111,7 +111,7 @@ public void testParseUrlFunction() throws Exception { List output5 = processor5.transform("http://facebook.com/path1/p.php?k1=v1&k2=v2#Ref1|QUERY|k1|cloud|1", new HashMap<>()); Assert.assertEquals(1, output5.size()); - Assert.assertEquals(output5.get(0), "result=null"); + Assert.assertEquals(output5.get(0), "result="); String transformSql6 = "select parse_url(string1, string2, stringX) from source"; TransformConfig config6 = new TransformConfig(transformSql6); @@ -122,7 +122,7 @@ public void testParseUrlFunction() throws Exception { List output6 = processor6.transform("http://facebook.com/path1/p.php?k1=v1&k2=v2#Ref1|QUERY|k1|cloud|1", new HashMap<>()); Assert.assertEquals(1, output6.size()); - Assert.assertEquals(output6.get(0), "result=null"); + Assert.assertEquals(output6.get(0), "result="); Assert.assertEquals(output3.get(0), "result=v1"); String transformSql7 = "select parse_url(string1, string2) from source"; @@ -145,7 +145,7 @@ public void testParseUrlFunction() throws Exception { List output8 = processor8 .transform("http://facebook.com/path1/p.php?k1=v1&k2=v2#Ref1|USERINFO|k1|cloud|1", new HashMap<>()); Assert.assertEquals(1, output8.size()); - Assert.assertEquals(output8.get(0), "result=null"); + Assert.assertEquals(output8.get(0), "result="); String transformSql9 = "select parse_url(string1, string2) from source"; TransformConfig config9 = new TransformConfig(transformSql9); diff --git a/inlong-sdk/transform-sdk/src/test/java/org/apache/inlong/sdk/transform/process/TestTransformChrFunctionProcessor.java b/inlong-sdk/transform-sdk/src/test/java/org/apache/inlong/sdk/transform/process/TestTransformChrFunctionProcessor.java index f89ef4c3b2d..4bf2fe8760b 100644 --- a/inlong-sdk/transform-sdk/src/test/java/org/apache/inlong/sdk/transform/process/TestTransformChrFunctionProcessor.java +++ b/inlong-sdk/transform-sdk/src/test/java/org/apache/inlong/sdk/transform/process/TestTransformChrFunctionProcessor.java @@ -80,6 +80,6 @@ public void testChrFunction() throws Exception { // case3: chr(null) List output3 = processor2.transform("|5|6|8|1|9", new HashMap<>()); Assert.assertEquals(1, output3.size()); - Assert.assertEquals(output3.get(0), "result=null"); + Assert.assertEquals(output3.get(0), "result="); } } diff --git a/inlong-sdk/transform-sdk/src/test/java/org/apache/inlong/sdk/transform/process/TestTransformConcatWsFunctionProcessor.java b/inlong-sdk/transform-sdk/src/test/java/org/apache/inlong/sdk/transform/process/TestTransformConcatWsFunctionProcessor.java index 76c253ea611..fae6af0e0b9 100644 --- a/inlong-sdk/transform-sdk/src/test/java/org/apache/inlong/sdk/transform/process/TestTransformConcatWsFunctionProcessor.java +++ b/inlong-sdk/transform-sdk/src/test/java/org/apache/inlong/sdk/transform/process/TestTransformConcatWsFunctionProcessor.java @@ -95,7 +95,7 @@ public void testConcatWsFunction() throws Exception { SinkEncoderFactory.createKvEncoder(kvSink)); List output4 = processor4.transform("apple|null|cloud|extra", new HashMap<>()); Assert.assertEquals(1, output4.size()); - Assert.assertEquals(output4.get(0), "result=null"); + Assert.assertEquals(output4.get(0), "result="); // case 5: concat_ws('-', '', '', '') String transformSql5 = "select concat_ws('-', string1, string2, string3) from source"; diff --git a/inlong-sdk/transform-sdk/src/test/java/org/apache/inlong/sdk/transform/process/function/arithmetic/TestBinFunction.java b/inlong-sdk/transform-sdk/src/test/java/org/apache/inlong/sdk/transform/process/function/arithmetic/TestBinFunction.java index be69e35b6f4..bc793f57dc4 100644 --- a/inlong-sdk/transform-sdk/src/test/java/org/apache/inlong/sdk/transform/process/function/arithmetic/TestBinFunction.java +++ b/inlong-sdk/transform-sdk/src/test/java/org/apache/inlong/sdk/transform/process/function/arithmetic/TestBinFunction.java @@ -48,6 +48,6 @@ public void testBinFunction() throws Exception { // case: bin() List output2 = processor2.transform("1|2|3|4", new HashMap<>()); Assert.assertEquals(1, output1.size()); - Assert.assertEquals(output2.get(0), "result=null"); + Assert.assertEquals(output2.get(0), "result="); } } diff --git a/inlong-sdk/transform-sdk/src/test/java/org/apache/inlong/sdk/transform/process/function/arithmetic/TestIfNullFunction.java b/inlong-sdk/transform-sdk/src/test/java/org/apache/inlong/sdk/transform/process/function/arithmetic/TestIfNullFunction.java index 5e86e534e17..418ef5c41d5 100644 --- a/inlong-sdk/transform-sdk/src/test/java/org/apache/inlong/sdk/transform/process/function/arithmetic/TestIfNullFunction.java +++ b/inlong-sdk/transform-sdk/src/test/java/org/apache/inlong/sdk/transform/process/function/arithmetic/TestIfNullFunction.java @@ -90,6 +90,6 @@ public void testIfNullFunction() throws Exception { data = "6|0|3|5"; output = processor.transform(data, new HashMap<>()); Assert.assertEquals(1, output.size()); - Assert.assertEquals("result=null", output.get(0)); + Assert.assertEquals("result=", output.get(0)); } } diff --git a/inlong-sdk/transform-sdk/src/test/java/org/apache/inlong/sdk/transform/process/function/arithmetic/TestMd5Function.java b/inlong-sdk/transform-sdk/src/test/java/org/apache/inlong/sdk/transform/process/function/arithmetic/TestMd5Function.java index 47f3c92d00f..b597cfc6fc6 100644 --- a/inlong-sdk/transform-sdk/src/test/java/org/apache/inlong/sdk/transform/process/function/arithmetic/TestMd5Function.java +++ b/inlong-sdk/transform-sdk/src/test/java/org/apache/inlong/sdk/transform/process/function/arithmetic/TestMd5Function.java @@ -59,6 +59,6 @@ public void testMd5Function() throws Exception { SinkEncoderFactory.createKvEncoder(kvSink)); List output4 = processor.transform("1|4|6|8"); Assert.assertEquals(1, output4.size()); - Assert.assertEquals("result=null", output4.get(0)); + Assert.assertEquals("result=", output4.get(0)); } } diff --git a/inlong-sdk/transform-sdk/src/test/java/org/apache/inlong/sdk/transform/process/function/arithmetic/TestSha2Function.java b/inlong-sdk/transform-sdk/src/test/java/org/apache/inlong/sdk/transform/process/function/arithmetic/TestSha2Function.java index 2007499a415..c87a66cd79b 100644 --- a/inlong-sdk/transform-sdk/src/test/java/org/apache/inlong/sdk/transform/process/function/arithmetic/TestSha2Function.java +++ b/inlong-sdk/transform-sdk/src/test/java/org/apache/inlong/sdk/transform/process/function/arithmetic/TestSha2Function.java @@ -46,7 +46,7 @@ public void testSha2Function() throws Exception { data = "|3|3|5"; output = processor.transform(data, new HashMap<>()); Assert.assertEquals(1, output.size()); - Assert.assertEquals("result=null", output.get(0)); + Assert.assertEquals("result=", output.get(0)); // case2: sha2("5",224) data = "5|224|3|5"; @@ -69,6 +69,6 @@ public void testSha2Function() throws Exception { data = "3|224|3|5"; output = processor.transform(data, new HashMap<>()); Assert.assertEquals(1, output.size()); - Assert.assertEquals("result=null", output.get(0)); + Assert.assertEquals("result=", output.get(0)); } } diff --git a/inlong-sdk/transform-sdk/src/test/java/org/apache/inlong/sdk/transform/process/function/arithmetic/TestShaFunction.java b/inlong-sdk/transform-sdk/src/test/java/org/apache/inlong/sdk/transform/process/function/arithmetic/TestShaFunction.java index 087a8e44564..3a256c38763 100644 --- a/inlong-sdk/transform-sdk/src/test/java/org/apache/inlong/sdk/transform/process/function/arithmetic/TestShaFunction.java +++ b/inlong-sdk/transform-sdk/src/test/java/org/apache/inlong/sdk/transform/process/function/arithmetic/TestShaFunction.java @@ -63,6 +63,6 @@ public void testShaFunction() throws Exception { data = "3|3|3|5"; output = processor.transform(data, new HashMap<>()); Assert.assertEquals(1, output.size()); - Assert.assertEquals("result=null", output.get(0)); + Assert.assertEquals("result=", output.get(0)); } } diff --git a/inlong-sdk/transform-sdk/src/test/java/org/apache/inlong/sdk/transform/process/function/flowcontrol/TestNullIfFunction.java b/inlong-sdk/transform-sdk/src/test/java/org/apache/inlong/sdk/transform/process/function/flowcontrol/TestNullIfFunction.java index 868e922cc94..8c3f1f5c34a 100644 --- a/inlong-sdk/transform-sdk/src/test/java/org/apache/inlong/sdk/transform/process/function/flowcontrol/TestNullIfFunction.java +++ b/inlong-sdk/transform-sdk/src/test/java/org/apache/inlong/sdk/transform/process/function/flowcontrol/TestNullIfFunction.java @@ -53,7 +53,7 @@ public void testNullIfFunction() throws Exception { data = "5|5|3|5"; output = processor.transform(data, new HashMap<>()); Assert.assertEquals(1, output.size()); - Assert.assertEquals("result=null", output.get(0)); + Assert.assertEquals("result=", output.get(0)); // case3: nullif(null,3) transformSql = "select nullif(xxd,numeric2) from source"; @@ -64,6 +64,6 @@ public void testNullIfFunction() throws Exception { data = "5|3|3|5"; output = processor.transform(data, new HashMap<>()); Assert.assertEquals(1, output.size()); - Assert.assertEquals("result=null", output.get(0)); + Assert.assertEquals("result=", output.get(0)); } } diff --git a/inlong-sdk/transform-sdk/src/test/java/org/apache/inlong/sdk/transform/process/function/string/TestAsciiFunction.java b/inlong-sdk/transform-sdk/src/test/java/org/apache/inlong/sdk/transform/process/function/string/TestAsciiFunction.java index 10d0c4032b5..35ee9bc4c97 100644 --- a/inlong-sdk/transform-sdk/src/test/java/org/apache/inlong/sdk/transform/process/function/string/TestAsciiFunction.java +++ b/inlong-sdk/transform-sdk/src/test/java/org/apache/inlong/sdk/transform/process/function/string/TestAsciiFunction.java @@ -57,6 +57,6 @@ public void testAsciiFunction() throws Exception { // case3: ascii(null) -> null List output2 = processor2.transform("A|el|EL|2|1|3", new HashMap<>()); Assert.assertEquals(1, output2.size()); - Assert.assertEquals(output2.get(0), "result=null"); + Assert.assertEquals(output2.get(0), "result="); } } diff --git a/inlong-sdk/transform-sdk/src/test/java/org/apache/inlong/sdk/transform/process/function/string/TestBitLengthFunction.java b/inlong-sdk/transform-sdk/src/test/java/org/apache/inlong/sdk/transform/process/function/string/TestBitLengthFunction.java index 70de7aa31c3..88ef27b1216 100644 --- a/inlong-sdk/transform-sdk/src/test/java/org/apache/inlong/sdk/transform/process/function/string/TestBitLengthFunction.java +++ b/inlong-sdk/transform-sdk/src/test/java/org/apache/inlong/sdk/transform/process/function/string/TestBitLengthFunction.java @@ -56,7 +56,7 @@ public void testBitLengthFunction() throws Exception { // case3: bit_length(null) output1 = processor1.transform("hello world|apple|cloud|2|1|3", new HashMap<>()); Assert.assertEquals(1, output1.size()); - Assert.assertEquals("result=null", output1.get(0)); + Assert.assertEquals("result=", output1.get(0)); transformSql = "select bit_length(string1,string2) from source"; config = new TransformConfig(transformSql); diff --git a/inlong-sdk/transform-sdk/src/test/java/org/apache/inlong/sdk/transform/process/function/string/TestCompressFunction.java b/inlong-sdk/transform-sdk/src/test/java/org/apache/inlong/sdk/transform/process/function/string/TestCompressFunction.java index 0b8b2f2f1e5..dd01c06ecb5 100644 --- a/inlong-sdk/transform-sdk/src/test/java/org/apache/inlong/sdk/transform/process/function/string/TestCompressFunction.java +++ b/inlong-sdk/transform-sdk/src/test/java/org/apache/inlong/sdk/transform/process/function/string/TestCompressFunction.java @@ -60,7 +60,7 @@ public void testCompressFunction() throws Exception { // case3: length(compress(null)) output1 = processor1.transform("hello world|apple|cloud|2|1|3", new HashMap<>()); Assert.assertEquals(1, output1.size()); - Assert.assertEquals("result=null", output1.get(0)); + Assert.assertEquals("result=", output1.get(0)); transformSql = "select length(compress(string1,string2)) from source"; config = new TransformConfig(transformSql); @@ -80,6 +80,6 @@ public void testCompressFunction() throws Exception { // case5: length(compress('hello world','undefinedType')) output1 = processor1.transform("hello world|undefinedType|cloud|2|1|3", new HashMap<>()); Assert.assertEquals(1, output1.size()); - Assert.assertEquals("result=null", output1.get(0)); + Assert.assertEquals("result=", output1.get(0)); } } diff --git a/inlong-sdk/transform-sdk/src/test/java/org/apache/inlong/sdk/transform/process/function/string/TestFromBase64Function.java b/inlong-sdk/transform-sdk/src/test/java/org/apache/inlong/sdk/transform/process/function/string/TestFromBase64Function.java index 021ef381d7f..e5754392870 100644 --- a/inlong-sdk/transform-sdk/src/test/java/org/apache/inlong/sdk/transform/process/function/string/TestFromBase64Function.java +++ b/inlong-sdk/transform-sdk/src/test/java/org/apache/inlong/sdk/transform/process/function/string/TestFromBase64Function.java @@ -51,7 +51,7 @@ public void testFromBase64Function() throws Exception { // case2: from_base64(null) -> null List output2 = processor2.transform("|apple|banana|cloud|1", new HashMap<>()); Assert.assertEquals(1, output2.size()); - Assert.assertEquals(output2.get(0), "result=null"); + Assert.assertEquals(output2.get(0), "result="); // case3: from_base64('QXBhY2hlIEluTG9uZw==') -> 'Apache InLong' List output3 = processor.transform("QXBhY2hlIEluTG9uZw==|apple|banana|cloud|1", new HashMap<>()); diff --git a/inlong-sdk/transform-sdk/src/test/java/org/apache/inlong/sdk/transform/process/function/string/TestLcaseFunction.java b/inlong-sdk/transform-sdk/src/test/java/org/apache/inlong/sdk/transform/process/function/string/TestLcaseFunction.java index 355afae2d31..1752de38107 100644 --- a/inlong-sdk/transform-sdk/src/test/java/org/apache/inlong/sdk/transform/process/function/string/TestLcaseFunction.java +++ b/inlong-sdk/transform-sdk/src/test/java/org/apache/inlong/sdk/transform/process/function/string/TestLcaseFunction.java @@ -59,6 +59,6 @@ public void testLcaseFunction() throws Exception { SinkEncoderFactory.createKvEncoder(kvSink)); List output3 = processor2.transform("ApPlE|", new HashMap<>()); Assert.assertEquals(1, output3.size()); - Assert.assertEquals(output3.get(0), "result=null"); + Assert.assertEquals(output3.get(0), "result="); } } diff --git a/inlong-sdk/transform-sdk/src/test/java/org/apache/inlong/sdk/transform/process/function/string/TestLeftFunction.java b/inlong-sdk/transform-sdk/src/test/java/org/apache/inlong/sdk/transform/process/function/string/TestLeftFunction.java index 6663c63d704..f07a205af58 100644 --- a/inlong-sdk/transform-sdk/src/test/java/org/apache/inlong/sdk/transform/process/function/string/TestLeftFunction.java +++ b/inlong-sdk/transform-sdk/src/test/java/org/apache/inlong/sdk/transform/process/function/string/TestLeftFunction.java @@ -64,7 +64,7 @@ public void testLeftFunction() throws Exception { data = "hello world|banana|cloud|5|3|3"; output1 = processor1.transform(data, new HashMap<>()); Assert.assertEquals(1, output1.size()); - Assert.assertEquals("result=null", output1.get(0)); + Assert.assertEquals("result=", output1.get(0)); // case5: left('hello world',null) transformSql = "select left(string1,xxd) from source"; @@ -75,6 +75,6 @@ public void testLeftFunction() throws Exception { data = "hello world|banana|cloud|5|3|3"; output1 = processor1.transform(data, new HashMap<>()); Assert.assertEquals(1, output1.size()); - Assert.assertEquals("result=null", output1.get(0)); + Assert.assertEquals("result=", output1.get(0)); } } diff --git a/inlong-sdk/transform-sdk/src/test/java/org/apache/inlong/sdk/transform/process/function/string/TestLengthFunction.java b/inlong-sdk/transform-sdk/src/test/java/org/apache/inlong/sdk/transform/process/function/string/TestLengthFunction.java index 1094c4f0553..7181dcedf9f 100644 --- a/inlong-sdk/transform-sdk/src/test/java/org/apache/inlong/sdk/transform/process/function/string/TestLengthFunction.java +++ b/inlong-sdk/transform-sdk/src/test/java/org/apache/inlong/sdk/transform/process/function/string/TestLengthFunction.java @@ -50,6 +50,6 @@ public void testLengthFunction() throws Exception { // case2: length(null) output1 = processor1.transform("hello world|apple|cloud|2|1|3", new HashMap<>()); Assert.assertEquals(1, output1.size()); - Assert.assertEquals("result=null", output1.get(0)); + Assert.assertEquals("result=", output1.get(0)); } } diff --git a/inlong-sdk/transform-sdk/src/test/java/org/apache/inlong/sdk/transform/process/function/string/TestLocateFunction.java b/inlong-sdk/transform-sdk/src/test/java/org/apache/inlong/sdk/transform/process/function/string/TestLocateFunction.java index ca21214bc44..f8663d20626 100644 --- a/inlong-sdk/transform-sdk/src/test/java/org/apache/inlong/sdk/transform/process/function/string/TestLocateFunction.java +++ b/inlong-sdk/transform-sdk/src/test/java/org/apache/inlong/sdk/transform/process/function/string/TestLocateFunction.java @@ -61,6 +61,6 @@ public void testLocateFunction() throws Exception { // case5: locate('app', null) List output5 = processor1.transform("app", new HashMap<>()); Assert.assertEquals(1, output5.size()); - Assert.assertEquals(output5.get(0), "result=null"); + Assert.assertEquals(output5.get(0), "result="); } } diff --git a/inlong-sdk/transform-sdk/src/test/java/org/apache/inlong/sdk/transform/process/function/string/TestLowerFunction.java b/inlong-sdk/transform-sdk/src/test/java/org/apache/inlong/sdk/transform/process/function/string/TestLowerFunction.java index 8050372a3c7..211eeaf3f85 100644 --- a/inlong-sdk/transform-sdk/src/test/java/org/apache/inlong/sdk/transform/process/function/string/TestLowerFunction.java +++ b/inlong-sdk/transform-sdk/src/test/java/org/apache/inlong/sdk/transform/process/function/string/TestLowerFunction.java @@ -55,6 +55,6 @@ public void testLowerFunction() throws Exception { SinkEncoderFactory.createKvEncoder(kvSink)); List output3 = processor2.transform("ApPlE|banana|cloud|2|1|3", new HashMap<>()); Assert.assertEquals(1, output3.size()); - Assert.assertEquals(output3.get(0), "result=null"); + Assert.assertEquals(output3.get(0), "result="); } } diff --git a/inlong-sdk/transform-sdk/src/test/java/org/apache/inlong/sdk/transform/process/function/string/TestLpadFunction.java b/inlong-sdk/transform-sdk/src/test/java/org/apache/inlong/sdk/transform/process/function/string/TestLpadFunction.java index 333683aa752..df776232406 100644 --- a/inlong-sdk/transform-sdk/src/test/java/org/apache/inlong/sdk/transform/process/function/string/TestLpadFunction.java +++ b/inlong-sdk/transform-sdk/src/test/java/org/apache/inlong/sdk/transform/process/function/string/TestLpadFunction.java @@ -64,7 +64,7 @@ public void testLpadFunction() throws Exception { data = "he|xxd|cloud|-1|3|3"; output = processor.transform(data, new HashMap<>()); Assert.assertEquals(1, output.size()); - Assert.assertEquals("result=null", output.get(0)); + Assert.assertEquals("result=", output.get(0)); // case5: lpad(null,5,'xxd') transformSql = "select lpad(xxd,numeric1,string2) from source"; @@ -75,7 +75,7 @@ public void testLpadFunction() throws Exception { data = "he|xxd|cloud|5|3|3"; output = processor.transform(data, new HashMap<>()); Assert.assertEquals(1, output.size()); - Assert.assertEquals("result=null", output.get(0)); + Assert.assertEquals("result=", output.get(0)); // case6: lpad('he',null,'xxd') transformSql = "select lpad(string1,xxd,string2) from source"; @@ -86,7 +86,7 @@ public void testLpadFunction() throws Exception { data = "he|xxd|cloud|5|3|3"; output = processor.transform(data, new HashMap<>()); Assert.assertEquals(1, output.size()); - Assert.assertEquals("result=null", output.get(0)); + Assert.assertEquals("result=", output.get(0)); // case7: lpad('he',5,null) transformSql = "select lpad(string1,numeric1,xxd) from source"; @@ -97,6 +97,6 @@ public void testLpadFunction() throws Exception { data = "he|xxd|cloud|5|3|3"; output = processor.transform(data, new HashMap<>()); Assert.assertEquals(1, output.size()); - Assert.assertEquals("result=null", output.get(0)); + Assert.assertEquals("result=", output.get(0)); } } diff --git a/inlong-sdk/transform-sdk/src/test/java/org/apache/inlong/sdk/transform/process/function/string/TestLtrimFunction.java b/inlong-sdk/transform-sdk/src/test/java/org/apache/inlong/sdk/transform/process/function/string/TestLtrimFunction.java index 9697738d8db..07e66485de0 100644 --- a/inlong-sdk/transform-sdk/src/test/java/org/apache/inlong/sdk/transform/process/function/string/TestLtrimFunction.java +++ b/inlong-sdk/transform-sdk/src/test/java/org/apache/inlong/sdk/transform/process/function/string/TestLtrimFunction.java @@ -65,6 +65,6 @@ public void testTrimFunction() throws Exception { data = " in long|xxd|cloud|7|3|3"; output = processor.transform(data, new HashMap<>()); Assert.assertEquals(1, output.size()); - Assert.assertEquals("result=null", output.get(0)); + Assert.assertEquals("result=", output.get(0)); } } diff --git a/inlong-sdk/transform-sdk/src/test/java/org/apache/inlong/sdk/transform/process/function/string/TestRightFunction.java b/inlong-sdk/transform-sdk/src/test/java/org/apache/inlong/sdk/transform/process/function/string/TestRightFunction.java index 9728fda896a..92eb54a1970 100644 --- a/inlong-sdk/transform-sdk/src/test/java/org/apache/inlong/sdk/transform/process/function/string/TestRightFunction.java +++ b/inlong-sdk/transform-sdk/src/test/java/org/apache/inlong/sdk/transform/process/function/string/TestRightFunction.java @@ -64,7 +64,7 @@ public void testRightFunction() throws Exception { data = "hello world|banana|cloud|5|3|3"; output1 = processor1.transform(data, new HashMap<>()); Assert.assertEquals(1, output1.size()); - Assert.assertEquals("result=null", output1.get(0)); + Assert.assertEquals("result=", output1.get(0)); // case5: right('hello world',null) transformSql = "select right(string1,xxd) from source"; @@ -75,6 +75,6 @@ public void testRightFunction() throws Exception { data = "hello world|banana|cloud|5|3|3"; output1 = processor1.transform(data, new HashMap<>()); Assert.assertEquals(1, output1.size()); - Assert.assertEquals("result=null", output1.get(0)); + Assert.assertEquals("result=", output1.get(0)); } } diff --git a/inlong-sdk/transform-sdk/src/test/java/org/apache/inlong/sdk/transform/process/function/string/TestRpadFunction.java b/inlong-sdk/transform-sdk/src/test/java/org/apache/inlong/sdk/transform/process/function/string/TestRpadFunction.java index 7a58a8be439..833332aced4 100644 --- a/inlong-sdk/transform-sdk/src/test/java/org/apache/inlong/sdk/transform/process/function/string/TestRpadFunction.java +++ b/inlong-sdk/transform-sdk/src/test/java/org/apache/inlong/sdk/transform/process/function/string/TestRpadFunction.java @@ -64,7 +64,7 @@ public void testRpadFunction() throws Exception { data = "he|xxd|cloud|-1|3|3"; output = processor.transform(data, new HashMap<>()); Assert.assertEquals(1, output.size()); - Assert.assertEquals("result=null", output.get(0)); + Assert.assertEquals("result=", output.get(0)); // case5: rpad(null,5,'xxd') transformSql = "select rpad(xxd,numeric1,string2) from source"; @@ -75,7 +75,7 @@ public void testRpadFunction() throws Exception { data = "he|xxd|cloud|5|3|3"; output = processor.transform(data, new HashMap<>()); Assert.assertEquals(1, output.size()); - Assert.assertEquals("result=null", output.get(0)); + Assert.assertEquals("result=", output.get(0)); // case6: rpad('he',null,'xxd') transformSql = "select rpad(string1,xxd,string2) from source"; @@ -86,7 +86,7 @@ public void testRpadFunction() throws Exception { data = "he|xxd|cloud|5|3|3"; output = processor.transform(data, new HashMap<>()); Assert.assertEquals(1, output.size()); - Assert.assertEquals("result=null", output.get(0)); + Assert.assertEquals("result=", output.get(0)); // case7: rpad('he',5,null) transformSql = "select rpad(string1,numeric1,xxd) from source"; @@ -97,6 +97,6 @@ public void testRpadFunction() throws Exception { data = "he|xxd|cloud|5|3|3"; output = processor.transform(data, new HashMap<>()); Assert.assertEquals(1, output.size()); - Assert.assertEquals("result=null", output.get(0)); + Assert.assertEquals("result=", output.get(0)); } } diff --git a/inlong-sdk/transform-sdk/src/test/java/org/apache/inlong/sdk/transform/process/function/string/TestRtrimFunction.java b/inlong-sdk/transform-sdk/src/test/java/org/apache/inlong/sdk/transform/process/function/string/TestRtrimFunction.java index 823a200f222..c92cb236654 100644 --- a/inlong-sdk/transform-sdk/src/test/java/org/apache/inlong/sdk/transform/process/function/string/TestRtrimFunction.java +++ b/inlong-sdk/transform-sdk/src/test/java/org/apache/inlong/sdk/transform/process/function/string/TestRtrimFunction.java @@ -65,6 +65,6 @@ public void testTrimFunction() throws Exception { data = " in long|xxd|cloud|7|3|3"; output = processor.transform(data, new HashMap<>()); Assert.assertEquals(1, output.size()); - Assert.assertEquals("result=null", output.get(0)); + Assert.assertEquals("result=", output.get(0)); } } diff --git a/inlong-sdk/transform-sdk/src/test/java/org/apache/inlong/sdk/transform/process/function/string/TestSpaceFunction.java b/inlong-sdk/transform-sdk/src/test/java/org/apache/inlong/sdk/transform/process/function/string/TestSpaceFunction.java index 308c54236cd..70185604ade 100644 --- a/inlong-sdk/transform-sdk/src/test/java/org/apache/inlong/sdk/transform/process/function/string/TestSpaceFunction.java +++ b/inlong-sdk/transform-sdk/src/test/java/org/apache/inlong/sdk/transform/process/function/string/TestSpaceFunction.java @@ -63,7 +63,7 @@ public void testSpaceFunction() throws Exception { data = "hello world|banana|cloud|5|3|3"; output = processor.transform(data, new HashMap<>()); Assert.assertEquals(1, output.size()); - Assert.assertEquals("result=null", output.get(0)); + Assert.assertEquals("result=", output.get(0)); } } diff --git a/inlong-sdk/transform-sdk/src/test/java/org/apache/inlong/sdk/transform/process/function/string/TestSplitIndexFunction.java b/inlong-sdk/transform-sdk/src/test/java/org/apache/inlong/sdk/transform/process/function/string/TestSplitIndexFunction.java index 1ad949aaa44..773f91f6166 100644 --- a/inlong-sdk/transform-sdk/src/test/java/org/apache/inlong/sdk/transform/process/function/string/TestSplitIndexFunction.java +++ b/inlong-sdk/transform-sdk/src/test/java/org/apache/inlong/sdk/transform/process/function/string/TestSplitIndexFunction.java @@ -48,13 +48,13 @@ public void testSplitIndexFunction() throws Exception { data = "a,b,c|,|cloud|-1|3|3"; output = processor.transform(data, new HashMap<>()); Assert.assertEquals(1, output.size()); - Assert.assertEquals("result=null", output.get(0)); + Assert.assertEquals("result=", output.get(0)); // case3: split_index('a,b,c', ',', 3) data = "a,b,c|,|cloud|3|3|3"; output = processor.transform(data, new HashMap<>()); Assert.assertEquals(1, output.size()); - Assert.assertEquals("result=null", output.get(0)); + Assert.assertEquals("result=", output.get(0)); // case4: split_index(null, ',', 1) transformSql = "select split_index(xxd, string2, numeric1) from source"; @@ -65,7 +65,7 @@ public void testSplitIndexFunction() throws Exception { data = "abc|,|cloud|1|3|3"; output = processor.transform(data, new HashMap<>()); Assert.assertEquals(1, output.size()); - Assert.assertEquals("result=null", output.get(0)); + Assert.assertEquals("result=", output.get(0)); // case5: split_index('a,b,c', null, 1) transformSql = "select split_index(string1, xxd, numeric1) from source"; @@ -76,7 +76,7 @@ public void testSplitIndexFunction() throws Exception { data = "a,b,c|xxd|cloud|1|3|3"; output = processor.transform(data, new HashMap<>()); Assert.assertEquals(1, output.size()); - Assert.assertEquals("result=null", output.get(0)); + Assert.assertEquals("result=", output.get(0)); // case6: split_index('a,b,c', ',', null) transformSql = "select split_index(string1, string2, xxd) from source"; @@ -87,7 +87,7 @@ public void testSplitIndexFunction() throws Exception { data = "a,b,c|,|cloud|xxd|3|3"; output = processor.transform(data, new HashMap<>()); Assert.assertEquals(1, output.size()); - Assert.assertEquals("result=null", output.get(0)); + Assert.assertEquals("result=", output.get(0)); // case7: split_index('', ',', 0) transformSql = "select split_index(string1, string2, numeric1) from source"; diff --git a/inlong-sdk/transform-sdk/src/test/java/org/apache/inlong/sdk/transform/process/function/string/TestStrcmpFunction.java b/inlong-sdk/transform-sdk/src/test/java/org/apache/inlong/sdk/transform/process/function/string/TestStrcmpFunction.java index 3534da6189d..58817407f61 100644 --- a/inlong-sdk/transform-sdk/src/test/java/org/apache/inlong/sdk/transform/process/function/string/TestStrcmpFunction.java +++ b/inlong-sdk/transform-sdk/src/test/java/org/apache/inlong/sdk/transform/process/function/string/TestStrcmpFunction.java @@ -69,6 +69,6 @@ public void testStrcmpFunction() throws Exception { data = "hello world|zzzzz|cloud|5|3|3"; output = processor.transform(data, new HashMap<>()); Assert.assertEquals(1, output.size()); - Assert.assertEquals("result=null", output.get(0)); + Assert.assertEquals("result=", output.get(0)); } } diff --git a/inlong-sdk/transform-sdk/src/test/java/org/apache/inlong/sdk/transform/process/function/string/TestUcaseFunction.java b/inlong-sdk/transform-sdk/src/test/java/org/apache/inlong/sdk/transform/process/function/string/TestUcaseFunction.java index c132f0e6294..d53cc982100 100644 --- a/inlong-sdk/transform-sdk/src/test/java/org/apache/inlong/sdk/transform/process/function/string/TestUcaseFunction.java +++ b/inlong-sdk/transform-sdk/src/test/java/org/apache/inlong/sdk/transform/process/function/string/TestUcaseFunction.java @@ -59,6 +59,6 @@ public void testUcaseFunction() throws Exception { SinkEncoderFactory.createKvEncoder(kvSink)); List output3 = processor2.transform("ApPlE", new HashMap<>()); Assert.assertEquals(1, output3.size()); - Assert.assertEquals(output3.get(0), "result=null"); + Assert.assertEquals(output3.get(0), "result="); } } diff --git a/inlong-sdk/transform-sdk/src/test/java/org/apache/inlong/sdk/transform/process/function/string/TestUnCompressFunction.java b/inlong-sdk/transform-sdk/src/test/java/org/apache/inlong/sdk/transform/process/function/string/TestUnCompressFunction.java index 2e7091709eb..9f4d9e5917f 100644 --- a/inlong-sdk/transform-sdk/src/test/java/org/apache/inlong/sdk/transform/process/function/string/TestUnCompressFunction.java +++ b/inlong-sdk/transform-sdk/src/test/java/org/apache/inlong/sdk/transform/process/function/string/TestUnCompressFunction.java @@ -60,7 +60,7 @@ public void testUnCompressFunction() throws Exception { // case3: uncompress(compress(null)) output1 = processor1.transform("h|apple|cloud|2|1|3", new HashMap<>()); Assert.assertEquals(1, output1.size()); - Assert.assertEquals("result=null", output1.get(0)); + Assert.assertEquals("result=", output1.get(0)); transformSql = "select uncompress(compress(string1)) from source"; config = new TransformConfig(transformSql); @@ -95,16 +95,16 @@ public void testUnCompressFunction() throws Exception { // case7: uncompress(compress(null,'Gzip'),'Gzip') output1 = processor1.transform("hello world|Gzip|Gzip|2|1|3", new HashMap<>()); Assert.assertEquals(1, output1.size()); - Assert.assertEquals("result=null", output1.get(0)); + Assert.assertEquals("result=", output1.get(0)); // case8: uncompress(compress(null,'zip'),'zip') output1 = processor1.transform("hello world|zip|zip|2|1|3", new HashMap<>()); Assert.assertEquals(1, output1.size()); - Assert.assertEquals("result=null", output1.get(0)); + Assert.assertEquals("result=", output1.get(0)); // case8: uncompress(compress(null,'zip'),'undefinedType') output1 = processor1.transform("hello world|zip|undefinedType|2|1|3", new HashMap<>()); Assert.assertEquals(1, output1.size()); - Assert.assertEquals("result=null", output1.get(0)); + Assert.assertEquals("result=", output1.get(0)); } } diff --git a/inlong-sdk/transform-sdk/src/test/java/org/apache/inlong/sdk/transform/process/function/string/TestUpperFunction.java b/inlong-sdk/transform-sdk/src/test/java/org/apache/inlong/sdk/transform/process/function/string/TestUpperFunction.java index b4fd923ebdd..242458c1d7c 100644 --- a/inlong-sdk/transform-sdk/src/test/java/org/apache/inlong/sdk/transform/process/function/string/TestUpperFunction.java +++ b/inlong-sdk/transform-sdk/src/test/java/org/apache/inlong/sdk/transform/process/function/string/TestUpperFunction.java @@ -55,6 +55,6 @@ public void testUpperFunction() throws Exception { SinkEncoderFactory.createKvEncoder(kvSink)); List output3 = processor2.transform("ApPlE|banana|cloud|2|1|3", new HashMap<>()); Assert.assertEquals(1, output3.size()); - Assert.assertEquals(output3.get(0), "result=null"); + Assert.assertEquals(output3.get(0), "result="); } } diff --git a/inlong-sdk/transform-sdk/src/test/java/org/apache/inlong/sdk/transform/process/function/temporal/TestDateDiffFunction.java b/inlong-sdk/transform-sdk/src/test/java/org/apache/inlong/sdk/transform/process/function/temporal/TestDateDiffFunction.java index 6887d855dc4..d1daefb4f0a 100644 --- a/inlong-sdk/transform-sdk/src/test/java/org/apache/inlong/sdk/transform/process/function/temporal/TestDateDiffFunction.java +++ b/inlong-sdk/transform-sdk/src/test/java/org/apache/inlong/sdk/transform/process/function/temporal/TestDateDiffFunction.java @@ -60,12 +60,12 @@ public void testDateDiffFunction() throws Exception { // case4: datediff('2018-12-10 12:30:00', '') output = processor.transform("2018-12-10 12:30:00|", new HashMap<>()); Assert.assertEquals(1, output.size()); - Assert.assertEquals("result=null", output.get(0)); + Assert.assertEquals("result=", output.get(0)); // case5: datediff('2018-12', '2018-12-12') output = processor.transform("2018-12|2018-12-12", new HashMap<>()); Assert.assertEquals(1, output.size()); - Assert.assertEquals("result=null", output.get(0)); + Assert.assertEquals("result=", output.get(0)); // case6: datediff('1970-01-01',null) transformSql = "select datediff(string1,xxd) from source"; @@ -75,6 +75,6 @@ public void testDateDiffFunction() throws Exception { SinkEncoderFactory.createKvEncoder(kvSink)); output = processor.transform("1970-01-01|1970-01-02", new HashMap<>()); Assert.assertEquals(1, output.size()); - Assert.assertEquals("result=null", output.get(0)); + Assert.assertEquals("result=", output.get(0)); } } From f8757d4e0aa053b577c9269e98816d2f55024de0 Mon Sep 17 00:00:00 2001 From: fuweng11 <76141879+fuweng11@users.noreply.github.com> Date: Tue, 17 Sep 2024 11:02:52 +0800 Subject: [PATCH 02/11] [INLONG-11103][Manager] Data add task supports filtering based on stream (#11104) --- .../pojo/source/file/FileDataAddTaskRequest.java | 5 +++++ .../inlong/manager/pojo/source/file/FileSource.java | 3 +++ .../manager/pojo/source/file/FileSourceDTO.java | 3 +++ .../manager/pojo/source/file/FileSourceRequest.java | 3 +++ .../manager/service/source/StreamSourceService.java | 2 +- .../service/source/StreamSourceServiceImpl.java | 4 ++-- .../service/source/file/FileSourceOperator.java | 8 +++++++- .../web/controller/StreamSourceController.java | 13 +++++-------- 8 files changed, 29 insertions(+), 12 deletions(-) diff --git a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/source/file/FileDataAddTaskRequest.java b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/source/file/FileDataAddTaskRequest.java index bcf292c1f34..6cb5fe1e42b 100644 --- a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/source/file/FileDataAddTaskRequest.java +++ b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/source/file/FileDataAddTaskRequest.java @@ -27,6 +27,8 @@ import lombok.EqualsAndHashCode; import lombok.ToString; +import java.util.List; + @Data @ToString(callSuper = true) @EqualsAndHashCode(callSuper = true) @@ -34,6 +36,9 @@ @ApiModel(value = "File data add task request") public class FileDataAddTaskRequest extends DataAddTaskRequest { + @ApiModelProperty("filterStreams") + private List filterStreams; + @ApiModelProperty("Start time") private Long startTime; diff --git a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/source/file/FileSource.java b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/source/file/FileSource.java index 4b0df6f4165..6b3cc382dbc 100644 --- a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/source/file/FileSource.java +++ b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/source/file/FileSource.java @@ -95,6 +95,9 @@ public class FileSource extends StreamSource { @ApiModelProperty("End time") private Long endTime; + @ApiModelProperty("filterStreams") + private List filterStreams; + public FileSource() { this.setSourceType(SourceType.FILE); } diff --git a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/source/file/FileSourceDTO.java b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/source/file/FileSourceDTO.java index ea78c9cbecc..db4ba25c12b 100644 --- a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/source/file/FileSourceDTO.java +++ b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/source/file/FileSourceDTO.java @@ -101,6 +101,9 @@ public class FileSourceDTO { @ApiModelProperty(value = "Audit version") private String auditVersion; + @ApiModelProperty("filterStreams") + private List filterStreams; + public static FileSourceDTO getFromRequest(@NotNull FileSourceRequest fileSourceRequest, String extParams) { FileSourceDTO dto = StringUtils.isNotBlank(extParams) ? FileSourceDTO.getFromJson(extParams) diff --git a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/source/file/FileSourceRequest.java b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/source/file/FileSourceRequest.java index 2ba1a1f7ff0..d0100ecd1b8 100644 --- a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/source/file/FileSourceRequest.java +++ b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/source/file/FileSourceRequest.java @@ -90,6 +90,9 @@ public class FileSourceRequest extends SourceRequest { @ApiModelProperty("End time") private Long endTime; + @ApiModelProperty("filterStreams") + private List filterStreams; + public FileSourceRequest() { this.setSourceType(SourceType.FILE); this.setSerializationType(DataFormat.CSV.getName()); diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/StreamSourceService.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/StreamSourceService.java index b622d7a3034..9be7f06172b 100644 --- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/StreamSourceService.java +++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/StreamSourceService.java @@ -220,6 +220,6 @@ default Boolean updateAfterApprove(String operator) { * @param operator Operator's name. * @return source id list after saving. */ - List batchAddDataAddTask(String groupId, String streamId, List requestList, + List batchAddDataAddTask(String groupId, List requestList, String operator); } diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/StreamSourceServiceImpl.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/StreamSourceServiceImpl.java index 62ccd8f2caa..c92a544054a 100644 --- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/StreamSourceServiceImpl.java +++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/StreamSourceServiceImpl.java @@ -537,10 +537,10 @@ public Integer addDataAddTask(DataAddTaskRequest request, String operator) { } @Override - public List batchAddDataAddTask(String groupId, String streamId, List requestList, + public List batchAddDataAddTask(String groupId, List requestList, String operator) { List result = new ArrayList<>(); - String auditVersion = String.valueOf(sourceMapper.selectDataAddTaskCount(groupId, streamId)); + String auditVersion = String.valueOf(sourceMapper.selectDataAddTaskCount(groupId, null)); for (DataAddTaskRequest request : requestList) { request.setAuditVersion(auditVersion); int id = addDataAddTask(request, operator); diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/file/FileSourceOperator.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/file/FileSourceOperator.java index 5d4329c7ff0..502808602b4 100644 --- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/file/FileSourceOperator.java +++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/file/FileSourceOperator.java @@ -119,13 +119,19 @@ public Integer addDataAddTask(DataAddTaskRequest request, String operator) { dto.setStartTime(sourceRequest.getStartTime()); dto.setEndTime(sourceRequest.getEndTime()); dto.setRetry(true); + dto.setAuditVersion(request.getAuditVersion()); + dto.setFilterStreams(sourceRequest.getFilterStreams()); StreamSourceEntity dataAddTaskEntity = CommonBeanUtils.copyProperties(sourceEntity, StreamSourceEntity::new); dataAddTaskEntity.setId(null); dataAddTaskEntity.setSourceName(sourceEntity.getSourceName() + "-" + (dataAddTaskSize + 1)); dataAddTaskEntity.setExtParams(objectMapper.writeValueAsString(dto)); dataAddTaskEntity.setTaskMapId(sourceEntity.getId()); - return sourceMapper.insert(dataAddTaskEntity); + Integer id = sourceMapper.insert(dataAddTaskEntity); + SourceRequest dataAddTaskRequest = + CommonBeanUtils.copyProperties(dataAddTaskEntity, SourceRequest::new, true); + updateAgentTaskConfig(dataAddTaskRequest, operator); + return id; } catch (Exception e) { LOGGER.error("serialize extParams of File SourceDTO failure: ", e); throw new BusinessException(ErrorCodeEnum.SOURCE_INFO_INCORRECT, diff --git a/inlong-manager/manager-web/src/main/java/org/apache/inlong/manager/web/controller/StreamSourceController.java b/inlong-manager/manager-web/src/main/java/org/apache/inlong/manager/web/controller/StreamSourceController.java index 3c751a0c6a0..d60fa3bad57 100644 --- a/inlong-manager/manager-web/src/main/java/org/apache/inlong/manager/web/controller/StreamSourceController.java +++ b/inlong-manager/manager-web/src/main/java/org/apache/inlong/manager/web/controller/StreamSourceController.java @@ -130,16 +130,13 @@ public Response forceDelete(@RequestParam String inlongGroupId, @Reques sourceService.forceDelete(inlongGroupId, inlongStreamId, LoginUserUtils.getLoginUser().getName())); } - @RequestMapping(value = "/source/addDataAddTask/{groupId}/{streamId}", method = RequestMethod.POST) + @RequestMapping(value = "/source/addDataAddTask/{groupId}", method = RequestMethod.POST) @ApiOperation(value = "Add supplementary recording task for stream source") - @ApiImplicitParams({ - @ApiImplicitParam(name = "groupId", dataTypeClass = String.class, required = true), - @ApiImplicitParam(name = "streamId", dataTypeClass = String.class, required = true) - }) - public Response> addSub(@PathVariable String groupId, @PathVariable String streamId, + @ApiImplicitParam(name = "groupId", dataTypeClass = String.class, required = true) + public Response> addSub(@PathVariable String groupId, @RequestBody List requestList) { - return Response.success(sourceService.batchAddDataAddTask(groupId, streamId, requestList, - LoginUserUtils.getLoginUser().getName())); + return Response.success( + sourceService.batchAddDataAddTask(groupId, requestList, LoginUserUtils.getLoginUser().getName())); } } From 9d97dcbb309897f64c4406ff96d818c4946cca6f Mon Sep 17 00:00:00 2001 From: Haotian Ma <60374114+qy-liuhuo@users.noreply.github.com> Date: Tue, 17 Sep 2024 11:03:03 +0800 Subject: [PATCH 03/11] [INLONG-11110][SDK] Fix incorrect usage of isLocalVisit variable in client example of dataproxy-sdk module (#11116) --- .../dataproxy/example/HttpClientExample.java | 23 +++++---------- .../dataproxy/example/TcpClientExample.java | 29 +++++++------------ 2 files changed, 18 insertions(+), 34 deletions(-) diff --git a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/example/HttpClientExample.java b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/example/HttpClientExample.java index 22274fcf7c5..3999390f9b3 100644 --- a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/example/HttpClientExample.java +++ b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/example/HttpClientExample.java @@ -29,40 +29,31 @@ public class HttpClientExample { public static void main(String[] args) { - /* - * 1. if 'isLocalVisit' is true use local config from file in ${configBasePath} directory/${dataProxyGroupId} - * .local such as : configBasePath = /data/inlong/dataproxy/conf dataProxyGroupId = test so config file is : - * /data/inlong/dataproxy/conf/test.local and config context like this: - * {"isInterVisit":1,"clusterId":"1","size":1,"switch":1,"address":[{"host":"127.0.0 - * .1","port":"46802"},{"host":"127.0.0.1","port":"46802"}]} - * - * 2. if 'isLocalVisit' is false sdk will get config from manager auto. - */ String inlongGroupId = "test_group_id"; String inlongStreamId = "test_stream_id"; - String configBasePath = "/data/inlong/dataproxy/conf"; + String configBasePath = ""; String inLongManagerAddr = "127.0.0.1"; - String inLongManagerPort = "8080"; + String inLongManagerPort = "8083"; String localIP = "127.0.0.1"; String messageBody = "inlong message body!"; HttpProxySender sender = getMessageSender(localIP, inLongManagerAddr, - inLongManagerPort, inlongGroupId, false, false, + inLongManagerPort, inlongGroupId, true, false, configBasePath); - sendHttpMessage(sender, inlongGroupId, inlongStreamId, messageBody); + sender.close(); // close the sender } public static HttpProxySender getMessageSender(String localIP, String inLongManagerAddr, String inLongManagerPort, String inlongGroupId, - boolean isLocalVisit, boolean isReadProxyIPFromLocal, + boolean requestByHttp, boolean isReadProxyIPFromLocal, String configBasePath) { ProxyClientConfig proxyConfig = null; HttpProxySender sender = null; try { - proxyConfig = new ProxyClientConfig(localIP, isLocalVisit, inLongManagerAddr, + proxyConfig = new ProxyClientConfig(localIP, requestByHttp, inLongManagerAddr, Integer.valueOf(inLongManagerPort), - inlongGroupId, "test", "123456"); + inlongGroupId, "admin", "inlong");// user and password of manager proxyConfig.setInlongGroupId(inlongGroupId); proxyConfig.setConfStoreBasePath(configBasePath); proxyConfig.setReadProxyIPFromLocal(isReadProxyIPFromLocal); diff --git a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/example/TcpClientExample.java b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/example/TcpClientExample.java index 993b7750f77..55b6cf6d995 100644 --- a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/example/TcpClientExample.java +++ b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/example/TcpClientExample.java @@ -40,22 +40,12 @@ public class TcpClientExample { */ public static void main(String[] args) throws InterruptedException { - String inlongGroupId = "test_test"; - String inlongStreamId = "test_test"; - - /* - * 1. if isLocalVisit is true, will get dataproxy server info from local file in - * ${configBasePath}/${inlongGroupId}.local file - * - * for example: /data/inlong/config/test_test.local and file context like this: - * {"isInterVisit":1,"clusterId":"1","size":1,"switch":1,"address":[{"host":"127.0.0.1", - * "port":"46802"},{"host":"127.0.0.1","port":"46802"}]} 2. if isLocalVisit is false, will get dataproxy server - * info from manager so we must ensure that the manager server url is configured correctly! - */ - String configBasePath = "/data/inlong/config"; + String inlongGroupId = "test_group_id"; + String inlongStreamId = "test_stream_id"; + String configBasePath = ""; String inLongManagerAddr = "127.0.0.1"; - String inLongManagerPort = "8000"; + String inLongManagerPort = "8083"; /* * It is recommended to use type 7. For others, please refer to the official related documents @@ -66,19 +56,20 @@ public static void main(String[] args) throws InterruptedException { TcpClientExample tcpClientExample = new TcpClientExample(); DefaultMessageSender sender = tcpClientExample .getMessageSender(localIP, inLongManagerAddr, inLongManagerPort, - inlongGroupId, false, false, configBasePath, msgType); + inlongGroupId, true, false, configBasePath, msgType); tcpClientExample.sendTcpMessage(sender, inlongGroupId, inlongStreamId, messageBody, System.currentTimeMillis()); + sender.close(); // close the sender } public DefaultMessageSender getMessageSender(String localIP, String inLongManagerAddr, String inLongManagerPort, - String inlongGroupId, boolean isLocalVisit, boolean isReadProxyIPFromLocal, + String inlongGroupId, boolean requestByHttp, boolean isReadProxyIPFromLocal, String configBasePath, int msgType) { ProxyClientConfig dataProxyConfig = null; DefaultMessageSender messageSender = null; try { - dataProxyConfig = new ProxyClientConfig(localIP, isLocalVisit, inLongManagerAddr, - Integer.valueOf(inLongManagerPort), inlongGroupId, "test", "123456"); + dataProxyConfig = new ProxyClientConfig(localIP, requestByHttp, inLongManagerAddr, + Integer.valueOf(inLongManagerPort), inlongGroupId, "admin", "inlong"); if (StringUtils.isNotEmpty(configBasePath)) { dataProxyConfig.setConfStoreBasePath(configBasePath); } @@ -98,9 +89,11 @@ public void sendTcpMessage(DefaultMessageSender sender, String inlongGroupId, try { result = sender.sendMessage(messageBody.getBytes("utf8"), inlongGroupId, inlongStreamId, 0, String.valueOf(dt), 20, TimeUnit.SECONDS); + } catch (UnsupportedEncodingException e) { e.printStackTrace(); } + System.out.println("messageSender" + result); logger.info("messageSender {}", result); } From 9e443cbfaa17c085d0824d9d8f912b7ab2a96c03 Mon Sep 17 00:00:00 2001 From: emptyOVO <118812562+emptyOVO@users.noreply.github.com> Date: Wed, 18 Sep 2024 09:57:05 +0800 Subject: [PATCH 04/11] [INLONG-11112][SDK] Transform TRUNCATE() function add pgsql name (#11113) --- .../process/function/TruncateFunction.java | 2 +- .../arithmetic/TestTruncateFunction.java | 24 +++++++++++++++---- 2 files changed, 21 insertions(+), 5 deletions(-) diff --git a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/function/TruncateFunction.java b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/function/TruncateFunction.java index 8267efff552..347f672352a 100644 --- a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/function/TruncateFunction.java +++ b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/function/TruncateFunction.java @@ -39,7 +39,7 @@ * for example: truncate(42.324, 2)--return 42.32 * truncate(42.324)--return 42.0 */ -@TransformFunction(names = {"truncate"}) +@TransformFunction(names = {"truncate", "trunc"}) public class TruncateFunction implements ValueParser { private ValueParser bigDecimalParser; diff --git a/inlong-sdk/transform-sdk/src/test/java/org/apache/inlong/sdk/transform/process/function/arithmetic/TestTruncateFunction.java b/inlong-sdk/transform-sdk/src/test/java/org/apache/inlong/sdk/transform/process/function/arithmetic/TestTruncateFunction.java index ddc3cabf7cf..35138ed508a 100644 --- a/inlong-sdk/transform-sdk/src/test/java/org/apache/inlong/sdk/transform/process/function/arithmetic/TestTruncateFunction.java +++ b/inlong-sdk/transform-sdk/src/test/java/org/apache/inlong/sdk/transform/process/function/arithmetic/TestTruncateFunction.java @@ -52,15 +52,31 @@ public void testTruncateFunction() throws Exception { Assert.assertEquals(1, output3.size()); Assert.assertEquals(output3.get(0), "result=12000"); - String transformSql2 = "select truncate(numeric1) from source"; + String transformSql2 = "select trunc(numeric1,numeric2) from source"; TransformConfig config2 = new TransformConfig(transformSql2); TransformProcessor processor2 = TransformProcessor .create(config2, SourceDecoderFactory.createCsvDecoder(csvSource), SinkEncoderFactory.createKvEncoder(kvSink)); - // case4: truncate(12345) - List output4 = processor2.transform("12345.6789|-3|6|8"); + // case1: trunc(42.324, 2) + List output4 = processor2.transform("42.324|2|6|8"); Assert.assertEquals(1, output4.size()); - Assert.assertEquals(output4.get(0), "result=12345"); + Assert.assertEquals(output4.get(0), "result=42.32"); + + // case2: trunc(42.324, -1) + List output5 = processor2.transform("42.324|-1|6|8"); + Assert.assertEquals(1, output5.size()); + Assert.assertEquals(output5.get(0), "result=40"); + + String transformSql3 = "select truncate(numeric1) from source"; + TransformConfig config3 = new TransformConfig(transformSql3); + TransformProcessor processor3 = TransformProcessor + .create(config3, SourceDecoderFactory.createCsvDecoder(csvSource), + SinkEncoderFactory.createKvEncoder(kvSink)); + + // case4: truncate(12345.6789) + List output6 = processor3.transform("12345.6789|-3|6|8"); + Assert.assertEquals(1, output6.size()); + Assert.assertEquals(output6.get(0), "result=12345"); } } From 51390eac821e6ac7c7bb94874c6cd10dfc417a23 Mon Sep 17 00:00:00 2001 From: Zkplo <87751516+Zkplo@users.noreply.github.com> Date: Thu, 19 Sep 2024 11:13:24 +0800 Subject: [PATCH 05/11] [INLONG-11117][SDK] Transform SQL supports ISNULL functions (#11120) Co-authored-by: ZKpLo <14148880+zkplo@user.noreply.gitee.com> --- .../process/function/IsNullFunction.java | 64 +++++++++++++ .../flowcontrol/TestIsNullFunction.java | 96 +++++++++++++++++++ 2 files changed, 160 insertions(+) create mode 100644 inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/function/IsNullFunction.java create mode 100644 inlong-sdk/transform-sdk/src/test/java/org/apache/inlong/sdk/transform/process/function/flowcontrol/TestIsNullFunction.java diff --git a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/function/IsNullFunction.java b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/function/IsNullFunction.java new file mode 100644 index 00000000000..95078400fad --- /dev/null +++ b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/function/IsNullFunction.java @@ -0,0 +1,64 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.inlong.sdk.transform.process.function; + +import org.apache.inlong.sdk.transform.decode.SourceData; +import org.apache.inlong.sdk.transform.process.Context; +import org.apache.inlong.sdk.transform.process.operator.ExpressionOperator; +import org.apache.inlong.sdk.transform.process.operator.OperatorTools; +import org.apache.inlong.sdk.transform.process.parser.ValueParser; + +import net.sf.jsqlparser.expression.Expression; +import net.sf.jsqlparser.expression.Function; + +/** + * IsNullFunction + * description: isnull(expr) + * - return true if expr is NULL + * - return false otherwise. + */ +@TransformFunction(names = {"isnull"}) +public class IsNullFunction implements ValueParser { + + private ValueParser stringParser; + private ExpressionOperator operator; + + public IsNullFunction(Function expr) { + Expression expression = expr.getParameters().getExpressions().get(0); + try { + stringParser = OperatorTools.buildParser(expression); + } catch (Exception e) { + operator = OperatorTools.buildOperator(expression); + } + } + + @Override + public Object parse(SourceData sourceData, int rowIndex, Context context) { + Object val = null; + try { + if (stringParser != null) { + val = stringParser.parse(sourceData, rowIndex, context); + } else { + val = operator.check(sourceData, rowIndex, context); + } + } catch (Exception ignored) { + + } + return val == null; + } +} diff --git a/inlong-sdk/transform-sdk/src/test/java/org/apache/inlong/sdk/transform/process/function/flowcontrol/TestIsNullFunction.java b/inlong-sdk/transform-sdk/src/test/java/org/apache/inlong/sdk/transform/process/function/flowcontrol/TestIsNullFunction.java new file mode 100644 index 00000000000..1040d206dde --- /dev/null +++ b/inlong-sdk/transform-sdk/src/test/java/org/apache/inlong/sdk/transform/process/function/flowcontrol/TestIsNullFunction.java @@ -0,0 +1,96 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.inlong.sdk.transform.process.function.flowcontrol; + +import org.apache.inlong.sdk.transform.decode.SourceDecoderFactory; +import org.apache.inlong.sdk.transform.encode.SinkEncoderFactory; +import org.apache.inlong.sdk.transform.pojo.TransformConfig; +import org.apache.inlong.sdk.transform.process.TransformProcessor; +import org.apache.inlong.sdk.transform.process.function.arithmetic.AbstractFunctionArithmeticTestBase; + +import org.junit.Assert; +import org.junit.Test; + +import java.util.HashMap; +import java.util.List; + +public class TestIsNullFunction extends AbstractFunctionArithmeticTestBase { + + @Test + public void testIsNullFunction() throws Exception { + String transformSql = null, data = null; + TransformConfig config = null; + TransformProcessor processor = null; + List output = null; + + // case1: isnull(5 + 3) + transformSql = "select isnull(numeric1 + numeric2) from source"; + config = new TransformConfig(transformSql); + processor = TransformProcessor + .create(config, SourceDecoderFactory.createCsvDecoder(csvSource), + SinkEncoderFactory.createKvEncoder(kvSink)); + data = "5|3|3|5"; + output = processor.transform(data, new HashMap<>()); + Assert.assertEquals(1, output.size()); + Assert.assertEquals("result=false", output.get(0)); + + // case2: isnull(5 / 0) + transformSql = "select isnull(numeric1 / numeric2) from source"; + config = new TransformConfig(transformSql); + processor = TransformProcessor + .create(config, SourceDecoderFactory.createCsvDecoder(csvSource), + SinkEncoderFactory.createKvEncoder(kvSink)); + data = "5|0|3|5"; + output = processor.transform(data, new HashMap<>()); + Assert.assertEquals(1, output.size()); + Assert.assertEquals("result=true", output.get(0)); + + // case3: isnull(null) + transformSql = "select isnull(numericx) from source"; + config = new TransformConfig(transformSql); + processor = TransformProcessor + .create(config, SourceDecoderFactory.createCsvDecoder(csvSource), + SinkEncoderFactory.createKvEncoder(kvSink)); + data = "5|0|3|5"; + output = processor.transform(data, new HashMap<>()); + Assert.assertEquals(1, output.size()); + Assert.assertEquals("result=true", output.get(0)); + + // case4: isnull(5) + transformSql = "select isnull(numeric1) from source"; + config = new TransformConfig(transformSql); + processor = TransformProcessor + .create(config, SourceDecoderFactory.createCsvDecoder(csvSource), + SinkEncoderFactory.createKvEncoder(kvSink)); + data = "5|0|3|5"; + output = processor.transform(data, new HashMap<>()); + Assert.assertEquals(1, output.size()); + Assert.assertEquals("result=false", output.get(0)); + + // case5: isnull(5 > 0) + transformSql = "select isnull(numeric1 > numeric2) from source"; + config = new TransformConfig(transformSql); + processor = TransformProcessor + .create(config, SourceDecoderFactory.createCsvDecoder(csvSource), + SinkEncoderFactory.createKvEncoder(kvSink)); + data = "5|0|3|5"; + output = processor.transform(data, new HashMap<>()); + Assert.assertEquals(1, output.size()); + Assert.assertEquals("result=false", output.get(0)); + } +} From 05d3ee6e669ea67f879bf0d173c2cc80e8b52d53 Mon Sep 17 00:00:00 2001 From: emptyOVO <118812562+emptyOVO@users.noreply.github.com> Date: Thu, 19 Sep 2024 11:13:49 +0800 Subject: [PATCH 06/11] [INLONG-11125][SDK] Transform OperatorTools support parseBytes (#11126) --- .../sdk/transform/process/function/Sha2Function.java | 11 +++++------ .../sdk/transform/process/function/ShaFunction.java | 5 +---- .../sdk/transform/process/operator/OperatorTools.java | 9 +++++++++ 3 files changed, 15 insertions(+), 10 deletions(-) diff --git a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/function/Sha2Function.java b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/function/Sha2Function.java index ad120712bfd..108b34de78a 100644 --- a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/function/Sha2Function.java +++ b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/function/Sha2Function.java @@ -26,7 +26,6 @@ import net.sf.jsqlparser.expression.Function; import org.apache.commons.codec.digest.DigestUtils; -import java.nio.charset.StandardCharsets; import java.util.List; import static org.apache.commons.codec.digest.MessageDigestAlgorithms.SHA_224; @@ -56,18 +55,18 @@ public Object parse(SourceData sourceData, int rowIndex, Context context) { if (msgObj == null || lenObj == null) { return null; } - String msg = msgObj.toString(); + byte[] msgBytes = OperatorTools.parseBytes(msgObj); int len = Integer.parseInt(lenObj.toString()); switch (len) { case 0: case 256: - return DigestUtils.sha256Hex(msg.getBytes(StandardCharsets.UTF_8)); + return DigestUtils.sha256Hex(msgBytes); case 224: - return new DigestUtils(SHA_224).digestAsHex(msg.getBytes(StandardCharsets.UTF_8)); + return new DigestUtils(SHA_224).digestAsHex(msgBytes); case 384: - return DigestUtils.sha384Hex(msg.getBytes(StandardCharsets.UTF_8)); + return DigestUtils.sha384Hex(msgBytes); case 512: - return DigestUtils.sha512Hex(msg.getBytes(StandardCharsets.UTF_8)); + return DigestUtils.sha512Hex(msgBytes); default: return null; } diff --git a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/function/ShaFunction.java b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/function/ShaFunction.java index 7da5e6c3a9d..cd0824479e0 100644 --- a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/function/ShaFunction.java +++ b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/function/ShaFunction.java @@ -25,8 +25,6 @@ import net.sf.jsqlparser.expression.Function; import org.apache.commons.codec.digest.DigestUtils; -import java.nio.charset.StandardCharsets; - /** * ShaFunction * description: sha(string): Compute the SHA-1 160 bit checksum of a string. @@ -48,7 +46,6 @@ public Object parse(SourceData sourceData, int rowIndex, Context context) { if (msgObj == null) { return null; } - String msg = msgObj.toString(); - return DigestUtils.sha1Hex(msg.getBytes(StandardCharsets.UTF_8)); + return DigestUtils.sha1Hex(OperatorTools.parseBytes(msgObj)); } } diff --git a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/operator/OperatorTools.java b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/operator/OperatorTools.java index 9982b374189..df991e7c724 100644 --- a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/operator/OperatorTools.java +++ b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/operator/OperatorTools.java @@ -32,6 +32,7 @@ import java.lang.reflect.Constructor; import java.math.BigDecimal; +import java.nio.charset.StandardCharsets; import java.sql.Date; import java.sql.Timestamp; import java.util.Map; @@ -148,6 +149,14 @@ public static Timestamp parseTimestamp(Object value) { } } + public static byte[] parseBytes(Object value) { + if (value instanceof byte[]) { + return (byte[]) value; + } else { + return (String.valueOf(value)).getBytes(StandardCharsets.UTF_8); + } + } + /** * compareValue * @param left From 2b52cfe012b80accb86377348db830a31dc9bdaa Mon Sep 17 00:00:00 2001 From: emptyOVO <118812562+emptyOVO@users.noreply.github.com> Date: Thu, 19 Sep 2024 11:14:10 +0800 Subject: [PATCH 07/11] [INLONG-11124][SDK] Transform fix ENCODE() and DECODE() function (#11127) --- .../process/function/DecodeFunction.java | 15 +++---- .../process/function/EncodeFunction.java | 9 +--- .../function/string/TestDecodeFunction.java | 34 +++++++-------- .../function/string/TestEncodeFunction.java | 42 +++++++++---------- 4 files changed, 42 insertions(+), 58 deletions(-) diff --git a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/function/DecodeFunction.java b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/function/DecodeFunction.java index 042e6b4f98d..7e408cdea88 100644 --- a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/function/DecodeFunction.java +++ b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/function/DecodeFunction.java @@ -72,23 +72,18 @@ public Object parse(SourceData sourceData, int rowIndex, Context context) { if (binaryObj == null || characterObj == null) { return null; } - String binaryString = OperatorTools.parseString(binaryObj); String characterSetValue = OperatorTools.parseString(characterObj).toUpperCase(); - return decode(binaryString, characterSetValue); + return decode((byte[]) binaryObj, characterSetValue); } - private String decode(String binaryString, String charsetName) { - if (binaryString == null || binaryString.isEmpty() || charsetName == null || charsetName.isEmpty()) { + private String decode(byte[] binaryString, String charsetName) { + if (binaryString == null || charsetName == null || charsetName.isEmpty()) { return ""; } - String[] byteValues = binaryString.split(" "); - byte[] byteArray = new byte[byteValues.length]; - for (int i = 0; i < byteValues.length; i++) { - byteArray[i] = (byte) Integer.parseInt(byteValues[i]); - } + if (Charset.isSupported(charsetName) && SUPPORTED_CHARSETS.contains(charsetName)) { Charset charset = Charset.forName(charsetName); - return new String(byteArray, charset); + return new String(binaryString, charset); } return ""; } diff --git a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/function/EncodeFunction.java b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/function/EncodeFunction.java index 8196c529fcd..520be319716 100644 --- a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/function/EncodeFunction.java +++ b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/function/EncodeFunction.java @@ -74,14 +74,7 @@ public Object parse(SourceData sourceData, int rowIndex, Context context) { } String stringValue = OperatorTools.parseString(stringObj); String characterSetValue = OperatorTools.parseString(characterObj).toUpperCase(); - byte[] encodeBytes = encode(stringValue, characterSetValue); - StringBuilder res = new StringBuilder(); - if (encodeBytes != null) { - for (byte encodeByte : encodeBytes) { - res.append((int) encodeByte).append(" "); - } - } - return res.toString().trim(); + return encode(stringValue, characterSetValue); } private byte[] encode(String stringValue, String characterSetValue) { diff --git a/inlong-sdk/transform-sdk/src/test/java/org/apache/inlong/sdk/transform/process/function/string/TestDecodeFunction.java b/inlong-sdk/transform-sdk/src/test/java/org/apache/inlong/sdk/transform/process/function/string/TestDecodeFunction.java index 4368334b790..d7f0ce06dd0 100644 --- a/inlong-sdk/transform-sdk/src/test/java/org/apache/inlong/sdk/transform/process/function/string/TestDecodeFunction.java +++ b/inlong-sdk/transform-sdk/src/test/java/org/apache/inlong/sdk/transform/process/function/string/TestDecodeFunction.java @@ -32,48 +32,44 @@ public class TestDecodeFunction extends AbstractFunctionStringTestBase { @Test public void testDecodeFunction() throws Exception { - String transformSql = "select decode(string1,string2) from source"; + String transformSql = "select decode(encode(string1,string2),string3) from source"; TransformConfig config = new TransformConfig(transformSql); TransformProcessor processor = TransformProcessor .create(config, SourceDecoderFactory.createCsvDecoder(csvSource), SinkEncoderFactory.createKvEncoder(kvSink)); - // case1: decode('72 101 108 108 111','UTF-8') - List output1 = processor.transform("72 101 108 108 111|UTF-8|banana|cloud|1", new HashMap<>()); + // case1: decode(encode('Hello','UTF-8'),'UTF-8') + List output1 = processor.transform("Hello|UTF-8|UTF-8|cloud|1", new HashMap<>()); Assert.assertEquals(1, output1.size()); Assert.assertEquals(output1.get(0), "result=Hello"); - // case2: decode('72 101 108 108 111','US-ASCII') - List output2 = processor.transform("72 101 108 108 111|US-ASCII|banana|cloud|1", new HashMap<>()); + // case2: decode(encode('Hello','US-ASCII'),'US-ASCII') + List output2 = processor.transform("Hello|US-ASCII|US-ASCII|cloud|1", new HashMap<>()); Assert.assertEquals(1, output2.size()); Assert.assertEquals(output2.get(0), "result=Hello"); - // case3: decode('72 101 108 108 111','ISO-8859-1') - List output3 = processor.transform("72 101 108 108 111|ISO-8859-1|banana|cloud|1", new HashMap<>()); + // case3: decode(encode('Hello','ISO-8859-1'),'ISO-8859-1') + List output3 = processor.transform("Hello|ISO-8859-1|ISO-8859-1|cloud|1", new HashMap<>()); Assert.assertEquals(1, output3.size()); Assert.assertEquals(output3.get(0), "result=Hello"); - // case4: decode('0 72 0 101 0 108 0 108 0 111','UTF-16BE') - List output4 = - processor.transform("0 72 0 101 0 108 0 108 0 111|UTF-16BE|banana|cloud|1", new HashMap<>()); + // case4: decode(encode('Hello','UTF-16BE'),'UTF-16BE') + List output4 = processor.transform("Hello|UTF-16BE|UTF-16BE|cloud|1", new HashMap<>()); Assert.assertEquals(1, output4.size()); Assert.assertEquals(output4.get(0), "result=Hello"); - // case5: decode('72 0 101 0 108 0 108 0 111 0','UTF-16LE') - List output5 = - processor.transform("72 0 101 0 108 0 108 0 111 0|UTf-16LE|banana|cloud|1", new HashMap<>()); + // case5: decode(encode('Hello','UTF-16LE'),'UTF-16LE') + List output5 = processor.transform("Hello|UTf-16LE|UTf-16LE|cloud|1", new HashMap<>()); Assert.assertEquals(1, output5.size()); Assert.assertEquals(output5.get(0), "result=Hello"); - // case6: decode('-2 -1 0 72 0 101 0 108 0 108 0 111','UTF-16') - List output6 = - processor.transform("-2 -1 0 72 0 101 0 108 0 108 0 111|UtF-16|banana|cloud|1", new HashMap<>()); + // case6: decode(encode('Hello','UTF-16'),'UTF-16') + List output6 = processor.transform("Hello|UtF-16|UtF-16|cloud|1", new HashMap<>()); Assert.assertEquals(1, output6.size()); Assert.assertEquals(output6.get(0), "result=Hello"); - // case7: decode('-2 -1 0 72 0 101 0 108 0 108 0 111','UTF-16--') - List output7 = - processor.transform("-2 -1 0 72 0 101 0 108 0 108 0 111|UTF-16--|banana|cloud|1", new HashMap<>()); + // case7: decode(encode('Hello','UTF-16--'),'UTF-16--') + List output7 = processor.transform("Hello|UTF-16--|UTF-16--|cloud|1", new HashMap<>()); Assert.assertEquals(1, output7.size()); Assert.assertEquals(output7.get(0), "result="); } diff --git a/inlong-sdk/transform-sdk/src/test/java/org/apache/inlong/sdk/transform/process/function/string/TestEncodeFunction.java b/inlong-sdk/transform-sdk/src/test/java/org/apache/inlong/sdk/transform/process/function/string/TestEncodeFunction.java index 73ff2f48768..9f39863c60b 100644 --- a/inlong-sdk/transform-sdk/src/test/java/org/apache/inlong/sdk/transform/process/function/string/TestEncodeFunction.java +++ b/inlong-sdk/transform-sdk/src/test/java/org/apache/inlong/sdk/transform/process/function/string/TestEncodeFunction.java @@ -32,44 +32,44 @@ public class TestEncodeFunction extends AbstractFunctionStringTestBase { @Test public void testEncodeFunction() throws Exception { - String transformSql = "select encode(string1,string2) from source"; + String transformSql = "select decode(encode(string1,string2),string3) from source"; TransformConfig config = new TransformConfig(transformSql); TransformProcessor processor = TransformProcessor .create(config, SourceDecoderFactory.createCsvDecoder(csvSource), SinkEncoderFactory.createKvEncoder(kvSink)); - // case1: encode('Hello','UTF-8') - List output1 = processor.transform("Hello|UTF-8|banana|cloud|1", new HashMap<>()); + // case1: decode(encode('Hello','UTF-8'),'UTF-8') + List output1 = processor.transform("Hello|UTF-8|UTF-8|cloud|1", new HashMap<>()); Assert.assertEquals(1, output1.size()); - Assert.assertEquals(output1.get(0), "result=72 101 108 108 111"); + Assert.assertEquals(output1.get(0), "result=Hello"); - // case2: encode('Hello','US-ASCII') - List output2 = processor.transform("Hello|US-ASCII|banana|cloud|1", new HashMap<>()); + // case2: decode(encode('Hello','US-ASCII'),'US-ASCII') + List output2 = processor.transform("Hello|US-ASCII|US-ASCII|cloud|1", new HashMap<>()); Assert.assertEquals(1, output2.size()); - Assert.assertEquals(output2.get(0), "result=72 101 108 108 111"); + Assert.assertEquals(output2.get(0), "result=Hello"); - // case3: encode('Hello','ISO-8859-1') - List output3 = processor.transform("Hello|ISO-8859-1|banana|cloud|1", new HashMap<>()); + // case3: decode(encode('Hello','ISO-8859-1'),'ISO-8859-1') + List output3 = processor.transform("Hello|ISO-8859-1|ISO-8859-1|cloud|1", new HashMap<>()); Assert.assertEquals(1, output3.size()); - Assert.assertEquals(output3.get(0), "result=72 101 108 108 111"); + Assert.assertEquals(output3.get(0), "result=Hello"); - // case4: encode('Hello','UTF-16BE') - List output4 = processor.transform("Hello|UTF-16BE|banana|cloud|1", new HashMap<>()); + // case4: decode(encode('Hello','UTF-16BE'),'UTF-16BE') + List output4 = processor.transform("Hello|UTF-16BE|UTF-16BE|cloud|1", new HashMap<>()); Assert.assertEquals(1, output4.size()); - Assert.assertEquals(output4.get(0), "result=0 72 0 101 0 108 0 108 0 111"); + Assert.assertEquals(output4.get(0), "result=Hello"); - // case5: encode('Hello','UTF-16LE') - List output5 = processor.transform("Hello|UTf-16LE|banana|cloud|1", new HashMap<>()); + // case5: decode(encode('Hello','UTF-16LE'),'UTF-16LE') + List output5 = processor.transform("Hello|UTf-16LE|UTf-16LE|cloud|1", new HashMap<>()); Assert.assertEquals(1, output5.size()); - Assert.assertEquals(output5.get(0), "result=72 0 101 0 108 0 108 0 111 0"); + Assert.assertEquals(output5.get(0), "result=Hello"); - // case6: encode('Hello','UTF-16') - List output6 = processor.transform("Hello|UtF-16|banana|cloud|1", new HashMap<>()); + // case6: decode(encode('Hello','UTF-16'),'UTF-16') + List output6 = processor.transform("Hello|UtF-16|UtF-16|cloud|1", new HashMap<>()); Assert.assertEquals(1, output6.size()); - Assert.assertEquals(output6.get(0), "result=-2 -1 0 72 0 101 0 108 0 108 0 111"); + Assert.assertEquals(output6.get(0), "result=Hello"); - // case7: encode('Hello','UTF-16--') - List output7 = processor.transform("Hello|UTF-16--|banana|cloud|1", new HashMap<>()); + // case7: decode(encode('Hello','UTF-16--'),'UTF-16--') + List output7 = processor.transform("Hello|UTF-16--|UTF-16--|cloud|1", new HashMap<>()); Assert.assertEquals(1, output7.size()); Assert.assertEquals(output7.get(0), "result="); } From cde50b2bcbed5b51f250d5725f46f082f56d5e93 Mon Sep 17 00:00:00 2001 From: Zkplo <87751516+Zkplo@users.noreply.github.com> Date: Thu, 19 Sep 2024 11:15:36 +0800 Subject: [PATCH 08/11] [INLONG-11121][SDK] Transform SQL supports gcd functions (#11128) Co-authored-by: ZKpLo <14148880+zkplo@user.noreply.gitee.com> --- .../process/function/GcdFunction.java | 95 +++++++++++++++++++ .../function/arithmetic/TestGcdFunction.java | 70 ++++++++++++++ 2 files changed, 165 insertions(+) create mode 100644 inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/function/GcdFunction.java create mode 100644 inlong-sdk/transform-sdk/src/test/java/org/apache/inlong/sdk/transform/process/function/arithmetic/TestGcdFunction.java diff --git a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/function/GcdFunction.java b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/function/GcdFunction.java new file mode 100644 index 00000000000..54de700d126 --- /dev/null +++ b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/function/GcdFunction.java @@ -0,0 +1,95 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.inlong.sdk.transform.process.function; + +import org.apache.inlong.sdk.transform.decode.SourceData; +import org.apache.inlong.sdk.transform.process.Context; +import org.apache.inlong.sdk.transform.process.operator.OperatorTools; +import org.apache.inlong.sdk.transform.process.parser.ValueParser; + +import lombok.extern.slf4j.Slf4j; +import net.sf.jsqlparser.expression.Expression; +import net.sf.jsqlparser.expression.Function; + +import java.math.BigDecimal; +import java.util.List; + +/** + * GcdFunction -> gcd(numeric_type,numeric_type) + * description: + * - return 0 if both inputs are zero; + * - return greatest common divisor (the largest positive number that divides both inputs with no remainder). + * Note: numeric_type includes floating-point number and integer + */ +@Slf4j +@TransformFunction(names = {"gcd"}) +public class GcdFunction implements ValueParser { + + private final ValueParser firstNumParser; + private final ValueParser secondNumTypeParser; + + public GcdFunction(Function expr) { + List expressions = expr.getParameters().getExpressions(); + firstNumParser = OperatorTools.buildParser(expressions.get(0)); + secondNumTypeParser = OperatorTools.buildParser(expressions.get(1)); + } + + @Override + public Object parse(SourceData sourceData, int rowIndex, Context context) { + Object firstNumObj = firstNumParser.parse(sourceData, rowIndex, context); + Object secondNumObj = secondNumTypeParser.parse(sourceData, rowIndex, context); + if (firstNumObj == null || secondNumObj == null) { + return null; + } + try { + BigDecimal firstNum = OperatorTools.parseBigDecimal(firstNumObj); + BigDecimal secondNum = OperatorTools.parseBigDecimal(secondNumObj); + return gcdForBigDecimals(firstNum, secondNum); + } catch (Exception e) { + log.error("Parse error", e); + return null; + } + } + + public static BigDecimal gcd(BigDecimal a, BigDecimal b) { + if (b.compareTo(BigDecimal.ZERO) == 0) { + return a; + } + BigDecimal remainder = a.remainder(b); + return gcd(b, remainder); + } + + /** + * Support floating-point and integer gcd + * + * @param a first number + * @param b second number + * @return The greatest common divisor of a and b + */ + public static BigDecimal gcdForBigDecimals(BigDecimal a, BigDecimal b) { + int scaleA = a.scale(); + int scaleB = b.scale(); + + BigDecimal scaledA = a.movePointRight(Math.max(scaleA, scaleB)); + BigDecimal scaledB = b.movePointRight(Math.max(scaleA, scaleB)); + + BigDecimal gcdValue = gcd(scaledA, scaledB); + + return gcdValue.movePointLeft(Math.max(scaleA, scaleB)); + } +} \ No newline at end of file diff --git a/inlong-sdk/transform-sdk/src/test/java/org/apache/inlong/sdk/transform/process/function/arithmetic/TestGcdFunction.java b/inlong-sdk/transform-sdk/src/test/java/org/apache/inlong/sdk/transform/process/function/arithmetic/TestGcdFunction.java new file mode 100644 index 00000000000..f4d00c69caa --- /dev/null +++ b/inlong-sdk/transform-sdk/src/test/java/org/apache/inlong/sdk/transform/process/function/arithmetic/TestGcdFunction.java @@ -0,0 +1,70 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.inlong.sdk.transform.process.function.arithmetic; + +import org.apache.inlong.sdk.transform.decode.SourceDecoderFactory; +import org.apache.inlong.sdk.transform.encode.SinkEncoderFactory; +import org.apache.inlong.sdk.transform.pojo.TransformConfig; +import org.apache.inlong.sdk.transform.process.TransformProcessor; + +import org.junit.Assert; +import org.junit.Test; + +import java.util.HashMap; +import java.util.List; + +public class TestGcdFunction extends AbstractFunctionArithmeticTestBase { + + @Test + public void testGcdFunction() throws Exception { + String transformSql = null, data = null; + TransformConfig config = null; + TransformProcessor processor = null; + List output = null; + + transformSql = "select gcd(numeric1,numeric2) from source"; + config = new TransformConfig(transformSql); + processor = TransformProcessor + .create(config, SourceDecoderFactory.createCsvDecoder(csvSource), + SinkEncoderFactory.createKvEncoder(kvSink)); + + // case1: gcd(3.14159265358979323846,3.2653589793) + data = "3.14159265358979323846|3.2653589793||"; + output = processor.transform(data, new HashMap<>()); + Assert.assertEquals(1, output.size()); + Assert.assertEquals("result=2E-20", output.get(0)); + + // case2: gcd(3.141,3.846) + data = "3.141|3.846||"; + output = processor.transform(data, new HashMap<>()); + Assert.assertEquals(1, output.size()); + Assert.assertEquals("result=0.003", output.get(0)); + + // case3: gcd(0,3.846) + data = "0|3.846||"; + output = processor.transform(data, new HashMap<>()); + Assert.assertEquals(1, output.size()); + Assert.assertEquals("result=3.846", output.get(0)); + + // case4: gcd(-3.141,3.846) + data = "3.141|3.846||"; + output = processor.transform(data, new HashMap<>()); + Assert.assertEquals(1, output.size()); + Assert.assertEquals("result=0.003", output.get(0)); + } +} From 9731701558a64a55103e657805f906c685d6fb7a Mon Sep 17 00:00:00 2001 From: Zkplo <87751516+Zkplo@users.noreply.github.com> Date: Thu, 19 Sep 2024 11:15:58 +0800 Subject: [PATCH 09/11] [INLONG-11122][SDK] Transform SQL supports UNHEX function (#11131) Co-authored-by: ZKpLo <14148880+zkplo@user.noreply.gitee.com> --- .../process/function/UnHexFunction.java | 60 ++++++++++++++ .../function/string/TestUnHexFunction.java | 79 +++++++++++++++++++ 2 files changed, 139 insertions(+) create mode 100644 inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/function/UnHexFunction.java create mode 100644 inlong-sdk/transform-sdk/src/test/java/org/apache/inlong/sdk/transform/process/function/string/TestUnHexFunction.java diff --git a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/function/UnHexFunction.java b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/function/UnHexFunction.java new file mode 100644 index 00000000000..036dd0ec734 --- /dev/null +++ b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/function/UnHexFunction.java @@ -0,0 +1,60 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.inlong.sdk.transform.process.function; + +import org.apache.inlong.sdk.transform.decode.SourceData; +import org.apache.inlong.sdk.transform.process.Context; +import org.apache.inlong.sdk.transform.process.operator.OperatorTools; +import org.apache.inlong.sdk.transform.process.parser.ValueParser; + +import net.sf.jsqlparser.expression.Function; + +/** + * UnHexFunction -> unhex(str) + * description: unhex(str) interprets each pair of characters in the argument as a hexadecimal number and converts it to the byte represented by the number. + * return null if str is null; + * return a binary string otherwise. + */ +@TransformFunction(names = {"unhex"}) +public class UnHexFunction implements ValueParser { + + private ValueParser valueParser; + + public UnHexFunction(Function expr) { + valueParser = OperatorTools.buildParser(expr.getParameters().getExpressions().get(0)); + } + + @Override + public Object parse(SourceData sourceData, int rowIndex, Context context) { + Object valueObj = valueParser.parse(sourceData, rowIndex, context); + if (valueObj == null) { + return null; + } + return hexToString(OperatorTools.parseString(valueObj)); + } + + public static String hexToString(String hex) { + StringBuilder output = new StringBuilder(); + for (int i = 0; i < hex.length(); i += 2) { + String str = hex.substring(i, i + 2); + char ch = (char) Integer.parseInt(str, 16); + output.append(ch); + } + return output.toString(); + } +} diff --git a/inlong-sdk/transform-sdk/src/test/java/org/apache/inlong/sdk/transform/process/function/string/TestUnHexFunction.java b/inlong-sdk/transform-sdk/src/test/java/org/apache/inlong/sdk/transform/process/function/string/TestUnHexFunction.java new file mode 100644 index 00000000000..7c8eedb490f --- /dev/null +++ b/inlong-sdk/transform-sdk/src/test/java/org/apache/inlong/sdk/transform/process/function/string/TestUnHexFunction.java @@ -0,0 +1,79 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.inlong.sdk.transform.process.function.string; + +import org.apache.inlong.sdk.transform.decode.SourceDecoderFactory; +import org.apache.inlong.sdk.transform.encode.SinkEncoderFactory; +import org.apache.inlong.sdk.transform.pojo.TransformConfig; +import org.apache.inlong.sdk.transform.process.TransformProcessor; + +import org.junit.Assert; +import org.junit.Test; + +import java.util.HashMap; +import java.util.List; + +/** + * TestUnHexFunction + * description: test the unhex function in transform processor + */ +public class TestUnHexFunction extends AbstractFunctionStringTestBase { + + @Test + public void testUnHexFunction() throws Exception { + String transformSql = null, data = null; + TransformConfig config = null; + TransformProcessor processor = null; + List output = null; + + transformSql = "select unhex(string1) from source"; + config = new TransformConfig(transformSql); + processor = TransformProcessor + .create(config, SourceDecoderFactory.createCsvDecoder(csvSource), + SinkEncoderFactory.createKvEncoder(kvSink)); + // case3: unhex("6") + output = processor.transform("6", new HashMap<>()); + Assert.assertEquals(1, output.size()); + Assert.assertEquals("result=", output.get(0)); + // case1: unhex("696E6C6F6E67") + output = processor.transform("696E6C6F6E67", new HashMap<>()); + Assert.assertEquals(1, output.size()); + Assert.assertEquals("result=inlong", output.get(0)); + + // case2: unhex("") + output = processor.transform("|", new HashMap<>()); + Assert.assertEquals(1, output.size()); + Assert.assertEquals(output.get(0), "result="); + + // case3: unhex("6") + output = processor.transform("6", new HashMap<>()); + Assert.assertEquals(1, output.size()); + Assert.assertEquals("result=", output.get(0)); + + transformSql = "select unhex(hex(string1)) from source"; + config = new TransformConfig(transformSql); + processor = TransformProcessor + .create(config, SourceDecoderFactory.createCsvDecoder(csvSource), + SinkEncoderFactory.createKvEncoder(kvSink)); + // case4: unhex(hex("inlong")) + output = processor.transform("inlong", new HashMap<>()); + Assert.assertEquals(1, output.size()); + Assert.assertEquals("result=inlong", output.get(0)); + + } +} From 03a27a75b357231ea37f8b5f3d73d95bd4bbbae3 Mon Sep 17 00:00:00 2001 From: Zkplo <87751516+Zkplo@users.noreply.github.com> Date: Thu, 19 Sep 2024 11:19:25 +0800 Subject: [PATCH 10/11] [INLONG-11020][SDK] Add BSON formatted data source for Transform (#11136) Co-authored-by: ZKpLo <14148880+zkplo@user.noreply.gitee.com> --- inlong-sdk/transform-sdk/pom.xml | 4 + .../sdk/transform/decode/BsonSourceData.java | 31 ++++ .../transform/decode/BsonSourceDecoder.java | 64 ++++++++ .../decode/SourceDecoderFactory.java | 4 + .../sdk/transform/pojo/BsonSourceInfo.java | 36 +++++ .../processor/TestBson2CsvProcessor.java | 141 ++++++++++++++++++ pom.xml | 7 + 7 files changed, 287 insertions(+) create mode 100644 inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/decode/BsonSourceData.java create mode 100644 inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/decode/BsonSourceDecoder.java create mode 100644 inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/pojo/BsonSourceInfo.java create mode 100644 inlong-sdk/transform-sdk/src/test/java/org/apache/inlong/sdk/transform/process/processor/TestBson2CsvProcessor.java diff --git a/inlong-sdk/transform-sdk/pom.xml b/inlong-sdk/transform-sdk/pom.xml index 4770f0025da..a4b17600241 100644 --- a/inlong-sdk/transform-sdk/pom.xml +++ b/inlong-sdk/transform-sdk/pom.xml @@ -58,6 +58,10 @@ protobuf-java-util ${protobuf.version} + + org.mongodb + bson + org.reflections reflections diff --git a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/decode/BsonSourceData.java b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/decode/BsonSourceData.java new file mode 100644 index 00000000000..97100b02405 --- /dev/null +++ b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/decode/BsonSourceData.java @@ -0,0 +1,31 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.inlong.sdk.transform.decode; + +import com.google.gson.JsonArray; +import com.google.gson.JsonObject; + +/** + * BsonSourceData + */ +public class BsonSourceData extends JsonSourceData { + + public BsonSourceData(JsonObject root, JsonArray childRoot) { + super(root, childRoot); + } +} diff --git a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/decode/BsonSourceDecoder.java b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/decode/BsonSourceDecoder.java new file mode 100644 index 00000000000..880467ea434 --- /dev/null +++ b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/decode/BsonSourceDecoder.java @@ -0,0 +1,64 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.inlong.sdk.transform.decode; + +import org.apache.inlong.sdk.transform.pojo.BsonSourceInfo; +import org.apache.inlong.sdk.transform.process.Context; + +import lombok.extern.slf4j.Slf4j; +import org.bson.BsonDocument; +import org.bson.RawBsonDocument; +import org.bson.json.JsonMode; +import org.bson.json.JsonWriterSettings; + +import java.math.BigDecimal; + +/** + * BsonSourceDecoder + */ +@Slf4j +public class BsonSourceDecoder implements SourceDecoder { + + private final JsonSourceDecoder decoder; + + public BsonSourceDecoder(BsonSourceInfo sourceInfo) { + decoder = new JsonSourceDecoder(sourceInfo); + } + + @Override + public SourceData decode(byte[] srcBytes, Context context) { + return decoder.decode(parse(srcBytes), context); + } + + public String parse(byte[] bsonData) { + try { + RawBsonDocument rawBsonDocument = new RawBsonDocument(bsonData); + JsonWriterSettings writerSettings = JsonWriterSettings.builder() + .outputMode(JsonMode.RELAXED) + .decimal128Converter((value, writer) -> writer.writeNumber(value.bigDecimalValue().toPlainString())) + .doubleConverter((value, writer) -> writer.writeNumber(new BigDecimal(value).toString())) + .build(); + BsonDocument bsonDocument = BsonDocument.parse(rawBsonDocument.toJson(writerSettings)); + bsonDocument.remove("_id"); + return bsonDocument.toJson(); + } catch (Exception e) { + log.error(e.getMessage(), e); + } + return null; + } +} diff --git a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/decode/SourceDecoderFactory.java b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/decode/SourceDecoderFactory.java index 06c3dd128b7..2fcab7c0bcf 100644 --- a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/decode/SourceDecoderFactory.java +++ b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/decode/SourceDecoderFactory.java @@ -18,6 +18,7 @@ package org.apache.inlong.sdk.transform.decode; import org.apache.inlong.sdk.transform.pojo.AvroSourceInfo; +import org.apache.inlong.sdk.transform.pojo.BsonSourceInfo; import org.apache.inlong.sdk.transform.pojo.CsvSourceInfo; import org.apache.inlong.sdk.transform.pojo.JsonSourceInfo; import org.apache.inlong.sdk.transform.pojo.KvSourceInfo; @@ -44,4 +45,7 @@ public static PbSourceDecoder createPbDecoder(PbSourceInfo sourceInfo) { public static AvroSourceDecoder createAvroDecoder(AvroSourceInfo sourceInfo) { return new AvroSourceDecoder(sourceInfo); } + public static BsonSourceDecoder createBsonDecoder(BsonSourceInfo sourceInfo) { + return new BsonSourceDecoder(sourceInfo); + } } diff --git a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/pojo/BsonSourceInfo.java b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/pojo/BsonSourceInfo.java new file mode 100644 index 00000000000..9ae274d5877 --- /dev/null +++ b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/pojo/BsonSourceInfo.java @@ -0,0 +1,36 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.inlong.sdk.transform.pojo; + +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonIgnoreProperties; +import com.fasterxml.jackson.annotation.JsonProperty; + +/** + * BsonSourceInfo + */ +@JsonIgnoreProperties(ignoreUnknown = true) +public class BsonSourceInfo extends JsonSourceInfo { + + @JsonCreator + public BsonSourceInfo( + @JsonProperty("charset") String charset, + @JsonProperty("rowsNodePath") String rowsNodePath) { + super(charset, rowsNodePath); + } +} diff --git a/inlong-sdk/transform-sdk/src/test/java/org/apache/inlong/sdk/transform/process/processor/TestBson2CsvProcessor.java b/inlong-sdk/transform-sdk/src/test/java/org/apache/inlong/sdk/transform/process/processor/TestBson2CsvProcessor.java new file mode 100644 index 00000000000..30668c58cca --- /dev/null +++ b/inlong-sdk/transform-sdk/src/test/java/org/apache/inlong/sdk/transform/process/processor/TestBson2CsvProcessor.java @@ -0,0 +1,141 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.inlong.sdk.transform.process.processor; + +import org.apache.inlong.sdk.transform.decode.SourceDecoderFactory; +import org.apache.inlong.sdk.transform.encode.SinkEncoderFactory; +import org.apache.inlong.sdk.transform.pojo.BsonSourceInfo; +import org.apache.inlong.sdk.transform.pojo.CsvSinkInfo; +import org.apache.inlong.sdk.transform.pojo.FieldInfo; +import org.apache.inlong.sdk.transform.pojo.TransformConfig; +import org.apache.inlong.sdk.transform.process.TransformProcessor; + +import org.junit.Assert; +import org.junit.Test; + +import java.util.Base64; +import java.util.HashMap; +import java.util.List; + +public class TestBson2CsvProcessor extends AbstractProcessorTestBase { + + @Test + public void testBson2Csv() throws Exception { + List fields1 = this.getTestFieldList("sid", "packageID", "msgTime", "msg"); + BsonSourceInfo bsonSourceInfo1 = new BsonSourceInfo("UTF-8", "msgs"); + CsvSinkInfo csvSink1 = new CsvSinkInfo("UTF-8", '|', '\\', fields1); + String transformSql1 = "select $root.sid,$root.packageID,$child.msgTime,$child.msg from source"; + TransformConfig config1 = new TransformConfig(transformSql1); + // case1 + TransformProcessor processor1 = TransformProcessor + .create(config1, SourceDecoderFactory.createBsonDecoder(bsonSourceInfo1), + SinkEncoderFactory.createCsvEncoder(csvSink1)); + String srcBase64String1 = "lQAAAAdfaWQAZugcR0pfeo8607hFAnNpZAAHAAAAdmFsdWUxAAJ" + + "wYWNrYWdlSUQABwAAAHZhbHVlMgAEbXNncwBTAAAAAzAAJgAAAAJtc2cABwAAAHZhbHVl" + + "NAABbXNnVGltZQAAAOu4VO54QgADMQAiAAAAAm1zZwADAAAAdjQAAW1zZ1RpbWUAAADru" + + "FTueEIAAAA="; + byte[] srcByte1 = Base64.getDecoder().decode(srcBase64String1); + List output1 = processor1.transform(srcByte1, new HashMap<>()); + Assert.assertEquals(2, output1.size()); + Assert.assertEquals("value1|value2|1713243918000|value4", output1.get(0)); + Assert.assertEquals("value1|value2|1713243918000|v4", output1.get(1)); + + // case2 + List fields2 = this.getTestFieldList("id", "itemId", "subItemId", "msg"); + BsonSourceInfo bsonSourceInfo2 = new BsonSourceInfo("UTF-8", "items"); + CsvSinkInfo csvSink2 = new CsvSinkInfo("UTF-8", '|', '\\', fields2); + String transformSql2 = + "select $root.id,$child.itemId,$child.subItems(0).subItemId,$child.subItems(1).msg from source"; + TransformConfig config2 = new TransformConfig(transformSql2); + TransformProcessor processor2 = TransformProcessor + .create(config2, SourceDecoderFactory.createBsonDecoder(bsonSourceInfo2), + SinkEncoderFactory.createCsvEncoder(csvSink2)); + String srcBase64String2 = "SAEAAAdfaWQAZueeaQg0im27chcAAmlkAAcAAAB2YWx1ZTEAAm5hb" + + "WUABwAAAHZhbHVlMgAEaXRlbXMACwEAAAMwAIAAAAACaXRlbUlkAAYAAABpdGVtMQAEc" + + "3ViSXRlbXMAXwAAAAMwACoAAAACc3ViSXRlbUlkAAUAAAAxMDAxAAJtc2cACAAAADEwM" + + "DFtc2cAAAMxACoAAAACc3ViSXRlbUlkAAUAAAAxMDAyAAJtc2cACAAAADEwMDJtc2cAA" + + "AAAAzEAgAAAAAJpdGVtSWQABgAAAGl0ZW0yAARzdWJJdGVtcwBfAAAAAzAAKgAAAAJzd" + + "WJJdGVtSWQABQAAADIwMDEAAm1zZwAIAAAAMjAwMW1zZwAAAzEAKgAAAAJzdWJJdGVtS" + + "WQABQAAADIwMDIAAm1zZwAIAAAAMjAwMm1zZwAAAAAAAA=="; + byte[] srcByte2 = Base64.getDecoder().decode(srcBase64String2); + List output2 = processor2.transform(srcByte2, new HashMap<>()); + Assert.assertEquals(2, output2.size()); + Assert.assertEquals("value1|item1|1001|1002msg", output2.get(0)); + Assert.assertEquals("value1|item2|2001|2002msg", output2.get(1)); + + // case3 + List fields3 = this.getTestFieldList("matrix(0,0)", "matrix(1,1)", "matrix(2,2)"); + BsonSourceInfo bsonSourceInfo3 = new BsonSourceInfo("UTF-8", ""); + CsvSinkInfo csvSink3 = new CsvSinkInfo("UTF-8", '|', '\\', fields3); + String transformSql3 = "select $root.matrix(0, 0), $root.matrix(1, 1), $root.matrix(2, 2) from source"; + TransformConfig config3 = new TransformConfig(transformSql3); + TransformProcessor processor3 = TransformProcessor + .create(config3, SourceDecoderFactory.createBsonDecoder(bsonSourceInfo3), + SinkEncoderFactory.createCsvEncoder(csvSink3)); + String srcBase64String3 = "ngAAAAdfaWQAZugj40pfeo8607hGBG1hdHJpeACAAAAABDAAJgAAAA" + + "EwAAAAAAAAAPA/ATEAAAAAAAAAAEABMgAAAAAAAAAIQAAEMQAmAAAAATAAAAAAAAAAEEABMQ" + + "AAAAAAAAAUQAEyAAAAAAAAABhAAAQyACYAAAABMAAAAAAAAAAcQAExAAAAAAAAACBAATIAAA" + + "AAAAAAIkAAAAA="; + byte[] srcByte3 = Base64.getDecoder().decode(srcBase64String3); + List output3 = processor3.transform(srcByte3, new HashMap<>()); + Assert.assertEquals(1, output3.size()); + Assert.assertEquals("1|5|9", output3.get(0)); + + // case 4 + List fields4 = this.getTestFieldList("department_name", "course_id", "num"); + BsonSourceInfo bsonSourceInfo4 = new BsonSourceInfo("UTF-8", ""); + CsvSinkInfo csvSink4 = new CsvSinkInfo("UTF-8", '|', '\\', fields4); + String transformSql4 = + "select $root.departments(0).name, $root.departments(0).courses(0,1).courseId, sqrt($root.departments(0).courses(0,1).courseId - 2) from source"; + TransformConfig config4 = new TransformConfig(transformSql4); + TransformProcessor processor4 = TransformProcessor + .create(config4, SourceDecoderFactory.createBsonDecoder(bsonSourceInfo4), + SinkEncoderFactory.createCsvEncoder(csvSink4)); + String srcBase64String4 = "KwEAAAdfaWQAZugoCEpfeo8607hHBGRlcGFydG1lbnRzAAgBAAADMAA" + + "AAQAAAm5hbWUADAAAAE1hdGhlbWF0aWNzAARjb3Vyc2VzANwAAAAEMABnAAAAAzAALAAAAAJj" + + "b3Vyc2VJZAAEAAAAMTAxAAJ0aXRsZQAKAAAAQ2FsY3VsdXNJAAADMQAwAAAAAmNvdXJzZUlkA" + + "AQAAAAxMDIAAnRpdGxlAA4AAABMaW5lYXJBbGdlYnJhAAAABDEAagAAAAMwAC0AAAACY291cn" + + "NlSWQABAAAADIwMQACdGl0bGUACwAAAENhbGN1bHVzSUkAAAMxADIAAAACY291cnNlSWQABAA" + + "AADIwMgACdGl0bGUAEAAAAEFic3RyYWN0QWxnZWJyYQAAAAAAAAA="; + byte[] srcByte4 = Base64.getDecoder().decode(srcBase64String4); + List output4 = processor4.transform(srcByte4, new HashMap<>()); + Assert.assertEquals(1, output4.size()); + Assert.assertEquals(output4.get(0), "Mathematics|102|10.0"); + } + + @Test + public void testBson2CsvForOne() throws Exception { + List fields = this.getTestFieldList("sid", "packageID", "msgTime", "msg"); + BsonSourceInfo bsonSourceInfo = new BsonSourceInfo("UTF-8", ""); + CsvSinkInfo csvSink = new CsvSinkInfo("UTF-8", '|', '\\', fields); + String transformSql = "select $root.sid,$root.packageID,$root.msgs(1).msgTime,$root.msgs(0).msg from source"; + TransformConfig config = new TransformConfig(transformSql); + // case1 + TransformProcessor processor = TransformProcessor + .create(config, SourceDecoderFactory.createBsonDecoder(bsonSourceInfo), + SinkEncoderFactory.createCsvEncoder(csvSink)); + String srcBase64String = "lQAAAAdfaWQAZugcR0pfeo8607hFAnNpZAAHAAAAdmFsdWUxAAJ" + + "wYWNrYWdlSUQABwAAAHZhbHVlMgAEbXNncwBTAAAAAzAAJgAAAAJtc2cABwAAAHZhbHVl" + + "NAABbXNnVGltZQAAAOu4VO54QgADMQAiAAAAAm1zZwADAAAAdjQAAW1zZ1RpbWUAAADru" + + "FTueEIAAAA="; + byte[] srcByte = Base64.getDecoder().decode(srcBase64String); + List output = processor.transform(srcByte, new HashMap<>()); + Assert.assertEquals(1, output.size()); + Assert.assertEquals(output.get(0), "value1|value2|1713243918000|value4"); + } +} diff --git a/pom.xml b/pom.xml index babebca3a7b..e843f210f3c 100644 --- a/pom.xml +++ b/pom.xml @@ -164,6 +164,7 @@ 4.2.0 1.10.1 + 4.9.1 1.6.7 1.12.2 2.0.8 @@ -209,6 +210,12 @@ + + org.mongodb + bson + ${bson.version} + + io.opentelemetry From c4cbc1071fd05eed7dea4f978fa4056108eab61d Mon Sep 17 00:00:00 2001 From: Xincheng Huang <60057611+ying-hua@users.noreply.github.com> Date: Thu, 19 Sep 2024 11:21:13 +0800 Subject: [PATCH 11/11] [INLONG-11139][SDK] Transform support Not Between And operator (#11140) --- .../process/operator/BetweenAndOperator.java | 6 ++- .../process/TestBetweenAndOperator.java | 47 +++++++++++++++++++ 2 files changed, 52 insertions(+), 1 deletion(-) diff --git a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/operator/BetweenAndOperator.java b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/operator/BetweenAndOperator.java index d6d4b45c27f..bd90fef135d 100644 --- a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/operator/BetweenAndOperator.java +++ b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/operator/BetweenAndOperator.java @@ -34,11 +34,13 @@ public class BetweenAndOperator implements ExpressionOperator { private final ValueParser left; private final ValueParser start; private final ValueParser end; + private final boolean isNot; public BetweenAndOperator(Between expr) { this.left = OperatorTools.buildParser(expr.getLeftExpression()); this.start = OperatorTools.buildParser(expr.getBetweenExpressionStart()); this.end = OperatorTools.buildParser(expr.getBetweenExpressionEnd()); + this.isNot = expr.isNot(); } @SuppressWarnings("rawtypes") @@ -48,7 +50,9 @@ public boolean check(SourceData sourceData, int rowIndex, Context context) { Comparable startValue = (Comparable) this.start.parse(sourceData, rowIndex, context); Comparable endValue = (Comparable) this.end.parse(sourceData, rowIndex, context); - return OperatorTools.compareValue(leftValue, startValue) >= 0 && + boolean result = OperatorTools.compareValue(leftValue, startValue) >= 0 && OperatorTools.compareValue(leftValue, endValue) <= 0; + + return this.isNot != result; } } \ No newline at end of file diff --git a/inlong-sdk/transform-sdk/src/test/java/org/apache/inlong/sdk/transform/process/TestBetweenAndOperator.java b/inlong-sdk/transform-sdk/src/test/java/org/apache/inlong/sdk/transform/process/TestBetweenAndOperator.java index 23b9693bca5..159e833d6f6 100644 --- a/inlong-sdk/transform-sdk/src/test/java/org/apache/inlong/sdk/transform/process/TestBetweenAndOperator.java +++ b/inlong-sdk/transform-sdk/src/test/java/org/apache/inlong/sdk/transform/process/TestBetweenAndOperator.java @@ -99,4 +99,51 @@ public void testBetweenAndOperator() throws Exception { Assert.assertEquals(1, output1.size()); Assert.assertEquals(output8.get(0), "result=0"); } + + @Test + public void testNotBetweenAndOperator() throws Exception { + String transformSql = "select if(string2 not between 3 and 5,1,0) from source"; + TransformConfig config = new TransformConfig(transformSql); + // case1: '3a' not between '3' and '5' -> 0 + TransformProcessor processor = TransformProcessor + .create(config, SourceDecoderFactory.createCsvDecoder(csvSource), + SinkEncoderFactory.createKvEncoder(kvSink)); + List output1 = processor.transform("3.14159265358979323846|3a|4|8"); + Assert.assertEquals(1, output1.size()); + Assert.assertEquals(output1.get(0), "result=0"); + // case2: '4a' not between '3' and '5' -> 0 + List output2 = processor.transform("3.14159265358979323846|4a|4|8"); + Assert.assertEquals(1, output1.size()); + Assert.assertEquals(output2.get(0), "result=0"); + // case3: '6' not between '3' and '5' -> 1 + List output3 = processor.transform("3.14159265358979323846|6|4|8"); + Assert.assertEquals(1, output1.size()); + Assert.assertEquals(output3.get(0), "result=1"); + // case4: '3e2' not between '3' and '5' -> 1 + List output4 = processor.transform("3.14159265358979323846|3e2|4|8"); + Assert.assertEquals(1, output1.size()); + Assert.assertEquals(output4.get(0), "result=1"); + + transformSql = "select if(numeric3 not between 3 and 5,1,0) from source"; + config = new TransformConfig(transformSql); + // case5: 4 not between 3 and 5 -> 0 + processor = TransformProcessor + .create(config, SourceDecoderFactory.createCsvDecoder(csvSource), + SinkEncoderFactory.createKvEncoder(kvSink)); + List output5 = processor.transform("3.14159265358979323846|4|4|8"); + Assert.assertEquals(1, output1.size()); + Assert.assertEquals(output5.get(0), "result=0"); + // case6: 3 not between 3 and 5 -> 0 + List output6 = processor.transform("3.14159265358979323846|4|3|8"); + Assert.assertEquals(1, output1.size()); + Assert.assertEquals(output6.get(0), "result=0"); + // case7: 5 not between 3 and 5 -> 0 + List output7 = processor.transform("3.14159265358979323846|4|5|8"); + Assert.assertEquals(1, output1.size()); + Assert.assertEquals(output7.get(0), "result=0"); + // case8: 3e2 not between 3 and 5 -> 1 + List output8 = processor.transform("3.14159265358979323846|4|3e2|8"); + Assert.assertEquals(1, output1.size()); + Assert.assertEquals(output8.get(0), "result=1"); + } } \ No newline at end of file