From 34dd69d19bd8ad5b474b3346e31281668c620c9a Mon Sep 17 00:00:00 2001
From: Ceng <441651826@qq.com>
Date: Tue, 30 Jul 2024 10:21:28 +0800
Subject: [PATCH] [Feature][external catalog/lakesoul] LakeSoul Catalog support
Filter Pushdown & Cdc data handling & S3 data access (#37979)
## Proposed changes
Issue Number: close #37978
---------
Signed-off-by: zenghua <441651826@qq.com>
---
.../lakesoul-scanner/pom.xml | 22 +-
.../doris/lakesoul/LakeSoulJniScanner.java | 82 ++-
.../apache/doris/lakesoul/LakeSoulUtils.java | 30 +-
.../doris/lakesoul/arrow/ArrowUtils.java | 8 +-
.../arrow/LakeSoulArrowJniScanner.java | 44 +-
fe/fe-core/pom.xml | 44 +-
.../lakesoul/LakeSoulExternalCatalog.java | 22 +-
.../lakesoul/LakeSoulExternalTable.java | 22 +-
.../datasource/lakesoul/LakeSoulUtils.java | 535 ++++++++++++++++++
.../lakesoul/source/LakeSoulScanNode.java | 158 +++++-
.../nereids/rules/analysis/BindRelation.java | 1 +
.../lakesoul/LakeSoulPredicateTest.java | 280 +++++++++
fe/pom.xml | 4 +-
regression-test/conf/regression-conf.groovy | 11 +
.../lakesoul/test_lakesoul_filter.out | 8 +
.../external/conf/regression-conf.groovy | 4 +
.../lakesoul/test_lakesoul_catalog.groovy | 23 +-
.../lakesoul/test_lakesoul_filter.groovy | 58 ++
.../test_external_table_lakesoul.groovy | 24 +-
19 files changed, 1269 insertions(+), 111 deletions(-)
create mode 100644 fe/fe-core/src/main/java/org/apache/doris/datasource/lakesoul/LakeSoulUtils.java
create mode 100644 fe/fe-core/src/test/java/org/apache/doris/datasource/lakesoul/LakeSoulPredicateTest.java
create mode 100644 regression-test/data/external_table_p0/lakesoul/test_lakesoul_filter.out
create mode 100644 regression-test/suites/external_table_p0/lakesoul/test_lakesoul_filter.groovy
diff --git a/fe/be-java-extensions/lakesoul-scanner/pom.xml b/fe/be-java-extensions/lakesoul-scanner/pom.xml
index cbbb473483e3f5..24d7efc7614477 100644
--- a/fe/be-java-extensions/lakesoul-scanner/pom.xml
+++ b/fe/be-java-extensions/lakesoul-scanner/pom.xml
@@ -47,21 +47,6 @@ under the License.
-
- org.apache.arrow
- arrow-vector
- ${arrow.version}
-
-
- org.apache.arrow
- arrow-memory-unsafe
- ${arrow.version}
-
-
- org.apache.arrow
- arrow-c-data
- ${arrow.version}
-
@@ -85,7 +70,8 @@ under the License.
com.dmetasoul
lakesoul-io-java
- 2.5.4
+ ${lakesoul.version}
+ shaded
org.slf4j
@@ -99,10 +85,6 @@ under the License.
org.slf4j
slf4j-api
-
- org.apache.arrow
- *
-
org.antlr
antlr4-runtime
diff --git a/fe/be-java-extensions/lakesoul-scanner/src/main/java/org/apache/doris/lakesoul/LakeSoulJniScanner.java b/fe/be-java-extensions/lakesoul-scanner/src/main/java/org/apache/doris/lakesoul/LakeSoulJniScanner.java
index 3dfbff756db189..a7ac785d1fb066 100644
--- a/fe/be-java-extensions/lakesoul-scanner/src/main/java/org/apache/doris/lakesoul/LakeSoulJniScanner.java
+++ b/fe/be-java-extensions/lakesoul-scanner/src/main/java/org/apache/doris/lakesoul/LakeSoulJniScanner.java
@@ -17,25 +17,30 @@
package org.apache.doris.lakesoul;
-import org.apache.doris.common.jni.vec.ScanPredicate;
import org.apache.doris.lakesoul.arrow.LakeSoulArrowJniScanner;
-import org.apache.doris.lakesoul.parquet.ParquetFilter;
import com.dmetasoul.lakesoul.LakeSoulArrowReader;
import com.dmetasoul.lakesoul.lakesoul.io.NativeIOReader;
-import org.apache.arrow.vector.VectorSchemaRoot;
-import org.apache.arrow.vector.types.pojo.Field;
-import org.apache.arrow.vector.types.pojo.Schema;
+import com.dmetasoul.lakesoul.lakesoul.io.substrait.SubstraitUtil;
+import com.lakesoul.shaded.com.fasterxml.jackson.core.type.TypeReference;
+import com.lakesoul.shaded.com.fasterxml.jackson.databind.ObjectMapper;
+import com.lakesoul.shaded.org.apache.arrow.vector.VectorSchemaRoot;
+import com.lakesoul.shaded.org.apache.arrow.vector.types.pojo.Field;
+import com.lakesoul.shaded.org.apache.arrow.vector.types.pojo.Schema;
+import io.substrait.proto.Plan;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
-import java.util.HashSet;
+import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
public class LakeSoulJniScanner extends LakeSoulArrowJniScanner {
+ private static final Logger LOG = LoggerFactory.getLogger(LakeSoulJniScanner.class);
private final Map params;
@@ -60,6 +65,9 @@ public void open() throws IOException {
withAllocator(nativeIOReader.getAllocator());
nativeIOReader.setBatchSize(batchSize);
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("opening LakeSoulJniScanner with params={}", params);
+ }
// add files
for (String file : params.get(LakeSoulUtils.FILE_NAMES).split(LakeSoulUtils.LIST_DELIM)) {
nativeIOReader.addFile(file);
@@ -72,19 +80,39 @@ public void open() throws IOException {
Arrays.stream(primaryKeys.split(LakeSoulUtils.LIST_DELIM)).collect(Collectors.toList()));
}
- Schema schema = Schema.fromJSON(params.get(LakeSoulUtils.SCHEMA_JSON));
+ String options = params.getOrDefault(LakeSoulUtils.OPTIONS, "{}");
+ Map optionsMap = new ObjectMapper().readValue(
+ options, new TypeReference