Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Core: Add Variant implementation to read serialized objects #11415

Open
wants to merge 12 commits into
base: main
Choose a base branch
from
17 changes: 17 additions & 0 deletions api/src/main/java/org/apache/iceberg/io/CloseableIterable.java
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,23 @@

public interface CloseableIterable<T> extends Iterable<T>, Closeable {

/**
* Adapts an Iterable to CloseableIterable using a no-op close if it is not Closeable.
*
* @param iterable an Iterable
* @return a CloseableIterable that closes Iterable if it is Closeable
*/
static <E> CloseableIterable<E> of(Iterable<E> iterable) {
if (iterable instanceof CloseableIterable) {
return (CloseableIterable<E>) iterable;
} else if (iterable instanceof Closeable) {
Closeable asCloseable = (Closeable) iterable;
return combine(iterable, asCloseable);
} else {
return withNoopClose(iterable);
}
}

/**
* Returns a closeable iterator over elements of type {@code T}.
*
Expand Down
9 changes: 8 additions & 1 deletion api/src/test/java/org/apache/iceberg/util/RandomUtil.java
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,10 @@ public class RandomUtil {

private RandomUtil() {}

public static String generateString(int length, Random random) {
return randomString(length, random);
}

private static boolean negate(int num) {
return num % 2 == 1;
}
Expand Down Expand Up @@ -200,7 +204,10 @@ public static Object generateDictionaryEncodablePrimitive(
"abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789_-.!?";

private static String randomString(Random random) {
int length = random.nextInt(50);
return randomString(random.nextInt(50), random);
}

private static String randomString(int length, Random random) {
byte[] buffer = new byte[length];

for (int i = 0; i < length; i += 1) {
Expand Down
13 changes: 13 additions & 0 deletions core/src/main/java/org/apache/iceberg/util/SortedMerge.java
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import java.io.Closeable;
import java.io.IOException;
import java.io.UncheckedIOException;
import java.util.Arrays;
import java.util.Comparator;
import java.util.Iterator;
import java.util.List;
Expand All @@ -30,6 +31,7 @@
import org.apache.iceberg.io.CloseableGroup;
import org.apache.iceberg.io.CloseableIterable;
import org.apache.iceberg.io.CloseableIterator;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;

/**
* An Iterable that merges the items from other Iterables in order.
Expand All @@ -39,6 +41,17 @@
* @param <T> the type of objects produced by this Iterable
*/
public class SortedMerge<T> extends CloseableGroup implements CloseableIterable<T> {
public static <C extends Comparable<C>> CloseableIterable<C> of(
Iterable<C> left, Iterable<C> right) {
return of(Arrays.asList(left, right));
}

public static <C extends Comparable<C>> CloseableIterable<C> of(List<Iterable<C>> iterables) {
List<CloseableIterable<C>> closeableIterables =
Lists.transform(iterables, CloseableIterable::of);
return new SortedMerge<>(Comparator.naturalOrder(), closeableIterables);
}

private final Comparator<T> comparator;
private final List<CloseableIterable<T>> iterables;

Expand Down
206 changes: 206 additions & 0 deletions core/src/main/java/org/apache/iceberg/variants/PrimitiveWrapper.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,206 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.iceberg.variants;

import java.math.BigDecimal;
import java.nio.ByteBuffer;
import java.nio.ByteOrder;
import java.nio.charset.StandardCharsets;
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
import org.apache.iceberg.variants.Variants.Primitives;

class PrimitiveWrapper<T> implements VariantPrimitive<T> {
private static final byte NULL_HEADER = VariantUtil.primitiveHeader(Primitives.TYPE_NULL);
private static final byte TRUE_HEADER = VariantUtil.primitiveHeader(Primitives.TYPE_TRUE);
private static final byte FALSE_HEADER = VariantUtil.primitiveHeader(Primitives.TYPE_FALSE);
private static final byte INT8_HEADER = VariantUtil.primitiveHeader(Primitives.TYPE_INT8);
private static final byte INT16_HEADER = VariantUtil.primitiveHeader(Primitives.TYPE_INT16);
private static final byte INT32_HEADER = VariantUtil.primitiveHeader(Primitives.TYPE_INT32);
private static final byte INT64_HEADER = VariantUtil.primitiveHeader(Primitives.TYPE_INT64);
private static final byte FLOAT_HEADER = VariantUtil.primitiveHeader(Primitives.TYPE_FLOAT);
private static final byte DOUBLE_HEADER = VariantUtil.primitiveHeader(Primitives.TYPE_DOUBLE);
private static final byte DATE_HEADER = VariantUtil.primitiveHeader(Primitives.TYPE_DATE);
private static final byte TIMESTAMPTZ_HEADER =
VariantUtil.primitiveHeader(Primitives.TYPE_TIMESTAMPTZ);
private static final byte TIMESTAMPNTZ_HEADER =
VariantUtil.primitiveHeader(Primitives.TYPE_TIMESTAMPNTZ);
private static final byte DECIMAL4_HEADER = VariantUtil.primitiveHeader(Primitives.TYPE_DECIMAL4);
private static final byte DECIMAL8_HEADER = VariantUtil.primitiveHeader(Primitives.TYPE_DECIMAL8);
private static final byte DECIMAL16_HEADER =
VariantUtil.primitiveHeader(Primitives.TYPE_DECIMAL16);
private static final byte BINARY_HEADER = VariantUtil.primitiveHeader(Primitives.TYPE_BINARY);
private static final byte STRING_HEADER = VariantUtil.primitiveHeader(Primitives.TYPE_STRING);

private final Variants.PhysicalType type;
private final T value;
private ByteBuffer buffer = null;

PrimitiveWrapper(Variants.PhysicalType type, T value) {
this.type = type;
this.value = value;
}

@Override
public Variants.PhysicalType type() {
return type;
}

@Override
public T get() {
return value;
}

@Override
public int sizeInBytes() {
switch (type()) {
case NULL:
case BOOLEAN_TRUE:
case BOOLEAN_FALSE:
return 1; // 1 header only
case INT8:
return 2; // 1 header + 1 value
case INT16:
return 3; // 1 header + 2 value
case INT32:
case DATE:
case FLOAT:
return 5; // 1 header + 4 value
case INT64:
case DOUBLE:
case TIMESTAMPTZ:
case TIMESTAMPNTZ:
return 9; // 1 header + 8 value
case DECIMAL4:
return 6; // 1 header + 1 scale + 4 unscaled value
case DECIMAL8:
return 10; // 1 header + 1 scale + 8 unscaled value
case DECIMAL16:
return 18; // 1 header + 1 scale + 16 unscaled value
case BINARY:
return 5 + ((ByteBuffer) value).remaining(); // 1 header + 4 length + value length
Comment on lines +95 to +96
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not very sure about remaining method but as I read the doc, would it be inaccurate if part of the buffer is read?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This uses the ByteBuffer to pass a section of the backing array. If the position or limit of that buffer were changed, it would change the value. You're right that reading part of the buffer could potentially change the position and have that effect, but this is still a valid use of ByteBuffer. We use the position and limit to track a section of the storage, not for state.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's say we have a PrimitiveWrapper of a binary b = new PrimitiveWrapper( binary, 0x1234), would we always expect b.sizeInBytes() always to be 5 + 2 and we shouldn't change that for this method contract?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure I follow what you're suggesting.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Here is what I mean: if I wrap an integer in TRUE value in PrimitiveWrapper, then the size is always 1 (that seems to make sense even if the caller reads the value through PrimitiveWrapper.get()) . While if I wrap a binary, then the size would change if the value is read through PrimitiveWrapper.get().

I may not get "We use the position and limit to track a section of the storage, not for state.". I assume that PrimitiveWrapper accepts a value, then the size would never change?

Copy link
Contributor

@aihuaxu aihuaxu Dec 16, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Here is what I mean with the following test:

primitive input: Variants.of(ByteBuffer.wrap(new byte[] {0x0a, 0x0b, 0x0c, 0x0d}))

  @ParameterizedTest
  @FieldSource("PRIMITIVES")
  public void testPrimitiveValueSerialization(VariantPrimitive<?> primitive) {
       ((ByteBuffer)primitive.get()).get();   -- add this line then 

    // write the value to the middle of a large buffer
    int size = primitive.sizeInBytes();
    ByteBuffer buffer = ByteBuffer.allocate(size + 1000).order(ByteOrder.LITTLE_ENDIAN);
    primitive.writeTo(buffer, 300);
...
}

We would still expect to write the whole ByteBuffer of 9 bytes while now we are writing 8 bytes. That is not expected, right?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@aihuaxu, if you modify the buffer then it will change the value. What you're suggesting is equivalent to other modifications, like this:

byte[] arr = new byte[] { 0x0A, 0x0B, 0x0C, 0x0D };
VariantPrimitive<ByteBuffer> primitive = Variants.of(ByteBuffer.wrap(arr));
// modify the primitive value
arr[0] = 0;

...

If you modify the value then it is going to be different. We could duplicate the incoming buffer to avoid the case where position is modified, but not the case where the backing byte array is modified. That would be okay, but I don't see a lot of value in worrying about that case. If you pass a buffer into these classes, you should not later modify that buffer.

case STRING:
if (null == buffer) {
this.buffer = ByteBuffer.wrap(((String) value).getBytes(StandardCharsets.UTF_8));
}

return 5 + buffer.remaining(); // 1 header + 4 length + value length
}

throw new UnsupportedOperationException("Unsupported primitive type: " + type());
}

@Override
public int writeTo(ByteBuffer outBuffer, int offset) {
Preconditions.checkArgument(
outBuffer.order() == ByteOrder.LITTLE_ENDIAN, "Invalid byte order: big endian");
switch (type()) {
case NULL:
outBuffer.put(offset, NULL_HEADER);
return 1;
case BOOLEAN_TRUE:
outBuffer.put(offset, TRUE_HEADER);
return 1;
case BOOLEAN_FALSE:
outBuffer.put(offset, FALSE_HEADER);
return 1;
case INT8:
outBuffer.put(offset, INT8_HEADER);
outBuffer.put(offset + 1, (Byte) value);
return 2;
case INT16:
outBuffer.put(offset, INT16_HEADER);
outBuffer.putShort(offset + 1, (Short) value);
return 3;
case INT32:
outBuffer.put(offset, INT32_HEADER);
outBuffer.putInt(offset + 1, (Integer) value);
return 5;
case INT64:
outBuffer.put(offset, INT64_HEADER);
outBuffer.putLong(offset + 1, (Long) value);
return 9;
case FLOAT:
outBuffer.put(offset, FLOAT_HEADER);
outBuffer.putFloat(offset + 1, (Float) value);
return 5;
case DOUBLE:
outBuffer.put(offset, DOUBLE_HEADER);
outBuffer.putDouble(offset + 1, (Double) value);
return 9;
case DATE:
outBuffer.put(offset, DATE_HEADER);
outBuffer.putInt(offset + 1, (Integer) value);
return 5;
case TIMESTAMPTZ:
outBuffer.put(offset, TIMESTAMPTZ_HEADER);
outBuffer.putLong(offset + 1, (Long) value);
return 9;
case TIMESTAMPNTZ:
outBuffer.put(offset, TIMESTAMPNTZ_HEADER);
outBuffer.putLong(offset + 1, (Long) value);
return 9;
case DECIMAL4:
BigDecimal decimal4 = (BigDecimal) value;
outBuffer.put(offset, DECIMAL4_HEADER);
outBuffer.put(offset + 1, (byte) decimal4.scale());
outBuffer.putInt(offset + 2, decimal4.unscaledValue().intValueExact());
return 6;
case DECIMAL8:
BigDecimal decimal8 = (BigDecimal) value;
outBuffer.put(offset, DECIMAL8_HEADER);
outBuffer.put(offset + 1, (byte) decimal8.scale());
outBuffer.putLong(offset + 2, decimal8.unscaledValue().longValueExact());
return 10;
case DECIMAL16:
BigDecimal decimal16 = (BigDecimal) value;
byte padding = (byte) (decimal16.signum() < 0 ? 0xFF : 0x00);
byte[] bytes = decimal16.unscaledValue().toByteArray();
outBuffer.put(offset, DECIMAL16_HEADER);
outBuffer.put(offset + 1, (byte) decimal16.scale());
for (int i = 0; i < 16; i += 1) {
if (i < bytes.length) {
// copy the big endian value and convert to little endian
outBuffer.put(offset + 2 + i, bytes[bytes.length - i - 1]);
} else {
// pad with 0x00 or 0xFF depending on the sign
outBuffer.put(offset + 2 + i, padding);
}
}
return 18;
case BINARY:
ByteBuffer binary = (ByteBuffer) value;
outBuffer.put(offset, BINARY_HEADER);
outBuffer.putInt(offset + 1, binary.remaining());
VariantUtil.writeBufferAbsolute(outBuffer, offset + 5, binary);
return 5 + binary.remaining();
case STRING:
// TODO: use short string when possible
if (null == buffer) {
this.buffer = ByteBuffer.wrap(((String) value).getBytes(StandardCharsets.UTF_8));
}

outBuffer.put(offset, STRING_HEADER);
outBuffer.putInt(offset + 1, buffer.remaining());
VariantUtil.writeBufferAbsolute(outBuffer, offset + 5, buffer);
return 5 + buffer.remaining();
}

throw new UnsupportedOperationException("Unsupported primitive type: " + type());
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,88 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.iceberg.variants;

import java.nio.ByteBuffer;
import java.nio.ByteOrder;
import org.apache.iceberg.relocated.com.google.common.annotations.VisibleForTesting;
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;

class SerializedArray extends Variants.SerializedValue implements VariantArray {
private static final int OFFSET_SIZE_MASK = 0b1100;
private static final int OFFSET_SIZE_SHIFT = 2;
private static final int IS_LARGE = 0b10000;

@VisibleForTesting
static SerializedArray from(SerializedMetadata metadata, byte[] bytes) {
return from(metadata, ByteBuffer.wrap(bytes).order(ByteOrder.LITTLE_ENDIAN), bytes[0]);
}

static SerializedArray from(SerializedMetadata metadata, ByteBuffer value, int header) {
Preconditions.checkArgument(
value.order() == ByteOrder.LITTLE_ENDIAN, "Unsupported byte order: big endian");
Variants.BasicType basicType = VariantUtil.basicType(header);
Preconditions.checkArgument(
basicType == Variants.BasicType.ARRAY, "Invalid array, basic type: " + basicType);
return new SerializedArray(metadata, value, header);
}

private final SerializedMetadata metadata;
private final ByteBuffer value;
private final int offsetSize;
private final int offsetListOffset;
private final int dataOffset;
private final VariantValue[] array;

private SerializedArray(SerializedMetadata metadata, ByteBuffer value, int header) {
this.metadata = metadata;
this.value = value;
this.offsetSize = 1 + ((header & OFFSET_SIZE_MASK) >> OFFSET_SIZE_SHIFT);
int numElementsSize = ((header & IS_LARGE) == IS_LARGE) ? 4 : 1;
int numElements =
VariantUtil.readLittleEndianUnsigned(value, Variants.HEADER_SIZE, numElementsSize);
this.offsetListOffset = Variants.HEADER_SIZE + numElementsSize;
this.dataOffset = offsetListOffset + ((1 + numElements) * offsetSize);
this.array = new VariantValue[numElements];
}

@VisibleForTesting
int numElements() {
return array.length;
}

@Override
public VariantValue get(int index) {
if (null == array[index]) {
int offset =
VariantUtil.readLittleEndianUnsigned(
value, offsetListOffset + (offsetSize * index), offsetSize);
int next =
VariantUtil.readLittleEndianUnsigned(
value, offsetListOffset + (offsetSize * (1 + index)), offsetSize);
array[index] =
Variants.from(metadata, VariantUtil.slice(value, dataOffset + offset, next - offset));
}
return array[index];
}

@Override
public ByteBuffer buffer() {
return value;
}
}
Loading