Skip to content

Commit

Permalink
[Feature][external catalog/lakesoul] LakeSoul Catalog support Filter …
Browse files Browse the repository at this point in the history
…Pushdown & Cdc data handling & S3 data access (apache#37979)

## Proposed changes

Issue Number: close apache#37978

---------

Signed-off-by: zenghua <[email protected]>
  • Loading branch information
Ceng23333 authored Jul 30, 2024
1 parent c5a1998 commit 34dd69d
Show file tree
Hide file tree
Showing 19 changed files with 1,269 additions and 111 deletions.
22 changes: 2 additions & 20 deletions fe/be-java-extensions/lakesoul-scanner/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -47,21 +47,6 @@ under the License.
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.apache.arrow</groupId>
<artifactId>arrow-vector</artifactId>
<version>${arrow.version}</version>
</dependency>
<dependency>
<groupId>org.apache.arrow</groupId>
<artifactId>arrow-memory-unsafe</artifactId>
<version>${arrow.version}</version>
</dependency>
<dependency>
<groupId>org.apache.arrow</groupId>
<artifactId>arrow-c-data</artifactId>
<version>${arrow.version}</version>
</dependency>

<!-- scala deps -->
<dependency>
Expand All @@ -85,7 +70,8 @@ under the License.
<dependency>
<groupId>com.dmetasoul</groupId>
<artifactId>lakesoul-io-java</artifactId>
<version>2.5.4</version>
<version>${lakesoul.version}</version>
<classifier>shaded</classifier>
<exclusions>
<exclusion>
<groupId>org.slf4j</groupId>
Expand All @@ -99,10 +85,6 @@ under the License.
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
</exclusion>
<exclusion>
<groupId>org.apache.arrow</groupId>
<artifactId>*</artifactId>
</exclusion>
<exclusion>
<groupId>org.antlr</groupId>
<artifactId>antlr4-runtime</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,25 +17,30 @@

package org.apache.doris.lakesoul;

import org.apache.doris.common.jni.vec.ScanPredicate;
import org.apache.doris.lakesoul.arrow.LakeSoulArrowJniScanner;
import org.apache.doris.lakesoul.parquet.ParquetFilter;

import com.dmetasoul.lakesoul.LakeSoulArrowReader;
import com.dmetasoul.lakesoul.lakesoul.io.NativeIOReader;
import org.apache.arrow.vector.VectorSchemaRoot;
import org.apache.arrow.vector.types.pojo.Field;
import org.apache.arrow.vector.types.pojo.Schema;
import com.dmetasoul.lakesoul.lakesoul.io.substrait.SubstraitUtil;
import com.lakesoul.shaded.com.fasterxml.jackson.core.type.TypeReference;
import com.lakesoul.shaded.com.fasterxml.jackson.databind.ObjectMapper;
import com.lakesoul.shaded.org.apache.arrow.vector.VectorSchemaRoot;
import com.lakesoul.shaded.org.apache.arrow.vector.types.pojo.Field;
import com.lakesoul.shaded.org.apache.arrow.vector.types.pojo.Schema;
import io.substrait.proto.Plan;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashSet;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;

public class LakeSoulJniScanner extends LakeSoulArrowJniScanner {
private static final Logger LOG = LoggerFactory.getLogger(LakeSoulJniScanner.class);

private final Map<String, String> params;

Expand All @@ -60,6 +65,9 @@ public void open() throws IOException {
withAllocator(nativeIOReader.getAllocator());
nativeIOReader.setBatchSize(batchSize);

if (LOG.isDebugEnabled()) {
LOG.debug("opening LakeSoulJniScanner with params={}", params);
}
// add files
for (String file : params.get(LakeSoulUtils.FILE_NAMES).split(LakeSoulUtils.LIST_DELIM)) {
nativeIOReader.addFile(file);
Expand All @@ -72,19 +80,39 @@ public void open() throws IOException {
Arrays.stream(primaryKeys.split(LakeSoulUtils.LIST_DELIM)).collect(Collectors.toList()));
}

Schema schema = Schema.fromJSON(params.get(LakeSoulUtils.SCHEMA_JSON));
String options = params.getOrDefault(LakeSoulUtils.OPTIONS, "{}");
Map<String, String> optionsMap = new ObjectMapper().readValue(
options, new TypeReference<Map<String, String>>() {}
);
String base64Predicate = optionsMap.get(LakeSoulUtils.SUBSTRAIT_PREDICATE);
if (base64Predicate != null) {
Plan predicate = SubstraitUtil.decodeBase64String(base64Predicate);
if (LOG.isDebugEnabled()) {
LOG.debug("push predicate={}", predicate);
}
nativeIOReader.addFilterProto(predicate);
}

for (String key : LakeSoulUtils.OBJECT_STORE_OPTIONS) {
String value = optionsMap.get(key);
if (key != null) {
nativeIOReader.setObjectStoreOption(key, value);
}
}

Schema tableSchema = Schema.fromJSON(params.get(LakeSoulUtils.SCHEMA_JSON));
String[] requiredFieldNames = params.get(LakeSoulUtils.REQUIRED_FIELDS).split(LakeSoulUtils.LIST_DELIM);

List<Field> requiredFields = new ArrayList<>();
for (String fieldName : requiredFieldNames) {
requiredFields.add(schema.findField(fieldName));
requiredFields.add(tableSchema.findField(fieldName));
}

requiredSchema = new Schema(requiredFields);

nativeIOReader.setSchema(requiredSchema);

HashSet<String> partitionColumn = new HashSet<>();
List<Field> partitionFields = new ArrayList<>();
for (String partitionKV : params.getOrDefault(LakeSoulUtils.PARTITION_DESC, "")
.split(LakeSoulUtils.LIST_DELIM)) {
if (partitionKV.isEmpty()) {
Expand All @@ -94,17 +122,15 @@ public void open() throws IOException {
if (kv.length != 2) {
throw new IllegalArgumentException("Invalid partition column = " + partitionKV);
}
partitionColumn.add(kv[0]);
nativeIOReader.setDefaultColumnValue(kv[0], kv[1]);
partitionFields.add(tableSchema.findField(kv[0]));
}
if (!partitionFields.isEmpty()) {
nativeIOReader.setPartitionSchema(new Schema(partitionFields));
}

initTableInfo(params);

for (ScanPredicate predicate : predicates) {
if (!partitionColumn.contains(predicate.columName)) {
nativeIOReader.addFilter(ParquetFilter.toParquetFilter(predicate).toString());
}
}

nativeIOReader.initializeReader();
lakesoulArrowReader = new LakeSoulArrowReader(nativeIOReader, awaitTimeout);
}
Expand Down Expand Up @@ -156,4 +182,28 @@ public void releaseTable() {
currentBatch.close();
}
}

public static void main(String[] args) throws IOException {
HashMap<String, String> params = new HashMap<>();
params.put("required_fields", "r_regionkey;r_name;r_comment");
params.put("primary_keys", "r_regionkey;r_name");
params.put("query_id", "e9d075a6500a4cac-b94630fd4b30c171");
params.put("file_paths",
"file:/Users/ceng/Documents/GitHub/LakeSoul/rust/lakesoul-datafusion/"
+ "default/region/part-RzmUvDFtYV8ceb3J_0000.parquet"
);
params.put("options", "{}");
params.put("table_schema",
"{\"fields\":["
+ "{\"name\":\"r_regionkey\",\"type\":{\"name\":\"int\",\"isSigned\":true,\"bitWidth\":64},"
+ "\"nullable\":false,\"children\":[]},"
+ "{\"name\":\"r_name\",\"type\":{\"name\":\"utf8\"},\"nullable\":false,\"children\":[]},"
+ "{\"name\":\"r_comment\",\"type\":{\"name\":\"utf8\"},\"nullable\":false,\"children\":[]}"
+ "],"
+ "\"metadata\":null}");
params.put("partition_descs", "");
LakeSoulJniScanner scanner = new LakeSoulJniScanner(1024, params);
scanner.open();
System.out.println(scanner.getNext());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,13 +17,29 @@

package org.apache.doris.lakesoul;

import java.util.Arrays;
import java.util.List;

public class LakeSoulUtils {
public static String FILE_NAMES = "file_paths";
public static String PRIMARY_KEYS = "primary_keys";
public static String SCHEMA_JSON = "table_schema";
public static String PARTITION_DESC = "partition_descs";
public static String REQUIRED_FIELDS = "required_fields";
public static final String FILE_NAMES = "file_paths";
public static final String PRIMARY_KEYS = "primary_keys";
public static final String SCHEMA_JSON = "table_schema";
public static final String PARTITION_DESC = "partition_descs";
public static final String REQUIRED_FIELDS = "required_fields";
public static final String OPTIONS = "options";
public static final String SUBSTRAIT_PREDICATE = "substrait_predicate";
public static final String LIST_DELIM = ";";
public static final String PARTITIONS_KV_DELIM = "=";

public static final String FS_S3A_ACCESS_KEY = "fs.s3a.access.key";
public static final String FS_S3A_SECRET_KEY = "fs.s3a.secret.key";
public static final String FS_S3A_ENDPOINT = "fs.s3a.endpoint";
public static final String FS_S3A_PATH_STYLE_ACCESS = "fs.s3a.path.style.access";

public static String LIST_DELIM = ";";
public static String PARTITIONS_KV_DELIM = "=";
public static final List<String> OBJECT_STORE_OPTIONS = Arrays.asList(
FS_S3A_ACCESS_KEY,
FS_S3A_SECRET_KEY,
FS_S3A_ENDPOINT,
FS_S3A_PATH_STYLE_ACCESS
);
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,10 @@
import org.apache.doris.common.jni.utils.OffHeap;
import org.apache.doris.common.jni.utils.TypeNativeBytes;

import org.apache.arrow.memory.ArrowBuf;
import org.apache.arrow.util.Preconditions;
import org.apache.arrow.vector.types.pojo.ArrowType;
import org.apache.arrow.vector.types.pojo.Field;
import com.lakesoul.shaded.org.apache.arrow.memory.ArrowBuf;
import com.lakesoul.shaded.org.apache.arrow.util.Preconditions;
import com.lakesoul.shaded.org.apache.arrow.vector.types.pojo.ArrowType;
import com.lakesoul.shaded.org.apache.arrow.vector.types.pojo.Field;

import java.time.LocalDate;
import java.time.LocalDateTime;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,28 +23,28 @@
import org.apache.doris.common.jni.vec.ScanPredicate;
import org.apache.doris.common.jni.vec.VectorTable;

import org.apache.arrow.memory.ArrowBuf;
import org.apache.arrow.memory.BufferAllocator;
import org.apache.arrow.vector.BitVector;
import org.apache.arrow.vector.DateDayVector;
import org.apache.arrow.vector.DecimalVector;
import org.apache.arrow.vector.FieldVector;
import org.apache.arrow.vector.TimeStampMicroTZVector;
import org.apache.arrow.vector.TimeStampMicroVector;
import org.apache.arrow.vector.TimeStampMilliTZVector;
import org.apache.arrow.vector.TimeStampMilliVector;
import org.apache.arrow.vector.TimeStampNanoTZVector;
import org.apache.arrow.vector.TimeStampNanoVector;
import org.apache.arrow.vector.TimeStampSecTZVector;
import org.apache.arrow.vector.TimeStampSecVector;
import org.apache.arrow.vector.TimeStampVector;
import org.apache.arrow.vector.ValueVector;
import org.apache.arrow.vector.VarCharVector;
import org.apache.arrow.vector.VectorSchemaRoot;
import org.apache.arrow.vector.complex.ListVector;
import org.apache.arrow.vector.complex.StructVector;
import org.apache.arrow.vector.types.pojo.Field;
import org.apache.arrow.vector.types.pojo.Schema;
import com.lakesoul.shaded.org.apache.arrow.memory.ArrowBuf;
import com.lakesoul.shaded.org.apache.arrow.memory.BufferAllocator;
import com.lakesoul.shaded.org.apache.arrow.vector.BitVector;
import com.lakesoul.shaded.org.apache.arrow.vector.DateDayVector;
import com.lakesoul.shaded.org.apache.arrow.vector.DecimalVector;
import com.lakesoul.shaded.org.apache.arrow.vector.FieldVector;
import com.lakesoul.shaded.org.apache.arrow.vector.TimeStampMicroTZVector;
import com.lakesoul.shaded.org.apache.arrow.vector.TimeStampMicroVector;
import com.lakesoul.shaded.org.apache.arrow.vector.TimeStampMilliTZVector;
import com.lakesoul.shaded.org.apache.arrow.vector.TimeStampMilliVector;
import com.lakesoul.shaded.org.apache.arrow.vector.TimeStampNanoTZVector;
import com.lakesoul.shaded.org.apache.arrow.vector.TimeStampNanoVector;
import com.lakesoul.shaded.org.apache.arrow.vector.TimeStampSecTZVector;
import com.lakesoul.shaded.org.apache.arrow.vector.TimeStampSecVector;
import com.lakesoul.shaded.org.apache.arrow.vector.TimeStampVector;
import com.lakesoul.shaded.org.apache.arrow.vector.ValueVector;
import com.lakesoul.shaded.org.apache.arrow.vector.VarCharVector;
import com.lakesoul.shaded.org.apache.arrow.vector.VectorSchemaRoot;
import com.lakesoul.shaded.org.apache.arrow.vector.complex.ListVector;
import com.lakesoul.shaded.org.apache.arrow.vector.complex.StructVector;
import com.lakesoul.shaded.org.apache.arrow.vector.types.pojo.Field;
import com.lakesoul.shaded.org.apache.arrow.vector.types.pojo.Schema;
import org.apache.log4j.Logger;

import java.io.IOException;
Expand Down
44 changes: 42 additions & 2 deletions fe/fe-core/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -568,7 +568,7 @@ under the License.
<dependency>
<groupId>com.dmetasoul</groupId>
<artifactId>lakesoul-common</artifactId>
<version>2.5.4</version>
<version>${lakesoul.version}</version>
<classifier>shaded</classifier>
<exclusions>
<exclusion>
Expand All @@ -577,6 +577,46 @@ under the License.
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>com.dmetasoul</groupId>
<artifactId>lakesoul-io-java</artifactId>
<version>${lakesoul.version}</version>
<classifier>shaded</classifier>
<exclusions>
<exclusion>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
</exclusion>
<exclusion>
<groupId>log4j</groupId>
<artifactId>log4j</artifactId>
</exclusion>
<exclusion>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
</exclusion>
<exclusion>
<groupId>org.apache.arrow</groupId>
<artifactId>*</artifactId>
</exclusion>
<exclusion>
<groupId>org.antlr</groupId>
<artifactId>antlr4-runtime</artifactId>
</exclusion>
<exclusion>
<groupId>commons-logging</groupId>
<artifactId>commons-logging</artifactId>
</exclusion>
<exclusion>
<groupId>com.google.code.findbugs</groupId>
<artifactId>jsr305</artifactId>
</exclusion>
<exclusion>
<groupId>org.apache.spark</groupId>
<artifactId>*</artifactId>
</exclusion>
</exclusions>
</dependency>

<dependency>
<groupId>org.apache.iceberg</groupId>
Expand Down Expand Up @@ -1220,4 +1260,4 @@ under the License.
</extension>
</extensions>
</build>
</project>
</project>
Loading

0 comments on commit 34dd69d

Please sign in to comment.