Skip to content

Commit

Permalink
[Kernel] Remove the ColumnVector::getStruct API (delta-io#2131)
Browse files Browse the repository at this point in the history
## Description

Removes the `getStruct` API from `ColumnVector`. We will use a wrapper to convert to rows only for the ColumnarBatch/FilteredColumnarBatch row-based processing APIs.

## How was this patch tested?

Existing tests should suffice.
  • Loading branch information
allisonport-db authored and xupefei committed Oct 31, 2023
1 parent 75d609e commit a421ec5
Show file tree
Hide file tree
Showing 24 changed files with 450 additions and 321 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -167,17 +167,6 @@ default MapValue getMap(int rowId) {
throw new UnsupportedOperationException("Invalid value request for data type");
}

/**
* Return the row value located at {@code rowId}. The return value is undefined and can be
* anything, if the slot for {@code rowId} is null.
*
* @param rowId
* @return
*/
default Row getStruct(int rowId) {
throw new UnsupportedOperationException("Invalid value request for data type");
}

/**
* Return the array value located at {@code rowId}. Returns null if the slot for {@code rowId}
* is null
Expand All @@ -191,9 +180,9 @@ default ArrayValue getArray(int rowId) {
* {@code struct} type columns.
*
* @param ordinal Ordinal of the child vector to return.
* @return
*/
default ColumnVector getChild(int ordinal) {
throw new UnsupportedOperationException("Child vectors are not available.");
throw new UnsupportedOperationException(
"Child vectors are not available for vector of type " + getDataType());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,16 +22,16 @@
import java.util.stream.IntStream;
import static java.util.stream.Collectors.toMap;

import io.delta.kernel.data.ColumnVector;
import io.delta.kernel.data.Row;
import io.delta.kernel.types.IntegerType;
import io.delta.kernel.types.LongType;
import io.delta.kernel.types.StringType;
import io.delta.kernel.types.StructType;
import static io.delta.kernel.utils.Utils.requireNonNull;

import io.delta.kernel.internal.deletionvectors.Base85Codec;
import io.delta.kernel.internal.fs.Path;
import static io.delta.kernel.internal.util.InternalUtils.checkArgument;
import static io.delta.kernel.internal.util.InternalUtils.requireNonNull;

/**
* Information about a deletion vector attached to a file action.
Expand All @@ -57,6 +57,25 @@ public static DeletionVectorDescriptor fromRow(Row row) {
sizeInBytes, cardinality);
}

public static DeletionVectorDescriptor fromColumnVector(ColumnVector vector, int rowId) {
if (vector.isNullAt(rowId)) {
return null;
}

final String storageType = requireNonNull(vector.getChild(0), rowId, "storageType")
.getString(rowId);
final String pathOrInlineDv = requireNonNull(vector.getChild(1), rowId, "pathOrInlineDv")
.getString(rowId);
final Optional<Integer> offset = Optional.ofNullable(
vector.getChild(2).isNullAt(rowId) ? null : vector.getChild(2).getInt(rowId));
final int sizeInBytes = requireNonNull(vector.getChild(3), rowId, "sizeInBytes")
.getInt(rowId);
final long cardinality = requireNonNull(vector.getChild(4), rowId, "cardinality")
.getLong(rowId);
return new DeletionVectorDescriptor(storageType, pathOrInlineDv, offset,
sizeInBytes, cardinality);
}

// Markers to separate different kinds of DV storage.
public static final String PATH_DV_MARKER = "p";
public static final String INLINE_DV_MARKER = "i";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,20 +15,27 @@
*/
package io.delta.kernel.internal.actions;

import io.delta.kernel.data.Row;
import java.util.Collections;
import java.util.Map;

import io.delta.kernel.data.ColumnVector;
import io.delta.kernel.types.MapType;
import io.delta.kernel.types.StringType;
import io.delta.kernel.types.StructType;
import static io.delta.kernel.utils.Utils.requireNonNull;
import io.delta.kernel.utils.VectorUtils;
import static io.delta.kernel.internal.util.InternalUtils.requireNonNull;

public class Format {
public static Format fromRow(Row row) {
if (row == null) {

public static Format fromColumnVector(ColumnVector vector, int rowId) {
if (vector.isNullAt(rowId)) {
return null;
}

final String provider = requireNonNull(row, 0, "provider").getString(0);
return new Format(provider);
final String provider = requireNonNull(vector.getChild(0), rowId, "provider")
.getString(rowId);
final Map<String, String> options = vector.getChild(1).isNullAt(rowId) ?
Collections.emptyMap() : VectorUtils.toJavaMap(vector.getChild(1).getMap(rowId));
return new Format(provider, options);
}

public static final StructType READ_SCHEMA = new StructType()
Expand All @@ -39,8 +46,18 @@ public static Format fromRow(Row row) {
);

private final String provider;
private final Map<String, String> options;

public Format(String provider) {
public Format(String provider, Map<String, String> options) {
this.provider = provider;
this.options = options;
}

public String getProvider() {
return provider;
}

public Map<String, String> getOptions() {
return Collections.unmodifiableMap(options);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,38 +22,43 @@

import io.delta.kernel.client.TableClient;
import io.delta.kernel.data.ArrayValue;
import io.delta.kernel.data.ColumnVector;
import io.delta.kernel.data.MapValue;
import io.delta.kernel.data.Row;
import io.delta.kernel.types.ArrayType;
import io.delta.kernel.types.LongType;
import io.delta.kernel.types.MapType;
import io.delta.kernel.types.StringType;
import io.delta.kernel.types.StructType;
import io.delta.kernel.utils.VectorUtils;
import static io.delta.kernel.utils.Utils.requireNonNull;

import io.delta.kernel.internal.lang.Lazy;
import io.delta.kernel.internal.types.TableSchemaSerDe;
import static io.delta.kernel.internal.util.InternalUtils.requireNonNull;

public class Metadata {
public static Metadata fromRow(Row row, TableClient tableClient) {
if (row == null) {

public static Metadata fromColumnVector(
ColumnVector vector, int rowId, TableClient tableClient) {
if (vector.isNullAt(rowId)) {
return null;
}

final String schemaJson = requireNonNull(row, 4, "schemaString").getString(4);
final String schemaJson = requireNonNull(vector.getChild(4), rowId, "schemaString")
.getString(rowId);
StructType schema = TableSchemaSerDe.fromJson(tableClient.getJsonHandler(), schemaJson);

return new Metadata(
requireNonNull(row, 0, "id").getString(0),
Optional.ofNullable(row.isNullAt(1) ? null : row.getString(1)),
Optional.ofNullable(row.isNullAt(2) ? null : row.getString(2)),
Format.fromRow(requireNonNull(row, 0, "id").getStruct(3)),
requireNonNull(vector.getChild(0), rowId, "id").getString(rowId),
Optional.ofNullable(vector.getChild(1).isNullAt(rowId) ? null :
vector.getChild(1).getString(rowId)),
Optional.ofNullable(vector.getChild(2).isNullAt(rowId) ? null :
vector.getChild(2).getString(rowId)),
Format.fromColumnVector(requireNonNull(vector.getChild(3), rowId, "format"), rowId),
schemaJson,
schema,
row.getArray(5),
Optional.ofNullable(row.isNullAt(6) ? null : row.getLong(6)),
row.getMap(7)
vector.getChild(5).getArray(rowId),
Optional.ofNullable(vector.getChild(6).isNullAt(rowId) ? null :
vector.getChild(6).getLong(rowId)),
vector.getChild(7).getMap(rowId)
);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,25 +18,28 @@
import java.util.Collections;
import java.util.List;

import io.delta.kernel.data.Row;
import io.delta.kernel.data.ColumnVector;
import io.delta.kernel.types.ArrayType;
import io.delta.kernel.types.IntegerType;
import io.delta.kernel.types.StringType;
import io.delta.kernel.types.StructType;
import io.delta.kernel.utils.VectorUtils;

public class Protocol {
public static Protocol fromRow(Row row) {
if (row == null) {

public static Protocol fromColumnVector(ColumnVector vector, int rowId) {
if (vector.isNullAt(rowId)) {
return null;
}

return new Protocol(
row.getInt(0),
row.getInt(1),
row.isNullAt(2) ? Collections.emptyList() :
VectorUtils.toJavaList(row.getArray(2)),
row.isNullAt(3) ? Collections.emptyList() :
VectorUtils.toJavaList(row.getArray(3)));
vector.getChild(0).getInt(rowId),
vector.getChild(1).getInt(rowId),
vector.getChild(2).isNullAt(rowId) ? Collections.emptyList() :
VectorUtils.toJavaList(vector.getChild(2).getArray(rowId)),
vector.getChild(3).isNullAt(rowId) ? Collections.emptyList() :
VectorUtils.toJavaList(vector.getChild(3).getArray(rowId))
);
}

public static final StructType READ_SCHEMA = new StructType()
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,115 @@
/*
* Copyright (2023) The Delta Lake Project Authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.delta.kernel.internal.data;

import java.math.BigDecimal;

import io.delta.kernel.data.ArrayValue;
import io.delta.kernel.data.ColumnVector;
import io.delta.kernel.data.MapValue;
import io.delta.kernel.data.Row;
import io.delta.kernel.types.StructType;

/**
* A {@link Row} implementation that wraps a set of child vectors for a specific {@code rowId}.
*/
public abstract class ChildVectorBasedRow implements Row {

private final int rowId;
private final StructType schema;

public ChildVectorBasedRow(int rowId, StructType schema) {
this.rowId = rowId;
this.schema = schema;
}

@Override
public StructType getSchema() {
return schema;
}

@Override
public boolean isNullAt(int ordinal) {
return getChild(ordinal).isNullAt(rowId);
}

@Override
public boolean getBoolean(int ordinal) {
return getChild(ordinal).getBoolean(rowId);
}

@Override
public byte getByte(int ordinal) {
return getChild(ordinal).getByte(rowId);
}

@Override
public short getShort(int ordinal) {
return getChild(ordinal).getShort(rowId);
}

@Override
public int getInt(int ordinal) {
return getChild(ordinal).getInt(rowId);
}

@Override
public long getLong(int ordinal) {
return getChild(ordinal).getLong(rowId);
}

@Override
public float getFloat(int ordinal) {
return getChild(ordinal).getFloat(rowId);
}

@Override
public double getDouble(int ordinal) {
return getChild(ordinal).getDouble(rowId);
}

@Override
public String getString(int ordinal) {
return getChild(ordinal).getString(rowId);
}

@Override
public BigDecimal getDecimal(int ordinal) {
return getChild(ordinal).getDecimal(rowId);
}

@Override
public byte[] getBinary(int ordinal) {
return getChild(ordinal).getBinary(rowId);
}

@Override
public Row getStruct(int ordinal) {
return StructRow.fromStructVector(getChild(ordinal), rowId);
}

@Override
public ArrayValue getArray(int ordinal) {
return getChild(ordinal).getArray(rowId);
}

@Override
public MapValue getMap(int ordinal) {
return getChild(ordinal).getMap(rowId);
}

protected abstract ColumnVector getChild(int ordinal);
}
Loading

0 comments on commit a421ec5

Please sign in to comment.