From efcfc25506cc8efae500d2d94985cfdce35e57a0 Mon Sep 17 00:00:00 2001 From: feat Date: Mon, 22 Apr 2024 23:17:42 +0800 Subject: [PATCH] dev --- .../sort/protocol/node/ExtractNode.java | 2 +- .../inlong/sort/protocol/node/LoadNode.java | 2 +- .../inlong/sort/protocol/node/Node.java | 22 +++++++++++++++++-- .../node/extract/PaimonExtractNode.java | 9 ++++---- .../protocol/node/load/PaimonLoadNode.java | 16 ++++++++------ .../node/load/PaimonLoadNodeTest.java | 3 ++- .../sort/parser/PaimonNodeSqlParserTest.java | 22 ++++++++++++++----- .../table/sink/HudiTableInlongFactory.java | 2 +- .../org.apache.flink.table.factories.Factory | 2 +- .../table/sink/PaimonTableInlongFactory.java | 3 ++- 10 files changed, 58 insertions(+), 25 deletions(-) rename inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/hudi/src/main/java/org/apache/inlong/sort/{hudi => paimon}/table/sink/HudiTableInlongFactory.java (97%) rename inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/paimon/src/main/java/org/apache/inlong/sort/{hudi => paimon}/table/sink/PaimonTableInlongFactory.java (93%) diff --git a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/ExtractNode.java b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/ExtractNode.java index f9ae2afe18b..c3b92af9b04 100644 --- a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/ExtractNode.java +++ b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/ExtractNode.java @@ -21,12 +21,12 @@ import org.apache.inlong.sort.protocol.node.extract.DorisExtractNode; import org.apache.inlong.sort.protocol.node.extract.FileSystemExtractNode; import org.apache.inlong.sort.protocol.node.extract.HudiExtractNode; -import org.apache.inlong.sort.protocol.node.extract.PaimonExtractNode; import org.apache.inlong.sort.protocol.node.extract.IcebergExtractNode; import org.apache.inlong.sort.protocol.node.extract.KafkaExtractNode; import org.apache.inlong.sort.protocol.node.extract.MongoExtractNode; import org.apache.inlong.sort.protocol.node.extract.MySqlExtractNode; import org.apache.inlong.sort.protocol.node.extract.OracleExtractNode; +import org.apache.inlong.sort.protocol.node.extract.PaimonExtractNode; import org.apache.inlong.sort.protocol.node.extract.PostgresExtractNode; import org.apache.inlong.sort.protocol.node.extract.PulsarExtractNode; import org.apache.inlong.sort.protocol.node.extract.RedisExtractNode; diff --git a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/LoadNode.java b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/LoadNode.java index 1ff43f39f71..4370fc13a91 100644 --- a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/LoadNode.java +++ b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/LoadNode.java @@ -27,11 +27,11 @@ import org.apache.inlong.sort.protocol.node.load.HbaseLoadNode; import org.apache.inlong.sort.protocol.node.load.HiveLoadNode; import org.apache.inlong.sort.protocol.node.load.HudiLoadNode; -import org.apache.inlong.sort.protocol.node.load.PaimonLoadNode; import org.apache.inlong.sort.protocol.node.load.IcebergLoadNode; import org.apache.inlong.sort.protocol.node.load.KafkaLoadNode; import org.apache.inlong.sort.protocol.node.load.MySqlLoadNode; import org.apache.inlong.sort.protocol.node.load.OracleLoadNode; +import org.apache.inlong.sort.protocol.node.load.PaimonLoadNode; import org.apache.inlong.sort.protocol.node.load.PostgresLoadNode; import org.apache.inlong.sort.protocol.node.load.RedisLoadNode; import org.apache.inlong.sort.protocol.node.load.SqlServerLoadNode; diff --git a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/Node.java b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/Node.java index 2f54cc61adc..71621ea68ca 100644 --- a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/Node.java +++ b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/Node.java @@ -21,18 +21,36 @@ import org.apache.inlong.sort.protocol.node.extract.DorisExtractNode; import org.apache.inlong.sort.protocol.node.extract.FileSystemExtractNode; import org.apache.inlong.sort.protocol.node.extract.HudiExtractNode; -import org.apache.inlong.sort.protocol.node.extract.PaimonExtractNode; import org.apache.inlong.sort.protocol.node.extract.IcebergExtractNode; import org.apache.inlong.sort.protocol.node.extract.KafkaExtractNode; import org.apache.inlong.sort.protocol.node.extract.MongoExtractNode; import org.apache.inlong.sort.protocol.node.extract.MySqlExtractNode; import org.apache.inlong.sort.protocol.node.extract.OracleExtractNode; +import org.apache.inlong.sort.protocol.node.extract.PaimonExtractNode; import org.apache.inlong.sort.protocol.node.extract.PostgresExtractNode; import org.apache.inlong.sort.protocol.node.extract.PulsarExtractNode; import org.apache.inlong.sort.protocol.node.extract.RedisExtractNode; import org.apache.inlong.sort.protocol.node.extract.SqlServerExtractNode; import org.apache.inlong.sort.protocol.node.extract.TubeMQExtractNode; -import org.apache.inlong.sort.protocol.node.load.*; +import org.apache.inlong.sort.protocol.node.load.ClickHouseLoadNode; +import org.apache.inlong.sort.protocol.node.load.DorisLoadNode; +import org.apache.inlong.sort.protocol.node.load.ElasticsearchLoadNode; +import org.apache.inlong.sort.protocol.node.load.FileSystemLoadNode; +import org.apache.inlong.sort.protocol.node.load.GreenplumLoadNode; +import org.apache.inlong.sort.protocol.node.load.HbaseLoadNode; +import org.apache.inlong.sort.protocol.node.load.HiveLoadNode; +import org.apache.inlong.sort.protocol.node.load.HudiLoadNode; +import org.apache.inlong.sort.protocol.node.load.IcebergLoadNode; +import org.apache.inlong.sort.protocol.node.load.KafkaLoadNode; +import org.apache.inlong.sort.protocol.node.load.KuduLoadNode; +import org.apache.inlong.sort.protocol.node.load.MySqlLoadNode; +import org.apache.inlong.sort.protocol.node.load.OracleLoadNode; +import org.apache.inlong.sort.protocol.node.load.PaimonLoadNode; +import org.apache.inlong.sort.protocol.node.load.PostgresLoadNode; +import org.apache.inlong.sort.protocol.node.load.RedisLoadNode; +import org.apache.inlong.sort.protocol.node.load.SqlServerLoadNode; +import org.apache.inlong.sort.protocol.node.load.StarRocksLoadNode; +import org.apache.inlong.sort.protocol.node.load.TDSQLPostgresLoadNode; import org.apache.inlong.sort.protocol.node.transform.DistinctNode; import org.apache.inlong.sort.protocol.node.transform.TransformNode; diff --git a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/extract/PaimonExtractNode.java b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/extract/PaimonExtractNode.java index e0849103219..5d2d40c0c63 100644 --- a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/extract/PaimonExtractNode.java +++ b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/extract/PaimonExtractNode.java @@ -17,6 +17,10 @@ package org.apache.inlong.sort.protocol.node.extract; +import org.apache.inlong.sort.protocol.FieldInfo; +import org.apache.inlong.sort.protocol.node.ExtractNode; +import org.apache.inlong.sort.protocol.transformation.WatermarkField; + import com.google.common.base.Preconditions; import lombok.Data; import lombok.EqualsAndHashCode; @@ -26,13 +30,10 @@ import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonInclude.Include; import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonProperty; import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonTypeName; -import org.apache.inlong.sort.protocol.FieldInfo; -import org.apache.inlong.sort.protocol.constant.PaimonConstant.*; -import org.apache.inlong.sort.protocol.node.ExtractNode; -import org.apache.inlong.sort.protocol.transformation.WatermarkField; import javax.annotation.Nonnull; import javax.annotation.Nullable; + import java.io.Serializable; import java.util.HashMap; import java.util.List; diff --git a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/load/PaimonLoadNode.java b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/load/PaimonLoadNode.java index 06190fc5875..93b9d344aaf 100644 --- a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/load/PaimonLoadNode.java +++ b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/load/PaimonLoadNode.java @@ -17,6 +17,14 @@ package org.apache.inlong.sort.protocol.node.load; +import org.apache.inlong.sort.protocol.FieldInfo; +import org.apache.inlong.sort.protocol.InlongMetric; +import org.apache.inlong.sort.protocol.constant.PaimonConstant.CatalogType; +import org.apache.inlong.sort.protocol.enums.FilterStrategy; +import org.apache.inlong.sort.protocol.node.LoadNode; +import org.apache.inlong.sort.protocol.transformation.FieldRelation; +import org.apache.inlong.sort.protocol.transformation.FilterFunction; + import com.google.common.base.Preconditions; import lombok.Data; import lombok.EqualsAndHashCode; @@ -26,16 +34,10 @@ import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonCreator; import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonProperty; import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonTypeName; -import org.apache.inlong.sort.protocol.FieldInfo; -import org.apache.inlong.sort.protocol.InlongMetric; -import org.apache.inlong.sort.protocol.constant.PaimonConstant.CatalogType; -import org.apache.inlong.sort.protocol.enums.FilterStrategy; -import org.apache.inlong.sort.protocol.node.LoadNode; -import org.apache.inlong.sort.protocol.transformation.FieldRelation; -import org.apache.inlong.sort.protocol.transformation.FilterFunction; import javax.annotation.Nonnull; import javax.annotation.Nullable; + import java.io.Serializable; import java.util.HashMap; import java.util.List; diff --git a/inlong-sort/sort-common/src/test/java/org/apache/inlong/sort/protocol/node/load/PaimonLoadNodeTest.java b/inlong-sort/sort-common/src/test/java/org/apache/inlong/sort/protocol/node/load/PaimonLoadNodeTest.java index 5257ee1f2ab..f4aa61558a5 100644 --- a/inlong-sort/sort-common/src/test/java/org/apache/inlong/sort/protocol/node/load/PaimonLoadNodeTest.java +++ b/inlong-sort/sort-common/src/test/java/org/apache/inlong/sort/protocol/node/load/PaimonLoadNodeTest.java @@ -17,13 +17,14 @@ package org.apache.inlong.sort.protocol.node.load; -import com.google.common.collect.Maps; import org.apache.inlong.common.pojo.sort.dataflow.field.format.StringFormatInfo; import org.apache.inlong.sort.SerializeBaseTest; import org.apache.inlong.sort.protocol.FieldInfo; import org.apache.inlong.sort.protocol.constant.PaimonConstant.CatalogType; import org.apache.inlong.sort.protocol.transformation.FieldRelation; +import com.google.common.collect.Maps; + import java.util.ArrayList; import java.util.Collections; import java.util.Map; diff --git a/inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/parser/PaimonNodeSqlParserTest.java b/inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/parser/PaimonNodeSqlParserTest.java index 4cf425818fe..407a302a7b2 100644 --- a/inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/parser/PaimonNodeSqlParserTest.java +++ b/inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/parser/PaimonNodeSqlParserTest.java @@ -17,11 +17,11 @@ package org.apache.inlong.sort.parser; -import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; -import org.apache.flink.table.api.EnvironmentSettings; -import org.apache.flink.table.api.bridge.java.StreamTableEnvironment; -import org.apache.flink.test.util.AbstractTestBase; -import org.apache.inlong.common.pojo.sort.dataflow.field.format.*; +import org.apache.inlong.common.pojo.sort.dataflow.field.format.FloatFormatInfo; +import org.apache.inlong.common.pojo.sort.dataflow.field.format.IntFormatInfo; +import org.apache.inlong.common.pojo.sort.dataflow.field.format.LongFormatInfo; +import org.apache.inlong.common.pojo.sort.dataflow.field.format.StringFormatInfo; +import org.apache.inlong.common.pojo.sort.dataflow.field.format.TimestampFormatInfo; import org.apache.inlong.sort.parser.impl.FlinkSqlParser; import org.apache.inlong.sort.parser.result.FlinkSqlParseResult; import org.apache.inlong.sort.protocol.FieldInfo; @@ -33,10 +33,20 @@ import org.apache.inlong.sort.protocol.node.load.PaimonLoadNode; import org.apache.inlong.sort.protocol.transformation.FieldRelation; import org.apache.inlong.sort.protocol.transformation.relation.NodeRelation; + +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.table.api.EnvironmentSettings; +import org.apache.flink.table.api.bridge.java.StreamTableEnvironment; +import org.apache.flink.test.util.AbstractTestBase; import org.junit.Assert; import org.junit.Test; -import java.util.*; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; import java.util.stream.Collectors; /** diff --git a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/hudi/src/main/java/org/apache/inlong/sort/hudi/table/sink/HudiTableInlongFactory.java b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/hudi/src/main/java/org/apache/inlong/sort/paimon/table/sink/HudiTableInlongFactory.java similarity index 97% rename from inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/hudi/src/main/java/org/apache/inlong/sort/hudi/table/sink/HudiTableInlongFactory.java rename to inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/hudi/src/main/java/org/apache/inlong/sort/paimon/table/sink/HudiTableInlongFactory.java index d2985ac5d4a..3999d6cda56 100644 --- a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/hudi/src/main/java/org/apache/inlong/sort/hudi/table/sink/HudiTableInlongFactory.java +++ b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/hudi/src/main/java/org/apache/inlong/sort/paimon/table/sink/HudiTableInlongFactory.java @@ -15,7 +15,7 @@ * limitations under the License. */ -package org.apache.inlong.sort.hudi.table.sink; +package org.apache.inlong.sort.paimon.table.sink; import org.apache.flink.configuration.ConfigOption; import org.apache.flink.table.connector.sink.DynamicTableSink; diff --git a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/hudi/src/main/resources/META-INF/services/org.apache.flink.table.factories.Factory b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/hudi/src/main/resources/META-INF/services/org.apache.flink.table.factories.Factory index 709a02e60c0..bf48855e555 100644 --- a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/hudi/src/main/resources/META-INF/services/org.apache.flink.table.factories.Factory +++ b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/hudi/src/main/resources/META-INF/services/org.apache.flink.table.factories.Factory @@ -13,4 +13,4 @@ # See the License for the specific language governing permissions and # limitations under the License. -org.apache.inlong.sort.hudi.table.sink.HudiTableInlongFactory +org.apache.inlong.sort.paimon.table.sink.HudiTableInlongFactory diff --git a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/paimon/src/main/java/org/apache/inlong/sort/hudi/table/sink/PaimonTableInlongFactory.java b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/paimon/src/main/java/org/apache/inlong/sort/paimon/table/sink/PaimonTableInlongFactory.java similarity index 93% rename from inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/paimon/src/main/java/org/apache/inlong/sort/hudi/table/sink/PaimonTableInlongFactory.java rename to inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/paimon/src/main/java/org/apache/inlong/sort/paimon/table/sink/PaimonTableInlongFactory.java index 3005c99ad98..590684a1f27 100644 --- a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/paimon/src/main/java/org/apache/inlong/sort/hudi/table/sink/PaimonTableInlongFactory.java +++ b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/paimon/src/main/java/org/apache/inlong/sort/paimon/table/sink/PaimonTableInlongFactory.java @@ -17,6 +17,7 @@ package org.apache.inlong.sort.paimon.table.sink; +import org.apache.flink.api.connector.sink2.SinkWriter; import org.apache.flink.configuration.ConfigOption; import org.apache.flink.table.connector.sink.DynamicTableSink; import org.apache.paimon.table.FileStoreTableFactory; @@ -40,7 +41,7 @@ public String factoryIdentifier() { } @Override - public DynamicTableSink createDynamicTableSink(Context context) { + public DynamicTableSink createDynamicTableSink(SinkWriter.Context context) { return super.createDynamicTableSink(context); }