Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Core: Add Variant implementation to read serialized objects #11415

Open
wants to merge 12 commits into
base: main
Choose a base branch
from
9 changes: 8 additions & 1 deletion api/src/test/java/org/apache/iceberg/util/RandomUtil.java
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,10 @@ public class RandomUtil {

private RandomUtil() {}

public static String generateString(int length, Random random) {
return randomString(length, random);
}

private static boolean negate(int num) {
return num % 2 == 1;
}
Expand Down Expand Up @@ -200,7 +204,10 @@ public static Object generateDictionaryEncodablePrimitive(
"abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789_-.!?";

private static String randomString(Random random) {
int length = random.nextInt(50);
return randomString(random.nextInt(50), random);
}

private static String randomString(int length, Random random) {
byte[] buffer = new byte[length];

for (int i = 0; i < length; i += 1) {
Expand Down
91 changes: 91 additions & 0 deletions core/src/main/java/org/apache/iceberg/VariantArray.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,91 @@
/*
*
* * Licensed to the Apache Software Foundation (ASF) under one
* * or more contributor license agreements. See the NOTICE file
* * distributed with this work for additional information
* * regarding copyright ownership. The ASF licenses this file
* * to you under the Apache License, Version 2.0 (the
* * "License"); you may not use this file except in compliance
* * with the License. You may obtain a copy of the License at
* *
* * http://www.apache.org/licenses/LICENSE-2.0
* *
* * Unless required by applicable law or agreed to in writing,
* * software distributed under the License is distributed on an
* * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* * KIND, either express or implied. See the License for the
* * specific language governing permissions and limitations
* * under the License.
*
*/

package org.apache.iceberg;

import java.nio.ByteBuffer;
import java.nio.ByteOrder;
import org.apache.iceberg.relocated.com.google.common.annotations.VisibleForTesting;
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;

class VariantArray implements Variants.Array, Variants.Serialized {
private static final int OFFSET_SIZE_MASK = 0b1100;
private static final int OFFSET_SIZE_SHIFT = 2;
private static final int IS_LARGE = 0b10000;

@VisibleForTesting
static VariantArray from(VariantMetadata metadata, byte[] bytes) {
return from(metadata, ByteBuffer.wrap(bytes).order(ByteOrder.LITTLE_ENDIAN), bytes[0]);
}

static VariantArray from(VariantMetadata metadata, ByteBuffer value, int header) {
Preconditions.checkArgument(
value.order() == ByteOrder.LITTLE_ENDIAN, "Unsupported byte order: big endian");
int basicType = header & Variants.BASIC_TYPE_MASK;
Preconditions.checkArgument(
basicType == Variants.BASIC_TYPE_ARRAY, "Invalid array, basic type != 3: " + basicType);
return new VariantArray(metadata, value, header);
}

private final VariantMetadata metadata;
private final ByteBuffer value;
private final int offsetSize;
private final int offsetListOffset;
private final int dataOffset;
private final Variants.Value[] array;

private VariantArray(VariantMetadata metadata, ByteBuffer value, int header) {
this.metadata = metadata;
this.value = value;
this.offsetSize = 1 + ((header & OFFSET_SIZE_MASK) >> OFFSET_SIZE_SHIFT);
int numElementsSize = ((header & IS_LARGE) == IS_LARGE) ? 4 : 1;
int numElements =
VariantUtil.readLittleEndianUnsigned(value, Variants.HEADER_SIZE, numElementsSize);
this.offsetListOffset = Variants.HEADER_SIZE + numElementsSize;
this.dataOffset = offsetListOffset + ((1 + numElements) * offsetSize);
this.array = new Variants.Value[numElements];
}

@VisibleForTesting
int numElements() {
return array.length;
}

@Override
public Variants.Value get(int index) {
if (null == array[index]) {
int offset =
VariantUtil.readLittleEndianUnsigned(
value, offsetListOffset + (offsetSize * index), offsetSize);
int next =
VariantUtil.readLittleEndianUnsigned(
value, offsetListOffset + (offsetSize * (1 + index)), offsetSize);
array[index] =
Variants.from(metadata, VariantUtil.slice(value, dataOffset + offset, next - offset));
}
return array[index];
}

@Override
public ByteBuffer buffer() {
return value;
}
}
117 changes: 117 additions & 0 deletions core/src/main/java/org/apache/iceberg/VariantMetadata.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,117 @@
/*
*
* * Licensed to the Apache Software Foundation (ASF) under one
* * or more contributor license agreements. See the NOTICE file
* * distributed with this work for additional information
* * regarding copyright ownership. The ASF licenses this file
* * to you under the Apache License, Version 2.0 (the
* * "License"); you may not use this file except in compliance
* * with the License. You may obtain a copy of the License at
* *
* * http://www.apache.org/licenses/LICENSE-2.0
* *
* * Unless required by applicable law or agreed to in writing,
* * software distributed under the License is distributed on an
* * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* * KIND, either express or implied. See the License for the
* * specific language governing permissions and limitations
* * under the License.
*
*/

package org.apache.iceberg;

import java.nio.ByteBuffer;
import java.nio.ByteOrder;
import org.apache.iceberg.relocated.com.google.common.annotations.VisibleForTesting;
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;

class VariantMetadata implements Variants.Metadata, Variants.Serialized {
private static final int SUPPORTED_VERSION = 1;
private static final int VERSION_MASK = 0b1111;
private static final int SORTED_STRINGS = 0b10000;
private static final int RESERVED = 0b100000;
private static final int OFFSET_SIZE_MASK = 0b11000000;
private static final int OFFSET_SIZE_SHIFT = 6;

static final ByteBuffer EMPTY_V1_BUFFER =
ByteBuffer.wrap(new byte[] {0x01, 0x00}).order(ByteOrder.LITTLE_ENDIAN);

static VariantMetadata from(byte[] bytes) {
return from(ByteBuffer.wrap(bytes).order(ByteOrder.LITTLE_ENDIAN));
}

static VariantMetadata from(ByteBuffer metadata) {
Preconditions.checkArgument(
metadata.order() == ByteOrder.LITTLE_ENDIAN, "Unsupported byte order: big endian");
int header = VariantUtil.readByte(metadata, 0);
int version = header & VERSION_MASK;
Preconditions.checkArgument(SUPPORTED_VERSION == version, "Unsupported version: %s", version);
return new VariantMetadata(metadata, header);
}

private final ByteBuffer metadata;
private final boolean isSorted;
private final int offsetSize;
private final int offsetListOffset;
private final int dataOffset;
private final String[] dict;

private VariantMetadata(ByteBuffer metadata, int header) {
this.metadata = metadata;
this.isSorted = (header & SORTED_STRINGS) == SORTED_STRINGS;
this.offsetSize = 1 + ((header & OFFSET_SIZE_MASK) >> OFFSET_SIZE_SHIFT);
int dictSize = VariantUtil.readLittleEndianUnsigned(metadata, Variants.HEADER_SIZE, offsetSize);
this.dict = new String[dictSize];
this.offsetListOffset = Variants.HEADER_SIZE + offsetSize;
this.dataOffset = offsetListOffset + ((1 + dictSize) * offsetSize);
}

@VisibleForTesting
int dictionarySize() {
return dict.length;
}

@VisibleForTesting
boolean isSorted() {
return isSorted;
}

/** Returns the position of the string in the metadata, or -1 if the string is not found. */
@Override
public int id(String name) {
if (name != null) {
if (isSorted) {
return VariantUtil.find(dict.length, name, this::get);
} else {
for (int id = 0; id < dict.length; id += 1) {
if (name.equals(get(id))) {
return id;
}
}
}
}

return -1;
}

/** Returns the string for the given dictionary id. */
@Override
public String get(int index) {
if (null == dict[index]) {
int offset =
VariantUtil.readLittleEndianUnsigned(
metadata, offsetListOffset + (offsetSize * index), offsetSize);
int next =
VariantUtil.readLittleEndianUnsigned(
metadata, offsetListOffset + (offsetSize * (1 + index)), offsetSize);
dict[index] = VariantUtil.readString(metadata, dataOffset + offset, next - offset);
}
return dict[index];
}

@Override
public ByteBuffer buffer() {
return metadata;
}
}
114 changes: 114 additions & 0 deletions core/src/main/java/org/apache/iceberg/VariantObject.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,114 @@
/*
*
* * Licensed to the Apache Software Foundation (ASF) under one
* * or more contributor license agreements. See the NOTICE file
* * distributed with this work for additional information
* * regarding copyright ownership. The ASF licenses this file
* * to you under the Apache License, Version 2.0 (the
* * "License"); you may not use this file except in compliance
* * with the License. You may obtain a copy of the License at
* *
* * http://www.apache.org/licenses/LICENSE-2.0
* *
* * Unless required by applicable law or agreed to in writing,
* * software distributed under the License is distributed on an
* * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* * KIND, either express or implied. See the License for the
* * specific language governing permissions and limitations
* * under the License.
*
*/

package org.apache.iceberg;

import java.nio.ByteBuffer;
import java.nio.ByteOrder;
import org.apache.iceberg.relocated.com.google.common.annotations.VisibleForTesting;
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;

class VariantObject implements Variants.Object, Variants.Serialized {
private static final int OFFSET_SIZE_MASK = 0b1100;
private static final int OFFSET_SIZE_SHIFT = 2;
private static final int FIELD_ID_SIZE_MASK = 0b110000;
private static final int FIELD_ID_SIZE_SHIFT = 4;
private static final int IS_LARGE = 0b1000000;

static VariantObject from(VariantMetadata metadata, byte[] bytes) {
return from(metadata, ByteBuffer.wrap(bytes).order(ByteOrder.LITTLE_ENDIAN), bytes[0]);
}

static VariantObject from(VariantMetadata metadata, ByteBuffer value, int header) {
Preconditions.checkArgument(
value.order() == ByteOrder.LITTLE_ENDIAN, "Unsupported byte order: big endian");
int basicType = header & Variants.BASIC_TYPE_MASK;
Preconditions.checkArgument(
basicType == Variants.BASIC_TYPE_OBJECT, "Invalid object, basic type != 2: " + basicType);
return new VariantObject(metadata, value, header);
}

private final VariantMetadata metadata;
private final ByteBuffer value;
private final int fieldIdSize;
private final int fieldIdListOffset;
private final int[] fieldIds;
private final int offsetSize;
private final int offsetListOffset;
private final int dataOffset;
private final Variants.Value[] values;

private VariantObject(VariantMetadata metadata, ByteBuffer value, int header) {
this.metadata = metadata;
this.value = value;
this.offsetSize = 1 + ((header & OFFSET_SIZE_MASK) >> OFFSET_SIZE_SHIFT);
this.fieldIdSize = 1 + ((header & FIELD_ID_SIZE_MASK) >> FIELD_ID_SIZE_SHIFT);
int numElementsSize = ((header & IS_LARGE) == IS_LARGE) ? 4 : 1;
int numElements =
VariantUtil.readLittleEndianUnsigned(value, Variants.HEADER_SIZE, numElementsSize);
this.fieldIdListOffset = Variants.HEADER_SIZE + numElementsSize;
this.fieldIds = new int[numElements];
this.offsetListOffset = fieldIdListOffset + (numElements * fieldIdSize);
this.dataOffset = offsetListOffset + ((1 + numElements) * offsetSize);
this.values = new Variants.Value[numElements];
}

@VisibleForTesting
int numElements() {
return fieldIds.length;
}

// keys are ordered lexicographically by the name
@Override
public Variants.Value get(String name) {
int index =
VariantUtil.find(
fieldIds.length,
name,
pos -> {
int id =
VariantUtil.readLittleEndianUnsigned(
value, fieldIdListOffset + (pos * fieldIdSize), fieldIdSize);
return metadata.get(id);
});

if (index < 0) {
return null;
}

if (null == values[index]) {
int offset =
VariantUtil.readLittleEndianUnsigned(
value, offsetListOffset + (index * offsetSize), offsetSize);
int next =
VariantUtil.readLittleEndianUnsigned(
value, offsetListOffset + ((1 + index) * offsetSize), offsetSize);
values[index] = Variants.from(metadata, VariantUtil.slice(value, dataOffset + offset, next - offset));
}

return values[index];
}

@Override
public ByteBuffer buffer() {
return value;
}
}
Loading
Loading