Skip to content

Commit

Permalink
Add DeleteFile in IcebergSplit for execution with Velox
Browse files Browse the repository at this point in the history
  • Loading branch information
agrawalreetika committed Sep 8, 2023
1 parent 97145e3 commit 80d4755
Show file tree
Hide file tree
Showing 3 changed files with 136 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
package com.facebook.presto.iceberg;

import com.facebook.presto.hive.HivePartitionKey;
import com.facebook.presto.iceberg.delete.DeleteFile;
import com.facebook.presto.spi.ConnectorSplit;
import com.facebook.presto.spi.HostAddress;
import com.facebook.presto.spi.NodeProvider;
Expand Down Expand Up @@ -44,7 +45,7 @@ public class IcebergSplit
private final Map<Integer, HivePartitionKey> partitionKeys;
private final NodeSelectionStrategy nodeSelectionStrategy;
private final SplitWeight splitWeight;

private final List<DeleteFile> deletes;
@JsonCreator
public IcebergSplit(
@JsonProperty("path") String path,
Expand All @@ -54,7 +55,8 @@ public IcebergSplit(
@JsonProperty("addresses") List<HostAddress> addresses,
@JsonProperty("partitionKeys") Map<Integer, HivePartitionKey> partitionKeys,
@JsonProperty("nodeSelectionStrategy") NodeSelectionStrategy nodeSelectionStrategy,
@JsonProperty("splitWeight") SplitWeight splitWeight)
@JsonProperty("splitWeight") SplitWeight splitWeight,
@JsonProperty("deletes") List<DeleteFile> deletes)
{
requireNonNull(nodeSelectionStrategy, "nodeSelectionStrategy is null");
this.path = requireNonNull(path, "path is null");
Expand All @@ -65,6 +67,7 @@ public IcebergSplit(
this.partitionKeys = Collections.unmodifiableMap(requireNonNull(partitionKeys, "partitionKeys is null"));
this.nodeSelectionStrategy = nodeSelectionStrategy;
this.splitWeight = requireNonNull(splitWeight, "splitWeight is null");
this.deletes = ImmutableList.copyOf(requireNonNull(deletes, "deletes is null"));
}

@JsonProperty
Expand Down Expand Up @@ -125,6 +128,12 @@ public SplitWeight getSplitWeight()
return splitWeight;
}

@JsonProperty
public List<DeleteFile> getDeletes()
{
return deletes;
}

@Override
public Object getInfo()
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
package com.facebook.presto.iceberg;

import com.facebook.presto.hive.HivePartitionKey;
import com.facebook.presto.iceberg.delete.DeleteFile;
import com.facebook.presto.spi.ConnectorSession;
import com.facebook.presto.spi.ConnectorSplit;
import com.facebook.presto.spi.ConnectorSplitSource;
Expand Down Expand Up @@ -44,6 +45,7 @@

import static com.facebook.presto.iceberg.IcebergSessionProperties.getNodeSelectionStrategy;
import static com.facebook.presto.iceberg.IcebergUtil.getIdentityPartitions;
import static com.google.common.collect.ImmutableList.toImmutableList;
import static com.google.common.collect.Iterators.limit;
import static java.nio.charset.StandardCharsets.UTF_8;
import static java.util.Objects.requireNonNull;
Expand Down Expand Up @@ -122,7 +124,8 @@ private ConnectorSplit toIcebergSplit(FileScanTask task)
ImmutableList.of(),
getPartitionKeys(task),
getNodeSelectionStrategy(session),
SplitWeight.fromProportion(Math.min(Math.max((double) task.length() / tableScan.targetSplitSize(), minimumAssignedSplitWeight), 1.0)));
SplitWeight.fromProportion(Math.min(Math.max((double) task.length() / tableScan.targetSplitSize(), minimumAssignedSplitWeight), 1.0)),
task.deletes().stream().map(DeleteFile::fromIceberg).collect(toImmutableList()));
}

private static Map<Integer, HivePartitionKey> getPartitionKeys(FileScanTask scanTask)
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,121 @@
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.facebook.presto.iceberg.delete;

import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.collect.ImmutableList;
import org.apache.iceberg.FileContent;
import org.apache.iceberg.FileFormat;

import java.nio.ByteBuffer;
import java.util.List;
import java.util.Map;
import java.util.Optional;

import static java.util.Objects.requireNonNull;

public class DeleteFile
{
private final FileContent content;
private final String path;
private final FileFormat format;
private final long recordCount;
private final long fileSizeInBytes;
private final List<Integer> equalityFieldIds;
private final Map<Integer, ByteBuffer> lowerBounds;
private final Map<Integer, ByteBuffer> upperBounds;

public static DeleteFile fromIceberg(org.apache.iceberg.DeleteFile deleteFile)
{
return new DeleteFile(
deleteFile.content(),
deleteFile.path().toString(),
deleteFile.format(),
deleteFile.recordCount(),
deleteFile.fileSizeInBytes(),
Optional.ofNullable(deleteFile.equalityFieldIds()).orElseGet(ImmutableList::of),
deleteFile.lowerBounds(),
deleteFile.upperBounds());
}

@JsonCreator
public DeleteFile(
FileContent content,
String path,
FileFormat format,
long recordCount,
long fileSizeInBytes,
List<Integer> equalityFieldIds,
Map<Integer, ByteBuffer> lowerBounds,
Map<Integer, ByteBuffer> upperBounds)
{
this.content = requireNonNull(content, "content is null");
this.path = requireNonNull(path, "path is null");
this.format = requireNonNull(format, "format is null");
this.recordCount = recordCount;
this.fileSizeInBytes = fileSizeInBytes;
this.equalityFieldIds = ImmutableList.copyOf(requireNonNull(equalityFieldIds, "equalityFieldIds is null"));
this.lowerBounds = requireNonNull(lowerBounds, "lowerBounds is null");
this.upperBounds = requireNonNull(upperBounds, "upperBounds is null");
}

@JsonProperty
public FileContent content()
{
return content;
}

@JsonProperty
public String path()
{
return path;
}

@JsonProperty
public FileFormat format()
{
return format;
}

@JsonProperty
public long recordCount()
{
return recordCount;
}

@JsonProperty
public long fileSizeInBytes()
{
return fileSizeInBytes;
}

@JsonProperty
public List<Integer> equalityFieldIds()
{
return equalityFieldIds;
}

@JsonProperty
public Map<Integer, ByteBuffer> getLowerBounds()
{
return lowerBounds;
}

@JsonProperty
public Map<Integer, ByteBuffer> getUpperBounds()
{
return upperBounds;
}
}

0 comments on commit 80d4755

Please sign in to comment.