diff --git a/fe/be-java-extensions/lakesoul-scanner/pom.xml b/fe/be-java-extensions/lakesoul-scanner/pom.xml index cbbb473483e3f5..24d7efc7614477 100644 --- a/fe/be-java-extensions/lakesoul-scanner/pom.xml +++ b/fe/be-java-extensions/lakesoul-scanner/pom.xml @@ -47,21 +47,6 @@ under the License. - - org.apache.arrow - arrow-vector - ${arrow.version} - - - org.apache.arrow - arrow-memory-unsafe - ${arrow.version} - - - org.apache.arrow - arrow-c-data - ${arrow.version} - @@ -85,7 +70,8 @@ under the License. com.dmetasoul lakesoul-io-java - 2.5.4 + ${lakesoul.version} + shaded org.slf4j @@ -99,10 +85,6 @@ under the License. org.slf4j slf4j-api - - org.apache.arrow - * - org.antlr antlr4-runtime diff --git a/fe/be-java-extensions/lakesoul-scanner/src/main/java/org/apache/doris/lakesoul/LakeSoulJniScanner.java b/fe/be-java-extensions/lakesoul-scanner/src/main/java/org/apache/doris/lakesoul/LakeSoulJniScanner.java index 3dfbff756db189..a7ac785d1fb066 100644 --- a/fe/be-java-extensions/lakesoul-scanner/src/main/java/org/apache/doris/lakesoul/LakeSoulJniScanner.java +++ b/fe/be-java-extensions/lakesoul-scanner/src/main/java/org/apache/doris/lakesoul/LakeSoulJniScanner.java @@ -17,25 +17,30 @@ package org.apache.doris.lakesoul; -import org.apache.doris.common.jni.vec.ScanPredicate; import org.apache.doris.lakesoul.arrow.LakeSoulArrowJniScanner; -import org.apache.doris.lakesoul.parquet.ParquetFilter; import com.dmetasoul.lakesoul.LakeSoulArrowReader; import com.dmetasoul.lakesoul.lakesoul.io.NativeIOReader; -import org.apache.arrow.vector.VectorSchemaRoot; -import org.apache.arrow.vector.types.pojo.Field; -import org.apache.arrow.vector.types.pojo.Schema; +import com.dmetasoul.lakesoul.lakesoul.io.substrait.SubstraitUtil; +import com.lakesoul.shaded.com.fasterxml.jackson.core.type.TypeReference; +import com.lakesoul.shaded.com.fasterxml.jackson.databind.ObjectMapper; +import com.lakesoul.shaded.org.apache.arrow.vector.VectorSchemaRoot; +import com.lakesoul.shaded.org.apache.arrow.vector.types.pojo.Field; +import com.lakesoul.shaded.org.apache.arrow.vector.types.pojo.Schema; +import io.substrait.proto.Plan; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import java.io.IOException; import java.util.ArrayList; import java.util.Arrays; -import java.util.HashSet; +import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.stream.Collectors; public class LakeSoulJniScanner extends LakeSoulArrowJniScanner { + private static final Logger LOG = LoggerFactory.getLogger(LakeSoulJniScanner.class); private final Map params; @@ -60,6 +65,9 @@ public void open() throws IOException { withAllocator(nativeIOReader.getAllocator()); nativeIOReader.setBatchSize(batchSize); + if (LOG.isDebugEnabled()) { + LOG.debug("opening LakeSoulJniScanner with params={}", params); + } // add files for (String file : params.get(LakeSoulUtils.FILE_NAMES).split(LakeSoulUtils.LIST_DELIM)) { nativeIOReader.addFile(file); @@ -72,19 +80,39 @@ public void open() throws IOException { Arrays.stream(primaryKeys.split(LakeSoulUtils.LIST_DELIM)).collect(Collectors.toList())); } - Schema schema = Schema.fromJSON(params.get(LakeSoulUtils.SCHEMA_JSON)); + String options = params.getOrDefault(LakeSoulUtils.OPTIONS, "{}"); + Map optionsMap = new ObjectMapper().readValue( + options, new TypeReference>() {} + ); + String base64Predicate = optionsMap.get(LakeSoulUtils.SUBSTRAIT_PREDICATE); + if (base64Predicate != null) { + Plan predicate = SubstraitUtil.decodeBase64String(base64Predicate); + if (LOG.isDebugEnabled()) { + LOG.debug("push predicate={}", predicate); + } + nativeIOReader.addFilterProto(predicate); + } + + for (String key : LakeSoulUtils.OBJECT_STORE_OPTIONS) { + String value = optionsMap.get(key); + if (key != null) { + nativeIOReader.setObjectStoreOption(key, value); + } + } + + Schema tableSchema = Schema.fromJSON(params.get(LakeSoulUtils.SCHEMA_JSON)); String[] requiredFieldNames = params.get(LakeSoulUtils.REQUIRED_FIELDS).split(LakeSoulUtils.LIST_DELIM); List requiredFields = new ArrayList<>(); for (String fieldName : requiredFieldNames) { - requiredFields.add(schema.findField(fieldName)); + requiredFields.add(tableSchema.findField(fieldName)); } requiredSchema = new Schema(requiredFields); nativeIOReader.setSchema(requiredSchema); - HashSet partitionColumn = new HashSet<>(); + List partitionFields = new ArrayList<>(); for (String partitionKV : params.getOrDefault(LakeSoulUtils.PARTITION_DESC, "") .split(LakeSoulUtils.LIST_DELIM)) { if (partitionKV.isEmpty()) { @@ -94,17 +122,15 @@ public void open() throws IOException { if (kv.length != 2) { throw new IllegalArgumentException("Invalid partition column = " + partitionKV); } - partitionColumn.add(kv[0]); + nativeIOReader.setDefaultColumnValue(kv[0], kv[1]); + partitionFields.add(tableSchema.findField(kv[0])); + } + if (!partitionFields.isEmpty()) { + nativeIOReader.setPartitionSchema(new Schema(partitionFields)); } initTableInfo(params); - for (ScanPredicate predicate : predicates) { - if (!partitionColumn.contains(predicate.columName)) { - nativeIOReader.addFilter(ParquetFilter.toParquetFilter(predicate).toString()); - } - } - nativeIOReader.initializeReader(); lakesoulArrowReader = new LakeSoulArrowReader(nativeIOReader, awaitTimeout); } @@ -156,4 +182,28 @@ public void releaseTable() { currentBatch.close(); } } + + public static void main(String[] args) throws IOException { + HashMap params = new HashMap<>(); + params.put("required_fields", "r_regionkey;r_name;r_comment"); + params.put("primary_keys", "r_regionkey;r_name"); + params.put("query_id", "e9d075a6500a4cac-b94630fd4b30c171"); + params.put("file_paths", + "file:/Users/ceng/Documents/GitHub/LakeSoul/rust/lakesoul-datafusion/" + + "default/region/part-RzmUvDFtYV8ceb3J_0000.parquet" + ); + params.put("options", "{}"); + params.put("table_schema", + "{\"fields\":[" + + "{\"name\":\"r_regionkey\",\"type\":{\"name\":\"int\",\"isSigned\":true,\"bitWidth\":64}," + + "\"nullable\":false,\"children\":[]}," + + "{\"name\":\"r_name\",\"type\":{\"name\":\"utf8\"},\"nullable\":false,\"children\":[]}," + + "{\"name\":\"r_comment\",\"type\":{\"name\":\"utf8\"},\"nullable\":false,\"children\":[]}" + + "]," + + "\"metadata\":null}"); + params.put("partition_descs", ""); + LakeSoulJniScanner scanner = new LakeSoulJniScanner(1024, params); + scanner.open(); + System.out.println(scanner.getNext()); + } } diff --git a/fe/be-java-extensions/lakesoul-scanner/src/main/java/org/apache/doris/lakesoul/LakeSoulUtils.java b/fe/be-java-extensions/lakesoul-scanner/src/main/java/org/apache/doris/lakesoul/LakeSoulUtils.java index 6c7f88f3ab36ac..ca07a81d0da81c 100644 --- a/fe/be-java-extensions/lakesoul-scanner/src/main/java/org/apache/doris/lakesoul/LakeSoulUtils.java +++ b/fe/be-java-extensions/lakesoul-scanner/src/main/java/org/apache/doris/lakesoul/LakeSoulUtils.java @@ -17,13 +17,29 @@ package org.apache.doris.lakesoul; +import java.util.Arrays; +import java.util.List; + public class LakeSoulUtils { - public static String FILE_NAMES = "file_paths"; - public static String PRIMARY_KEYS = "primary_keys"; - public static String SCHEMA_JSON = "table_schema"; - public static String PARTITION_DESC = "partition_descs"; - public static String REQUIRED_FIELDS = "required_fields"; + public static final String FILE_NAMES = "file_paths"; + public static final String PRIMARY_KEYS = "primary_keys"; + public static final String SCHEMA_JSON = "table_schema"; + public static final String PARTITION_DESC = "partition_descs"; + public static final String REQUIRED_FIELDS = "required_fields"; + public static final String OPTIONS = "options"; + public static final String SUBSTRAIT_PREDICATE = "substrait_predicate"; + public static final String LIST_DELIM = ";"; + public static final String PARTITIONS_KV_DELIM = "="; + + public static final String FS_S3A_ACCESS_KEY = "fs.s3a.access.key"; + public static final String FS_S3A_SECRET_KEY = "fs.s3a.secret.key"; + public static final String FS_S3A_ENDPOINT = "fs.s3a.endpoint"; + public static final String FS_S3A_PATH_STYLE_ACCESS = "fs.s3a.path.style.access"; - public static String LIST_DELIM = ";"; - public static String PARTITIONS_KV_DELIM = "="; + public static final List OBJECT_STORE_OPTIONS = Arrays.asList( + FS_S3A_ACCESS_KEY, + FS_S3A_SECRET_KEY, + FS_S3A_ENDPOINT, + FS_S3A_PATH_STYLE_ACCESS + ); } diff --git a/fe/be-java-extensions/lakesoul-scanner/src/main/java/org/apache/doris/lakesoul/arrow/ArrowUtils.java b/fe/be-java-extensions/lakesoul-scanner/src/main/java/org/apache/doris/lakesoul/arrow/ArrowUtils.java index 3ad28ba783a104..94ac32935e80bd 100644 --- a/fe/be-java-extensions/lakesoul-scanner/src/main/java/org/apache/doris/lakesoul/arrow/ArrowUtils.java +++ b/fe/be-java-extensions/lakesoul-scanner/src/main/java/org/apache/doris/lakesoul/arrow/ArrowUtils.java @@ -20,10 +20,10 @@ import org.apache.doris.common.jni.utils.OffHeap; import org.apache.doris.common.jni.utils.TypeNativeBytes; -import org.apache.arrow.memory.ArrowBuf; -import org.apache.arrow.util.Preconditions; -import org.apache.arrow.vector.types.pojo.ArrowType; -import org.apache.arrow.vector.types.pojo.Field; +import com.lakesoul.shaded.org.apache.arrow.memory.ArrowBuf; +import com.lakesoul.shaded.org.apache.arrow.util.Preconditions; +import com.lakesoul.shaded.org.apache.arrow.vector.types.pojo.ArrowType; +import com.lakesoul.shaded.org.apache.arrow.vector.types.pojo.Field; import java.time.LocalDate; import java.time.LocalDateTime; diff --git a/fe/be-java-extensions/lakesoul-scanner/src/main/java/org/apache/doris/lakesoul/arrow/LakeSoulArrowJniScanner.java b/fe/be-java-extensions/lakesoul-scanner/src/main/java/org/apache/doris/lakesoul/arrow/LakeSoulArrowJniScanner.java index 320d653a20ae38..3c73c2f1ab4379 100644 --- a/fe/be-java-extensions/lakesoul-scanner/src/main/java/org/apache/doris/lakesoul/arrow/LakeSoulArrowJniScanner.java +++ b/fe/be-java-extensions/lakesoul-scanner/src/main/java/org/apache/doris/lakesoul/arrow/LakeSoulArrowJniScanner.java @@ -23,28 +23,28 @@ import org.apache.doris.common.jni.vec.ScanPredicate; import org.apache.doris.common.jni.vec.VectorTable; -import org.apache.arrow.memory.ArrowBuf; -import org.apache.arrow.memory.BufferAllocator; -import org.apache.arrow.vector.BitVector; -import org.apache.arrow.vector.DateDayVector; -import org.apache.arrow.vector.DecimalVector; -import org.apache.arrow.vector.FieldVector; -import org.apache.arrow.vector.TimeStampMicroTZVector; -import org.apache.arrow.vector.TimeStampMicroVector; -import org.apache.arrow.vector.TimeStampMilliTZVector; -import org.apache.arrow.vector.TimeStampMilliVector; -import org.apache.arrow.vector.TimeStampNanoTZVector; -import org.apache.arrow.vector.TimeStampNanoVector; -import org.apache.arrow.vector.TimeStampSecTZVector; -import org.apache.arrow.vector.TimeStampSecVector; -import org.apache.arrow.vector.TimeStampVector; -import org.apache.arrow.vector.ValueVector; -import org.apache.arrow.vector.VarCharVector; -import org.apache.arrow.vector.VectorSchemaRoot; -import org.apache.arrow.vector.complex.ListVector; -import org.apache.arrow.vector.complex.StructVector; -import org.apache.arrow.vector.types.pojo.Field; -import org.apache.arrow.vector.types.pojo.Schema; +import com.lakesoul.shaded.org.apache.arrow.memory.ArrowBuf; +import com.lakesoul.shaded.org.apache.arrow.memory.BufferAllocator; +import com.lakesoul.shaded.org.apache.arrow.vector.BitVector; +import com.lakesoul.shaded.org.apache.arrow.vector.DateDayVector; +import com.lakesoul.shaded.org.apache.arrow.vector.DecimalVector; +import com.lakesoul.shaded.org.apache.arrow.vector.FieldVector; +import com.lakesoul.shaded.org.apache.arrow.vector.TimeStampMicroTZVector; +import com.lakesoul.shaded.org.apache.arrow.vector.TimeStampMicroVector; +import com.lakesoul.shaded.org.apache.arrow.vector.TimeStampMilliTZVector; +import com.lakesoul.shaded.org.apache.arrow.vector.TimeStampMilliVector; +import com.lakesoul.shaded.org.apache.arrow.vector.TimeStampNanoTZVector; +import com.lakesoul.shaded.org.apache.arrow.vector.TimeStampNanoVector; +import com.lakesoul.shaded.org.apache.arrow.vector.TimeStampSecTZVector; +import com.lakesoul.shaded.org.apache.arrow.vector.TimeStampSecVector; +import com.lakesoul.shaded.org.apache.arrow.vector.TimeStampVector; +import com.lakesoul.shaded.org.apache.arrow.vector.ValueVector; +import com.lakesoul.shaded.org.apache.arrow.vector.VarCharVector; +import com.lakesoul.shaded.org.apache.arrow.vector.VectorSchemaRoot; +import com.lakesoul.shaded.org.apache.arrow.vector.complex.ListVector; +import com.lakesoul.shaded.org.apache.arrow.vector.complex.StructVector; +import com.lakesoul.shaded.org.apache.arrow.vector.types.pojo.Field; +import com.lakesoul.shaded.org.apache.arrow.vector.types.pojo.Schema; import org.apache.log4j.Logger; import java.io.IOException; diff --git a/fe/fe-core/pom.xml b/fe/fe-core/pom.xml index bac2346185d367..8021f2a3b18b07 100644 --- a/fe/fe-core/pom.xml +++ b/fe/fe-core/pom.xml @@ -568,7 +568,7 @@ under the License. com.dmetasoul lakesoul-common - 2.5.4 + ${lakesoul.version} shaded @@ -577,6 +577,46 @@ under the License. + + com.dmetasoul + lakesoul-io-java + ${lakesoul.version} + shaded + + + org.slf4j + slf4j-log4j12 + + + log4j + log4j + + + org.slf4j + slf4j-api + + + org.apache.arrow + * + + + org.antlr + antlr4-runtime + + + commons-logging + commons-logging + + + com.google.code.findbugs + jsr305 + + + org.apache.spark + * + + + org.apache.iceberg @@ -1220,4 +1260,4 @@ under the License. - \ No newline at end of file + diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/lakesoul/LakeSoulExternalCatalog.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/lakesoul/LakeSoulExternalCatalog.java index dd8342ad660ef6..e813ac2fc97a32 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/lakesoul/LakeSoulExternalCatalog.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/lakesoul/LakeSoulExternalCatalog.java @@ -17,6 +17,7 @@ package org.apache.doris.datasource.lakesoul; + import org.apache.doris.datasource.CatalogProperty; import org.apache.doris.datasource.ExternalCatalog; import org.apache.doris.datasource.InitCatalogLog; @@ -25,6 +26,7 @@ import com.dmetasoul.lakesoul.meta.DBManager; import com.dmetasoul.lakesoul.meta.DBUtil; +import com.dmetasoul.lakesoul.meta.entity.PartitionInfo; import com.dmetasoul.lakesoul.meta.entity.TableInfo; import com.google.common.collect.Lists; @@ -33,7 +35,7 @@ public class LakeSoulExternalCatalog extends ExternalCatalog { - private DBManager dbManager; + private DBManager lakesoulMetadataManager; private final Map props; @@ -48,13 +50,13 @@ public LakeSoulExternalCatalog(long catalogId, String name, String resource, Map @Override protected List listDatabaseNames() { initLocalObjectsImpl(); - return dbManager.listNamespaces(); + return lakesoulMetadataManager.listNamespaces(); } @Override public List listTableNames(SessionContext ctx, String dbName) { makeSureInitialized(); - List tifs = dbManager.getTableInfosByNamespace(dbName); + List tifs = lakesoulMetadataManager.getTableInfosByNamespace(dbName); List tableNames = Lists.newArrayList(); for (TableInfo item : tifs) { tableNames.add(item.getTableName()); @@ -65,14 +67,13 @@ public List listTableNames(SessionContext ctx, String dbName) { @Override public boolean tableExist(SessionContext ctx, String dbName, String tblName) { makeSureInitialized(); - TableInfo tableInfo = dbManager.getTableInfoByNameAndNamespace(dbName, tblName); - + TableInfo tableInfo = lakesoulMetadataManager.getTableInfoByNameAndNamespace(dbName, tblName); return null != tableInfo; } @Override protected void initLocalObjectsImpl() { - if (dbManager == null) { + if (lakesoulMetadataManager == null) { if (props != null) { if (props.containsKey(DBUtil.urlKey)) { System.setProperty(DBUtil.urlKey, props.get(DBUtil.urlKey)); @@ -84,13 +85,18 @@ protected void initLocalObjectsImpl() { System.setProperty(DBUtil.passwordKey, props.get(DBUtil.passwordKey)); } } - dbManager = new DBManager(); + lakesoulMetadataManager = new DBManager(); } } public TableInfo getLakeSoulTable(String dbName, String tblName) { makeSureInitialized(); - return dbManager.getTableInfoByNameAndNamespace(tblName, dbName); + return lakesoulMetadataManager.getTableInfoByNameAndNamespace(tblName, dbName); + } + + public List listPartitionInfo(String tableId) { + makeSureInitialized(); + return lakesoulMetadataManager.getAllPartitionInfo(tableId); } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/lakesoul/LakeSoulExternalTable.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/lakesoul/LakeSoulExternalTable.java index 46e8d1db47c723..a5cf3478ae840a 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/lakesoul/LakeSoulExternalTable.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/lakesoul/LakeSoulExternalTable.java @@ -25,17 +25,23 @@ import org.apache.doris.catalog.Type; import org.apache.doris.datasource.ExternalTable; import org.apache.doris.datasource.SchemaCacheValue; +import org.apache.doris.statistics.AnalysisInfo; +import org.apache.doris.statistics.BaseAnalysisTask; +import org.apache.doris.statistics.ExternalAnalysisTask; import org.apache.doris.thrift.TLakeSoulTable; import org.apache.doris.thrift.TTableDescriptor; import org.apache.doris.thrift.TTableType; import com.dmetasoul.lakesoul.meta.DBUtil; +import com.dmetasoul.lakesoul.meta.entity.PartitionInfo; import com.dmetasoul.lakesoul.meta.entity.TableInfo; import com.google.common.collect.Lists; import org.apache.arrow.util.Preconditions; import org.apache.arrow.vector.types.pojo.ArrowType; import org.apache.arrow.vector.types.pojo.Field; import org.apache.arrow.vector.types.pojo.Schema; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; import java.io.IOException; import java.util.HashMap; @@ -45,11 +51,20 @@ import java.util.stream.Collectors; public class LakeSoulExternalTable extends ExternalTable { - + private static final Logger LOG = LogManager.getLogger(LakeSoulExternalTable.class); public static final int LAKESOUL_TIMESTAMP_SCALE_MS = 6; + public final String tableId; + public LakeSoulExternalTable(long id, String name, String dbName, LakeSoulExternalCatalog catalog) { super(id, name, catalog, dbName, TableType.LAKESOUl_EXTERNAL_TABLE); + tableId = getLakeSoulTableInfo().getTableId(); + } + + @Override + public BaseAnalysisTask createAnalysisTask(AnalysisInfo info) { + makeSureInitialized(); + return new ExternalAnalysisTask(info); } private Type arrowFiledToDorisType(Field field) { @@ -150,6 +165,7 @@ public Optional initSchema() { String tableSchema = tableInfo.getTableSchema(); DBUtil.TablePartitionKeys partitionKeys = DBUtil.parseTableInfoPartitions(tableInfo.getPartitions()); Schema schema; + LOG.info("tableSchema={}", tableSchema); try { schema = Schema.fromJSON(tableSchema); } catch (IOException e) { @@ -174,6 +190,10 @@ public TableInfo getLakeSoulTableInfo() { return ((LakeSoulExternalCatalog) catalog).getLakeSoulTable(dbName, name); } + public List listPartitionInfo() { + return ((LakeSoulExternalCatalog) catalog).listPartitionInfo(tableId); + } + public String tablePath() { return ((LakeSoulExternalCatalog) catalog).getLakeSoulTable(dbName, name).getTablePath(); } diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/lakesoul/LakeSoulUtils.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/lakesoul/LakeSoulUtils.java new file mode 100644 index 00000000000000..8f7cf83dbfcb9a --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/lakesoul/LakeSoulUtils.java @@ -0,0 +1,535 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.datasource.lakesoul; + +import org.apache.doris.analysis.BoolLiteral; +import org.apache.doris.analysis.CastExpr; +import org.apache.doris.analysis.CompoundPredicate; +import org.apache.doris.analysis.DateLiteral; +import org.apache.doris.analysis.DecimalLiteral; +import org.apache.doris.analysis.Expr; +import org.apache.doris.analysis.FloatLiteral; +import org.apache.doris.analysis.FunctionCallExpr; +import org.apache.doris.analysis.InPredicate; +import org.apache.doris.analysis.IntLiteral; +import org.apache.doris.analysis.IsNullPredicate; +import org.apache.doris.analysis.LiteralExpr; +import org.apache.doris.analysis.NullLiteral; +import org.apache.doris.analysis.SlotRef; +import org.apache.doris.analysis.StringLiteral; +import org.apache.doris.analysis.Subquery; +import org.apache.doris.planner.ColumnBound; +import org.apache.doris.planner.ColumnRange; +import org.apache.doris.thrift.TExprOpcode; + +import com.dmetasoul.lakesoul.lakesoul.io.substrait.SubstraitUtil; +import com.dmetasoul.lakesoul.meta.entity.PartitionInfo; +import com.google.common.collect.BoundType; +import com.google.common.collect.Range; +import com.google.common.collect.RangeSet; +import com.lakesoul.shaded.org.apache.arrow.vector.types.pojo.Field; +import com.lakesoul.shaded.org.apache.arrow.vector.types.pojo.Schema; +import io.substrait.expression.Expression; +import io.substrait.extension.DefaultExtensionCatalog; +import io.substrait.type.Type; +import io.substrait.type.TypeCreator; + +import java.io.IOException; +import java.time.Instant; +import java.time.LocalDate; +import java.time.OffsetDateTime; +import java.time.ZoneOffset; +import java.time.format.DateTimeFormatter; +import java.time.format.DateTimeParseException; +import java.time.temporal.ChronoUnit; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.Set; +import java.util.stream.Collectors; + + +public class LakeSoulUtils { + + public static final String FILE_NAMES = "file_paths"; + public static final String PRIMARY_KEYS = "primary_keys"; + public static final String SCHEMA_JSON = "table_schema"; + public static final String PARTITION_DESC = "partition_descs"; + public static final String REQUIRED_FIELDS = "required_fields"; + public static final String OPTIONS = "options"; + public static final String SUBSTRAIT_PREDICATE = "substrait_predicate"; + public static final String CDC_COLUMN = "lakesoul_cdc_change_column"; + public static final String LIST_DELIM = ";"; + public static final String PARTITIONS_KV_DELIM = "="; + public static final String FS_S3A_ACCESS_KEY = "fs.s3a.access.key"; + public static final String FS_S3A_SECRET_KEY = "fs.s3a.secret.key"; + public static final String FS_S3A_ENDPOINT = "fs.s3a.endpoint"; + public static final String FS_S3A_REGION = "fs.s3a.endpoint.region"; + public static final String FS_S3A_PATH_STYLE_ACCESS = "fs.s3a.path.style.access"; + + private static final OffsetDateTime EPOCH; + private static final LocalDate EPOCH_DAY; + + static { + EPOCH = Instant.ofEpochSecond(0L).atOffset(ZoneOffset.UTC); + EPOCH_DAY = EPOCH.toLocalDate(); + } + + public static List applyPartitionFilters( + List allPartitionInfo, + String tableName, + Schema partitionArrowSchema, + Map columnNameToRange + ) throws IOException { + + Expression conjunctionFilter = null; + for (Field field : partitionArrowSchema.getFields()) { + ColumnRange columnRange = columnNameToRange.get(field.getName()); + if (columnRange != null) { + Expression expr = columnRangeToSubstraitFilter(field, columnRange); + if (expr != null) { + if (conjunctionFilter == null) { + conjunctionFilter = expr; + } else { + conjunctionFilter = SubstraitUtil.and(conjunctionFilter, expr); + } + } + } + } + return SubstraitUtil.applyPartitionFilters( + allPartitionInfo, + partitionArrowSchema, + SubstraitUtil.substraitExprToProto(conjunctionFilter, tableName) + ); + } + + public static Expression columnRangeToSubstraitFilter( + Field columnField, + ColumnRange columnRange + ) throws IOException { + Optional> rangeSetOpt = columnRange.getRangeSet(); + if (columnRange.hasConjunctiveIsNull() || !rangeSetOpt.isPresent()) { + return SubstraitUtil.CONST_TRUE; + } else { + RangeSet rangeSet = rangeSetOpt.get(); + if (rangeSet.isEmpty()) { + return SubstraitUtil.CONST_TRUE; + } else { + Expression conjunctionFilter = null; + for (Range range : rangeSet.asRanges()) { + Expression expr = rangeToSubstraitFilter(columnField, range); + if (expr != null) { + if (conjunctionFilter == null) { + conjunctionFilter = expr; + } else { + conjunctionFilter = SubstraitUtil.or(conjunctionFilter, expr); + } + } + } + return conjunctionFilter; + } + } + } + + public static Expression rangeToSubstraitFilter(Field columnField, Range range) throws IOException { + if (!range.hasLowerBound() && !range.hasUpperBound()) { + // Range.all() + return SubstraitUtil.CONST_TRUE; + } else { + Expression upper = SubstraitUtil.CONST_TRUE; + if (range.hasUpperBound()) { + String func = range.upperBoundType() == BoundType.OPEN ? "lt:any_any" : "lte:any_any"; + Expression left = SubstraitUtil.arrowFieldToSubstraitField(columnField); + Expression right = SubstraitUtil.anyToSubstraitLiteral( + SubstraitUtil.arrowFieldToSubstraitType(columnField), + ((ColumnBound) range.upperEndpoint()).getValue().getRealValue()); + upper = SubstraitUtil.makeBinary( + left, + right, + DefaultExtensionCatalog.FUNCTIONS_COMPARISON, + func, + TypeCreator.NULLABLE.BOOLEAN + ); + } + Expression lower = SubstraitUtil.CONST_TRUE; + if (range.hasLowerBound()) { + String func = range.lowerBoundType() == BoundType.OPEN ? "gt:any_any" : "gte:any_any"; + Expression left = SubstraitUtil.arrowFieldToSubstraitField(columnField); + Expression right = SubstraitUtil.anyToSubstraitLiteral( + SubstraitUtil.arrowFieldToSubstraitType(columnField), + ((ColumnBound) range.lowerEndpoint()).getValue().getRealValue()); + lower = SubstraitUtil.makeBinary( + left, + right, + DefaultExtensionCatalog.FUNCTIONS_COMPARISON, + func, + TypeCreator.NULLABLE.BOOLEAN + ); + } + return SubstraitUtil.and(upper, lower); + } + } + + public static io.substrait.proto.Plan getPushPredicate( + List conjuncts, + String tableName, + Schema tableSchema, + Schema partitionArrowSchema, + Map properties, + boolean incRead + ) throws IOException { + + Set partitionColumn = + partitionArrowSchema + .getFields() + .stream() + .map(Field::getName) + .collect(Collectors.toSet()); + Expression conjunctionFilter = null; + String cdcColumn = properties.get(CDC_COLUMN); + if (cdcColumn != null && !incRead) { + conjunctionFilter = SubstraitUtil.cdcColumnMergeOnReadFilter(tableSchema.findField(cdcColumn)); + } + for (Expr expr : conjuncts) { + if (!isAllPartitionPredicate(expr, partitionColumn)) { + Expression predicate = convertToSubstraitExpr(expr, tableSchema); + if (predicate != null) { + if (conjunctionFilter == null) { + conjunctionFilter = predicate; + } else { + conjunctionFilter = SubstraitUtil.and(conjunctionFilter, predicate); + } + } + } + } + if (conjunctionFilter == null) { + return null; + } + return SubstraitUtil.substraitExprToProto(conjunctionFilter, tableName); + } + + public static boolean isAllPartitionPredicate(Expr predicate, Set partitionColumns) { + if (predicate == null) { + return false; + } + if (predicate instanceof CompoundPredicate) { + CompoundPredicate compoundPredicate = (CompoundPredicate) predicate; + return isAllPartitionPredicate(compoundPredicate.getChild(0), partitionColumns) + && isAllPartitionPredicate(compoundPredicate.getChild(1), partitionColumns); + } + // Make sure the col slot is always first + SlotRef slotRef = convertDorisExprToSlotRef(predicate.getChild(0)); + LiteralExpr literalExpr = convertDorisExprToLiteralExpr(predicate.getChild(1)); + if (slotRef == null || literalExpr == null) { + return false; + } + String colName = slotRef.getColumnName(); + return partitionColumns.contains(colName); + + } + + public static SlotRef convertDorisExprToSlotRef(Expr expr) { + SlotRef slotRef = null; + if (expr instanceof SlotRef) { + slotRef = (SlotRef) expr; + } else if (expr instanceof CastExpr) { + if (expr.getChild(0) instanceof SlotRef) { + slotRef = (SlotRef) expr.getChild(0); + } + } + return slotRef; + } + + public static LiteralExpr convertDorisExprToLiteralExpr(Expr expr) { + LiteralExpr literalExpr = null; + if (expr instanceof LiteralExpr) { + literalExpr = (LiteralExpr) expr; + } else if (expr instanceof CastExpr) { + if (expr.getChild(0) instanceof LiteralExpr) { + literalExpr = (LiteralExpr) expr.getChild(0); + } + } + return literalExpr; + } + + public static Expression convertToSubstraitExpr(Expr predicate, Schema tableSchema) throws IOException { + if (predicate == null) { + return null; + } + if (predicate instanceof BoolLiteral) { + BoolLiteral boolLiteral = (BoolLiteral) predicate; + boolean value = boolLiteral.getValue(); + if (value) { + return SubstraitUtil.CONST_TRUE; + } else { + return SubstraitUtil.CONST_FALSE; + } + } + if (predicate instanceof CompoundPredicate) { + CompoundPredicate compoundPredicate = (CompoundPredicate) predicate; + switch (compoundPredicate.getOp()) { + case AND: { + Expression left = convertToSubstraitExpr(compoundPredicate.getChild(0), tableSchema); + Expression right = convertToSubstraitExpr(compoundPredicate.getChild(1), tableSchema); + if (left != null && right != null) { + return SubstraitUtil.and(left, right); + } else if (left != null) { + return left; + } else { + return right; + } + } + case OR: { + Expression left = convertToSubstraitExpr(compoundPredicate.getChild(0), tableSchema); + Expression right = convertToSubstraitExpr(compoundPredicate.getChild(1), tableSchema); + if (left != null && right != null) { + return SubstraitUtil.or(left, right); + } + return null; + } + case NOT: { + Expression child = convertToSubstraitExpr(compoundPredicate.getChild(0), tableSchema); + if (child != null) { + return SubstraitUtil.not(child); + } + return null; + } + default: + return null; + } + } else if (predicate instanceof InPredicate) { + InPredicate inExpr = (InPredicate) predicate; + if (inExpr.contains(Subquery.class)) { + return null; + } + SlotRef slotRef = convertDorisExprToSlotRef(inExpr.getChild(0)); + if (slotRef == null) { + return null; + } + String colName = slotRef.getColumnName(); + Field field = tableSchema.findField(colName); + Expression fieldRef = SubstraitUtil.arrowFieldToSubstraitField(field); + + colName = field.getName(); + Type type = field.getType().accept( + new SubstraitUtil.ArrowTypeToSubstraitTypeConverter(field.isNullable()) + ); + List valueList = new ArrayList<>(); + for (int i = 1; i < inExpr.getChildren().size(); ++i) { + if (!(inExpr.getChild(i) instanceof LiteralExpr)) { + return null; + } + LiteralExpr literalExpr = (LiteralExpr) inExpr.getChild(i); + Object value = extractDorisLiteral(type, literalExpr); + if (value == null) { + return null; + } + valueList.add(SubstraitUtil.anyToSubstraitLiteral(type, value)); + } + if (inExpr.isNotIn()) { + // not in + return SubstraitUtil.notIn(fieldRef, valueList); + } else { + // in + return SubstraitUtil.in(fieldRef, valueList); + } + } + return convertBinaryExpr(predicate, tableSchema); + } + + private static Expression convertBinaryExpr(Expr dorisExpr, Schema tableSchema) throws IOException { + TExprOpcode opcode = dorisExpr.getOpcode(); + // Make sure the col slot is always first + SlotRef slotRef = convertDorisExprToSlotRef(dorisExpr.getChild(0)); + LiteralExpr literalExpr = convertDorisExprToLiteralExpr(dorisExpr.getChild(1)); + if (slotRef == null || literalExpr == null) { + return null; + } + String colName = slotRef.getColumnName(); + Field field = tableSchema.findField(colName); + Expression fieldRef = SubstraitUtil.arrowFieldToSubstraitField(field); + + Type type = field.getType().accept( + new SubstraitUtil.ArrowTypeToSubstraitTypeConverter(field.isNullable()) + ); + Object value = extractDorisLiteral(type, literalExpr); + if (value == null) { + if (opcode == TExprOpcode.EQ_FOR_NULL && literalExpr instanceof NullLiteral) { + return SubstraitUtil.makeUnary( + fieldRef, + DefaultExtensionCatalog.FUNCTIONS_COMPARISON, + "is_null:any", + TypeCreator.NULLABLE.BOOLEAN); + } else { + return null; + } + } + Expression literal = SubstraitUtil.anyToSubstraitLiteral( + type, + value + ); + + String namespace; + String func; + switch (opcode) { + case EQ: + namespace = DefaultExtensionCatalog.FUNCTIONS_COMPARISON; + func = "equal:any_any"; + break; + case EQ_FOR_NULL: + namespace = DefaultExtensionCatalog.FUNCTIONS_COMPARISON; + func = "is_null:any"; + break; + case NE: + namespace = DefaultExtensionCatalog.FUNCTIONS_COMPARISON; + func = "not_equal:any_any"; + break; + case GE: + namespace = DefaultExtensionCatalog.FUNCTIONS_COMPARISON; + func = "gte:any_any"; + break; + case GT: + namespace = DefaultExtensionCatalog.FUNCTIONS_COMPARISON; + func = "gt:any_any"; + break; + case LE: + namespace = DefaultExtensionCatalog.FUNCTIONS_COMPARISON; + func = "lte:any_any"; + break; + case LT: + namespace = DefaultExtensionCatalog.FUNCTIONS_COMPARISON; + func = "lt:any_any"; + break; + case INVALID_OPCODE: + if (dorisExpr instanceof FunctionCallExpr) { + String name = dorisExpr.getExprName().toLowerCase(); + String s = literalExpr.getStringValue(); + if (name.equals("like") && !s.startsWith("%") && s.endsWith("%")) { + namespace = DefaultExtensionCatalog.FUNCTIONS_STRING; + func = "like:bool"; + break; + } + } else if (dorisExpr instanceof IsNullPredicate) { + if (((IsNullPredicate) dorisExpr).isNotNull()) { + namespace = DefaultExtensionCatalog.FUNCTIONS_COMPARISON; + func = "is_not_null:any"; + + } else { + namespace = DefaultExtensionCatalog.FUNCTIONS_COMPARISON; + func = "is_null:any"; + } + break; + } + return null; + default: + return null; + } + return SubstraitUtil.makeBinary(fieldRef, literal, namespace, func, TypeCreator.NULLABLE.BOOLEAN); + } + + public static Object extractDorisLiteral(Type type, LiteralExpr expr) { + + if (expr instanceof BoolLiteral) { + if (type instanceof Type.Bool) { + return ((BoolLiteral) expr).getValue(); + } + if (type instanceof Type.Str) { + return expr.getStringValue(); + } + } else if (expr instanceof DateLiteral) { + DateLiteral dateLiteral = (DateLiteral) expr; + if (type instanceof Type.Date) { + if (dateLiteral.getType().isDatetimeV2() || dateLiteral.getType().isDatetime()) { + return null; + } + return dateLiteral.getLongValue(); + } + if (type instanceof Type.TimestampTZ || type instanceof Type.Timestamp) { + return dateLiteral.getLongValue(); + } + if (type instanceof Type.Str) { + return expr.getStringValue(); + } + } else if (expr instanceof DecimalLiteral) { + DecimalLiteral decimalLiteral = (DecimalLiteral) expr; + if (type instanceof Type.Decimal) { + return decimalLiteral.getValue(); + } else if (type instanceof Type.FP64) { + return decimalLiteral.getDoubleValue(); + } + if (type instanceof Type.Str) { + return expr.getStringValue(); + } + } else if (expr instanceof FloatLiteral) { + FloatLiteral floatLiteral = (FloatLiteral) expr; + + if (floatLiteral.getType() == org.apache.doris.catalog.Type.FLOAT) { + return type instanceof Type.FP32 + || type instanceof Type.FP64 + || type instanceof Type.Decimal ? ((FloatLiteral) expr).getValue() : null; + } else { + return type instanceof Type.FP64 + || type instanceof Type.Decimal ? ((FloatLiteral) expr).getValue() : null; + } + } else if (expr instanceof IntLiteral) { + if (type instanceof Type.I8 + || type instanceof Type.I16 + || type instanceof Type.I32 + || type instanceof Type.I64 + || type instanceof Type.FP32 + || type instanceof Type.FP64 + || type instanceof Type.Decimal + || type instanceof Type.Date + ) { + return expr.getRealValue(); + } + if (!expr.getType().isInteger32Type()) { + if (type instanceof Type.Time || type instanceof Type.Timestamp || type instanceof Type.TimestampTZ) { + return expr.getLongValue(); + } + } + + } else if (expr instanceof StringLiteral) { + String value = expr.getStringValue(); + if (type instanceof Type.Str) { + return value; + } + if (type instanceof Type.Date) { + try { + return (int) ChronoUnit.DAYS.between( + EPOCH_DAY, + LocalDate.parse(value, DateTimeFormatter.ISO_LOCAL_DATE)); + } catch (DateTimeParseException e) { + return null; + } + } + if (type instanceof Type.Timestamp || type instanceof Type.TimestampTZ) { + try { + return ChronoUnit.MICROS.between( + EPOCH, + OffsetDateTime.parse(value, DateTimeFormatter.ISO_DATE_TIME)); + } catch (DateTimeParseException e) { + return null; + } + } + } + return null; + } + +} diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/lakesoul/source/LakeSoulScanNode.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/lakesoul/source/LakeSoulScanNode.java index 1779aeaca106de..fd36bfd52bdfac 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/lakesoul/source/LakeSoulScanNode.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/lakesoul/source/LakeSoulScanNode.java @@ -17,14 +17,19 @@ package org.apache.doris.datasource.lakesoul.source; + + import org.apache.doris.analysis.TupleDescriptor; import org.apache.doris.catalog.TableIf; import org.apache.doris.common.DdlException; import org.apache.doris.common.UserException; import org.apache.doris.common.util.LocationPath; +import org.apache.doris.datasource.ExternalCatalog; import org.apache.doris.datasource.FileQueryScanNode; import org.apache.doris.datasource.TableFormatType; import org.apache.doris.datasource.lakesoul.LakeSoulExternalTable; +import org.apache.doris.datasource.lakesoul.LakeSoulUtils; +import org.apache.doris.datasource.property.constants.S3Properties; import org.apache.doris.planner.PlanNodeId; import org.apache.doris.spi.Split; import org.apache.doris.statistics.StatisticalType; @@ -34,15 +39,26 @@ import org.apache.doris.thrift.TLakeSoulFileDesc; import org.apache.doris.thrift.TTableFormatFileDesc; +import com.dmetasoul.lakesoul.lakesoul.io.substrait.SubstraitUtil; import com.dmetasoul.lakesoul.meta.DBUtil; import com.dmetasoul.lakesoul.meta.DataFileInfo; import com.dmetasoul.lakesoul.meta.DataOperation; import com.dmetasoul.lakesoul.meta.LakeSoulOptions; +import com.dmetasoul.lakesoul.meta.entity.PartitionInfo; import com.dmetasoul.lakesoul.meta.entity.TableInfo; import com.google.common.collect.Lists; import com.lakesoul.shaded.com.alibaba.fastjson.JSON; import com.lakesoul.shaded.com.alibaba.fastjson.JSONObject; +import com.lakesoul.shaded.com.fasterxml.jackson.core.type.TypeReference; +import com.lakesoul.shaded.com.fasterxml.jackson.databind.ObjectMapper; +import com.lakesoul.shaded.org.apache.arrow.vector.types.pojo.Field; +import com.lakesoul.shaded.org.apache.arrow.vector.types.pojo.Schema; +import io.substrait.proto.Plan; +import lombok.SneakyThrows; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; +import java.io.IOException; import java.util.ArrayList; import java.util.LinkedHashMap; import java.util.List; @@ -52,19 +68,54 @@ public class LakeSoulScanNode extends FileQueryScanNode { - protected final LakeSoulExternalTable lakeSoulExternalTable; + private static final Logger LOG = LogManager.getLogger(LakeSoulScanNode.class); + protected LakeSoulExternalTable lakeSoulExternalTable; + + String tableName; + + String location; - protected final TableInfo table; + String partitions; + + Schema tableArrowSchema; + + Schema partitionArrowSchema; + private Map tableProperties; + + String readType; public LakeSoulScanNode(PlanNodeId id, TupleDescriptor desc, boolean needCheckColumnPriv) { super(id, desc, "planNodeName", StatisticalType.LAKESOUL_SCAN_NODE, needCheckColumnPriv); + } + + @Override + protected void doInitialize() throws UserException { + super.doInitialize(); lakeSoulExternalTable = (LakeSoulExternalTable) desc.getTable(); - table = lakeSoulExternalTable.getLakeSoulTableInfo(); + TableInfo tableInfo = lakeSoulExternalTable.getLakeSoulTableInfo(); + location = tableInfo.getTablePath(); + tableName = tableInfo.getTableName(); + partitions = tableInfo.getPartitions(); + readType = LakeSoulOptions.ReadType$.MODULE$.FULL_READ(); + try { + tableProperties = new ObjectMapper().readValue( + tableInfo.getProperties(), + new TypeReference>() {} + ); + tableArrowSchema = Schema.fromJSON(tableInfo.getTableSchema()); + List partitionFields = + DBUtil.parseTableInfoPartitions(partitions) + .rangeKeys + .stream() + .map(tableArrowSchema::findField).collect(Collectors.toList()); + partitionArrowSchema = new Schema(partitionFields); + } catch (IOException e) { + throw new UserException(e); + } } @Override protected TFileType getLocationType() throws UserException { - String location = table.getTablePath(); return getLocationType(location); } @@ -81,12 +132,12 @@ protected TFileFormatType getFileFormatType() throws UserException { @Override protected List getPathPartitionKeys() throws UserException { - return new ArrayList<>(DBUtil.parseTableInfoPartitions(table.getPartitions()).rangeKeys); + return new ArrayList<>(DBUtil.parseTableInfoPartitions(partitions).rangeKeys); } @Override protected TableIf getTargetTable() throws UserException { - return lakeSoulExternalTable; + return desc.getTable(); } @Override @@ -94,13 +145,21 @@ protected Map getLocationProperties() throws UserException { return lakeSoulExternalTable.getHadoopProperties(); } + @SneakyThrows @Override protected void setScanParams(TFileRangeDesc rangeDesc, Split split) { + if (LOG.isDebugEnabled()) { + LOG.debug("{}", rangeDesc); + } if (split instanceof LakeSoulSplit) { setLakeSoulParams(rangeDesc, (LakeSoulSplit) split); } } + public ExternalCatalog getCatalog() { + return lakeSoulExternalTable.getCatalog(); + } + public static boolean isExistHashPartition(TableInfo tif) { JSONObject tableProperties = JSON.parseObject(tif.getProperties()); if (tableProperties.containsKey(LakeSoulOptions.HASH_BUCKET_NUM()) @@ -111,13 +170,47 @@ public static boolean isExistHashPartition(TableInfo tif) { } } - public void setLakeSoulParams(TFileRangeDesc rangeDesc, LakeSoulSplit lakeSoulSplit) { + public void setLakeSoulParams(TFileRangeDesc rangeDesc, LakeSoulSplit lakeSoulSplit) throws IOException { TTableFormatFileDesc tableFormatFileDesc = new TTableFormatFileDesc(); tableFormatFileDesc.setTableFormatType(lakeSoulSplit.getTableFormatType().value()); TLakeSoulFileDesc fileDesc = new TLakeSoulFileDesc(); fileDesc.setFilePaths(lakeSoulSplit.getPaths()); fileDesc.setPrimaryKeys(lakeSoulSplit.getPrimaryKeys()); fileDesc.setTableSchema(lakeSoulSplit.getTableSchema()); + + + JSONObject options = new JSONObject(); + Plan predicate = LakeSoulUtils.getPushPredicate( + conjuncts, + tableName, + tableArrowSchema, + partitionArrowSchema, + tableProperties, + readType.equals(LakeSoulOptions.ReadType$.MODULE$.INCREMENTAL_READ())); + if (predicate != null) { + options.put(LakeSoulUtils.SUBSTRAIT_PREDICATE, SubstraitUtil.encodeBase64String(predicate)); + } + Map catalogProps = getCatalog().getProperties(); + if (LOG.isDebugEnabled()) { + LOG.debug("{}", catalogProps); + } + + if (catalogProps.get(S3Properties.Env.ENDPOINT) != null) { + options.put(LakeSoulUtils.FS_S3A_ENDPOINT, catalogProps.get(S3Properties.Env.ENDPOINT)); + options.put(LakeSoulUtils.FS_S3A_PATH_STYLE_ACCESS, "true"); + if (catalogProps.get(S3Properties.Env.ACCESS_KEY) != null) { + options.put(LakeSoulUtils.FS_S3A_ACCESS_KEY, catalogProps.get(S3Properties.Env.ACCESS_KEY)); + } + if (catalogProps.get(S3Properties.Env.SECRET_KEY) != null) { + options.put(LakeSoulUtils.FS_S3A_SECRET_KEY, catalogProps.get(S3Properties.Env.SECRET_KEY)); + } + if (catalogProps.get(S3Properties.Env.REGION) != null) { + options.put(LakeSoulUtils.FS_S3A_REGION, catalogProps.get(S3Properties.Env.REGION)); + } + } + + fileDesc.setOptions(JSON.toJSONString(options)); + fileDesc.setPartitionDescs(lakeSoulSplit.getPartitionDesc() .entrySet().stream().map(entry -> String.format("%s=%s", entry.getKey(), entry.getValue())).collect(Collectors.toList())); @@ -126,24 +219,51 @@ public void setLakeSoulParams(TFileRangeDesc rangeDesc, LakeSoulSplit lakeSoulSp } public List getSplits() throws UserException { + if (LOG.isDebugEnabled()) { + LOG.debug("getSplits with columnFilters={}", columnFilters); + LOG.debug("getSplits with columnNameToRange={}", columnNameToRange); + LOG.debug("getSplits with conjuncts={}", conjuncts); + } + + List allPartitionInfo = lakeSoulExternalTable.listPartitionInfo(); + if (LOG.isDebugEnabled()) { + LOG.debug("allPartitionInfo={}", allPartitionInfo); + } + List filteredPartitionInfo = allPartitionInfo; + try { + filteredPartitionInfo = + LakeSoulUtils.applyPartitionFilters( + allPartitionInfo, + tableName, + partitionArrowSchema, + columnNameToRange + ); + } catch (IOException e) { + throw new UserException(e); + } + if (LOG.isDebugEnabled()) { + LOG.debug("filteredPartitionInfo={}", filteredPartitionInfo); + } + DataFileInfo[] dataFileInfos = DataOperation.getTableDataInfo(filteredPartitionInfo); + List splits = new ArrayList<>(); Map>> splitByRangeAndHashPartition = new LinkedHashMap<>(); - TableInfo tif = table; - DataFileInfo[] dfinfos = DataOperation.getTableDataInfo(table.getTableId()); - for (DataFileInfo pif : dfinfos) { - if (isExistHashPartition(tif) && pif.file_bucket_id() != -1) { - splitByRangeAndHashPartition.computeIfAbsent(pif.range_partitions(), k -> new LinkedHashMap<>()) - .computeIfAbsent(pif.file_bucket_id(), v -> new ArrayList<>()) - .add(pif.path()); + TableInfo tableInfo = lakeSoulExternalTable.getLakeSoulTableInfo(); + + for (DataFileInfo fileInfo : dataFileInfos) { + if (isExistHashPartition(tableInfo) && fileInfo.file_bucket_id() != -1) { + splitByRangeAndHashPartition.computeIfAbsent(fileInfo.range_partitions(), k -> new LinkedHashMap<>()) + .computeIfAbsent(fileInfo.file_bucket_id(), v -> new ArrayList<>()) + .add(fileInfo.path()); } else { - splitByRangeAndHashPartition.computeIfAbsent(pif.range_partitions(), k -> new LinkedHashMap<>()) + splitByRangeAndHashPartition.computeIfAbsent(fileInfo.range_partitions(), k -> new LinkedHashMap<>()) .computeIfAbsent(-1, v -> new ArrayList<>()) - .add(pif.path()); + .add(fileInfo.path()); } } List pkKeys = null; - if (!table.getPartitions().equals(";")) { - pkKeys = Lists.newArrayList(table.getPartitions().split(";")[1].split(",")); + if (!tableInfo.getPartitions().equals(";")) { + pkKeys = Lists.newArrayList(tableInfo.getPartitions().split(";")[1].split(",")); } for (Map.Entry>> entry : splitByRangeAndHashPartition.entrySet()) { @@ -161,7 +281,7 @@ public List getSplits() throws UserException { split.getValue(), pkKeys, rangeDesc, - table.getTableSchema(), + tableInfo.getTableSchema(), 0, 0, 0, new String[0], null); lakeSoulSplit.setTableFormatType(TableFormatType.LAKESOUL); diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/analysis/BindRelation.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/analysis/BindRelation.java index 82945fb6963df0..64178846abf7ed 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/analysis/BindRelation.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/analysis/BindRelation.java @@ -283,6 +283,7 @@ private LogicalPlan getLogicalPlan(TableIf table, UnboundRelation unboundRelatio case PAIMON_EXTERNAL_TABLE: case MAX_COMPUTE_EXTERNAL_TABLE: case TRINO_CONNECTOR_EXTERNAL_TABLE: + case LAKESOUl_EXTERNAL_TABLE: return new LogicalFileScan(unboundRelation.getRelationId(), (ExternalTable) table, qualifierWithoutTableName, unboundRelation.getTableSample(), unboundRelation.getTableSnapshot()); diff --git a/fe/fe-core/src/test/java/org/apache/doris/datasource/lakesoul/LakeSoulPredicateTest.java b/fe/fe-core/src/test/java/org/apache/doris/datasource/lakesoul/LakeSoulPredicateTest.java new file mode 100644 index 00000000000000..aebd74f5e020f5 --- /dev/null +++ b/fe/fe-core/src/test/java/org/apache/doris/datasource/lakesoul/LakeSoulPredicateTest.java @@ -0,0 +1,280 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.datasource.lakesoul; + +import org.apache.doris.analysis.BinaryPredicate; +import org.apache.doris.analysis.BoolLiteral; +import org.apache.doris.analysis.CompoundPredicate; +import org.apache.doris.analysis.CompoundPredicate.Operator; +import org.apache.doris.analysis.DateLiteral; +import org.apache.doris.analysis.DecimalLiteral; +import org.apache.doris.analysis.Expr; +import org.apache.doris.analysis.FloatLiteral; +import org.apache.doris.analysis.InPredicate; +import org.apache.doris.analysis.IntLiteral; +import org.apache.doris.analysis.LiteralExpr; +import org.apache.doris.analysis.SlotRef; +import org.apache.doris.analysis.StringLiteral; +import org.apache.doris.analysis.TableName; +import org.apache.doris.catalog.Type; +import org.apache.doris.common.AnalysisException; + +import com.dmetasoul.lakesoul.lakesoul.io.substrait.SubstraitUtil; +import com.google.common.collect.ArrayListMultimap; +import com.google.common.collect.Lists; +import com.lakesoul.shaded.org.apache.arrow.vector.types.DateUnit; +import com.lakesoul.shaded.org.apache.arrow.vector.types.FloatingPointPrecision; +import com.lakesoul.shaded.org.apache.arrow.vector.types.TimeUnit; +import com.lakesoul.shaded.org.apache.arrow.vector.types.pojo.ArrowType; +import com.lakesoul.shaded.org.apache.arrow.vector.types.pojo.Field; +import com.lakesoul.shaded.org.apache.arrow.vector.types.pojo.FieldType; +import com.lakesoul.shaded.org.apache.arrow.vector.types.pojo.Schema; +import io.substrait.expression.Expression; +import org.junit.Assert; +import org.junit.BeforeClass; +import org.junit.Test; + +import java.io.IOException; +import java.math.BigDecimal; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; +import java.util.stream.Collectors; + +public class LakeSoulPredicateTest { + + public static Schema schema; + + @BeforeClass + public static void before() throws AnalysisException, IOException { + schema = new Schema( + Arrays.asList( + new Field("c_int", FieldType.nullable(new ArrowType.Int(32, true)), null), + new Field("c_long", FieldType.nullable(new ArrowType.Int(64, true)), null), + new Field("c_bool", FieldType.nullable(new ArrowType.Bool()), null), + new Field("c_float", FieldType.nullable(new ArrowType.FloatingPoint(FloatingPointPrecision.SINGLE)), null), + new Field("c_double", FieldType.nullable(new ArrowType.FloatingPoint(FloatingPointPrecision.DOUBLE)), null), + new Field("c_dec", FieldType.nullable(new ArrowType.Decimal(20, 10)), null), + new Field("c_date", FieldType.nullable(new ArrowType.Date(DateUnit.DAY)), null), + new Field("c_ts", FieldType.nullable(new ArrowType.Timestamp(TimeUnit.MICROSECOND, "UTC")), null), + new Field("c_str", FieldType.nullable(new ArrowType.Utf8()), null) + )); + } + + @Test + public void testBinaryPredicate() throws AnalysisException, IOException { + List literalList = new ArrayList() {{ + add(new BoolLiteral(true)); + add(new DateLiteral("2023-01-02", Type.DATEV2)); + add(new DateLiteral("2024-01-02 12:34:56.123456", Type.DATETIMEV2)); + add(new DecimalLiteral(new BigDecimal("1.23"))); + add(new FloatLiteral(1.23, Type.FLOAT)); + add(new FloatLiteral(3.456, Type.DOUBLE)); + add(new IntLiteral(1, Type.TINYINT)); + add(new IntLiteral(1, Type.SMALLINT)); + add(new IntLiteral(1, Type.INT)); + add(new IntLiteral(1, Type.BIGINT)); + add(new StringLiteral("abc")); + add(new StringLiteral("2023-01-02")); + add(new StringLiteral("2023-01-02 01:02:03.456789")); + }}; + + List slotRefs = new ArrayList() {{ + add(new SlotRef(new TableName(), "c_int")); + add(new SlotRef(new TableName(), "c_long")); + add(new SlotRef(new TableName(), "c_bool")); + add(new SlotRef(new TableName(), "c_float")); + add(new SlotRef(new TableName(), "c_double")); + add(new SlotRef(new TableName(), "c_dec")); + add(new SlotRef(new TableName(), "c_date")); + add(new SlotRef(new TableName(), "c_ts")); + add(new SlotRef(new TableName(), "c_str")); + }}; + + // true indicates support for pushdown + Boolean[][] expects = new Boolean[][] { + { // int + false, false, false, false, false, false, true, true, true, true, false, false, false + }, + { // long + false, false, false, false, false, false, true, true, true, true, false, false, false + }, + { // boolean + true, false, false, false, false, false, false, false, false, false, false, false, false + }, + { // float + false, false, false, false, true, false, true, true, true, true, false, false, false + }, + { // double + false, false, false, true, true, true, true, true, true, true, false, false, false + }, + { // decimal + false, false, false, true, true, true, true, true, true, true, false, false, false + }, + { // date + false, true, false, false, false, false, true, true, true, true, false, true, false + }, + { // timestamp + false, true, true, false, false, false, false, false, false, true, false, false, false + }, + { // string + true, true, true, true, false, false, false, false, false, false, true, true, true + } + }; + + ArrayListMultimap validPredicateMap = ArrayListMultimap.create(); + + // binary predicate + for (int i = 0; i < slotRefs.size(); i++) { + final int loc = i; + List ret = literalList.stream().map(literal -> { + BinaryPredicate expr = new BinaryPredicate(BinaryPredicate.Operator.EQ, slotRefs.get(loc), literal); + Expression expression = null; + try { + expression = LakeSoulUtils.convertToSubstraitExpr(expr, schema); + } catch (IOException e) { + throw new RuntimeException(e); + } + validPredicateMap.put(expression != null, expr); + return expression != null; + }).collect(Collectors.toList()); + Assert.assertArrayEquals(expects[i], ret.toArray()); + } + + // in predicate + for (int i = 0; i < slotRefs.size(); i++) { + final int loc = i; + List ret = literalList.stream().map(literal -> { + InPredicate expr = new InPredicate(slotRefs.get(loc), Lists.newArrayList(literal), false); + Expression expression = null; + try { + expression = LakeSoulUtils.convertToSubstraitExpr(expr, schema); + } catch (IOException e) { + throw new RuntimeException(e); + } + validPredicateMap.put(expression != null, expr); + return expression != null; + }).collect(Collectors.toList()); + Assert.assertArrayEquals(expects[i], ret.toArray()); + } + + // not in predicate + for (int i = 0; i < slotRefs.size(); i++) { + final int loc = i; + List ret = literalList.stream().map(literal -> { + InPredicate expr = new InPredicate(slotRefs.get(loc), Lists.newArrayList(literal), true); + Expression expression = null; + try { + expression = LakeSoulUtils.convertToSubstraitExpr(expr, schema); + } catch (IOException e) { + throw new RuntimeException(e); + } + validPredicateMap.put(expression != null, expr); + return expression != null; + }).collect(Collectors.toList()); + Assert.assertArrayEquals(expects[i], ret.toArray()); + } + + // bool literal + + Expression trueExpr = LakeSoulUtils.convertToSubstraitExpr(new BoolLiteral(true), schema); + Expression falseExpr = LakeSoulUtils.convertToSubstraitExpr(new BoolLiteral(false), schema); + Assert.assertEquals(SubstraitUtil.CONST_TRUE, trueExpr); + Assert.assertEquals(SubstraitUtil.CONST_FALSE, falseExpr); + validPredicateMap.put(true, new BoolLiteral(true)); + validPredicateMap.put(true, new BoolLiteral(false)); + + List validExprs = validPredicateMap.get(true); + List invalidExprs = validPredicateMap.get(false); + // OR predicate + // both valid + for (int i = 0; i < validExprs.size(); i++) { + for (int j = 0; j < validExprs.size(); j++) { + CompoundPredicate orPredicate = new CompoundPredicate(Operator.OR, + validExprs.get(i), validExprs.get(j)); + Expression expression = LakeSoulUtils.convertToSubstraitExpr(orPredicate, schema); + Assert.assertNotNull("pred: " + orPredicate.toSql(), expression); + } + } + // both invalid + for (int i = 0; i < invalidExprs.size(); i++) { + for (int j = 0; j < invalidExprs.size(); j++) { + CompoundPredicate orPredicate = new CompoundPredicate(Operator.OR, + invalidExprs.get(i), invalidExprs.get(j)); + Expression expression = LakeSoulUtils.convertToSubstraitExpr(orPredicate, schema); + Assert.assertNull("pred: " + orPredicate.toSql(), expression); + } + } + // valid or invalid + for (int i = 0; i < validExprs.size(); i++) { + for (int j = 0; j < invalidExprs.size(); j++) { + CompoundPredicate orPredicate = new CompoundPredicate(Operator.OR, + validExprs.get(i), invalidExprs.get(j)); + Expression expression = LakeSoulUtils.convertToSubstraitExpr(orPredicate, schema); + Assert.assertNull("pred: " + orPredicate.toSql(), expression); + } + } + + // AND predicate + // both valid + for (int i = 0; i < validExprs.size(); i++) { + for (int j = 0; j < validExprs.size(); j++) { + CompoundPredicate andPredicate = new CompoundPredicate(Operator.AND, + validExprs.get(i), validExprs.get(j)); + Expression expression = LakeSoulUtils.convertToSubstraitExpr(andPredicate, schema); + Assert.assertNotNull("pred: " + andPredicate.toSql(), expression); + } + } + // both invalid + for (int i = 0; i < invalidExprs.size(); i++) { + for (int j = 0; j < invalidExprs.size(); j++) { + CompoundPredicate andPredicate = new CompoundPredicate(Operator.AND, + invalidExprs.get(i), invalidExprs.get(j)); + Expression expression = LakeSoulUtils.convertToSubstraitExpr(andPredicate, schema); + Assert.assertNull("pred: " + andPredicate.toSql(), expression); + } + } + // valid and invalid + for (int i = 0; i < validExprs.size(); i++) { + for (int j = 0; j < invalidExprs.size(); j++) { + CompoundPredicate andPredicate = new CompoundPredicate(Operator.AND, + validExprs.get(i), invalidExprs.get(j)); + Expression expression = LakeSoulUtils.convertToSubstraitExpr(andPredicate, schema); + Assert.assertNotNull("pred: " + andPredicate.toSql(), expression); + Assert.assertEquals(SubstraitUtil.substraitExprToProto(LakeSoulUtils.convertToSubstraitExpr(validExprs.get(i), schema), "table"), + SubstraitUtil.substraitExprToProto(expression, "table")); + } + } + + // NOT predicate + // valid + for (int i = 0; i < validExprs.size(); i++) { + CompoundPredicate notPredicate = new CompoundPredicate(Operator.NOT, + validExprs.get(i), null); + Expression expression = LakeSoulUtils.convertToSubstraitExpr(notPredicate, schema); + Assert.assertNotNull("pred: " + notPredicate.toSql(), expression); + } + // invalid + for (int i = 0; i < invalidExprs.size(); i++) { + CompoundPredicate notPredicate = new CompoundPredicate(Operator.NOT, + invalidExprs.get(i), null); + Expression expression = LakeSoulUtils.convertToSubstraitExpr(notPredicate, schema); + Assert.assertNull("pred: " + notPredicate.toSql(), expression); + } + } +} diff --git a/fe/pom.xml b/fe/pom.xml index 0c08bb327ae932..3d43297fe217a0 100644 --- a/fe/pom.xml +++ b/fe/pom.xml @@ -317,6 +317,8 @@ under the License. 0.14.1 2.7.4-11 3.0.0-8 + + 2.6.1 1.13.1 3.2.2 @@ -1831,4 +1833,4 @@ under the License. - \ No newline at end of file + diff --git a/regression-test/conf/regression-conf.groovy b/regression-test/conf/regression-conf.groovy index 08d03632c373d4..d4591bf1782c76 100644 --- a/regression-test/conf/regression-conf.groovy +++ b/regression-test/conf/regression-conf.groovy @@ -228,3 +228,14 @@ enableTrinoConnectorTest = false enableKerberosTest=false kerberosHmsPort=9883 kerberosHdfsPort=8820 + + +// LakeSoul catalog test config +enableLakesoulTest = false +lakesoulPGUser="*******" +lakesoulPGPwd="*******" +lakesoulPGUrl="*******" +lakesoulMinioAK="*******" +lakesoulMinioSK="*******" +lakesoulMinioEndpoint="*******" + diff --git a/regression-test/data/external_table_p0/lakesoul/test_lakesoul_filter.out b/regression-test/data/external_table_p0/lakesoul/test_lakesoul_filter.out new file mode 100644 index 00000000000000..cb1899326bd0dc --- /dev/null +++ b/regression-test/data/external_table_p0/lakesoul/test_lakesoul_filter.out @@ -0,0 +1,8 @@ +-- This file is automatically generated. You should know what you did if you want to edit this +-- !lakesoul -- +0 AFRICA lar deposits. blithely final packages cajole. regular waters are final requests. regular accounts are according to +2 ASIA ges. thinly even pinto beans ca +3 EUROPE ly final courts cajole furiously final excuse +1 AMERICA hs use ironic, even requests. s +4 MIDDLE EAST uickly special accounts cajole carefully blithely close requests. carefully final asymptotes haggle furiousl + diff --git a/regression-test/pipeline/external/conf/regression-conf.groovy b/regression-test/pipeline/external/conf/regression-conf.groovy index 93965b84219e97..31e2c2629ce756 100644 --- a/regression-test/pipeline/external/conf/regression-conf.groovy +++ b/regression-test/pipeline/external/conf/regression-conf.groovy @@ -160,3 +160,7 @@ enableTrinoConnectorTest = true enableKerberosTest = true kerberosHmsPort=9883 kerberosHdfsPort=8820 + + +// LakeSoul catalog test config +enableLakesoulTest = true diff --git a/regression-test/suites/external_table_p0/lakesoul/test_lakesoul_catalog.groovy b/regression-test/suites/external_table_p0/lakesoul/test_lakesoul_catalog.groovy index e0b8a924c3075b..ffd95d93097dbc 100644 --- a/regression-test/suites/external_table_p0/lakesoul/test_lakesoul_catalog.groovy +++ b/regression-test/suites/external_table_p0/lakesoul/test_lakesoul_catalog.groovy @@ -16,20 +16,33 @@ // under the License. suite("test_lakesoul_catalog", "p0,external,doris,external_docker,external_docker_doris") { - def enabled = false; + String enabled = context.config.otherConfigs.get("enableLakesoulTest") // open it when docker image is ready to run in regression test - if (enabled) { + if (enabled != null && enabled.equalsIgnoreCase("true")) { String catalog_name = "lakesoul" String db_name = "default" + String pg_user = context.config.otherConfigs.get("lakesoulPGUser") + String pg_pwd = context.config.otherConfigs.get("lakesoulPGPwd") + String pg_url = context.config.otherConfigs.get("lakesoulPGUrl") + String minio_ak = context.config.otherConfigs.get("lakesoulMinioAK") + String minio_sk = context.config.otherConfigs.get("lakesoulMinioSK") + String minio_endpoint = context.config.otherConfigs.get("lakesoulMinioEndpoint") sql """drop catalog if exists ${catalog_name}""" - sql """ - create catalog lakesoul properties ('type'='lakesoul','lakesoul.pg.username'='lakesoul_test','lakesoul.pg.password'='lakesoul_test','lakesoul.pg.url'='jdbc:postgresql://127.0.0.1:5432/lakesoul_test?stringtype=unspecified');""" + sql """create catalog lakesoul properties ( + 'type'='lakesoul', + 'lakesoul.pg.username'='${pg_user}', + 'lakesoul.pg.password'='${pg_pwd}', + 'lakesoul.pg.url'='${pg_url}', + 'minio.endpoint'='${minio_endpoint}', + 'minio.access_key'='${minio_ak}', + 'minio.secret_key'='${minio_sk}' + );""" // analyze sql """use `${catalog_name}`.`${db_name}`""" - sq """show tables;""" + sql """show tables;""" // select sql """select * from nation;""" diff --git a/regression-test/suites/external_table_p0/lakesoul/test_lakesoul_filter.groovy b/regression-test/suites/external_table_p0/lakesoul/test_lakesoul_filter.groovy new file mode 100644 index 00000000000000..799e8ba61bb2d4 --- /dev/null +++ b/regression-test/suites/external_table_p0/lakesoul/test_lakesoul_filter.groovy @@ -0,0 +1,58 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +suite("test_lakesoul_filter", "p0,external,doris,external_docker,external_docker_doris") { + String enabled = context.config.otherConfigs.get("enableLakesoulTest") + // open it when docker image is ready to run in regression test + if (enabled != null && enabled.equalsIgnoreCase("true")) { + String catalog_name = "lakesoul" + String db_name = "default" + String pg_user = context.config.otherConfigs.get("lakesoulPGUser") + String pg_pwd = context.config.otherConfigs.get("lakesoulPGPwd") + String pg_url = context.config.otherConfigs.get("lakesoulPGUrl") + String minio_ak = context.config.otherConfigs.get("lakesoulMinioAK") + String minio_sk = context.config.otherConfigs.get("lakesoulMinioSK") + String minio_endpoint = context.config.otherConfigs.get("lakesoulMinioEndpoint") + + sql """drop catalog if exists ${catalog_name}""" + sql """create catalog lakesoul properties ( + 'type'='lakesoul', + 'lakesoul.pg.username'='${pg_user}', + 'lakesoul.pg.password'='${pg_pwd}', + 'lakesoul.pg.url'='${pg_url}', + 'minio.endpoint'='${minio_endpoint}', + 'minio.access_key'='${minio_ak}', + 'minio.secret_key'='${minio_sk}' + );""" + + // analyze + sql """use `${catalog_name}`.`${db_name}`""" + + sql """show tables;""" + // select + sql """select * from region;""" + + sql """select * from nation;""" + + sql """select * from nation where n_regionkey = 0 or n_nationkey > 14;""" + + sql """select * from nation where n_regionkey = 0 and n_nationkey > 0;""" + + sql """select * from nation where n_regionkey = 0;""" + } +} + diff --git a/regression-test/suites/external_table_p2/lakesoul/test_external_table_lakesoul.groovy b/regression-test/suites/external_table_p2/lakesoul/test_external_table_lakesoul.groovy index 9369a28e8fe9c7..bb85dc687d7c99 100644 --- a/regression-test/suites/external_table_p2/lakesoul/test_external_table_lakesoul.groovy +++ b/regression-test/suites/external_table_p2/lakesoul/test_external_table_lakesoul.groovy @@ -34,13 +34,25 @@ suite("test_external_table_lakesoul", "p2,external,lakesoul,external_remote,exte if (enabled != null && enabled.equalsIgnoreCase("true")) { - String catalog_name = "lakesoul" - String db_name = "default" + String catalog_name = "lakesoul" + String db_name = "default" + String pg_user = context.config.otherConfigs.get("lakesoulPGUser") + String pg_pwd = context.config.otherConfigs.get("lakesoulPGPwd") + String pg_url = context.config.otherConfigs.get("lakesoulPGUrl") + String minio_ak = context.config.otherConfigs.get("lakesoulMinioAK") + String minio_sk = context.config.otherConfigs.get("lakesoulMinioSK") + String minio_endpoint = context.config.otherConfigs.get("lakesoulMinioEndpoint") - sql """drop catalog if exists ${catalog_name}""" - sql """ - create catalog lakesoul properties ('type'='lakesoul','lakesoul.pg.username'='lakesoul','lakesoul.pg.url'='jdbc:postgresql://127.0.0.1:5433/lakesoul_test?stringtype=unspecified'); - """ + sql """drop catalog if exists ${catalog_name}""" + sql """create catalog lakesoul properties ( + 'type'='lakesoul', + 'lakesoul.pg.username'='${pg_user}', + 'lakesoul.pg.password'='${pg_pwd}', + 'lakesoul.pg.url'='${pg_url}', + 'minio.endpoint'='${minio_endpoint}', + 'minio.access_key'='${minio_ak}', + 'minio.secret_key'='${minio_sk}' + );""" // analyze sql """use `${catalog_name}`.`${db_name}`"""