Skip to content

Commit

Permalink
[improvement](external)add some improvements for external scan (apach…
Browse files Browse the repository at this point in the history
…e#38946) (apache#43156)

bp apache#38946

Co-authored-by: wuwenchi <[email protected]>
  • Loading branch information
morningman and wuwenchi authored Nov 4, 2024
1 parent e3170d6 commit d1e63c5
Show file tree
Hide file tree
Showing 14 changed files with 230 additions and 21 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -125,7 +125,7 @@ private void initReader() throws IOException {
int[] projected = getProjected();
readBuilder.withProjection(projected);
readBuilder.withFilter(getPredicates());
reader = readBuilder.newRead().createReader(getSplit());
reader = readBuilder.newRead().executeFilter().createReader(getSplit());
paimonDataTypeList =
Arrays.stream(projected).mapToObj(i -> table.rowType().getTypeAt(i)).collect(Collectors.toList());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,8 +45,10 @@ public abstract class ExternalScanNode extends ScanNode {
protected boolean needCheckColumnPriv;

protected final FederationBackendPolicy backendPolicy = (ConnectContext.get() != null
&& ConnectContext.get().getSessionVariable().enableFileCache)
? new FederationBackendPolicy(NodeSelectionStrategy.CONSISTENT_HASHING) : new FederationBackendPolicy();
&& (ConnectContext.get().getSessionVariable().enableFileCache
|| ConnectContext.get().getSessionVariable().getUseConsistentHashForExternalScan()))
? new FederationBackendPolicy(NodeSelectionStrategy.CONSISTENT_HASHING)
: new FederationBackendPolicy();

public ExternalScanNode(PlanNodeId id, TupleDescriptor desc, String planNodeName, StatisticalType statisticalType,
boolean needCheckColumnPriv) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -495,7 +495,7 @@ public void funnel(Backend backend, PrimitiveSink primitiveSink) {
private static class SplitHash implements Funnel<Split> {
@Override
public void funnel(Split split, PrimitiveSink primitiveSink) {
primitiveSink.putBytes(split.getPathString().getBytes(StandardCharsets.UTF_8));
primitiveSink.putBytes(split.getConsistentHashString().getBytes(StandardCharsets.UTF_8));
primitiveSink.putLong(split.getStart());
primitiveSink.putLong(split.getLength());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,9 @@ public class FileSplit implements Split {
// the location type for BE, eg: HDFS, LOCAL, S3
protected TFileType locationType;

public Long selfSplitWeight;
public Long targetSplitSize;

public FileSplit(LocationPath path, long start, long length, long fileLength,
long modificationTime, String[] hosts, List<String> partitionValues) {
this.path = path;
Expand Down Expand Up @@ -89,4 +92,20 @@ public Split create(LocationPath path, long start, long length, long fileLength,
return new FileSplit(path, start, length, fileLength, modificationTime, hosts, partitionValues);
}
}

@Override
public void setTargetSplitSize(Long targetSplitSize) {
this.targetSplitSize = targetSplitSize;
}

@Override
public SplitWeight getSplitWeight() {
if (selfSplitWeight != null && targetSplitSize != null) {
double computedWeight = selfSplitWeight * 1.0 / targetSplitSize;
// Clamp the value be between the minimum weight and 1.0 (standard weight)
return SplitWeight.fromProportion(Math.min(Math.max(computedWeight, 0.01), 1.0));
} else {
return SplitWeight.standard();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -25,32 +25,34 @@
@Data
public class IcebergDeleteFileFilter {
private String deleteFilePath;
private long filesize;

public IcebergDeleteFileFilter(String deleteFilePath) {
public IcebergDeleteFileFilter(String deleteFilePath, long filesize) {
this.deleteFilePath = deleteFilePath;
this.filesize = filesize;
}

public static PositionDelete createPositionDelete(String deleteFilePath, Long positionLowerBound,
Long positionUpperBound) {
return new PositionDelete(deleteFilePath, positionLowerBound, positionUpperBound);
Long positionUpperBound, long filesize) {
return new PositionDelete(deleteFilePath, positionLowerBound, positionUpperBound, filesize);
}

public static EqualityDelete createEqualityDelete(String deleteFilePath, List<Integer> fieldIds) {
public static EqualityDelete createEqualityDelete(String deleteFilePath, List<Integer> fieldIds, long fileSize) {
// todo:
// Schema deleteSchema = TypeUtil.select(scan.schema(), new HashSet<>(fieldIds));
// StructLikeSet deleteSet = StructLikeSet.create(deleteSchema.asStruct());
// pass deleteSet to BE
// compare two StructLike value, if equals, filtered
return new EqualityDelete(deleteFilePath, fieldIds);
return new EqualityDelete(deleteFilePath, fieldIds, fileSize);
}

static class PositionDelete extends IcebergDeleteFileFilter {
private final Long positionLowerBound;
private final Long positionUpperBound;

public PositionDelete(String deleteFilePath, Long positionLowerBound,
Long positionUpperBound) {
super(deleteFilePath);
Long positionUpperBound, long fileSize) {
super(deleteFilePath, fileSize);
this.positionLowerBound = positionLowerBound;
this.positionUpperBound = positionUpperBound;
}
Expand All @@ -67,8 +69,8 @@ public OptionalLong getPositionUpperBound() {
static class EqualityDelete extends IcebergDeleteFileFilter {
private List<Integer> fieldIds;

public EqualityDelete(String deleteFilePath, List<Integer> fieldIds) {
super(deleteFilePath);
public EqualityDelete(String deleteFilePath, List<Integer> fieldIds, long fileSize) {
super(deleteFilePath, fileSize);
this.fieldIds = fieldIds;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -282,7 +282,7 @@ private List<Split> doGetSplits() throws UserException {
}

selectedPartitionNum = partitionPathSet.size();

splits.forEach(s -> s.setTargetSplitSize(fileSplitSize));
return splits;
}

Expand Down Expand Up @@ -315,10 +315,11 @@ private List<IcebergDeleteFileFilter> getDeleteFileFilters(FileScanTask spitTask
.map(m -> m.get(MetadataColumns.DELETE_FILE_POS.fieldId()))
.map(bytes -> Conversions.fromByteBuffer(MetadataColumns.DELETE_FILE_POS.type(), bytes));
filters.add(IcebergDeleteFileFilter.createPositionDelete(delete.path().toString(),
positionLowerBound.orElse(-1L), positionUpperBound.orElse(-1L)));
positionLowerBound.orElse(-1L), positionUpperBound.orElse(-1L),
delete.fileSizeInBytes()));
} else if (delete.content() == FileContent.EQUALITY_DELETES) {
filters.add(IcebergDeleteFileFilter.createEqualityDelete(
delete.path().toString(), delete.equalityFieldIds()));
delete.path().toString(), delete.equalityFieldIds(), delete.fileSizeInBytes()));
} else {
throw new IllegalStateException("Unknown delete content: " + delete.content());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ public IcebergSplit(LocationPath file, long start, long length, long fileLength,
this.formatVersion = formatVersion;
this.config = config;
this.originalPath = originalPath;
this.selfSplitWeight = length;
}

public long getRowCount() {
Expand All @@ -56,4 +57,9 @@ public long getRowCount() {
public void setRowCount(long rowCount) {
this.rowCount = rowCount;
}

public void setDeleteFileFilters(List<IcebergDeleteFileFilter> deleteFileFilters) {
this.deleteFileFilters = deleteFileFilters;
this.selfSplitWeight += deleteFileFilters.stream().mapToLong(IcebergDeleteFileFilter::getFilesize).sum();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@
import org.apache.doris.datasource.paimon.PaimonExternalCatalog;
import org.apache.doris.datasource.paimon.PaimonExternalTable;
import org.apache.doris.planner.PlanNodeId;
import org.apache.doris.qe.ConnectContext;
import org.apache.doris.qe.SessionVariable;
import org.apache.doris.spi.Split;
import org.apache.doris.statistics.StatisticalType;
import org.apache.doris.thrift.TExplainLevel;
Expand Down Expand Up @@ -101,9 +101,14 @@ public String toString() {
private int rawFileSplitNum = 0;
private int paimonSplitNum = 0;
private List<SplitStat> splitStats = new ArrayList<>();
private SessionVariable sessionVariable;

public PaimonScanNode(PlanNodeId id, TupleDescriptor desc, boolean needCheckColumnPriv) {
public PaimonScanNode(PlanNodeId id,
TupleDescriptor desc,
boolean needCheckColumnPriv,
SessionVariable sessionVariable) {
super(id, desc, "PAIMON_SCAN_NODE", StatisticalType.PAIMON_SCAN_NODE, needCheckColumnPriv);
this.sessionVariable = sessionVariable;
}

@Override
Expand Down Expand Up @@ -176,7 +181,9 @@ private void setPaimonParams(TFileRangeDesc rangeDesc, PaimonSplit paimonSplit)

@Override
public List<Split> getSplits() throws UserException {
boolean forceJniScanner = ConnectContext.get().getSessionVariable().isForceJniScanner();
boolean forceJniScanner = sessionVariable.isForceJniScanner();
SessionVariable.IgnoreSplitType ignoreSplitType =
SessionVariable.IgnoreSplitType.valueOf(sessionVariable.getIgnoreSplitType());
List<Split> splits = new ArrayList<>();
int[] projected = desc.getSlots().stream().mapToInt(
slot -> (source.getPaimonTable().rowType().getFieldNames().indexOf(slot.getColumn().getName())))
Expand All @@ -196,7 +203,11 @@ public List<Split> getSplits() throws UserException {
selectedPartitionValues.add(partitionValue);
Optional<List<RawFile>> optRawFiles = dataSplit.convertToRawFiles();
Optional<List<DeletionFile>> optDeletionFiles = dataSplit.deletionFiles();

if (supportNativeReader(optRawFiles)) {
if (ignoreSplitType == SessionVariable.IgnoreSplitType.IGNORE_NATIVE) {
continue;
}
splitStat.setType(SplitReadType.NATIVE);
splitStat.setRawFileConvertable(true);
List<RawFile> rawFiles = optRawFiles.get();
Expand Down Expand Up @@ -252,17 +263,25 @@ public List<Split> getSplits() throws UserException {
}
}
} else {
if (ignoreSplitType == SessionVariable.IgnoreSplitType.IGNORE_JNI) {
continue;
}
splits.add(new PaimonSplit(split));
++paimonSplitNum;
}
} else {
if (ignoreSplitType == SessionVariable.IgnoreSplitType.IGNORE_JNI) {
continue;
}
splits.add(new PaimonSplit(split));
++paimonSplitNum;
}
splitStats.add(splitStat);
}
this.selectedPartitionNum = selectedPartitionValues.size();
// TODO: get total partition number
// We should set fileSplitSize at the end because fileSplitSize may be modified in splitFile.
splits.forEach(s -> s.setTargetSplitSize(fileSplitSize));
return splits;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,30 +23,51 @@
import org.apache.doris.datasource.TableFormatType;

import com.google.common.collect.Maps;
import org.apache.paimon.io.DataFileMeta;
import org.apache.paimon.table.source.DataSplit;
import org.apache.paimon.table.source.DeletionFile;
import org.apache.paimon.table.source.Split;

import java.util.List;
import java.util.Optional;
import java.util.UUID;

public class PaimonSplit extends FileSplit {
private static final LocationPath DUMMY_PATH = new LocationPath("/dummyPath", Maps.newHashMap());
private Split split;
private TableFormatType tableFormatType;
private Optional<DeletionFile> optDeletionFile;


public PaimonSplit(Split split) {
super(DUMMY_PATH, 0, 0, 0, 0, null, null);
this.split = split;
this.tableFormatType = TableFormatType.PAIMON;
this.optDeletionFile = Optional.empty();

if (split instanceof DataSplit) {
List<DataFileMeta> dataFileMetas = ((DataSplit) split).dataFiles();
this.path = new LocationPath("/" + dataFileMetas.get(0).fileName());
this.selfSplitWeight = dataFileMetas.stream().mapToLong(DataFileMeta::fileSize).sum();
} else {
this.selfSplitWeight = split.rowCount();
}
}

private PaimonSplit(LocationPath file, long start, long length, long fileLength, long modificationTime,
String[] hosts, List<String> partitionList) {
super(file, start, length, fileLength, modificationTime, hosts, partitionList);
this.tableFormatType = TableFormatType.PAIMON;
this.optDeletionFile = Optional.empty();
this.selfSplitWeight = length;
}

@Override
public String getConsistentHashString() {
if (this.path == DUMMY_PATH) {
return UUID.randomUUID().toString();
}
return getPathString();
}

public Split getSplit() {
Expand All @@ -66,6 +87,7 @@ public Optional<DeletionFile> getDeletionFile() {
}

public void setDeletionFile(DeletionFile deletionFile) {
this.selfSplitWeight += deletionFile.length();
this.optDeletionFile = Optional.of(deletionFile);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -606,7 +606,8 @@ public PlanFragment visitPhysicalFileScan(PhysicalFileScan fileScan, PlanTransla
} else if (table instanceof IcebergExternalTable) {
scanNode = new IcebergScanNode(context.nextPlanNodeId(), tupleDescriptor, false);
} else if (table instanceof PaimonExternalTable) {
scanNode = new PaimonScanNode(context.nextPlanNodeId(), tupleDescriptor, false);
scanNode = new PaimonScanNode(context.nextPlanNodeId(), tupleDescriptor, false,
ConnectContext.get().getSessionVariable());
} else if (table instanceof MaxComputeExternalTable) {
scanNode = new MaxComputeScanNode(context.nextPlanNodeId(), tupleDescriptor, false);
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1987,7 +1987,8 @@ private PlanNode createScanNode(Analyzer analyzer, TableRef tblRef, SelectStmt s
scanNode = new IcebergScanNode(ctx.getNextNodeId(), tblRef.getDesc(), true);
break;
case PAIMON_EXTERNAL_TABLE:
scanNode = new PaimonScanNode(ctx.getNextNodeId(), tblRef.getDesc(), true);
scanNode = new PaimonScanNode(ctx.getNextNodeId(), tblRef.getDesc(), true,
ConnectContext.get().getSessionVariable());
break;
case MAX_COMPUTE_EXTERNAL_TABLE:
// TODO: support max compute scan node
Expand Down
36 changes: 36 additions & 0 deletions fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java
Original file line number Diff line number Diff line change
Expand Up @@ -920,6 +920,26 @@ public class SessionVariable implements Serializable, Writable {
setter = "setPipelineTaskNum")
public int parallelPipelineTaskNum = 0;


public enum IgnoreSplitType {
NONE,
IGNORE_JNI,
IGNORE_NATIVE
}

public static final String IGNORE_SPLIT_TYPE = "ignore_split_type";
@VariableMgr.VarAttr(name = IGNORE_SPLIT_TYPE,
checker = "checkIgnoreSplitType",
options = {"NONE", "IGNORE_JNI", "IGNORE_NATIVE"},
description = {"忽略指定类型的split", "Ignore splits of the specified type"})
public String ignoreSplitType = IgnoreSplitType.NONE.toString();

public static final String USE_CONSISTENT_HASHING_FOR_EXTERNAL_SCAN = "use_consistent_hash_for_external_scan";
@VariableMgr.VarAttr(name = USE_CONSISTENT_HASHING_FOR_EXTERNAL_SCAN,
description = {"对外表采用一致性hash的方式做split的分发",
"Use consistent hashing to split the appearance for external scan"})
public boolean useConsistentHashForExternalScan = false;

@VariableMgr.VarAttr(name = PROFILE_LEVEL, fuzzy = true)
public int profileLevel = 1;

Expand Down Expand Up @@ -4226,6 +4246,22 @@ public boolean isForceJniScanner() {
return forceJniScanner;
}

public String getIgnoreSplitType() {
return ignoreSplitType;
}

public void checkIgnoreSplitType(String value) {
try {
IgnoreSplitType.valueOf(value);
} catch (Exception e) {
throw new UnsupportedOperationException("We only support `NONE`, `IGNORE_JNI` and `IGNORE_NATIVE`");
}
}

public boolean getUseConsistentHashForExternalScan() {
return useConsistentHashForExternalScan;
}

public void setForceJniScanner(boolean force) {
forceJniScanner = force;
}
Expand Down
5 changes: 5 additions & 0 deletions fe/fe-core/src/main/java/org/apache/doris/spi/Split.java
Original file line number Diff line number Diff line change
Expand Up @@ -48,4 +48,9 @@ default boolean isRemotelyAccessible() {

void setAlternativeHosts(List<String> alternativeHosts);

default String getConsistentHashString() {
return getPathString();
}

void setTargetSplitSize(Long targetSplitSize);
}
Loading

0 comments on commit d1e63c5

Please sign in to comment.