diff --git a/api/src/main/java/org/apache/iceberg/io/CloseableIterable.java b/api/src/main/java/org/apache/iceberg/io/CloseableIterable.java index 06323612a178..34c561bc373d 100644 --- a/api/src/main/java/org/apache/iceberg/io/CloseableIterable.java +++ b/api/src/main/java/org/apache/iceberg/io/CloseableIterable.java @@ -32,6 +32,23 @@ public interface CloseableIterable extends Iterable, Closeable { + /** + * Adapts an Iterable to CloseableIterable using a no-op close if it is not Closeable. + * + * @param iterable an Iterable + * @return a CloseableIterable that closes Iterable if it is Closeable + */ + static CloseableIterable of(Iterable iterable) { + if (iterable instanceof CloseableIterable) { + return (CloseableIterable) iterable; + } else if (iterable instanceof Closeable) { + Closeable asCloseable = (Closeable) iterable; + return combine(iterable, asCloseable); + } else { + return withNoopClose(iterable); + } + } + /** * Returns a closeable iterator over elements of type {@code T}. * diff --git a/api/src/test/java/org/apache/iceberg/util/RandomUtil.java b/api/src/test/java/org/apache/iceberg/util/RandomUtil.java index 9131e6166133..bf281835a237 100644 --- a/api/src/test/java/org/apache/iceberg/util/RandomUtil.java +++ b/api/src/test/java/org/apache/iceberg/util/RandomUtil.java @@ -29,6 +29,10 @@ public class RandomUtil { private RandomUtil() {} + public static String generateString(int length, Random random) { + return randomString(length, random); + } + private static boolean negate(int num) { return num % 2 == 1; } @@ -200,7 +204,10 @@ public static Object generateDictionaryEncodablePrimitive( "abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789_-.!?"; private static String randomString(Random random) { - int length = random.nextInt(50); + return randomString(random.nextInt(50), random); + } + + private static String randomString(int length, Random random) { byte[] buffer = new byte[length]; for (int i = 0; i < length; i += 1) { diff --git a/core/src/main/java/org/apache/iceberg/util/SortedMerge.java b/core/src/main/java/org/apache/iceberg/util/SortedMerge.java index d93116852eb9..62bc89bae96f 100644 --- a/core/src/main/java/org/apache/iceberg/util/SortedMerge.java +++ b/core/src/main/java/org/apache/iceberg/util/SortedMerge.java @@ -21,6 +21,7 @@ import java.io.Closeable; import java.io.IOException; import java.io.UncheckedIOException; +import java.util.Arrays; import java.util.Comparator; import java.util.Iterator; import java.util.List; @@ -30,6 +31,7 @@ import org.apache.iceberg.io.CloseableGroup; import org.apache.iceberg.io.CloseableIterable; import org.apache.iceberg.io.CloseableIterator; +import org.apache.iceberg.relocated.com.google.common.collect.Lists; /** * An Iterable that merges the items from other Iterables in order. @@ -39,6 +41,17 @@ * @param the type of objects produced by this Iterable */ public class SortedMerge extends CloseableGroup implements CloseableIterable { + public static > CloseableIterable of( + Iterable left, Iterable right) { + return of(Arrays.asList(left, right)); + } + + public static > CloseableIterable of(List> iterables) { + List> closeableIterables = + Lists.transform(iterables, CloseableIterable::of); + return new SortedMerge<>(Comparator.naturalOrder(), closeableIterables); + } + private final Comparator comparator; private final List> iterables; diff --git a/core/src/main/java/org/apache/iceberg/variants/PrimitiveWrapper.java b/core/src/main/java/org/apache/iceberg/variants/PrimitiveWrapper.java new file mode 100644 index 000000000000..96d6229cbd27 --- /dev/null +++ b/core/src/main/java/org/apache/iceberg/variants/PrimitiveWrapper.java @@ -0,0 +1,206 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.variants; + +import java.math.BigDecimal; +import java.nio.ByteBuffer; +import java.nio.ByteOrder; +import java.nio.charset.StandardCharsets; +import org.apache.iceberg.relocated.com.google.common.base.Preconditions; +import org.apache.iceberg.variants.Variants.Primitives; + +class PrimitiveWrapper implements VariantPrimitive { + private static final byte NULL_HEADER = VariantUtil.primitiveHeader(Primitives.TYPE_NULL); + private static final byte TRUE_HEADER = VariantUtil.primitiveHeader(Primitives.TYPE_TRUE); + private static final byte FALSE_HEADER = VariantUtil.primitiveHeader(Primitives.TYPE_FALSE); + private static final byte INT8_HEADER = VariantUtil.primitiveHeader(Primitives.TYPE_INT8); + private static final byte INT16_HEADER = VariantUtil.primitiveHeader(Primitives.TYPE_INT16); + private static final byte INT32_HEADER = VariantUtil.primitiveHeader(Primitives.TYPE_INT32); + private static final byte INT64_HEADER = VariantUtil.primitiveHeader(Primitives.TYPE_INT64); + private static final byte FLOAT_HEADER = VariantUtil.primitiveHeader(Primitives.TYPE_FLOAT); + private static final byte DOUBLE_HEADER = VariantUtil.primitiveHeader(Primitives.TYPE_DOUBLE); + private static final byte DATE_HEADER = VariantUtil.primitiveHeader(Primitives.TYPE_DATE); + private static final byte TIMESTAMPTZ_HEADER = + VariantUtil.primitiveHeader(Primitives.TYPE_TIMESTAMPTZ); + private static final byte TIMESTAMPNTZ_HEADER = + VariantUtil.primitiveHeader(Primitives.TYPE_TIMESTAMPNTZ); + private static final byte DECIMAL4_HEADER = VariantUtil.primitiveHeader(Primitives.TYPE_DECIMAL4); + private static final byte DECIMAL8_HEADER = VariantUtil.primitiveHeader(Primitives.TYPE_DECIMAL8); + private static final byte DECIMAL16_HEADER = + VariantUtil.primitiveHeader(Primitives.TYPE_DECIMAL16); + private static final byte BINARY_HEADER = VariantUtil.primitiveHeader(Primitives.TYPE_BINARY); + private static final byte STRING_HEADER = VariantUtil.primitiveHeader(Primitives.TYPE_STRING); + + private final Variants.PhysicalType type; + private final T value; + private ByteBuffer buffer = null; + + PrimitiveWrapper(Variants.PhysicalType type, T value) { + this.type = type; + this.value = value; + } + + @Override + public Variants.PhysicalType type() { + return type; + } + + @Override + public T get() { + return value; + } + + @Override + public int sizeInBytes() { + switch (type()) { + case NULL: + case BOOLEAN_TRUE: + case BOOLEAN_FALSE: + return 1; // 1 header only + case INT8: + return 2; // 1 header + 1 value + case INT16: + return 3; // 1 header + 2 value + case INT32: + case DATE: + case FLOAT: + return 5; // 1 header + 4 value + case INT64: + case DOUBLE: + case TIMESTAMPTZ: + case TIMESTAMPNTZ: + return 9; // 1 header + 8 value + case DECIMAL4: + return 6; // 1 header + 1 scale + 4 unscaled value + case DECIMAL8: + return 10; // 1 header + 1 scale + 8 unscaled value + case DECIMAL16: + return 18; // 1 header + 1 scale + 16 unscaled value + case BINARY: + return 5 + ((ByteBuffer) value).remaining(); // 1 header + 4 length + value length + case STRING: + if (null == buffer) { + this.buffer = ByteBuffer.wrap(((String) value).getBytes(StandardCharsets.UTF_8)); + } + + return 5 + buffer.remaining(); // 1 header + 4 length + value length + } + + throw new UnsupportedOperationException("Unsupported primitive type: " + type()); + } + + @Override + public int writeTo(ByteBuffer outBuffer, int offset) { + Preconditions.checkArgument( + outBuffer.order() == ByteOrder.LITTLE_ENDIAN, "Invalid byte order: big endian"); + switch (type()) { + case NULL: + outBuffer.put(offset, NULL_HEADER); + return 1; + case BOOLEAN_TRUE: + outBuffer.put(offset, TRUE_HEADER); + return 1; + case BOOLEAN_FALSE: + outBuffer.put(offset, FALSE_HEADER); + return 1; + case INT8: + outBuffer.put(offset, INT8_HEADER); + outBuffer.put(offset + 1, (Byte) value); + return 2; + case INT16: + outBuffer.put(offset, INT16_HEADER); + outBuffer.putShort(offset + 1, (Short) value); + return 3; + case INT32: + outBuffer.put(offset, INT32_HEADER); + outBuffer.putInt(offset + 1, (Integer) value); + return 5; + case INT64: + outBuffer.put(offset, INT64_HEADER); + outBuffer.putLong(offset + 1, (Long) value); + return 9; + case FLOAT: + outBuffer.put(offset, FLOAT_HEADER); + outBuffer.putFloat(offset + 1, (Float) value); + return 5; + case DOUBLE: + outBuffer.put(offset, DOUBLE_HEADER); + outBuffer.putDouble(offset + 1, (Double) value); + return 9; + case DATE: + outBuffer.put(offset, DATE_HEADER); + outBuffer.putInt(offset + 1, (Integer) value); + return 5; + case TIMESTAMPTZ: + outBuffer.put(offset, TIMESTAMPTZ_HEADER); + outBuffer.putLong(offset + 1, (Long) value); + return 9; + case TIMESTAMPNTZ: + outBuffer.put(offset, TIMESTAMPNTZ_HEADER); + outBuffer.putLong(offset + 1, (Long) value); + return 9; + case DECIMAL4: + BigDecimal decimal4 = (BigDecimal) value; + outBuffer.put(offset, DECIMAL4_HEADER); + outBuffer.put(offset + 1, (byte) decimal4.scale()); + outBuffer.putInt(offset + 2, decimal4.unscaledValue().intValueExact()); + return 6; + case DECIMAL8: + BigDecimal decimal8 = (BigDecimal) value; + outBuffer.put(offset, DECIMAL8_HEADER); + outBuffer.put(offset + 1, (byte) decimal8.scale()); + outBuffer.putLong(offset + 2, decimal8.unscaledValue().longValueExact()); + return 10; + case DECIMAL16: + BigDecimal decimal16 = (BigDecimal) value; + byte padding = (byte) (decimal16.signum() < 0 ? 0xFF : 0x00); + byte[] bytes = decimal16.unscaledValue().toByteArray(); + outBuffer.put(offset, DECIMAL16_HEADER); + outBuffer.put(offset + 1, (byte) decimal16.scale()); + for (int i = 0; i < 16; i += 1) { + if (i < bytes.length) { + // copy the big endian value and convert to little endian + outBuffer.put(offset + 2 + i, bytes[bytes.length - i - 1]); + } else { + // pad with 0x00 or 0xFF depending on the sign + outBuffer.put(offset + 2 + i, padding); + } + } + return 18; + case BINARY: + ByteBuffer binary = (ByteBuffer) value; + outBuffer.put(offset, BINARY_HEADER); + outBuffer.putInt(offset + 1, binary.remaining()); + VariantUtil.writeBufferAbsolute(outBuffer, offset + 5, binary); + return 5 + binary.remaining(); + case STRING: + // TODO: use short string when possible + if (null == buffer) { + this.buffer = ByteBuffer.wrap(((String) value).getBytes(StandardCharsets.UTF_8)); + } + + outBuffer.put(offset, STRING_HEADER); + outBuffer.putInt(offset + 1, buffer.remaining()); + VariantUtil.writeBufferAbsolute(outBuffer, offset + 5, buffer); + return 5 + buffer.remaining(); + } + + throw new UnsupportedOperationException("Unsupported primitive type: " + type()); + } +} diff --git a/core/src/main/java/org/apache/iceberg/variants/SerializedArray.java b/core/src/main/java/org/apache/iceberg/variants/SerializedArray.java new file mode 100644 index 000000000000..774553cbb4a3 --- /dev/null +++ b/core/src/main/java/org/apache/iceberg/variants/SerializedArray.java @@ -0,0 +1,88 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.variants; + +import java.nio.ByteBuffer; +import java.nio.ByteOrder; +import org.apache.iceberg.relocated.com.google.common.annotations.VisibleForTesting; +import org.apache.iceberg.relocated.com.google.common.base.Preconditions; + +class SerializedArray extends Variants.SerializedValue implements VariantArray { + private static final int OFFSET_SIZE_MASK = 0b1100; + private static final int OFFSET_SIZE_SHIFT = 2; + private static final int IS_LARGE = 0b10000; + + @VisibleForTesting + static SerializedArray from(SerializedMetadata metadata, byte[] bytes) { + return from(metadata, ByteBuffer.wrap(bytes).order(ByteOrder.LITTLE_ENDIAN), bytes[0]); + } + + static SerializedArray from(SerializedMetadata metadata, ByteBuffer value, int header) { + Preconditions.checkArgument( + value.order() == ByteOrder.LITTLE_ENDIAN, "Unsupported byte order: big endian"); + Variants.BasicType basicType = VariantUtil.basicType(header); + Preconditions.checkArgument( + basicType == Variants.BasicType.ARRAY, "Invalid array, basic type: " + basicType); + return new SerializedArray(metadata, value, header); + } + + private final SerializedMetadata metadata; + private final ByteBuffer value; + private final int offsetSize; + private final int offsetListOffset; + private final int dataOffset; + private final VariantValue[] array; + + private SerializedArray(SerializedMetadata metadata, ByteBuffer value, int header) { + this.metadata = metadata; + this.value = value; + this.offsetSize = 1 + ((header & OFFSET_SIZE_MASK) >> OFFSET_SIZE_SHIFT); + int numElementsSize = ((header & IS_LARGE) == IS_LARGE) ? 4 : 1; + int numElements = + VariantUtil.readLittleEndianUnsigned(value, Variants.HEADER_SIZE, numElementsSize); + this.offsetListOffset = Variants.HEADER_SIZE + numElementsSize; + this.dataOffset = offsetListOffset + ((1 + numElements) * offsetSize); + this.array = new VariantValue[numElements]; + } + + @VisibleForTesting + int numElements() { + return array.length; + } + + @Override + public VariantValue get(int index) { + if (null == array[index]) { + int offset = + VariantUtil.readLittleEndianUnsigned( + value, offsetListOffset + (offsetSize * index), offsetSize); + int next = + VariantUtil.readLittleEndianUnsigned( + value, offsetListOffset + (offsetSize * (1 + index)), offsetSize); + array[index] = + Variants.from(metadata, VariantUtil.slice(value, dataOffset + offset, next - offset)); + } + return array[index]; + } + + @Override + public ByteBuffer buffer() { + return value; + } +} diff --git a/core/src/main/java/org/apache/iceberg/variants/SerializedMetadata.java b/core/src/main/java/org/apache/iceberg/variants/SerializedMetadata.java new file mode 100644 index 000000000000..30f4903db281 --- /dev/null +++ b/core/src/main/java/org/apache/iceberg/variants/SerializedMetadata.java @@ -0,0 +1,113 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.variants; + +import java.nio.ByteBuffer; +import java.nio.ByteOrder; +import org.apache.iceberg.relocated.com.google.common.annotations.VisibleForTesting; +import org.apache.iceberg.relocated.com.google.common.base.Preconditions; + +class SerializedMetadata implements VariantMetadata, Variants.Serialized { + private static final int SUPPORTED_VERSION = 1; + private static final int VERSION_MASK = 0b1111; + private static final int SORTED_STRINGS = 0b10000; + private static final int OFFSET_SIZE_MASK = 0b11000000; + private static final int OFFSET_SIZE_SHIFT = 6; + + static final ByteBuffer EMPTY_V1_BUFFER = + ByteBuffer.wrap(new byte[] {0x01, 0x00}).order(ByteOrder.LITTLE_ENDIAN); + + static SerializedMetadata from(byte[] bytes) { + return from(ByteBuffer.wrap(bytes).order(ByteOrder.LITTLE_ENDIAN)); + } + + static SerializedMetadata from(ByteBuffer metadata) { + Preconditions.checkArgument( + metadata.order() == ByteOrder.LITTLE_ENDIAN, "Unsupported byte order: big endian"); + int header = VariantUtil.readByte(metadata, 0); + int version = header & VERSION_MASK; + Preconditions.checkArgument(SUPPORTED_VERSION == version, "Unsupported version: %s", version); + return new SerializedMetadata(metadata, header); + } + + private final ByteBuffer metadata; + private final boolean isSorted; + private final int offsetSize; + private final int offsetListOffset; + private final int dataOffset; + private final String[] dict; + + private SerializedMetadata(ByteBuffer metadata, int header) { + this.metadata = metadata; + this.isSorted = (header & SORTED_STRINGS) == SORTED_STRINGS; + this.offsetSize = 1 + ((header & OFFSET_SIZE_MASK) >> OFFSET_SIZE_SHIFT); + int dictSize = VariantUtil.readLittleEndianUnsigned(metadata, Variants.HEADER_SIZE, offsetSize); + this.dict = new String[dictSize]; + this.offsetListOffset = Variants.HEADER_SIZE + offsetSize; + this.dataOffset = offsetListOffset + ((1 + dictSize) * offsetSize); + } + + @VisibleForTesting + int dictionarySize() { + return dict.length; + } + + @VisibleForTesting + boolean isSorted() { + return isSorted; + } + + /** Returns the position of the string in the metadata, or -1 if the string is not found. */ + @Override + public int id(String name) { + if (name != null) { + if (isSorted) { + return VariantUtil.find(dict.length, name, this::get); + } else { + for (int id = 0; id < dict.length; id += 1) { + if (name.equals(get(id))) { + return id; + } + } + } + } + + return -1; + } + + /** Returns the string for the given dictionary id. */ + @Override + public String get(int index) { + if (null == dict[index]) { + int offset = + VariantUtil.readLittleEndianUnsigned( + metadata, offsetListOffset + (offsetSize * index), offsetSize); + int next = + VariantUtil.readLittleEndianUnsigned( + metadata, offsetListOffset + (offsetSize * (1 + index)), offsetSize); + dict[index] = VariantUtil.readString(metadata, dataOffset + offset, next - offset); + } + return dict[index]; + } + + @Override + public ByteBuffer buffer() { + return metadata; + } +} diff --git a/core/src/main/java/org/apache/iceberg/variants/SerializedObject.java b/core/src/main/java/org/apache/iceberg/variants/SerializedObject.java new file mode 100644 index 000000000000..f941b62731d7 --- /dev/null +++ b/core/src/main/java/org/apache/iceberg/variants/SerializedObject.java @@ -0,0 +1,198 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.variants; + +import java.nio.ByteBuffer; +import java.nio.ByteOrder; +import java.util.Iterator; +import org.apache.iceberg.relocated.com.google.common.annotations.VisibleForTesting; +import org.apache.iceberg.relocated.com.google.common.base.Preconditions; +import org.apache.iceberg.util.Pair; + +class SerializedObject extends Variants.SerializedValue implements VariantObject { + private static final int OFFSET_SIZE_MASK = 0b1100; + private static final int OFFSET_SIZE_SHIFT = 2; + private static final int FIELD_ID_SIZE_MASK = 0b110000; + private static final int FIELD_ID_SIZE_SHIFT = 4; + private static final int IS_LARGE = 0b1000000; + + static SerializedObject from(SerializedMetadata metadata, byte[] bytes) { + return from(metadata, ByteBuffer.wrap(bytes).order(ByteOrder.LITTLE_ENDIAN), bytes[0]); + } + + static SerializedObject from(SerializedMetadata metadata, ByteBuffer value, int header) { + Preconditions.checkArgument( + value.order() == ByteOrder.LITTLE_ENDIAN, "Unsupported byte order: big endian"); + Variants.BasicType basicType = VariantUtil.basicType(header); + Preconditions.checkArgument( + basicType == Variants.BasicType.OBJECT, "Invalid object, basic type: " + basicType); + return new SerializedObject(metadata, value, header); + } + + private final SerializedMetadata metadata; + private final ByteBuffer value; + private final int fieldIdSize; + private final int fieldIdListOffset; + private final Integer[] fieldIds; + private final int offsetSize; + private final int offsetListOffset; + private final int dataOffset; + private final VariantValue[] values; + + private SerializedObject(SerializedMetadata metadata, ByteBuffer value, int header) { + this.metadata = metadata; + this.value = value; + this.offsetSize = 1 + ((header & OFFSET_SIZE_MASK) >> OFFSET_SIZE_SHIFT); + this.fieldIdSize = 1 + ((header & FIELD_ID_SIZE_MASK) >> FIELD_ID_SIZE_SHIFT); + int numElementsSize = ((header & IS_LARGE) == IS_LARGE) ? 4 : 1; + int numElements = + VariantUtil.readLittleEndianUnsigned(value, Variants.HEADER_SIZE, numElementsSize); + this.fieldIdListOffset = Variants.HEADER_SIZE + numElementsSize; + this.fieldIds = new Integer[numElements]; + this.offsetListOffset = fieldIdListOffset + (numElements * fieldIdSize); + this.dataOffset = offsetListOffset + ((1 + numElements) * offsetSize); + this.values = new VariantValue[numElements]; + } + + @VisibleForTesting + int numElements() { + return fieldIds.length; + } + + SerializedMetadata metadata() { + return metadata; + } + + Iterable> fields() { + return () -> + new Iterator<>() { + private int index = 0; + + @Override + public boolean hasNext() { + return index < fieldIds.length; + } + + @Override + public Pair next() { + Pair next = Pair.of(metadata.get(id(index)), index); + index += 1; + return next; + } + }; + } + + public Iterable fieldNames() { + return () -> + new Iterator<>() { + private int index = 0; + + @Override + public boolean hasNext() { + return index < fieldIds.length; + } + + @Override + public String next() { + int id = id(index); + index += 1; + return metadata.get(id); + } + }; + } + + private int id(int index) { + if (null == fieldIds[index]) { + fieldIds[index] = + VariantUtil.readLittleEndianUnsigned( + value, fieldIdListOffset + (index * fieldIdSize), fieldIdSize); + } + return fieldIds[index]; + } + + @Override + public VariantValue get(String name) { + // keys are ordered lexicographically by the name + int index = VariantUtil.find(fieldIds.length, name, pos -> metadata.get(id(pos))); + + if (index < 0) { + return null; + } + + if (null == values[index]) { + int offset = + VariantUtil.readLittleEndianUnsigned( + value, offsetListOffset + (index * offsetSize), offsetSize); + int next = + VariantUtil.readLittleEndianUnsigned( + value, offsetListOffset + ((1 + index) * offsetSize), offsetSize); + values[index] = + Variants.from(metadata, VariantUtil.slice(value, dataOffset + offset, next - offset)); + } + + return values[index]; + } + + /** + * Retrieve a field value as a ByteBuffer. + * + * @param name field name + * @return the field value as a ByteBuffer + */ + ByteBuffer sliceValue(String name) { + int index = VariantUtil.find(fieldIds.length, name, pos -> metadata.get(id(pos))); + + if (index < 0) { + return null; + } + + return sliceValue(index); + } + + /** + * Retrieve a field value as a ByteBuffer. + * + * @param index field index within the object + * @return the field value as a ByteBuffer + */ + ByteBuffer sliceValue(int index) { + if (values[index] != null) { + return ((Variants.Serialized) values[index]).buffer(); + } + + int offset = + VariantUtil.readLittleEndianUnsigned( + value, offsetListOffset + (index * offsetSize), offsetSize); + int next = + VariantUtil.readLittleEndianUnsigned( + value, offsetListOffset + ((1 + index) * offsetSize), offsetSize); + + return VariantUtil.slice(value, dataOffset + offset, next - offset); + } + + @Override + public ByteBuffer buffer() { + return value; + } + + @Override + public int sizeInBytes() { + return value.remaining(); + } +} diff --git a/core/src/main/java/org/apache/iceberg/variants/SerializedPrimitive.java b/core/src/main/java/org/apache/iceberg/variants/SerializedPrimitive.java new file mode 100644 index 000000000000..1a6bd37a4ff3 --- /dev/null +++ b/core/src/main/java/org/apache/iceberg/variants/SerializedPrimitive.java @@ -0,0 +1,130 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.variants; + +import java.math.BigDecimal; +import java.math.BigInteger; +import java.nio.ByteBuffer; +import java.nio.ByteOrder; +import org.apache.iceberg.relocated.com.google.common.base.Preconditions; + +class SerializedPrimitive extends Variants.SerializedValue implements VariantPrimitive { + private static final int PRIMITIVE_TYPE_SHIFT = 2; + private static final int PRIMITIVE_OFFSET = Variants.HEADER_SIZE; + + static SerializedPrimitive from(byte[] bytes) { + return from(ByteBuffer.wrap(bytes).order(ByteOrder.LITTLE_ENDIAN), bytes[0]); + } + + static SerializedPrimitive from(ByteBuffer value, int header) { + Preconditions.checkArgument( + value.order() == ByteOrder.LITTLE_ENDIAN, "Unsupported byte order: big endian"); + Variants.BasicType basicType = VariantUtil.basicType(header); + Preconditions.checkArgument( + basicType == Variants.BasicType.PRIMITIVE, + "Invalid primitive, basic type != PRIMITIVE: " + basicType); + return new SerializedPrimitive(value, header); + } + + private final ByteBuffer value; + private final Variants.PhysicalType type; + private Object primitive = null; + + private SerializedPrimitive(ByteBuffer value, int header) { + this.value = value; + this.type = Variants.PhysicalType.from(header >> PRIMITIVE_TYPE_SHIFT); + } + + private Object read() { + switch (type) { + case NULL: + return null; + case BOOLEAN_TRUE: + return true; + case BOOLEAN_FALSE: + return false; + case INT8: + return VariantUtil.readLittleEndianInt8(value, PRIMITIVE_OFFSET); + case INT16: + return VariantUtil.readLittleEndianInt16(value, PRIMITIVE_OFFSET); + case INT32: + case DATE: + return VariantUtil.readLittleEndianInt32(value, PRIMITIVE_OFFSET); + case INT64: + case TIMESTAMPTZ: + case TIMESTAMPNTZ: + return VariantUtil.readLittleEndianInt64(value, PRIMITIVE_OFFSET); + case FLOAT: + return VariantUtil.readFloat(value, PRIMITIVE_OFFSET); + case DOUBLE: + return VariantUtil.readDouble(value, PRIMITIVE_OFFSET); + case DECIMAL4: + { + int scale = VariantUtil.readByte(value, PRIMITIVE_OFFSET); + int unscaled = VariantUtil.readLittleEndianInt32(value, PRIMITIVE_OFFSET + 1); + return new BigDecimal(BigInteger.valueOf(unscaled), scale); + } + case DECIMAL8: + { + int scale = VariantUtil.readByte(value, PRIMITIVE_OFFSET); + long unscaled = VariantUtil.readLittleEndianInt64(value, PRIMITIVE_OFFSET + 1); + return new BigDecimal(BigInteger.valueOf(unscaled), scale); + } + case DECIMAL16: + { + int scale = VariantUtil.readByte(value, PRIMITIVE_OFFSET); + byte[] unscaled = new byte[16]; + for (int i = 0; i < 16; i += 1) { + unscaled[i] = (byte) VariantUtil.readByte(value, PRIMITIVE_OFFSET + 16 - i); + } + return new BigDecimal(new BigInteger(unscaled), scale); + } + case BINARY: + { + int size = VariantUtil.readLittleEndianInt32(value, PRIMITIVE_OFFSET); + return VariantUtil.slice(value, PRIMITIVE_OFFSET + 4, size); + } + case STRING: + { + int size = VariantUtil.readLittleEndianInt32(value, PRIMITIVE_OFFSET); + return VariantUtil.readString(value, PRIMITIVE_OFFSET + 4, size); + } + } + + throw new UnsupportedOperationException("Unsupported primitive type: " + type); + } + + @Override + public Variants.PhysicalType type() { + return type; + } + + @Override + public Object get() { + if (null == primitive) { + this.primitive = read(); + } + return primitive; + } + + @Override + public ByteBuffer buffer() { + return value; + } +} diff --git a/core/src/main/java/org/apache/iceberg/variants/SerializedShortString.java b/core/src/main/java/org/apache/iceberg/variants/SerializedShortString.java new file mode 100644 index 000000000000..3004a075def1 --- /dev/null +++ b/core/src/main/java/org/apache/iceberg/variants/SerializedShortString.java @@ -0,0 +1,69 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.variants; + +import java.nio.ByteBuffer; +import java.nio.ByteOrder; +import org.apache.iceberg.relocated.com.google.common.base.Preconditions; + +class SerializedShortString extends Variants.SerializedValue implements VariantPrimitive { + private static final int LENGTH_MASK = 0b11111100; + private static final int LENGTH_SHIFT = 2; + + static SerializedShortString from(byte[] bytes) { + return from(ByteBuffer.wrap(bytes).order(ByteOrder.LITTLE_ENDIAN), bytes[0]); + } + + static SerializedShortString from(ByteBuffer value, int header) { + Preconditions.checkArgument( + value.order() == ByteOrder.LITTLE_ENDIAN, "Unsupported byte order: big endian"); + Variants.BasicType basicType = VariantUtil.basicType(header); + Preconditions.checkArgument( + basicType == Variants.BasicType.SHORT_STRING, + "Invalid short string, basic type: " + basicType); + return new SerializedShortString(value, header); + } + + private final ByteBuffer value; + private final int length; + private String string = null; + + private SerializedShortString(ByteBuffer value, int header) { + this.value = value; + this.length = ((header & LENGTH_MASK) >> LENGTH_SHIFT); + } + + @Override + public Variants.PhysicalType type() { + return Variants.PhysicalType.STRING; + } + + @Override + public String get() { + if (null == string) { + this.string = VariantUtil.readString(value, Variants.HEADER_SIZE, length); + } + return string; + } + + @Override + public ByteBuffer buffer() { + return value; + } +} diff --git a/core/src/main/java/org/apache/iceberg/variants/ShreddedObject.java b/core/src/main/java/org/apache/iceberg/variants/ShreddedObject.java new file mode 100644 index 000000000000..e9e734fce0dd --- /dev/null +++ b/core/src/main/java/org/apache/iceberg/variants/ShreddedObject.java @@ -0,0 +1,211 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.variants; + +import java.nio.ByteBuffer; +import java.nio.ByteOrder; +import java.util.Map; +import org.apache.iceberg.relocated.com.google.common.base.Preconditions; +import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap; +import org.apache.iceberg.relocated.com.google.common.collect.Maps; +import org.apache.iceberg.util.Pair; +import org.apache.iceberg.util.SortedMerge; + +/** + * A variant Object that handles full or partial shredding. + * + *

Metadata stored for an object must be the same regardless of whether the object is shredded. + * This class assumes that the metadata from the unshredded object can be used for the shredded + * fields. This also does not allow updating or replacing the metadata for the unshredded object, + * which could require recursively rewriting field IDs. + */ +class ShreddedObject implements VariantObject { + private final SerializedMetadata metadata; + private final SerializedObject unshredded; + private final Map shreddedFields = Maps.newHashMap(); + private SerializationState serializationState = null; + + ShreddedObject(SerializedMetadata metadata) { + this.metadata = metadata; + this.unshredded = null; + } + + ShreddedObject(SerializedObject unshredded) { + this.metadata = unshredded.metadata(); + this.unshredded = unshredded; + } + + public void put(String field, VariantValue value) { + Preconditions.checkArgument( + metadata.id(field) >= 0, "Cannot find field name in metadata: %s", field); + + // allow setting fields that are contained in unshredded. this avoids read-time failures and + // simplifies replacing field values. + shreddedFields.put(field, value); + this.serializationState = null; + } + + @Override + public VariantValue get(String field) { + // the shredded value takes precedence if there is a conflict + VariantValue value = shreddedFields.get(field); + if (value != null) { + return value; + } + + if (unshredded != null) { + return unshredded.get(field); + } + + return null; + } + + @Override + public int sizeInBytes() { + if (null == serializationState) { + this.serializationState = new SerializationState(metadata, unshredded, shreddedFields); + } + + return serializationState.size(); + } + + @Override + public int writeTo(ByteBuffer buffer, int offset) { + Preconditions.checkArgument( + buffer.order() == ByteOrder.LITTLE_ENDIAN, "Invalid byte order: big endian"); + + if (null == serializationState) { + this.serializationState = new SerializationState(metadata, unshredded, shreddedFields); + } + + return serializationState.writeTo(buffer, offset); + } + + /** Common state for {@link #size()} and {@link #writeTo(ByteBuffer, int)} */ + private static class SerializationState { + private final SerializedMetadata metadata; + private final Map unshreddedFields; + private final Map shreddedFields; + private final int dataSize; + private final int numElements; + private final boolean isLarge; + private final int fieldIdSize; + private final int offsetSize; + + private SerializationState( + SerializedMetadata metadata, + SerializedObject unshredded, + Map shreddedFields) { + this.metadata = metadata; + // field ID size is the size needed to store the largest field ID in the data + this.fieldIdSize = VariantUtil.sizeOf(metadata.dictionarySize()); + this.shreddedFields = shreddedFields; + + int totalDataSize = 0; + // get the unshredded field names and values as byte buffers + ImmutableMap.Builder unshreddedBuilder = ImmutableMap.builder(); + if (unshredded != null) { + for (Pair field : unshredded.fields()) { + // if the value is replaced by an unshredded field, don't include it + String name = field.first(); + boolean replaced = shreddedFields.containsKey(name); + if (!replaced) { + ByteBuffer value = unshredded.sliceValue(field.second()); + unshreddedBuilder.put(name, value); + totalDataSize += value.remaining(); + } + } + } + + this.unshreddedFields = unshreddedBuilder.build(); + // duplicates are suppressed when creating unshreddedFields + this.numElements = unshreddedFields.size() + shreddedFields.size(); + // object is large if the number of elements can't be stored in 1 byte + this.isLarge = numElements > 0xFF; + + for (VariantValue value : shreddedFields.values()) { + totalDataSize += value.sizeInBytes(); + } + + this.dataSize = totalDataSize; + // offset size is the size needed to store the length of the data section + this.offsetSize = VariantUtil.sizeOf(totalDataSize); + } + + private int size() { + return 1 /* header */ + + (isLarge ? 4 : 1) /* num elements size */ + + numElements * fieldIdSize /* field ID list size */ + + (1 + numElements) * offsetSize /* offset list size */ + + dataSize; + } + + private int writeTo(ByteBuffer buffer, int offset) { + int fieldIdListOffset = + offset + 1 /* header size */ + (isLarge ? 4 : 1) /* num elements size */; + int offsetListOffset = fieldIdListOffset + (numElements * fieldIdSize); + int dataOffset = offsetListOffset + ((1 + numElements) * offsetSize); + byte header = VariantUtil.objectHeader(isLarge, fieldIdSize, offsetSize); + + VariantUtil.writeByte(buffer, header, offset); + VariantUtil.writeLittleEndianUnsigned(buffer, numElements, offset + 1, isLarge ? 4 : 1); + + // neither iterable is closeable, so it is okay to use Iterable + Iterable fields = + SortedMerge.of( + () -> unshreddedFields.keySet().stream().sorted().iterator(), + () -> shreddedFields.keySet().stream().sorted().iterator()); + + int nextValueOffset = 0; + int index = 0; + for (String field : fields) { + // write the field ID from the metadata dictionary + int id = metadata.id(field); + Preconditions.checkState(id >= 0, "Invalid metadata, missing: %s", field); + VariantUtil.writeLittleEndianUnsigned( + buffer, id, fieldIdListOffset + (index * fieldIdSize), fieldIdSize); + // write the data offset + VariantUtil.writeLittleEndianUnsigned( + buffer, nextValueOffset, offsetListOffset + (index * offsetSize), offsetSize); + + // copy or serialize the value into the data section + int valueSize; + VariantValue shreddedValue = shreddedFields.get(field); + if (shreddedValue != null) { + valueSize = shreddedValue.writeTo(buffer, dataOffset + nextValueOffset); + } else { + valueSize = + VariantUtil.writeBufferAbsolute( + buffer, dataOffset + nextValueOffset, unshreddedFields.get(field)); + } + + // update tracking + nextValueOffset += valueSize; + index += 1; + } + + // write the final size of the data section + VariantUtil.writeLittleEndianUnsigned( + buffer, nextValueOffset, offsetListOffset + (index * offsetSize), offsetSize); + + // return the total size + return (dataOffset - offset) + dataSize; + } + } +} diff --git a/core/src/main/java/org/apache/iceberg/variants/Variant.java b/core/src/main/java/org/apache/iceberg/variants/Variant.java new file mode 100644 index 000000000000..b5606fa094b6 --- /dev/null +++ b/core/src/main/java/org/apache/iceberg/variants/Variant.java @@ -0,0 +1,28 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.variants; + +/** A variant metadata and value pair. */ +public interface Variant { + /** Returns the metadata for all values in the variant. */ + VariantMetadata metadata(); + + /** Returns the variant value. */ + VariantValue value(); +} diff --git a/core/src/main/java/org/apache/iceberg/variants/VariantArray.java b/core/src/main/java/org/apache/iceberg/variants/VariantArray.java new file mode 100644 index 000000000000..55dbc071f15b --- /dev/null +++ b/core/src/main/java/org/apache/iceberg/variants/VariantArray.java @@ -0,0 +1,35 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.variants; + +/** An variant array value. */ +public interface VariantArray extends VariantValue { + /** Returns the {@link VariantValue} at {@code index} in this array. */ + VariantValue get(int index); + + @Override + default Variants.PhysicalType type() { + return Variants.PhysicalType.ARRAY; + } + + @Override + default VariantArray asArray() { + return this; + } +} diff --git a/core/src/main/java/org/apache/iceberg/variants/VariantMetadata.java b/core/src/main/java/org/apache/iceberg/variants/VariantMetadata.java new file mode 100644 index 000000000000..91dc591c64e7 --- /dev/null +++ b/core/src/main/java/org/apache/iceberg/variants/VariantMetadata.java @@ -0,0 +1,32 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.variants; + +/** A variant metadata dictionary. */ +public interface VariantMetadata extends Variants.Serialized { + /** Returns the ID for a {@code name} in the dictionary, or -1 if not present. */ + int id(String name); + + /** + * Returns the field name for an ID in metadata. + * + * @throws ArrayIndexOutOfBoundsException if the dictionary does not contain the ID + */ + String get(int id); +} diff --git a/core/src/main/java/org/apache/iceberg/variants/VariantObject.java b/core/src/main/java/org/apache/iceberg/variants/VariantObject.java new file mode 100644 index 000000000000..7bb82f94a467 --- /dev/null +++ b/core/src/main/java/org/apache/iceberg/variants/VariantObject.java @@ -0,0 +1,35 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.variants; + +/** An variant object value. */ +public interface VariantObject extends VariantValue { + /** Returns the {@link VariantValue} for the field named {@code name} in this object. */ + VariantValue get(String name); + + @Override + default Variants.PhysicalType type() { + return Variants.PhysicalType.OBJECT; + } + + @Override + default VariantObject asObject() { + return this; + } +} diff --git a/core/src/main/java/org/apache/iceberg/variants/VariantPrimitive.java b/core/src/main/java/org/apache/iceberg/variants/VariantPrimitive.java new file mode 100644 index 000000000000..73efb45ae91b --- /dev/null +++ b/core/src/main/java/org/apache/iceberg/variants/VariantPrimitive.java @@ -0,0 +1,29 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.variants; + +/** A primitive variant value. */ +public interface VariantPrimitive extends VariantValue { + T get(); + + @Override + default VariantPrimitive asPrimitive() { + return this; + } +} diff --git a/core/src/main/java/org/apache/iceberg/variants/VariantUtil.java b/core/src/main/java/org/apache/iceberg/variants/VariantUtil.java new file mode 100644 index 000000000000..d6b78fe899e6 --- /dev/null +++ b/core/src/main/java/org/apache/iceberg/variants/VariantUtil.java @@ -0,0 +1,195 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.variants; + +import java.nio.ByteBuffer; +import java.nio.ByteOrder; +import java.nio.charset.StandardCharsets; +import java.util.function.Function; +import org.apache.iceberg.relocated.com.google.common.base.Preconditions; + +class VariantUtil { + private static final int BASIC_TYPE_MASK = 0b11; + private static final int BASIC_TYPE_PRIMITIVE = 0; + private static final int BASIC_TYPE_SHORT_STRING = 1; + private static final int BASIC_TYPE_OBJECT = 2; + private static final int BASIC_TYPE_ARRAY = 3; + + private VariantUtil() {} + + /** A hacky absolute put for ByteBuffer */ + static int writeBufferAbsolute(ByteBuffer buffer, int offset, ByteBuffer toCopy) { + int originalPosition = buffer.position(); + buffer.position(offset); + ByteBuffer copy = toCopy.duplicate(); + buffer.put(copy); // duplicate so toCopy is not modified + buffer.position(originalPosition); + Preconditions.checkArgument(copy.remaining() <= 0, "Not fully written"); + return toCopy.remaining(); + } + + static void writeByte(ByteBuffer buffer, int value, int offset) { + buffer.put(buffer.position() + offset, (byte) (value & 0xFF)); + } + + static void writeLittleEndianUnsigned(ByteBuffer buffer, int value, int offset, int size) { + int base = buffer.position() + offset; + switch (size) { + case 4: + buffer.putInt(base, value); + return; + case 3: + buffer.putShort(base, (short) (value & 0xFFFF)); + buffer.put(base + 2, (byte) ((value >> 16) & 0xFF)); + return; + case 2: + buffer.putShort(base, (short) (value & 0xFFFF)); + return; + case 1: + buffer.put(base, (byte) (value & 0xFF)); + return; + } + + throw new IllegalArgumentException("Invalid size: " + size); + } + + static byte readLittleEndianInt8(ByteBuffer buffer, int offset) { + return buffer.get(buffer.position() + offset); + } + + static short readLittleEndianInt16(ByteBuffer buffer, int offset) { + return buffer.getShort(buffer.position() + offset); + } + + static int readByte(ByteBuffer buffer, int offset) { + return buffer.get(buffer.position() + offset) & 0xFF; + } + + static int readLittleEndianUnsigned(ByteBuffer buffer, int offset, int size) { + int base = buffer.position() + offset; + switch (size) { + case 4: + return buffer.getInt(base); + case 3: + return (((int) buffer.getShort(base)) & 0xFFFF) | ((buffer.get(base + 2) & 0xFF) << 16); + case 2: + return ((int) buffer.getShort(base)) & 0xFFFF; + case 1: + return buffer.get(base) & 0xFF; + } + + throw new IllegalArgumentException("Invalid size: " + size); + } + + static int readLittleEndianInt32(ByteBuffer buffer, int offset) { + return buffer.getInt(buffer.position() + offset); + } + + static long readLittleEndianInt64(ByteBuffer buffer, int offset) { + return buffer.getLong(buffer.position() + offset); + } + + static float readFloat(ByteBuffer buffer, int offset) { + return buffer.getFloat(buffer.position() + offset); + } + + static double readDouble(ByteBuffer buffer, int offset) { + return buffer.getDouble(buffer.position() + offset); + } + + static ByteBuffer slice(ByteBuffer buffer, int offset, int length) { + ByteBuffer slice = buffer.duplicate(); + slice.order(ByteOrder.LITTLE_ENDIAN); + slice.position(buffer.position() + offset); + slice.limit(buffer.position() + offset + length); + return slice; + } + + static String readString(ByteBuffer buffer, int offset, int length) { + if (buffer.hasArray()) { + return new String( + buffer.array(), + buffer.arrayOffset() + buffer.position() + offset, + length, + StandardCharsets.UTF_8); + } else { + return StandardCharsets.UTF_8.decode(slice(buffer, offset, length)).toString(); + } + } + + static > int find(int size, T key, Function resolve) { + int low = 0; + int high = size - 1; + while (low <= high) { + int mid = (low + high) >>> 1; + T value = resolve.apply(mid); + int cmp = key.compareTo(value); + if (cmp == 0) { + return mid; + } else if (cmp < 0) { + high = mid - 1; + } else { + low = mid + 1; + } + } + + return -1; + } + + static int sizeOf(int maxValue) { + if (maxValue <= 0xFF) { + return 1; + } else if (maxValue <= 0xFFFF) { + return 2; + } else if (maxValue <= 0xFFFFFF) { + return 3; + } else { + return 4; + } + } + + static byte primitiveHeader(int primitiveType) { + return (byte) (primitiveType << Variants.Primitives.PRIMITIVE_TYPE_SHIFT); + } + + static byte objectHeader(boolean isLarge, int fieldIdSize, int offsetSize) { + return (byte) + ((isLarge ? 0b1000000 : 0) | ((fieldIdSize - 1) << 4) | ((offsetSize - 1) << 2) | 0b10); + } + + static byte arrayHeader(boolean isLarge, int offsetSize) { + return (byte) ((isLarge ? 0b10000 : 0) | (offsetSize - 1) << 2 | 0b11); + } + + static Variants.BasicType basicType(int header) { + int basicType = header & BASIC_TYPE_MASK; + switch (basicType) { + case BASIC_TYPE_PRIMITIVE: + return Variants.BasicType.PRIMITIVE; + case BASIC_TYPE_SHORT_STRING: + return Variants.BasicType.SHORT_STRING; + case BASIC_TYPE_OBJECT: + return Variants.BasicType.OBJECT; + case BASIC_TYPE_ARRAY: + return Variants.BasicType.ARRAY; + } + + throw new UnsupportedOperationException("Unsupported basic type: " + basicType); + } +} diff --git a/core/src/main/java/org/apache/iceberg/variants/VariantValue.java b/core/src/main/java/org/apache/iceberg/variants/VariantValue.java new file mode 100644 index 000000000000..26a43795f778 --- /dev/null +++ b/core/src/main/java/org/apache/iceberg/variants/VariantValue.java @@ -0,0 +1,63 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.variants; + +import java.nio.ByteBuffer; +import org.apache.iceberg.variants.Variants.PhysicalType; + +/** A variant value. */ +public interface VariantValue { + /** Returns the {@link PhysicalType} of this value. */ + PhysicalType type(); + + /** Returns the serialized size in bytes of this value. */ + int sizeInBytes(); + + /** + * Writes this value to the buffer at the given offset, ignoring the buffer's position and limit. + */ + int writeTo(ByteBuffer buffer, int offset); + + /** + * Returns this value as a {@link VariantPrimitive}. + * + * @throws IllegalArgumentException if the value is not a primitive + */ + default VariantPrimitive asPrimitive() { + throw new IllegalArgumentException("Not a primitive: " + this); + } + + /** + * Returns this value as a {@link VariantObject}. + * + * @throws IllegalArgumentException if the value is not an object + */ + default VariantObject asObject() { + throw new IllegalArgumentException("Not an object: " + this); + } + + /** + * Returns this value as a {@link VariantArray}. + * + * @throws IllegalArgumentException if the value is not an array + */ + default VariantArray asArray() { + throw new IllegalArgumentException("Not an array: " + this); + } +} diff --git a/core/src/main/java/org/apache/iceberg/variants/Variants.java b/core/src/main/java/org/apache/iceberg/variants/Variants.java new file mode 100644 index 000000000000..e8ea3d93ab77 --- /dev/null +++ b/core/src/main/java/org/apache/iceberg/variants/Variants.java @@ -0,0 +1,276 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.variants; + +import java.math.BigDecimal; +import java.nio.ByteBuffer; +import java.util.List; +import java.util.Map; +import org.apache.iceberg.util.DateTimeUtil; + +public class Variants { + private Variants() {} + + enum LogicalType { + NULL, + BOOLEAN, + EXACT_NUMERIC, + FLOAT, + DOUBLE, + DATE, + TIMESTAMPTZ, + TIMESTAMPNTZ, + BINARY, + STRING, + ARRAY, + OBJECT + } + + public enum PhysicalType { + NULL(LogicalType.NULL, Void.class), + BOOLEAN_TRUE(LogicalType.BOOLEAN, Boolean.class), + BOOLEAN_FALSE(LogicalType.BOOLEAN, Boolean.class), + INT8(LogicalType.EXACT_NUMERIC, Byte.class), + INT16(LogicalType.EXACT_NUMERIC, Short.class), + INT32(LogicalType.EXACT_NUMERIC, Integer.class), + INT64(LogicalType.EXACT_NUMERIC, Long.class), + DOUBLE(LogicalType.DOUBLE, Double.class), + DECIMAL4(LogicalType.EXACT_NUMERIC, BigDecimal.class), + DECIMAL8(LogicalType.EXACT_NUMERIC, BigDecimal.class), + DECIMAL16(LogicalType.EXACT_NUMERIC, BigDecimal.class), + DATE(LogicalType.DATE, Integer.class), + TIMESTAMPTZ(LogicalType.TIMESTAMPTZ, Long.class), + TIMESTAMPNTZ(LogicalType.TIMESTAMPNTZ, Long.class), + FLOAT(LogicalType.FLOAT, Float.class), + BINARY(LogicalType.BINARY, ByteBuffer.class), + STRING(LogicalType.STRING, String.class), + ARRAY(LogicalType.ARRAY, List.class), + OBJECT(LogicalType.OBJECT, Map.class); + + private final LogicalType logicalType; + private final Class javaClass; + + PhysicalType(LogicalType logicalType, Class javaClass) { + this.logicalType = logicalType; + this.javaClass = javaClass; + } + + LogicalType toLogicalType() { + return logicalType; + } + + public Class javaClass() { + return javaClass; + } + + public static PhysicalType from(int primitiveType) { + switch (primitiveType) { + case Primitives.TYPE_NULL: + return NULL; + case Primitives.TYPE_TRUE: + return BOOLEAN_TRUE; + case Primitives.TYPE_FALSE: + return BOOLEAN_FALSE; + case Primitives.TYPE_INT8: + return INT8; + case Primitives.TYPE_INT16: + return INT16; + case Primitives.TYPE_INT32: + return INT32; + case Primitives.TYPE_INT64: + return INT64; + case Primitives.TYPE_DATE: + return DATE; + case Primitives.TYPE_TIMESTAMPTZ: + return TIMESTAMPTZ; + case Primitives.TYPE_TIMESTAMPNTZ: + return TIMESTAMPNTZ; + case Primitives.TYPE_FLOAT: + return FLOAT; + case Primitives.TYPE_DOUBLE: + return DOUBLE; + case Primitives.TYPE_DECIMAL4: + return DECIMAL4; + case Primitives.TYPE_DECIMAL8: + return DECIMAL8; + case Primitives.TYPE_DECIMAL16: + return DECIMAL16; + case Primitives.TYPE_BINARY: + return BINARY; + case Primitives.TYPE_STRING: + return STRING; + } + + throw new UnsupportedOperationException("Unknown primitive physical type: " + primitiveType); + } + } + + interface Serialized { + ByteBuffer buffer(); + } + + abstract static class SerializedValue implements VariantValue, Serialized { + @Override + public int sizeInBytes() { + return buffer().remaining(); + } + + @Override + public int writeTo(ByteBuffer buffer, int offset) { + ByteBuffer value = buffer(); + VariantUtil.writeBufferAbsolute(buffer, offset, value); + return value.remaining(); + } + } + + static class Primitives { + static final int TYPE_NULL = 0; + static final int TYPE_TRUE = 1; + static final int TYPE_FALSE = 2; + static final int TYPE_INT8 = 3; + static final int TYPE_INT16 = 4; + static final int TYPE_INT32 = 5; + static final int TYPE_INT64 = 6; + static final int TYPE_DOUBLE = 7; + static final int TYPE_DECIMAL4 = 8; + static final int TYPE_DECIMAL8 = 9; + static final int TYPE_DECIMAL16 = 10; + static final int TYPE_DATE = 11; + static final int TYPE_TIMESTAMPTZ = 12; // equivalent to timestamptz + static final int TYPE_TIMESTAMPNTZ = 13; // equivalent to timestamp + static final int TYPE_FLOAT = 14; + static final int TYPE_BINARY = 15; + static final int TYPE_STRING = 16; + + static final int PRIMITIVE_TYPE_SHIFT = 2; + + private Primitives() {} + } + + static final int HEADER_SIZE = 1; + + enum BasicType { + PRIMITIVE, + SHORT_STRING, + OBJECT, + ARRAY + } + + public static VariantValue from(ByteBuffer metadata, ByteBuffer value) { + return from(SerializedMetadata.from(metadata), value); + } + + static VariantValue from(SerializedMetadata metadata, ByteBuffer value) { + int header = VariantUtil.readByte(value, 0); + BasicType basicType = VariantUtil.basicType(header); + switch (basicType) { + case PRIMITIVE: + return SerializedPrimitive.from(value, header); + case SHORT_STRING: + return SerializedShortString.from(value, header); + case OBJECT: + return SerializedObject.from(metadata, value, header); + case ARRAY: + return SerializedArray.from(metadata, value, header); + } + + throw new UnsupportedOperationException("Unsupported basic type: " + basicType); + } + + static VariantPrimitive ofNull() { + return new PrimitiveWrapper<>(PhysicalType.NULL, null); + } + + static VariantPrimitive of(boolean value) { + if (value) { + return new PrimitiveWrapper<>(PhysicalType.BOOLEAN_TRUE, true); + } else { + return new PrimitiveWrapper<>(PhysicalType.BOOLEAN_FALSE, false); + } + } + + static VariantPrimitive of(byte value) { + return new PrimitiveWrapper<>(PhysicalType.INT8, value); + } + + static VariantPrimitive of(short value) { + return new PrimitiveWrapper<>(PhysicalType.INT16, value); + } + + static VariantPrimitive of(int value) { + return new PrimitiveWrapper<>(PhysicalType.INT32, value); + } + + static VariantPrimitive of(long value) { + return new PrimitiveWrapper<>(PhysicalType.INT64, value); + } + + static VariantPrimitive of(float value) { + return new PrimitiveWrapper<>(PhysicalType.FLOAT, value); + } + + static VariantPrimitive of(double value) { + return new PrimitiveWrapper<>(PhysicalType.DOUBLE, value); + } + + static VariantPrimitive ofDate(int value) { + return new PrimitiveWrapper<>(PhysicalType.DATE, value); + } + + static VariantPrimitive ofIsoDate(String value) { + return ofDate(DateTimeUtil.isoDateToDays(value)); + } + + static VariantPrimitive ofTimestamptz(long value) { + return new PrimitiveWrapper<>(PhysicalType.TIMESTAMPTZ, value); + } + + static VariantPrimitive ofIsoTimestamptz(String value) { + return ofTimestamptz(DateTimeUtil.isoTimestamptzToMicros(value)); + } + + static VariantPrimitive ofTimestampntz(long value) { + return new PrimitiveWrapper<>(PhysicalType.TIMESTAMPNTZ, value); + } + + static VariantPrimitive ofIsoTimestampntz(String value) { + return ofTimestampntz(DateTimeUtil.isoTimestampToMicros(value)); + } + + static VariantPrimitive of(BigDecimal value) { + int bitLength = value.unscaledValue().bitLength(); + if (bitLength < 32) { + return new PrimitiveWrapper<>(PhysicalType.DECIMAL4, value); + } else if (bitLength < 64) { + return new PrimitiveWrapper<>(PhysicalType.DECIMAL8, value); + } else if (bitLength < 128) { + return new PrimitiveWrapper<>(PhysicalType.DECIMAL16, value); + } + + throw new UnsupportedOperationException("Unsupported decimal precision: " + value.precision()); + } + + static VariantPrimitive of(ByteBuffer value) { + return new PrimitiveWrapper<>(PhysicalType.BINARY, value); + } + + static VariantPrimitive of(String value) { + return new PrimitiveWrapper<>(PhysicalType.STRING, value); + } +} diff --git a/core/src/test/java/org/apache/iceberg/variants/TestPrimitiveWrapper.java b/core/src/test/java/org/apache/iceberg/variants/TestPrimitiveWrapper.java new file mode 100644 index 000000000000..fd113f9cece3 --- /dev/null +++ b/core/src/test/java/org/apache/iceberg/variants/TestPrimitiveWrapper.java @@ -0,0 +1,82 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.variants; + +import static org.assertj.core.api.Assertions.assertThat; + +import java.math.BigDecimal; +import java.nio.ByteBuffer; +import java.nio.ByteOrder; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.FieldSource; + +public class TestPrimitiveWrapper { + private static final VariantPrimitive[] PRIMITIVES = + new VariantPrimitive[] { + Variants.ofNull(), + Variants.of(true), + Variants.of(false), + Variants.of((byte) 34), + Variants.of((byte) -34), + Variants.of((short) 1234), + Variants.of((short) -1234), + Variants.of(12345), + Variants.of(-12345), + Variants.of(9876543210L), + Variants.of(-9876543210L), + Variants.of(10.11F), + Variants.of(-10.11F), + Variants.of(14.3D), + Variants.of(-14.3D), + Variants.ofIsoDate("2024-11-07"), + Variants.ofIsoDate("1957-11-07"), + Variants.ofIsoTimestamptz("2024-11-07T12:33:54.123456+00:00"), + Variants.ofIsoTimestamptz("1957-11-07T12:33:54.123456+00:00"), + Variants.ofIsoTimestampntz("2024-11-07T12:33:54.123456"), + Variants.ofIsoTimestampntz("1957-11-07T12:33:54.123456"), + Variants.of(new BigDecimal("123456.7890")), // decimal4 + Variants.of(new BigDecimal("-123456.7890")), // decimal4 + Variants.of(new BigDecimal("1234567890.987654321")), // decimal8 + Variants.of(new BigDecimal("-1234567890.987654321")), // decimal8 + Variants.of(new BigDecimal("9876543210.123456789")), // decimal16 + Variants.of(new BigDecimal("-9876543210.123456789")), // decimal16 + Variants.of(ByteBuffer.wrap(new byte[] {0x0a, 0x0b, 0x0c, 0x0d})), + Variants.of("iceberg"), + }; + + @ParameterizedTest + @FieldSource("PRIMITIVES") + public void testPrimitiveValueSerialization(VariantPrimitive primitive) { + // write the value to the middle of a large buffer + int size = primitive.sizeInBytes(); + ByteBuffer buffer = ByteBuffer.allocate(size + 1000).order(ByteOrder.LITTLE_ENDIAN); + primitive.writeTo(buffer, 300); + + // create a copy that is limited to the value range + ByteBuffer readBuffer = buffer.duplicate().order(ByteOrder.LITTLE_ENDIAN); + readBuffer.position(300); + readBuffer.limit(300 + size); + + // read and validate the serialized bytes + VariantValue actual = Variants.from(SerializedMetadata.EMPTY_V1_BUFFER, readBuffer); + assertThat(actual.type()).isEqualTo(primitive.type()); + assertThat(actual).isInstanceOf(VariantPrimitive.class); + assertThat(actual.asPrimitive().get()).isEqualTo(primitive.get()); + } +} diff --git a/core/src/test/java/org/apache/iceberg/variants/TestSerializedArray.java b/core/src/test/java/org/apache/iceberg/variants/TestSerializedArray.java new file mode 100644 index 000000000000..1e052572b85d --- /dev/null +++ b/core/src/test/java/org/apache/iceberg/variants/TestSerializedArray.java @@ -0,0 +1,240 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.variants; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatThrownBy; + +import java.nio.ByteBuffer; +import java.util.Random; +import org.apache.iceberg.util.RandomUtil; +import org.apache.iceberg.variants.Variants.PhysicalType; +import org.junit.jupiter.api.Test; + +public class TestSerializedArray { + private static final SerializedMetadata EMPTY_METADATA = + SerializedMetadata.from(SerializedMetadata.EMPTY_V1_BUFFER); + private static final SerializedPrimitive NULL = SerializedPrimitive.from(new byte[] {0x00}); + private static final SerializedPrimitive TRUE = SerializedPrimitive.from(new byte[] {0b100}); + private static final SerializedPrimitive FALSE = SerializedPrimitive.from(new byte[] {0b1000}); + private static final SerializedShortString STR = + SerializedShortString.from(new byte[] {0b11101, 'i', 'c', 'e', 'b', 'e', 'r', 'g'}); + private static final SerializedShortString A = + SerializedShortString.from(new byte[] {0b101, 'a'}); + private static final SerializedShortString B = + SerializedShortString.from(new byte[] {0b101, 'b'}); + private static final SerializedShortString C = + SerializedShortString.from(new byte[] {0b101, 'c'}); + private static final SerializedShortString D = + SerializedShortString.from(new byte[] {0b101, 'd'}); + private static final SerializedShortString E = + SerializedShortString.from(new byte[] {0b101, 'e'}); + private static final SerializedPrimitive I34 = SerializedPrimitive.from(new byte[] {0b1100, 34}); + private static final SerializedPrimitive I1234 = + SerializedPrimitive.from(new byte[] {0b10000, (byte) 0xD2, 0x04}); + private static final SerializedPrimitive DATE = + SerializedPrimitive.from(new byte[] {0b101100, (byte) 0xF4, 0x43, 0x00, 0x00}); + + private final Random random = new Random(374513); + + @Test + public void testEmptyArray() { + SerializedArray array = SerializedArray.from(EMPTY_METADATA, new byte[] {0b0011, 0x00}); + + assertThat(array.type()).isEqualTo(PhysicalType.ARRAY); + assertThat(array.numElements()).isEqualTo(0); + } + + @Test + public void testEmptyLargeArray() { + SerializedArray array = + SerializedArray.from(EMPTY_METADATA, new byte[] {0b10011, 0x00, 0x00, 0x00, 0x00}); + + assertThat(array.type()).isEqualTo(PhysicalType.ARRAY); + assertThat(array.numElements()).isEqualTo(0); + } + + @Test + public void testStringArray() { + ByteBuffer buffer = VariantTestUtil.createArray(A, B, C, D, E); + SerializedArray array = SerializedArray.from(EMPTY_METADATA, buffer, buffer.get(0)); + + assertThat(array.type()).isEqualTo(PhysicalType.ARRAY); + assertThat(array.numElements()).isEqualTo(5); + assertThat(array.get(0).type()).isEqualTo(PhysicalType.STRING); + assertThat(array.get(0).asPrimitive().get()).isEqualTo("a"); + assertThat(array.get(1).type()).isEqualTo(PhysicalType.STRING); + assertThat(array.get(1).asPrimitive().get()).isEqualTo("b"); + assertThat(array.get(2).type()).isEqualTo(PhysicalType.STRING); + assertThat(array.get(2).asPrimitive().get()).isEqualTo("c"); + assertThat(array.get(3).type()).isEqualTo(PhysicalType.STRING); + assertThat(array.get(3).asPrimitive().get()).isEqualTo("d"); + assertThat(array.get(4).type()).isEqualTo(PhysicalType.STRING); + assertThat(array.get(4).asPrimitive().get()).isEqualTo("e"); + + assertThatThrownBy(() -> array.get(5)) + .isInstanceOf(ArrayIndexOutOfBoundsException.class) + .hasMessage("Index 5 out of bounds for length 5"); + } + + @Test + public void testStringDifferentLengths() { + ByteBuffer buffer = VariantTestUtil.createArray(A, B, C, STR, D, E); + SerializedArray array = SerializedArray.from(EMPTY_METADATA, buffer, buffer.get(0)); + + assertThat(array.type()).isEqualTo(PhysicalType.ARRAY); + assertThat(array.numElements()).isEqualTo(6); + assertThat(array.get(0).type()).isEqualTo(PhysicalType.STRING); + assertThat(array.get(0).asPrimitive().get()).isEqualTo("a"); + assertThat(array.get(1).type()).isEqualTo(PhysicalType.STRING); + assertThat(array.get(1).asPrimitive().get()).isEqualTo("b"); + assertThat(array.get(2).type()).isEqualTo(PhysicalType.STRING); + assertThat(array.get(2).asPrimitive().get()).isEqualTo("c"); + assertThat(array.get(3).type()).isEqualTo(PhysicalType.STRING); + assertThat(array.get(3).asPrimitive().get()).isEqualTo("iceberg"); + assertThat(array.get(4).type()).isEqualTo(PhysicalType.STRING); + assertThat(array.get(4).asPrimitive().get()).isEqualTo("d"); + assertThat(array.get(5).type()).isEqualTo(PhysicalType.STRING); + assertThat(array.get(5).asPrimitive().get()).isEqualTo("e"); + + assertThatThrownBy(() -> array.get(6)) + .isInstanceOf(ArrayIndexOutOfBoundsException.class) + .hasMessage("Index 6 out of bounds for length 6"); + } + + @Test + public void testArrayOfMixedTypes() { + ByteBuffer nestedBuffer = VariantTestUtil.createArray(A, C, D); + SerializedArray nested = + SerializedArray.from(EMPTY_METADATA, nestedBuffer, nestedBuffer.get(0)); + ByteBuffer buffer = + VariantTestUtil.createArray(DATE, I34, STR, NULL, E, B, FALSE, nested, TRUE, I1234); + SerializedArray array = SerializedArray.from(EMPTY_METADATA, buffer, buffer.get(0)); + + assertThat(array.type()).isEqualTo(PhysicalType.ARRAY); + assertThat(array.numElements()).isEqualTo(10); + assertThat(array.get(0).type()).isEqualTo(PhysicalType.DATE); + assertThat(array.get(0).asPrimitive().get()).isEqualTo(17396); + assertThat(array.get(1).type()).isEqualTo(PhysicalType.INT8); + assertThat(array.get(1).asPrimitive().get()).isEqualTo((byte) 34); + assertThat(array.get(2).type()).isEqualTo(PhysicalType.STRING); + assertThat(array.get(2).asPrimitive().get()).isEqualTo("iceberg"); + assertThat(array.get(3).type()).isEqualTo(PhysicalType.NULL); + assertThat(array.get(3).asPrimitive().get()).isEqualTo(null); + assertThat(array.get(4).type()).isEqualTo(PhysicalType.STRING); + assertThat(array.get(4).asPrimitive().get()).isEqualTo("e"); + assertThat(array.get(5).type()).isEqualTo(PhysicalType.STRING); + assertThat(array.get(5).asPrimitive().get()).isEqualTo("b"); + assertThat(array.get(6).type()).isEqualTo(PhysicalType.BOOLEAN_FALSE); + assertThat(array.get(6).asPrimitive().get()).isEqualTo(false); + assertThat(array.get(8).type()).isEqualTo(PhysicalType.BOOLEAN_TRUE); + assertThat(array.get(8).asPrimitive().get()).isEqualTo(true); + assertThat(array.get(9).type()).isEqualTo(PhysicalType.INT16); + assertThat(array.get(9).asPrimitive().get()).isEqualTo((short) 1234); + + assertThatThrownBy(() -> array.get(10)) + .isInstanceOf(ArrayIndexOutOfBoundsException.class) + .hasMessage("Index 10 out of bounds for length 10"); + + assertThat(array.get(7).type()).isEqualTo(PhysicalType.ARRAY); + SerializedArray actualNested = (SerializedArray) array.get(7); + assertThat(actualNested.numElements()).isEqualTo(3); + assertThat(actualNested.get(0).type()).isEqualTo(PhysicalType.STRING); + assertThat(actualNested.get(0).asPrimitive().get()).isEqualTo("a"); + assertThat(actualNested.get(1).type()).isEqualTo(PhysicalType.STRING); + assertThat(actualNested.get(1).asPrimitive().get()).isEqualTo("c"); + assertThat(actualNested.get(2).type()).isEqualTo(PhysicalType.STRING); + assertThat(actualNested.get(2).asPrimitive().get()).isEqualTo("d"); + + assertThatThrownBy(() -> actualNested.get(3)) + .isInstanceOf(ArrayIndexOutOfBoundsException.class) + .hasMessage("Index 3 out of bounds for length 3"); + } + + @Test + public void testTwoByteOffsets() { + // a string larger than 255 bytes to push the value offset size above 1 byte + String randomString = RandomUtil.generateString(300, random); + SerializedPrimitive bigString = VariantTestUtil.createString(randomString); + + ByteBuffer buffer = VariantTestUtil.createArray(bigString, A, B, C); + SerializedArray array = SerializedArray.from(EMPTY_METADATA, buffer, buffer.get(0)); + + assertThat(array.type()).isEqualTo(PhysicalType.ARRAY); + assertThat(array.numElements()).isEqualTo(4); + assertThat(array.get(0).type()).isEqualTo(PhysicalType.STRING); + assertThat(array.get(0).asPrimitive().get()).isEqualTo(randomString); + assertThat(array.get(1).type()).isEqualTo(PhysicalType.STRING); + assertThat(array.get(1).asPrimitive().get()).isEqualTo("a"); + assertThat(array.get(2).type()).isEqualTo(PhysicalType.STRING); + assertThat(array.get(2).asPrimitive().get()).isEqualTo("b"); + assertThat(array.get(3).type()).isEqualTo(PhysicalType.STRING); + assertThat(array.get(3).asPrimitive().get()).isEqualTo("c"); + + assertThatThrownBy(() -> array.get(4)) + .isInstanceOf(ArrayIndexOutOfBoundsException.class) + .hasMessage("Index 4 out of bounds for length 4"); + } + + @Test + public void testThreeByteOffsets() { + // a string larger than 65535 bytes to push the value offset size above 1 byte + String randomString = RandomUtil.generateString(70_000, random); + SerializedPrimitive reallyBigString = VariantTestUtil.createString(randomString); + + ByteBuffer buffer = VariantTestUtil.createArray(reallyBigString, A, B, C); + SerializedArray array = SerializedArray.from(EMPTY_METADATA, buffer, buffer.get(0)); + + assertThat(array.type()).isEqualTo(PhysicalType.ARRAY); + assertThat(array.numElements()).isEqualTo(4); + assertThat(array.get(0).type()).isEqualTo(PhysicalType.STRING); + assertThat(array.get(0).asPrimitive().get()).isEqualTo(randomString); + assertThat(array.get(1).type()).isEqualTo(PhysicalType.STRING); + assertThat(array.get(1).asPrimitive().get()).isEqualTo("a"); + assertThat(array.get(2).type()).isEqualTo(PhysicalType.STRING); + assertThat(array.get(2).asPrimitive().get()).isEqualTo("b"); + assertThat(array.get(3).type()).isEqualTo(PhysicalType.STRING); + assertThat(array.get(3).asPrimitive().get()).isEqualTo("c"); + + assertThatThrownBy(() -> array.get(4)) + .isInstanceOf(ArrayIndexOutOfBoundsException.class) + .hasMessage("Index 4 out of bounds for length 4"); + } + + @Test + public void testLargeArraySize() { + SerializedArray array = + SerializedArray.from( + EMPTY_METADATA, new byte[] {0b10011, (byte) 0xFF, (byte) 0x01, 0x00, 0x00}); + + assertThat(array.type()).isEqualTo(PhysicalType.ARRAY); + assertThat(array.numElements()).isEqualTo(511); + } + + @Test + public void testNegativeArraySize() { + assertThatThrownBy( + () -> + SerializedArray.from( + EMPTY_METADATA, + new byte[] {0b10011, (byte) 0xFF, (byte) 0xFF, (byte) 0xFF, (byte) 0xFF})) + .isInstanceOf(NegativeArraySizeException.class) + .hasMessage("-1"); + } +} diff --git a/core/src/test/java/org/apache/iceberg/variants/TestSerializedMetadata.java b/core/src/test/java/org/apache/iceberg/variants/TestSerializedMetadata.java new file mode 100644 index 000000000000..27a4dda3dc3e --- /dev/null +++ b/core/src/test/java/org/apache/iceberg/variants/TestSerializedMetadata.java @@ -0,0 +1,219 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.variants; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatThrownBy; + +import java.nio.ByteBuffer; +import java.util.Random; +import java.util.Set; +import org.apache.iceberg.relocated.com.google.common.collect.Sets; +import org.apache.iceberg.util.RandomUtil; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.ValueSource; + +public class TestSerializedMetadata { + private final Random random = new Random(872591); + + @Test + public void testEmptyVariantMetadata() { + SerializedMetadata metadata = SerializedMetadata.from(SerializedMetadata.EMPTY_V1_BUFFER); + + assertThat(metadata.isSorted()).isFalse(); + assertThat(metadata.dictionarySize()).isEqualTo(0); + assertThatThrownBy(() -> metadata.get(0)).isInstanceOf(ArrayIndexOutOfBoundsException.class); + } + + @Test + public void testHeaderSorted() { + SerializedMetadata metadata = SerializedMetadata.from(new byte[] {0b10001, 0x00}); + + assertThat(metadata.isSorted()).isTrue(); + assertThat(metadata.dictionarySize()).isEqualTo(0); + } + + @Test + public void testHeaderOffsetSize() { + // offset size is 4-byte LE = 1 + assertThat( + SerializedMetadata.from(new byte[] {(byte) 0b11010001, 0x01, 0x00, 0x00, 0x00}) + .dictionarySize()) + .isEqualTo(1); + + // offset size is 3-byte LE = 1 + assertThat( + SerializedMetadata.from(new byte[] {(byte) 0b10010001, 0x01, 0x00, 0x00}) + .dictionarySize()) + .isEqualTo(1); + + // offset size is 2-byte LE = 1 + assertThat(SerializedMetadata.from(new byte[] {(byte) 0b01010001, 0x01, 0x00}).dictionarySize()) + .isEqualTo(1); + + // offset size is 1-byte LE = 1 + assertThat(SerializedMetadata.from(new byte[] {(byte) 0b00010001, 0x01}).dictionarySize()) + .isEqualTo(1); + } + + @Test + public void testReadString() { + SerializedMetadata metadata = + SerializedMetadata.from( + new byte[] { + 0b10001, 0x05, 0x00, 0x01, 0x02, 0x03, 0x04, 0x05, 'a', 'b', 'c', 'd', 'e' + }); + + assertThat(metadata.get(0)).isEqualTo("a"); + assertThat(metadata.get(1)).isEqualTo("b"); + assertThat(metadata.get(2)).isEqualTo("c"); + assertThat(metadata.get(3)).isEqualTo("d"); + assertThat(metadata.get(4)).isEqualTo("e"); + assertThatThrownBy(() -> metadata.get(5)).isInstanceOf(ArrayIndexOutOfBoundsException.class); + } + + @Test + public void testMultibyteString() { + SerializedMetadata metadata = + SerializedMetadata.from( + new byte[] { + 0b10001, 0x05, 0x00, 0x01, 0x02, 0x05, 0x06, 0x07, 'a', 'b', 'x', 'y', 'z', 'd', 'e' + }); + + assertThat(metadata.get(0)).isEqualTo("a"); + assertThat(metadata.get(1)).isEqualTo("b"); + assertThat(metadata.get(2)).isEqualTo("xyz"); + assertThat(metadata.get(3)).isEqualTo("d"); + assertThat(metadata.get(4)).isEqualTo("e"); + assertThatThrownBy(() -> metadata.get(5)).isInstanceOf(ArrayIndexOutOfBoundsException.class); + } + + @Test + public void testTwoByteOffsets() { + SerializedMetadata metadata = + SerializedMetadata.from( + new byte[] { + 0b1010001, 0x05, 0x00, 0x00, 0x00, 0x01, 0x00, 0x02, 0x00, 0x05, 0x00, 0x06, 0x00, + 0x07, 0x00, 'a', 'b', 'x', 'y', 'z', 'd', 'e' + }); + + assertThat(metadata.get(0)).isEqualTo("a"); + assertThat(metadata.get(1)).isEqualTo("b"); + assertThat(metadata.get(2)).isEqualTo("xyz"); + assertThat(metadata.get(3)).isEqualTo("d"); + assertThat(metadata.get(4)).isEqualTo("e"); + assertThatThrownBy(() -> metadata.get(5)).isInstanceOf(ArrayIndexOutOfBoundsException.class); + } + + @Test + public void testFindStringSorted() { + SerializedMetadata metadata = + SerializedMetadata.from( + new byte[] { + 0b10001, 0x05, 0x00, 0x01, 0x02, 0x03, 0x04, 0x05, 'a', 'b', 'c', 'd', 'e' + }); + assertThat(metadata.id("A")).isEqualTo(-1); + assertThat(metadata.id("a")).isEqualTo(0); + assertThat(metadata.id("aa")).isEqualTo(-1); + assertThat(metadata.id("b")).isEqualTo(1); + assertThat(metadata.id("bb")).isEqualTo(-1); + assertThat(metadata.id("c")).isEqualTo(2); + assertThat(metadata.id("cc")).isEqualTo(-1); + assertThat(metadata.id("d")).isEqualTo(3); + assertThat(metadata.id("dd")).isEqualTo(-1); + assertThat(metadata.id("e")).isEqualTo(4); + assertThat(metadata.id("ee")).isEqualTo(-1); + } + + @Test + public void testFindStringUnsorted() { + SerializedMetadata metadata = + SerializedMetadata.from( + new byte[] { + 0b00001, 0x05, 0x00, 0x01, 0x02, 0x03, 0x04, 0x05, 'e', 'd', 'c', 'b', 'a' + }); + assertThat(metadata.id("A")).isEqualTo(-1); + assertThat(metadata.id("a")).isEqualTo(4); + assertThat(metadata.id("aa")).isEqualTo(-1); + assertThat(metadata.id("b")).isEqualTo(3); + assertThat(metadata.id("bb")).isEqualTo(-1); + assertThat(metadata.id("c")).isEqualTo(2); + assertThat(metadata.id("cc")).isEqualTo(-1); + assertThat(metadata.id("d")).isEqualTo(1); + assertThat(metadata.id("dd")).isEqualTo(-1); + assertThat(metadata.id("e")).isEqualTo(0); + assertThat(metadata.id("ee")).isEqualTo(-1); + } + + @ParameterizedTest + @ValueSource(booleans = {true, false}) + public void testTwoByteFieldIds(boolean sortFieldNames) { + Set keySet = Sets.newHashSet(); + String lastKey = null; + for (int i = 0; i < 10_000; i += 1) { + lastKey = RandomUtil.generateString(10, random); + keySet.add(lastKey); + } + + ByteBuffer buffer = VariantTestUtil.createMetadata(keySet, sortFieldNames); + SerializedMetadata metadata = SerializedMetadata.from(buffer); + + assertThat(metadata.dictionarySize()).isEqualTo(10_000); + assertThat(metadata.id(lastKey)).isGreaterThan(0); + } + + @ParameterizedTest + @ValueSource(booleans = {true, false}) + public void testThreeByteFieldIds(boolean sortFieldNames) { + Set keySet = Sets.newHashSet(); + String lastKey = null; + for (int i = 0; i < 100_000; i += 1) { + lastKey = RandomUtil.generateString(10, random); + keySet.add(lastKey); + } + + ByteBuffer buffer = VariantTestUtil.createMetadata(keySet, sortFieldNames); + SerializedMetadata metadata = SerializedMetadata.from(buffer); + + assertThat(metadata.dictionarySize()).isEqualTo(100_000); + assertThat(metadata.id(lastKey)).isGreaterThan(0); + } + + @Test + public void testInvalidMetadataVersion() { + assertThatThrownBy(() -> SerializedMetadata.from(new byte[] {0x02, 0x00})) + .isInstanceOf(IllegalArgumentException.class) + .hasMessage("Unsupported version: 2"); + } + + @Test + public void testMissingLength() { + assertThatThrownBy(() -> SerializedMetadata.from(new byte[] {0x01})) + .isInstanceOf(IndexOutOfBoundsException.class); + } + + @Test + public void testLengthTooShort() { + // missing the 4th length byte + assertThatThrownBy( + () -> SerializedMetadata.from(new byte[] {(byte) 0b11010001, 0x00, 0x00, 0x00})) + .isInstanceOf(IndexOutOfBoundsException.class); + } +} diff --git a/core/src/test/java/org/apache/iceberg/variants/TestSerializedObject.java b/core/src/test/java/org/apache/iceberg/variants/TestSerializedObject.java new file mode 100644 index 000000000000..3c5fb808d835 --- /dev/null +++ b/core/src/test/java/org/apache/iceberg/variants/TestSerializedObject.java @@ -0,0 +1,287 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.variants; + +import static org.assertj.core.api.Assertions.assertThat; + +import java.nio.ByteBuffer; +import java.util.Map; +import java.util.Random; +import java.util.Set; +import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList; +import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap; +import org.apache.iceberg.relocated.com.google.common.collect.Maps; +import org.apache.iceberg.relocated.com.google.common.collect.Sets; +import org.apache.iceberg.util.RandomUtil; +import org.apache.iceberg.variants.Variants.PhysicalType; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.ValueSource; + +public class TestSerializedObject { + private static final SerializedMetadata EMPTY_METADATA = + SerializedMetadata.from(SerializedMetadata.EMPTY_V1_BUFFER); + private static final SerializedPrimitive I1 = SerializedPrimitive.from(new byte[] {0b1100, 1}); + private static final SerializedPrimitive I2 = SerializedPrimitive.from(new byte[] {0b1100, 2}); + private static final SerializedPrimitive I3 = SerializedPrimitive.from(new byte[] {0b1100, 3}); + private static final SerializedPrimitive NULL = SerializedPrimitive.from(new byte[] {0x00}); + private static final SerializedPrimitive TRUE = SerializedPrimitive.from(new byte[] {0b100}); + private static final SerializedPrimitive DATE = + SerializedPrimitive.from(new byte[] {0b101100, (byte) 0xF4, 0x43, 0x00, 0x00}); + + private final Random random = new Random(198725); + + @Test + public void testEmptyObject() { + SerializedObject object = SerializedObject.from(EMPTY_METADATA, new byte[] {0b10, 0x00}); + + assertThat(object.type()).isEqualTo(PhysicalType.OBJECT); + assertThat(object.numElements()).isEqualTo(0); + } + + @Test + public void testEmptyLargeObject() { + SerializedObject object = + SerializedObject.from(EMPTY_METADATA, new byte[] {0b1000010, 0x00, 0x00, 0x00, 0x00}); + + assertThat(object.type()).isEqualTo(PhysicalType.OBJECT); + assertThat(object.numElements()).isEqualTo(0); + } + + @Test + public void testSimpleObject() { + Map data = ImmutableMap.of("a", I1, "b", I2, "c", I3); + ByteBuffer meta = VariantTestUtil.createMetadata(data.keySet(), true /* sort names */); + ByteBuffer value = VariantTestUtil.createObject(meta, data); + + SerializedMetadata metadata = SerializedMetadata.from(meta); + SerializedObject object = SerializedObject.from(metadata, value, value.get(0)); + + assertThat(object.type()).isEqualTo(PhysicalType.OBJECT); + assertThat(object.numElements()).isEqualTo(3); + + assertThat(object.get("a").type()).isEqualTo(PhysicalType.INT8); + assertThat(object.get("a").asPrimitive().get()).isEqualTo((byte) 1); + assertThat(object.get("b").type()).isEqualTo(PhysicalType.INT8); + assertThat(object.get("b").asPrimitive().get()).isEqualTo((byte) 2); + assertThat(object.get("c").type()).isEqualTo(PhysicalType.INT8); + assertThat(object.get("c").asPrimitive().get()).isEqualTo((byte) 3); + + assertThat(object.get("d")).isEqualTo(null); + } + + @Test + public void testOutOfOrderKeys() { + Map data = ImmutableMap.of("b", I2, "a", I1, "c", I3); + ByteBuffer meta = VariantTestUtil.createMetadata(data.keySet(), false /* sort names */); + ByteBuffer value = VariantTestUtil.createObject(meta, data); + + SerializedMetadata metadata = SerializedMetadata.from(meta); + SerializedObject object = SerializedObject.from(metadata, value, value.get(0)); + + assertThat(object.type()).isEqualTo(PhysicalType.OBJECT); + assertThat(object.numElements()).isEqualTo(3); + + assertThat(object.get("d")).isEqualTo(null); + + assertThat(object.get("c").type()).isEqualTo(PhysicalType.INT8); + assertThat(object.get("c").asPrimitive().get()).isEqualTo((byte) 3); + assertThat(object.get("a").type()).isEqualTo(PhysicalType.INT8); + assertThat(object.get("a").asPrimitive().get()).isEqualTo((byte) 1); + assertThat(object.get("b").type()).isEqualTo(PhysicalType.INT8); + assertThat(object.get("b").asPrimitive().get()).isEqualTo((byte) 2); + } + + @Test + public void testMixedValueTypes() { + ByteBuffer meta = + VariantTestUtil.createMetadata( + ImmutableList.of("a", "b", "c", "d", "e", "f"), true /* sort names */); + SerializedMetadata metadata = SerializedMetadata.from(meta); + + Map inner = ImmutableMap.of("b", I2, "f", I3); + ByteBuffer innerBuffer = VariantTestUtil.createObject(meta, inner); + SerializedObject innerObject = SerializedObject.from(metadata, innerBuffer, innerBuffer.get(0)); + Map data = + ImmutableMap.of("a", I1, "b", DATE, "c", NULL, "d", TRUE, "e", innerObject); + ByteBuffer value = VariantTestUtil.createObject(meta, data); + + SerializedObject object = SerializedObject.from(metadata, value, value.get(0)); + + assertThat(object.type()).isEqualTo(PhysicalType.OBJECT); + assertThat(object.numElements()).isEqualTo(5); + + assertThat(object.get("a").type()).isEqualTo(PhysicalType.INT8); + assertThat(object.get("a").asPrimitive().get()).isEqualTo((byte) 1); + assertThat(object.get("b").type()).isEqualTo(PhysicalType.DATE); + assertThat(((SerializedPrimitive) object.get("b")).get()).isEqualTo(17396); + assertThat(object.get("c").type()).isEqualTo(PhysicalType.NULL); + assertThat(((SerializedPrimitive) object.get("c")).get()).isEqualTo(null); + assertThat(object.get("d").type()).isEqualTo(PhysicalType.BOOLEAN_TRUE); + assertThat(((SerializedPrimitive) object.get("d")).get()).isEqualTo(true); + + assertThat(object.get("e").type()).isEqualTo(PhysicalType.OBJECT); + SerializedObject actualInner = (SerializedObject) object.get("e").asObject(); + assertThat(actualInner.numElements()).isEqualTo(2); + assertThat(actualInner.get("b").type()).isEqualTo(PhysicalType.INT8); + assertThat(actualInner.get("b").asPrimitive().get()).isEqualTo((byte) 2); + assertThat(actualInner.get("f").type()).isEqualTo(PhysicalType.INT8); + assertThat(actualInner.get("f").asPrimitive().get()).isEqualTo((byte) 3); + } + + @Test + public void testTwoByteOffsets() { + // a string larger than 255 bytes to push the value offset size above 1 byte + String randomString = RandomUtil.generateString(300, random); + SerializedPrimitive bigString = VariantTestUtil.createString(randomString); + + // note that order doesn't matter. fields are sorted by name + Map data = ImmutableMap.of("big", bigString, "a", I1, "b", I2, "c", I3); + ByteBuffer meta = VariantTestUtil.createMetadata(data.keySet(), true /* sort names */); + ByteBuffer value = VariantTestUtil.createObject(meta, data); + + SerializedMetadata metadata = SerializedMetadata.from(meta); + SerializedObject object = SerializedObject.from(metadata, value, value.get(0)); + + assertThat(object.type()).isEqualTo(PhysicalType.OBJECT); + assertThat(object.numElements()).isEqualTo(4); + + assertThat(object.get("a").type()).isEqualTo(PhysicalType.INT8); + assertThat(object.get("a").asPrimitive().get()).isEqualTo((byte) 1); + assertThat(object.get("b").type()).isEqualTo(PhysicalType.INT8); + assertThat(object.get("b").asPrimitive().get()).isEqualTo((byte) 2); + assertThat(object.get("c").type()).isEqualTo(PhysicalType.INT8); + assertThat(object.get("c").asPrimitive().get()).isEqualTo((byte) 3); + assertThat(object.get("big").type()).isEqualTo(PhysicalType.STRING); + assertThat(object.get("big").asPrimitive().get()).isEqualTo(randomString); + } + + @Test + public void testThreeByteOffsets() { + // a string larger than 65535 bytes to push the value offset size above 1 byte + String randomString = RandomUtil.generateString(70_000, random); + SerializedPrimitive reallyBigString = VariantTestUtil.createString(randomString); + + // note that order doesn't matter. fields are sorted by name + Map data = + ImmutableMap.of("really-big", reallyBigString, "a", I1, "b", I2, "c", I3); + ByteBuffer meta = VariantTestUtil.createMetadata(data.keySet(), true /* sort names */); + ByteBuffer value = VariantTestUtil.createObject(meta, data); + + SerializedMetadata metadata = SerializedMetadata.from(meta); + SerializedObject object = SerializedObject.from(metadata, value, value.get(0)); + + assertThat(object.type()).isEqualTo(PhysicalType.OBJECT); + assertThat(object.numElements()).isEqualTo(4); + + assertThat(object.get("a").type()).isEqualTo(PhysicalType.INT8); + assertThat(object.get("a").asPrimitive().get()).isEqualTo((byte) 1); + assertThat(object.get("b").type()).isEqualTo(PhysicalType.INT8); + assertThat(object.get("b").asPrimitive().get()).isEqualTo((byte) 2); + assertThat(object.get("c").type()).isEqualTo(PhysicalType.INT8); + assertThat(object.get("c").asPrimitive().get()).isEqualTo((byte) 3); + assertThat(object.get("really-big").type()).isEqualTo(PhysicalType.STRING); + assertThat(object.get("really-big").asPrimitive().get()).isEqualTo(randomString); + } + + @ParameterizedTest + @ValueSource(booleans = {true, false}) + @SuppressWarnings({"unchecked", "rawtypes"}) + public void testLargeObject(boolean sortFieldNames) { + Map> fields = Maps.newHashMap(); + for (int i = 0; i < 10_000; i += 1) { + fields.put( + RandomUtil.generateString(10, random), + Variants.of(RandomUtil.generateString(10, random))); + } + + ByteBuffer meta = VariantTestUtil.createMetadata(fields.keySet(), sortFieldNames); + ByteBuffer value = VariantTestUtil.createObject(meta, (Map) fields); + + SerializedMetadata metadata = SerializedMetadata.from(meta); + SerializedObject object = SerializedObject.from(metadata, value, value.get(0)); + + assertThat(object.type()).isEqualTo(Variants.PhysicalType.OBJECT); + assertThat(object.numElements()).isEqualTo(10_000); + + for (Map.Entry> entry : fields.entrySet()) { + VariantValue fieldValue = object.get(entry.getKey()); + assertThat(fieldValue.type()).isEqualTo(Variants.PhysicalType.STRING); + assertThat(fieldValue.asPrimitive().get()).isEqualTo(entry.getValue().get()); + } + } + + @ParameterizedTest + @ValueSource(booleans = {true, false}) + public void testTwoByteFieldIds(boolean sortFieldNames) { + Set keySet = Sets.newHashSet(); + for (int i = 0; i < 10_000; i += 1) { + keySet.add(RandomUtil.generateString(10, random)); + } + + Map data = ImmutableMap.of("aa", I1, "AA", I2, "ZZ", I3); + + // create metadata from the large key set and the actual keys + keySet.addAll(data.keySet()); + ByteBuffer meta = VariantTestUtil.createMetadata(keySet, sortFieldNames); + ByteBuffer value = VariantTestUtil.createObject(meta, data); + + SerializedMetadata metadata = SerializedMetadata.from(meta); + SerializedObject object = SerializedObject.from(metadata, value, value.get(0)); + + assertThat(object.type()).isEqualTo(PhysicalType.OBJECT); + assertThat(object.numElements()).isEqualTo(3); + + assertThat(object.get("aa").type()).isEqualTo(PhysicalType.INT8); + assertThat(object.get("aa").asPrimitive().get()).isEqualTo((byte) 1); + assertThat(object.get("AA").type()).isEqualTo(PhysicalType.INT8); + assertThat(object.get("AA").asPrimitive().get()).isEqualTo((byte) 2); + assertThat(object.get("ZZ").type()).isEqualTo(PhysicalType.INT8); + assertThat(object.get("ZZ").asPrimitive().get()).isEqualTo((byte) 3); + } + + @ParameterizedTest + @ValueSource(booleans = {true, false}) + public void testThreeByteFieldIds(boolean sortFieldNames) { + Set keySet = Sets.newHashSet(); + for (int i = 0; i < 100_000; i += 1) { + keySet.add(RandomUtil.generateString(10, random)); + } + + Map data = ImmutableMap.of("aa", I1, "AA", I2, "ZZ", I3); + + // create metadata from the large key set and the actual keys + keySet.addAll(data.keySet()); + ByteBuffer meta = VariantTestUtil.createMetadata(keySet, sortFieldNames); + ByteBuffer value = VariantTestUtil.createObject(meta, data); + + SerializedMetadata metadata = SerializedMetadata.from(meta); + SerializedObject object = SerializedObject.from(metadata, value, value.get(0)); + + assertThat(object.type()).isEqualTo(PhysicalType.OBJECT); + assertThat(object.numElements()).isEqualTo(3); + + assertThat(object.get("aa").type()).isEqualTo(PhysicalType.INT8); + assertThat(object.get("aa").asPrimitive().get()).isEqualTo((byte) 1); + assertThat(object.get("AA").type()).isEqualTo(PhysicalType.INT8); + assertThat(object.get("AA").asPrimitive().get()).isEqualTo((byte) 2); + assertThat(object.get("ZZ").type()).isEqualTo(PhysicalType.INT8); + assertThat(object.get("ZZ").asPrimitive().get()).isEqualTo((byte) 3); + } +} diff --git a/core/src/test/java/org/apache/iceberg/variants/TestSerializedPrimitives.java b/core/src/test/java/org/apache/iceberg/variants/TestSerializedPrimitives.java new file mode 100644 index 000000000000..b4646c0d13e9 --- /dev/null +++ b/core/src/test/java/org/apache/iceberg/variants/TestSerializedPrimitives.java @@ -0,0 +1,465 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.variants; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatThrownBy; + +import java.math.BigDecimal; +import java.nio.ByteBuffer; +import org.apache.iceberg.util.DateTimeUtil; +import org.apache.iceberg.variants.Variants.PhysicalType; +import org.junit.jupiter.api.Test; + +public class TestSerializedPrimitives { + @Test + public void testNull() { + VariantPrimitive value = SerializedPrimitive.from(new byte[] {primitiveHeader(0)}); + + assertThat(value.type()).isEqualTo(PhysicalType.NULL); + assertThat(value.get()).isEqualTo(null); + } + + @Test + public void testTrue() { + VariantPrimitive value = SerializedPrimitive.from(new byte[] {primitiveHeader(1)}); + + assertThat(value.type()).isEqualTo(PhysicalType.BOOLEAN_TRUE); + assertThat(value.get()).isEqualTo(true); + } + + @Test + public void testFalse() { + VariantPrimitive value = SerializedPrimitive.from(new byte[] {primitiveHeader(2)}); + + assertThat(value.type()).isEqualTo(PhysicalType.BOOLEAN_FALSE); + assertThat(value.get()).isEqualTo(false); + } + + @Test + public void testInt8() { + VariantPrimitive value = SerializedPrimitive.from(new byte[] {primitiveHeader(3), 34}); + + assertThat(value.type()).isEqualTo(PhysicalType.INT8); + assertThat(value.get()).isEqualTo((byte) 34); + } + + @Test + public void testNegativeInt8() { + VariantPrimitive value = + SerializedPrimitive.from(new byte[] {primitiveHeader(3), (byte) 0xFF}); + + assertThat(value.type()).isEqualTo(PhysicalType.INT8); + assertThat(value.get()).isEqualTo((byte) -1); + } + + @Test + public void testInt16() { + VariantPrimitive value = + SerializedPrimitive.from(new byte[] {primitiveHeader(4), (byte) 0xD2, 0x04}); + + assertThat(value.type()).isEqualTo(PhysicalType.INT16); + assertThat(value.get()).isEqualTo((short) 1234); + } + + @Test + public void testNegativeInt16() { + VariantPrimitive value = + SerializedPrimitive.from(new byte[] {primitiveHeader(4), (byte) 0xFF, (byte) 0xFF}); + + assertThat(value.type()).isEqualTo(PhysicalType.INT16); + assertThat(value.get()).isEqualTo((short) -1); + } + + @Test + public void testInt32() { + VariantPrimitive value = + SerializedPrimitive.from( + new byte[] {primitiveHeader(5), (byte) 0xD2, 0x02, (byte) 0x96, 0x49}); + + assertThat(value.type()).isEqualTo(PhysicalType.INT32); + assertThat(value.get()).isEqualTo(1234567890); + } + + @Test + public void testNegativeInt32() { + VariantPrimitive value = + SerializedPrimitive.from( + new byte[] {primitiveHeader(5), (byte) 0xFF, (byte) 0xFF, (byte) 0xFF, (byte) 0xFF}); + + assertThat(value.type()).isEqualTo(PhysicalType.INT32); + assertThat(value.get()).isEqualTo(-1); + } + + @Test + public void testInt64() { + VariantPrimitive value = + SerializedPrimitive.from( + new byte[] { + primitiveHeader(6), + (byte) 0xB1, + 0x1C, + 0x6C, + (byte) 0xB1, + (byte) 0xF4, + 0x10, + 0x22, + 0x11 + }); + + assertThat(value.type()).isEqualTo(PhysicalType.INT64); + assertThat(value.get()).isEqualTo(1234567890987654321L); + } + + @Test + public void testNegativeInt64() { + VariantPrimitive value = + SerializedPrimitive.from( + new byte[] { + primitiveHeader(6), + (byte) 0xFF, + (byte) 0xFF, + (byte) 0xFF, + (byte) 0xFF, + (byte) 0xFF, + (byte) 0xFF, + (byte) 0xFF, + (byte) 0xFF + }); + + assertThat(value.type()).isEqualTo(PhysicalType.INT64); + assertThat(value.get()).isEqualTo(-1L); + } + + @Test + public void testDouble() { + VariantPrimitive value = + SerializedPrimitive.from( + new byte[] { + primitiveHeader(7), + (byte) 0xB1, + 0x1C, + 0x6C, + (byte) 0xB1, + (byte) 0xF4, + 0x10, + 0x22, + 0x11 + }); + + assertThat(value.type()).isEqualTo(PhysicalType.DOUBLE); + assertThat(value.get()).isEqualTo(Double.longBitsToDouble(1234567890987654321L)); + } + + @Test + public void testNegativeDouble() { + VariantPrimitive value = + SerializedPrimitive.from( + new byte[] {primitiveHeader(7), 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, (byte) 0x80}); + + assertThat(value.type()).isEqualTo(PhysicalType.DOUBLE); + assertThat(value.get()).isEqualTo(-0.0D); + } + + @Test + public void testDecimal4() { + VariantPrimitive value = + SerializedPrimitive.from( + new byte[] {primitiveHeader(8), 0x04, (byte) 0xD2, 0x02, (byte) 0x96, 0x49}); + + assertThat(value.type()).isEqualTo(PhysicalType.DECIMAL4); + assertThat(value.get()).isEqualTo(new BigDecimal("123456.7890")); + } + + @Test + public void testNegativeDecimal4() { + VariantPrimitive value = + SerializedPrimitive.from( + new byte[] { + primitiveHeader(8), 0x04, (byte) 0xFF, (byte) 0xFF, (byte) 0xFF, (byte) 0xFF + }); + + assertThat(value.type()).isEqualTo(PhysicalType.DECIMAL4); + assertThat(value.get()).isEqualTo(new BigDecimal("-0.0001")); + } + + @Test + public void testDecimal8() { + VariantPrimitive value = + SerializedPrimitive.from( + new byte[] { + primitiveHeader(9), + 0x09, // scale=9 + (byte) 0xB1, + 0x1C, + 0x6C, + (byte) 0xB1, + (byte) 0xF4, + 0x10, + 0x22, + 0x11 + }); + + assertThat(value.type()).isEqualTo(PhysicalType.DECIMAL8); + assertThat(value.get()).isEqualTo(new BigDecimal("1234567890.987654321")); + } + + @Test + public void testNegativeDecimal8() { + VariantPrimitive value = + SerializedPrimitive.from( + new byte[] { + primitiveHeader(9), + 0x09, // scale=9 + (byte) 0xFF, + (byte) 0xFF, + (byte) 0xFF, + (byte) 0xFF, + (byte) 0xFF, + (byte) 0xFF, + (byte) 0xFF, + (byte) 0xFF + }); + + assertThat(value.type()).isEqualTo(PhysicalType.DECIMAL8); + assertThat(value.get()).isEqualTo(new BigDecimal("-0.000000001")); + } + + @Test + public void testDecimal16() { + VariantPrimitive value = + SerializedPrimitive.from( + new byte[] { + primitiveHeader(10), + 0x09, // scale=9 + 0x15, + 0x71, + 0x34, + (byte) 0xB0, + (byte) 0xB8, + (byte) 0x87, + 0x10, + (byte) 0x89, + 0x00, + 0x00, + 0x00, + 0x00, + 0x00, + 0x00, + 0x00, + 0x00 + }); + + assertThat(value.type()).isEqualTo(PhysicalType.DECIMAL16); + assertThat(value.get()).isEqualTo(new BigDecimal("9876543210.123456789")); + } + + @Test + public void testNegativeDecimal16() { + VariantPrimitive value = + SerializedPrimitive.from( + new byte[] { + primitiveHeader(10), + 0x09, // scale=9 + (byte) 0xEB, + (byte) 0x8E, + (byte) 0xCB, + 0x4F, + 0x47, + 0x78, + (byte) 0xEF, + 0x76, + (byte) 0xFF, + (byte) 0xFF, + (byte) 0xFF, + (byte) 0xFF, + (byte) 0xFF, + (byte) 0xFF, + (byte) 0xFF, + (byte) 0xFF, + }); + + assertThat(value.type()).isEqualTo(PhysicalType.DECIMAL16); + assertThat(value.get()).isEqualTo(new BigDecimal("-9876543210.123456789")); + } + + @Test + public void testDate() { + VariantPrimitive value = + SerializedPrimitive.from(new byte[] {primitiveHeader(11), (byte) 0xF4, 0x43, 0x00, 0x00}); + + assertThat(value.type()).isEqualTo(PhysicalType.DATE); + assertThat(DateTimeUtil.daysToIsoDate((int) value.get())).isEqualTo("2017-08-18"); + } + + @Test + public void testNegativeDate() { + VariantPrimitive value = + SerializedPrimitive.from( + new byte[] {primitiveHeader(11), (byte) 0xFF, (byte) 0xFF, (byte) 0xFF, (byte) 0xFF}); + + assertThat(value.type()).isEqualTo(PhysicalType.DATE); + assertThat(DateTimeUtil.daysToIsoDate((int) value.get())).isEqualTo("1969-12-31"); + } + + @Test + public void testTimestamptz() { + VariantPrimitive value = + SerializedPrimitive.from( + new byte[] { + primitiveHeader(12), + 0x18, + (byte) 0xD3, + (byte) 0xB1, + (byte) 0xD6, + 0x07, + 0x57, + 0x05, + 0x00 + }); + + assertThat(value.type()).isEqualTo(PhysicalType.TIMESTAMPTZ); + assertThat(DateTimeUtil.microsToIsoTimestamptz((long) value.get())) + .isEqualTo("2017-08-18T14:21:01.919+00:00"); + } + + @Test + public void testNegativeTimestamptz() { + VariantPrimitive value = + SerializedPrimitive.from( + new byte[] { + primitiveHeader(12), + (byte) 0xFF, + (byte) 0xFF, + (byte) 0xFF, + (byte) 0xFF, + (byte) 0xFF, + (byte) 0xFF, + (byte) 0xFF, + (byte) 0xFF + }); + + assertThat(value.type()).isEqualTo(PhysicalType.TIMESTAMPTZ); + assertThat(DateTimeUtil.microsToIsoTimestamptz((long) value.get())) + .isEqualTo("1969-12-31T23:59:59.999999+00:00"); + } + + @Test + public void testTimestampntz() { + VariantPrimitive value = + SerializedPrimitive.from( + new byte[] { + primitiveHeader(13), + 0x18, + (byte) 0xD3, + (byte) 0xB1, + (byte) 0xD6, + 0x07, + 0x57, + 0x05, + 0x00 + }); + + assertThat(value.type()).isEqualTo(PhysicalType.TIMESTAMPNTZ); + assertThat(DateTimeUtil.microsToIsoTimestamp((long) value.get())) + .isEqualTo("2017-08-18T14:21:01.919"); + } + + @Test + public void testNegativeTimestampntz() { + VariantPrimitive value = + SerializedPrimitive.from( + new byte[] { + primitiveHeader(13), + (byte) 0xFF, + (byte) 0xFF, + (byte) 0xFF, + (byte) 0xFF, + (byte) 0xFF, + (byte) 0xFF, + (byte) 0xFF, + (byte) 0xFF + }); + + assertThat(value.type()).isEqualTo(PhysicalType.TIMESTAMPNTZ); + assertThat(DateTimeUtil.microsToIsoTimestamp((long) value.get())) + .isEqualTo("1969-12-31T23:59:59.999999"); + } + + @Test + public void testFloat() { + VariantPrimitive value = + SerializedPrimitive.from( + new byte[] {primitiveHeader(14), (byte) 0xD2, 0x02, (byte) 0x96, 0x49}); + + assertThat(value.type()).isEqualTo(PhysicalType.FLOAT); + assertThat(value.get()).isEqualTo(Float.intBitsToFloat(1234567890)); + } + + @Test + public void testNegativeFloat() { + VariantPrimitive value = + SerializedPrimitive.from(new byte[] {primitiveHeader(14), 0x00, 0x00, 0x00, (byte) 0x80}); + + assertThat(value.type()).isEqualTo(PhysicalType.FLOAT); + assertThat(value.get()).isEqualTo(-0.0F); + } + + @Test + public void testBinary() { + VariantPrimitive value = + SerializedPrimitive.from( + new byte[] {primitiveHeader(15), 0x05, 0x00, 0x00, 0x00, 'a', 'b', 'c', 'd', 'e'}); + + assertThat(value.type()).isEqualTo(PhysicalType.BINARY); + assertThat(value.get()).isEqualTo(ByteBuffer.wrap(new byte[] {'a', 'b', 'c', 'd', 'e'})); + } + + @Test + public void testString() { + VariantPrimitive value = + SerializedPrimitive.from( + new byte[] { + primitiveHeader(16), 0x07, 0x00, 0x00, 0x00, 'i', 'c', 'e', 'b', 'e', 'r', 'g' + }); + + assertThat(value.type()).isEqualTo(PhysicalType.STRING); + assertThat(value.get()).isEqualTo("iceberg"); + } + + @Test + public void testShortString() { + VariantPrimitive value = + SerializedShortString.from(new byte[] {0b11101, 'i', 'c', 'e', 'b', 'e', 'r', 'g'}); + + assertThat(value.type()).isEqualTo(PhysicalType.STRING); + assertThat(value.get()).isEqualTo("iceberg"); + } + + @Test + public void testUnsupportedType() { + assertThatThrownBy(() -> SerializedPrimitive.from(new byte[] {primitiveHeader(17)})) + .isInstanceOf(UnsupportedOperationException.class) + .hasMessage("Unknown primitive physical type: 17"); + } + + private static byte primitiveHeader(int primitiveType) { + return (byte) (primitiveType << 2); + } +} diff --git a/core/src/test/java/org/apache/iceberg/variants/TestShreddedObject.java b/core/src/test/java/org/apache/iceberg/variants/TestShreddedObject.java new file mode 100644 index 000000000000..7ce1ea3383d5 --- /dev/null +++ b/core/src/test/java/org/apache/iceberg/variants/TestShreddedObject.java @@ -0,0 +1,446 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.variants; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatThrownBy; + +import java.math.BigDecimal; +import java.nio.ByteBuffer; +import java.nio.ByteOrder; +import java.util.Map; +import java.util.Random; +import java.util.Set; +import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap; +import org.apache.iceberg.relocated.com.google.common.collect.Maps; +import org.apache.iceberg.relocated.com.google.common.collect.Sets; +import org.apache.iceberg.util.DateTimeUtil; +import org.apache.iceberg.util.Pair; +import org.apache.iceberg.util.RandomUtil; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.ValueSource; + +public class TestShreddedObject { + private static final Map FIELDS = + ImmutableMap.of( + "a", + Variants.of(34), + "b", + Variants.of("iceberg"), + "c", + Variants.of(new BigDecimal("12.21"))); + + private final Random random = new Random(871925); + + @Test + public void testShreddedFields() { + ShreddedObject object = createShreddedObject(FIELDS).second(); + + assertThat(object.get("a")).isInstanceOf(VariantPrimitive.class); + assertThat(object.get("a").asPrimitive().get()).isEqualTo(34); + assertThat(object.get("b")).isInstanceOf(VariantPrimitive.class); + assertThat(object.get("b").asPrimitive().get()).isEqualTo("iceberg"); + assertThat(object.get("c")).isInstanceOf(VariantPrimitive.class); + assertThat(object.get("c").asPrimitive().get()).isEqualTo(new BigDecimal("12.21")); + } + + @Test + public void testShreddedSerializationMinimalBuffer() { + Pair pair = createShreddedObject(FIELDS); + SerializedMetadata metadata = pair.first(); + ShreddedObject object = pair.second(); + + VariantValue value = roundTripMinimalBuffer(object, metadata); + + assertThat(value).isInstanceOf(SerializedObject.class); + SerializedObject actual = (SerializedObject) value; + + assertThat(actual.numElements()).isEqualTo(3); + assertThat(actual.get("a")).isInstanceOf(VariantPrimitive.class); + assertThat(actual.get("a").asPrimitive().get()).isEqualTo(34); + assertThat(actual.get("b")).isInstanceOf(VariantPrimitive.class); + assertThat(actual.get("b").asPrimitive().get()).isEqualTo("iceberg"); + assertThat(actual.get("c")).isInstanceOf(VariantPrimitive.class); + assertThat(actual.get("c").asPrimitive().get()).isEqualTo(new BigDecimal("12.21")); + } + + @Test + public void testShreddedSerializationLargeBuffer() { + Pair pair = createShreddedObject(FIELDS); + SerializedMetadata metadata = pair.first(); + ShreddedObject object = pair.second(); + + VariantValue value = roundTripLargeBuffer(object, metadata); + + assertThat(value).isInstanceOf(SerializedObject.class); + SerializedObject actual = (SerializedObject) value; + + assertThat(actual.numElements()).isEqualTo(3); + assertThat(actual.get("a")).isInstanceOf(VariantPrimitive.class); + assertThat(actual.get("a").asPrimitive().get()).isEqualTo(34); + assertThat(actual.get("b")).isInstanceOf(VariantPrimitive.class); + assertThat(actual.get("b").asPrimitive().get()).isEqualTo("iceberg"); + assertThat(actual.get("c")).isInstanceOf(VariantPrimitive.class); + assertThat(actual.get("c").asPrimitive().get()).isEqualTo(new BigDecimal("12.21")); + } + + @Test + public void testUnshreddedObjectSerializationMinimalBuffer() { + Pair pair = createUnshreddedObject(FIELDS); + SerializedMetadata metadata = pair.first(); + ShreddedObject object = pair.second(); + + VariantValue value = roundTripMinimalBuffer(object, metadata); + + assertThat(value).isInstanceOf(SerializedObject.class); + SerializedObject actual = (SerializedObject) value; + + assertThat(actual.numElements()).isEqualTo(3); + assertThat(actual.get("a")).isInstanceOf(VariantPrimitive.class); + assertThat(actual.get("a").asPrimitive().get()).isEqualTo(34); + assertThat(actual.get("b")).isInstanceOf(VariantPrimitive.class); + assertThat(actual.get("b").asPrimitive().get()).isEqualTo("iceberg"); + assertThat(actual.get("c")).isInstanceOf(VariantPrimitive.class); + assertThat(actual.get("c").asPrimitive().get()).isEqualTo(new BigDecimal("12.21")); + } + + @Test + public void testUnshreddedObjectSerializationLargeBuffer() { + Pair pair = createUnshreddedObject(FIELDS); + SerializedMetadata metadata = pair.first(); + ShreddedObject object = pair.second(); + + VariantValue value = roundTripLargeBuffer(object, metadata); + + assertThat(value).isInstanceOf(SerializedObject.class); + SerializedObject actual = (SerializedObject) value; + + assertThat(actual.numElements()).isEqualTo(3); + assertThat(actual.get("a")).isInstanceOf(VariantPrimitive.class); + assertThat(actual.get("a").asPrimitive().get()).isEqualTo(34); + assertThat(actual.get("b")).isInstanceOf(VariantPrimitive.class); + assertThat(actual.get("b").asPrimitive().get()).isEqualTo("iceberg"); + assertThat(actual.get("c")).isInstanceOf(VariantPrimitive.class); + assertThat(actual.get("c").asPrimitive().get()).isEqualTo(new BigDecimal("12.21")); + } + + @Test + public void testPartiallyShreddedObjectReplacement() { + ShreddedObject partial = createUnshreddedObject(FIELDS).second(); + + // replace field c with a new value + partial.put("c", Variants.ofIsoDate("2024-10-12")); + + assertThat(partial.get("a")).isInstanceOf(VariantPrimitive.class); + assertThat(partial.get("a").asPrimitive().get()).isEqualTo(34); + assertThat(partial.get("b")).isInstanceOf(VariantPrimitive.class); + assertThat(partial.get("b").asPrimitive().get()).isEqualTo("iceberg"); + assertThat(partial.get("c")).isInstanceOf(VariantPrimitive.class); + assertThat(partial.get("c").type()).isEqualTo(Variants.PhysicalType.DATE); + assertThat(partial.get("c").asPrimitive().get()) + .isEqualTo(DateTimeUtil.isoDateToDays("2024-10-12")); + } + + @Test + public void testPartiallyShreddedObjectGetMissingField() { + ShreddedObject partial = createUnshreddedObject(FIELDS).second(); + + // missing fields are returned as null + assertThat(partial.get("d")).isNull(); + } + + @Test + public void testPartiallyShreddedObjectPutMissingFieldFailure() { + ShreddedObject partial = createUnshreddedObject(FIELDS).second(); + + // d is not defined in the variant metadata and will fail + assertThatThrownBy(() -> partial.put("d", Variants.ofIsoDate("2024-10-12"))) + .isInstanceOf(IllegalArgumentException.class) + .hasMessage("Cannot find field name in metadata: d"); + } + + @Test + public void testPartiallyShreddedObjectSerializationMinimalBuffer() { + Pair pair = createUnshreddedObject(FIELDS); + SerializedMetadata metadata = pair.first(); + ShreddedObject partial = pair.second(); + + // replace field c with a new value + partial.put("c", Variants.ofIsoDate("2024-10-12")); + + VariantValue value = roundTripMinimalBuffer(partial, metadata); + + assertThat(value).isInstanceOf(SerializedObject.class); + SerializedObject actual = (SerializedObject) value; + + assertThat(actual.get("a")).isInstanceOf(VariantPrimitive.class); + assertThat(actual.get("a").asPrimitive().get()).isEqualTo(34); + assertThat(actual.get("b")).isInstanceOf(VariantPrimitive.class); + assertThat(actual.get("b").asPrimitive().get()).isEqualTo("iceberg"); + assertThat(actual.get("c")).isInstanceOf(VariantPrimitive.class); + assertThat(actual.get("c").type()).isEqualTo(Variants.PhysicalType.DATE); + assertThat(actual.get("c").asPrimitive().get()) + .isEqualTo(DateTimeUtil.isoDateToDays("2024-10-12")); + } + + @Test + public void testPartiallyShreddedObjectSerializationLargeBuffer() { + Pair pair = createUnshreddedObject(FIELDS); + SerializedMetadata metadata = pair.first(); + ShreddedObject partial = pair.second(); + + // replace field c with a new value + partial.put("c", Variants.ofIsoDate("2024-10-12")); + + VariantValue value = roundTripLargeBuffer(partial, metadata); + + assertThat(value).isInstanceOf(SerializedObject.class); + SerializedObject actual = (SerializedObject) value; + + assertThat(actual.get("a")).isInstanceOf(VariantPrimitive.class); + assertThat(actual.get("a").asPrimitive().get()).isEqualTo(34); + assertThat(actual.get("b")).isInstanceOf(VariantPrimitive.class); + assertThat(actual.get("b").asPrimitive().get()).isEqualTo("iceberg"); + assertThat(actual.get("c")).isInstanceOf(VariantPrimitive.class); + assertThat(actual.get("c").type()).isEqualTo(Variants.PhysicalType.DATE); + assertThat(actual.get("c").asPrimitive().get()) + .isEqualTo(DateTimeUtil.isoDateToDays("2024-10-12")); + } + + @Test + public void testTwoByteOffsets() { + // a string larger than 255 bytes to push the value offset size above 1 byte + String randomString = RandomUtil.generateString(300, random); + SerializedPrimitive bigString = VariantTestUtil.createString(randomString); + + Map data = Maps.newHashMap(); + data.putAll(FIELDS); + data.put("big", bigString); + + Pair pair = createShreddedObject(data); + VariantValue value = roundTripLargeBuffer(pair.second(), pair.first()); + + assertThat(value.type()).isEqualTo(Variants.PhysicalType.OBJECT); + SerializedObject object = (SerializedObject) value; + assertThat(object.numElements()).isEqualTo(4); + + assertThat(object.get("a").type()).isEqualTo(Variants.PhysicalType.INT32); + assertThat(object.get("a").asPrimitive().get()).isEqualTo(34); + assertThat(object.get("b").type()).isEqualTo(Variants.PhysicalType.STRING); + assertThat(object.get("b").asPrimitive().get()).isEqualTo("iceberg"); + assertThat(object.get("c").type()).isEqualTo(Variants.PhysicalType.DECIMAL4); + assertThat(object.get("c").asPrimitive().get()).isEqualTo(new BigDecimal("12.21")); + assertThat(object.get("big").type()).isEqualTo(Variants.PhysicalType.STRING); + assertThat(object.get("big").asPrimitive().get()).isEqualTo(randomString); + } + + @Test + public void testThreeByteOffsets() { + // a string larger than 65535 bytes to push the value offset size above 2 bytes + String randomString = RandomUtil.generateString(70_000, random); + SerializedPrimitive reallyBigString = VariantTestUtil.createString(randomString); + + Map data = Maps.newHashMap(); + data.putAll(FIELDS); + data.put("really-big", reallyBigString); + + Pair pair = createShreddedObject(data); + VariantValue value = roundTripLargeBuffer(pair.second(), pair.first()); + + assertThat(value.type()).isEqualTo(Variants.PhysicalType.OBJECT); + SerializedObject object = (SerializedObject) value; + assertThat(object.numElements()).isEqualTo(4); + + assertThat(object.get("a").type()).isEqualTo(Variants.PhysicalType.INT32); + assertThat(object.get("a").asPrimitive().get()).isEqualTo(34); + assertThat(object.get("b").type()).isEqualTo(Variants.PhysicalType.STRING); + assertThat(object.get("b").asPrimitive().get()).isEqualTo("iceberg"); + assertThat(object.get("c").type()).isEqualTo(Variants.PhysicalType.DECIMAL4); + assertThat(object.get("c").asPrimitive().get()).isEqualTo(new BigDecimal("12.21")); + assertThat(object.get("really-big").type()).isEqualTo(Variants.PhysicalType.STRING); + assertThat(object.get("really-big").asPrimitive().get()).isEqualTo(randomString); + } + + @Test + public void testFourByteOffsets() { + // a string larger than 16777215 bytes to push the value offset size above 3 bytes + String randomString = RandomUtil.generateString(16_777_300, random); + SerializedPrimitive reallyBigString = VariantTestUtil.createString(randomString); + + Map data = Maps.newHashMap(); + data.putAll(FIELDS); + data.put("really-big", reallyBigString); + + Pair pair = createShreddedObject(data); + VariantValue value = roundTripLargeBuffer(pair.second(), pair.first()); + + assertThat(value.type()).isEqualTo(Variants.PhysicalType.OBJECT); + SerializedObject object = (SerializedObject) value; + assertThat(object.numElements()).isEqualTo(4); + + assertThat(object.get("a").type()).isEqualTo(Variants.PhysicalType.INT32); + assertThat(object.get("a").asPrimitive().get()).isEqualTo(34); + assertThat(object.get("b").type()).isEqualTo(Variants.PhysicalType.STRING); + assertThat(object.get("b").asPrimitive().get()).isEqualTo("iceberg"); + assertThat(object.get("c").type()).isEqualTo(Variants.PhysicalType.DECIMAL4); + assertThat(object.get("c").asPrimitive().get()).isEqualTo(new BigDecimal("12.21")); + assertThat(object.get("really-big").type()).isEqualTo(Variants.PhysicalType.STRING); + assertThat(object.get("really-big").asPrimitive().get()).isEqualTo(randomString); + } + + @ParameterizedTest + @ValueSource(booleans = {true, false}) + @SuppressWarnings({"unchecked", "rawtypes"}) + public void testLargeObject(boolean sortFieldNames) { + Map> fields = Maps.newHashMap(); + for (int i = 0; i < 10_000; i += 1) { + fields.put( + RandomUtil.generateString(10, random), + Variants.of(RandomUtil.generateString(10, random))); + } + + SerializedMetadata metadata = + SerializedMetadata.from(VariantTestUtil.createMetadata(fields.keySet(), sortFieldNames)); + + ShreddedObject shredded = createShreddedObject(metadata, (Map) fields); + VariantValue value = roundTripLargeBuffer(shredded, metadata); + + assertThat(value.type()).isEqualTo(Variants.PhysicalType.OBJECT); + SerializedObject object = (SerializedObject) value; + assertThat(object.numElements()).isEqualTo(10_000); + + for (Map.Entry> entry : fields.entrySet()) { + VariantValue fieldValue = object.get(entry.getKey()); + assertThat(fieldValue.type()).isEqualTo(Variants.PhysicalType.STRING); + assertThat(fieldValue.asPrimitive().get()).isEqualTo(entry.getValue().get()); + } + } + + @ParameterizedTest + @ValueSource(booleans = {true, false}) + public void testTwoByteFieldIds(boolean sortFieldNames) { + Set keySet = Sets.newHashSet(); + for (int i = 0; i < 10_000; i += 1) { + keySet.add(RandomUtil.generateString(10, random)); + } + + Map data = + ImmutableMap.of("aa", FIELDS.get("a"), "AA", FIELDS.get("b"), "ZZ", FIELDS.get("c")); + + // create metadata from the large key set and the actual keys + keySet.addAll(data.keySet()); + SerializedMetadata metadata = + SerializedMetadata.from(VariantTestUtil.createMetadata(keySet, sortFieldNames)); + + ShreddedObject shredded = createShreddedObject(metadata, data); + VariantValue value = roundTripLargeBuffer(shredded, metadata); + + assertThat(value.type()).isEqualTo(Variants.PhysicalType.OBJECT); + SerializedObject object = (SerializedObject) value; + assertThat(object.numElements()).isEqualTo(3); + + assertThat(object.get("aa").type()).isEqualTo(Variants.PhysicalType.INT32); + assertThat(object.get("aa").asPrimitive().get()).isEqualTo(34); + assertThat(object.get("AA").type()).isEqualTo(Variants.PhysicalType.STRING); + assertThat(object.get("AA").asPrimitive().get()).isEqualTo("iceberg"); + assertThat(object.get("ZZ").type()).isEqualTo(Variants.PhysicalType.DECIMAL4); + assertThat(object.get("ZZ").asPrimitive().get()).isEqualTo(new BigDecimal("12.21")); + } + + @ParameterizedTest + @ValueSource(booleans = {true, false}) + public void testThreeByteFieldIds(boolean sortFieldNames) { + Set keySet = Sets.newHashSet(); + for (int i = 0; i < 100_000; i += 1) { + keySet.add(RandomUtil.generateString(10, random)); + } + + Map data = + ImmutableMap.of("aa", FIELDS.get("a"), "AA", FIELDS.get("b"), "ZZ", FIELDS.get("c")); + + // create metadata from the large key set and the actual keys + keySet.addAll(data.keySet()); + SerializedMetadata metadata = + SerializedMetadata.from(VariantTestUtil.createMetadata(keySet, sortFieldNames)); + + ShreddedObject shredded = createShreddedObject(metadata, data); + VariantValue value = roundTripLargeBuffer(shredded, metadata); + + assertThat(value.type()).isEqualTo(Variants.PhysicalType.OBJECT); + SerializedObject object = (SerializedObject) value; + assertThat(object.numElements()).isEqualTo(3); + + assertThat(object.get("aa").type()).isEqualTo(Variants.PhysicalType.INT32); + assertThat(object.get("aa").asPrimitive().get()).isEqualTo(34); + assertThat(object.get("AA").type()).isEqualTo(Variants.PhysicalType.STRING); + assertThat(object.get("AA").asPrimitive().get()).isEqualTo("iceberg"); + assertThat(object.get("ZZ").type()).isEqualTo(Variants.PhysicalType.DECIMAL4); + assertThat(object.get("ZZ").asPrimitive().get()).isEqualTo(new BigDecimal("12.21")); + } + + static VariantValue roundTripMinimalBuffer(ShreddedObject object, SerializedMetadata metadata) { + ByteBuffer serialized = + ByteBuffer.allocate(object.sizeInBytes()).order(ByteOrder.LITTLE_ENDIAN); + object.writeTo(serialized, 0); + + return Variants.from(metadata, serialized); + } + + static VariantValue roundTripLargeBuffer(ShreddedObject object, SerializedMetadata metadata) { + ByteBuffer serialized = + ByteBuffer.allocate(1000 + object.sizeInBytes()).order(ByteOrder.LITTLE_ENDIAN); + object.writeTo(serialized, 300); + + ByteBuffer slice = serialized.duplicate().order(ByteOrder.LITTLE_ENDIAN); + slice.position(300); + slice.limit(300 + object.sizeInBytes()); + + return Variants.from(metadata, slice); + } + + private static ShreddedObject createShreddedObject( + SerializedMetadata metadata, Map fields) { + ShreddedObject object = new ShreddedObject(metadata); + for (Map.Entry field : fields.entrySet()) { + object.put(field.getKey(), field.getValue()); + } + + return object; + } + + private static Pair createShreddedObject( + Map fields) { + ByteBuffer metadataBuffer = VariantTestUtil.createMetadata(fields.keySet(), false); + SerializedMetadata metadata = SerializedMetadata.from(metadataBuffer); + return Pair.of(metadata, createShreddedObject(metadata, fields)); + } + + private static Pair createUnshreddedObject( + Map fields) { + SerializedObject serialized = createSerializedObject(fields); + return Pair.of(serialized.metadata(), new ShreddedObject(serialized)); + } + + private static SerializedObject createSerializedObject(Map fields) { + ByteBuffer metadataBuffer = VariantTestUtil.createMetadata(fields.keySet(), false); + return (SerializedObject) + Variants.from(metadataBuffer, VariantTestUtil.createObject(metadataBuffer, fields)); + } +} diff --git a/core/src/test/java/org/apache/iceberg/variants/TestVariantUtil.java b/core/src/test/java/org/apache/iceberg/variants/TestVariantUtil.java new file mode 100644 index 000000000000..9d9536fbf0d6 --- /dev/null +++ b/core/src/test/java/org/apache/iceberg/variants/TestVariantUtil.java @@ -0,0 +1,44 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.variants; + +import static org.assertj.core.api.Assertions.assertThat; + +import java.nio.ByteBuffer; +import org.junit.jupiter.api.Test; + +public class TestVariantUtil { + @Test + public void testReadByteUnsigned() { + ByteBuffer buffer = ByteBuffer.wrap(new byte[] {(byte) 0xFF}); + assertThat(VariantUtil.readByte(buffer, 0)).isEqualTo(255); + } + + @Test + public void testRead2ByteUnsigned() { + ByteBuffer buffer = ByteBuffer.wrap(new byte[] {(byte) 0xFF, (byte) 0xFF}); + assertThat(VariantUtil.readLittleEndianUnsigned(buffer, 0, 2)).isEqualTo(65535); + } + + @Test + public void testRead3ByteUnsigned() { + ByteBuffer buffer = ByteBuffer.wrap(new byte[] {(byte) 0xFF, (byte) 0xFF, (byte) 0xFF}); + assertThat(VariantUtil.readLittleEndianUnsigned(buffer, 0, 3)).isEqualTo(16777215); + } +} diff --git a/core/src/test/java/org/apache/iceberg/variants/VariantTestUtil.java b/core/src/test/java/org/apache/iceberg/variants/VariantTestUtil.java new file mode 100644 index 000000000000..b6caec63758a --- /dev/null +++ b/core/src/test/java/org/apache/iceberg/variants/VariantTestUtil.java @@ -0,0 +1,214 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.variants; + +import java.nio.ByteBuffer; +import java.nio.ByteOrder; +import java.nio.charset.StandardCharsets; +import java.util.Collection; +import java.util.List; +import java.util.Map; +import java.util.stream.Collectors; +import java.util.stream.Stream; +import org.apache.iceberg.relocated.com.google.common.base.Preconditions; + +public class VariantTestUtil { + private VariantTestUtil() {} + + private static byte primitiveHeader(int primitiveType) { + return (byte) (primitiveType << 2); + } + + private static byte metadataHeader(boolean isSorted, int offsetSize) { + return (byte) (((offsetSize - 1) << 6) | (isSorted ? 0b10000 : 0) | 0b0001); + } + + /** A hacky absolute put for ByteBuffer */ + private static int writeBufferAbsolute(ByteBuffer buffer, int offset, ByteBuffer toCopy) { + int originalPosition = buffer.position(); + buffer.position(offset); + ByteBuffer copy = toCopy.duplicate(); + buffer.put(copy); // duplicate so toCopy is not modified + buffer.position(originalPosition); + Preconditions.checkArgument(copy.remaining() <= 0, "Not fully written"); + return toCopy.remaining(); + } + + /** Creates a random string primitive of the given length for forcing large offset sizes */ + static SerializedPrimitive createString(String string) { + byte[] utf8 = string.getBytes(StandardCharsets.UTF_8); + ByteBuffer buffer = ByteBuffer.allocate(5 + utf8.length).order(ByteOrder.LITTLE_ENDIAN); + buffer.put(0, primitiveHeader(16)); + buffer.putInt(1, utf8.length); + writeBufferAbsolute(buffer, 5, ByteBuffer.wrap(utf8)); + return SerializedPrimitive.from(buffer, buffer.get(0)); + } + + static ByteBuffer createMetadata(Collection fieldNames, boolean sortNames) { + if (fieldNames.isEmpty()) { + return SerializedMetadata.EMPTY_V1_BUFFER; + } + + int numElements = fieldNames.size(); + Stream names = sortNames ? fieldNames.stream().sorted() : fieldNames.stream(); + ByteBuffer[] nameBuffers = + names + .map(str -> ByteBuffer.wrap(str.getBytes(StandardCharsets.UTF_8))) + .toArray(ByteBuffer[]::new); + + int dataSize = 0; + for (ByteBuffer nameBuffer : nameBuffers) { + dataSize += nameBuffer.remaining(); + } + + int offsetSize = VariantUtil.sizeOf(dataSize); + int offsetListOffset = 1 /* header size */ + offsetSize /* dictionary size */; + int dataOffset = offsetListOffset + ((1 + numElements) * offsetSize); + int totalSize = dataOffset + dataSize; + + byte header = metadataHeader(sortNames, offsetSize); + ByteBuffer buffer = ByteBuffer.allocate(totalSize).order(ByteOrder.LITTLE_ENDIAN); + + buffer.put(0, header); + VariantUtil.writeLittleEndianUnsigned(buffer, numElements, 1, offsetSize); + + // write offsets and strings + int nextOffset = 0; + int index = 0; + for (ByteBuffer nameBuffer : nameBuffers) { + // write the offset and the string + VariantUtil.writeLittleEndianUnsigned( + buffer, nextOffset, offsetListOffset + (index * offsetSize), offsetSize); + int nameSize = writeBufferAbsolute(buffer, dataOffset + nextOffset, nameBuffer); + // update the offset and index + nextOffset += nameSize; + index += 1; + } + + // write the final size of the data section + VariantUtil.writeLittleEndianUnsigned( + buffer, nextOffset, offsetListOffset + (index * offsetSize), offsetSize); + + return buffer; + } + + static ByteBuffer createObject(ByteBuffer metadataBuffer, Map data) { + // create the metadata to look up field names + SerializedMetadata metadata = SerializedMetadata.from(metadataBuffer); + + int numElements = data.size(); + boolean isLarge = numElements > 0xFF; + + int dataSize = 0; + for (Map.Entry field : data.entrySet()) { + dataSize += field.getValue().sizeInBytes(); + } + + // field ID size is the size needed to store the largest field ID in the data + int fieldIdSize = VariantUtil.sizeOf(metadata.dictionarySize()); + int fieldIdListOffset = 1 /* header size */ + (isLarge ? 4 : 1) /* num elements size */; + + // offset size is the size needed to store the length of the data section + int offsetSize = VariantUtil.sizeOf(dataSize); + int offsetListOffset = fieldIdListOffset + (numElements * fieldIdSize); + int dataOffset = offsetListOffset + ((1 + numElements) * offsetSize); + int totalSize = dataOffset + dataSize; + + byte header = VariantUtil.objectHeader(isLarge, fieldIdSize, offsetSize); + ByteBuffer buffer = ByteBuffer.allocate(totalSize).order(ByteOrder.LITTLE_ENDIAN); + + buffer.put(0, header); + if (isLarge) { + buffer.putInt(1, numElements); + } else { + buffer.put(1, (byte) (numElements & 0xFF)); + } + + // write field IDs, values, and offsets + int nextOffset = 0; + int index = 0; + List sortedFieldNames = data.keySet().stream().sorted().collect(Collectors.toList()); + for (String fieldName : sortedFieldNames) { + int id = metadata.id(fieldName); + VariantUtil.writeLittleEndianUnsigned( + buffer, id, fieldIdListOffset + (index * fieldIdSize), fieldIdSize); + VariantUtil.writeLittleEndianUnsigned( + buffer, nextOffset, offsetListOffset + (index * offsetSize), offsetSize); + int valueSize = data.get(fieldName).writeTo(buffer, dataOffset + nextOffset); + + // update next offset and index + nextOffset += valueSize; + index += 1; + } + + // write the final size of the data section + VariantUtil.writeLittleEndianUnsigned( + buffer, nextOffset, offsetListOffset + (index * offsetSize), offsetSize); + + return buffer; + } + + static ByteBuffer createArray(Variants.Serialized... values) { + int numElements = values.length; + boolean isLarge = numElements > 0xFF; + + int dataSize = 0; + for (Variants.Serialized value : values) { + // TODO: produce size for every variant without serializing + dataSize += value.buffer().remaining(); + } + + // offset size is the size needed to store the length of the data section + int offsetSize = VariantUtil.sizeOf(dataSize); + int offsetListOffset = 1 /* header size */ + (isLarge ? 4 : 1) /* num elements size */; + int dataOffset = offsetListOffset + ((1 + numElements) * offsetSize) /* offset list size */; + int totalSize = dataOffset + dataSize; + + byte header = VariantUtil.arrayHeader(isLarge, offsetSize); + ByteBuffer buffer = ByteBuffer.allocate(totalSize).order(ByteOrder.LITTLE_ENDIAN); + + buffer.put(0, header); + if (isLarge) { + buffer.putInt(1, numElements); + } else { + buffer.put(1, (byte) (numElements & 0xFF)); + } + + // write values and offsets + int nextOffset = 0; // the first offset is always 0 + int index = 0; + for (Variants.Serialized value : values) { + // write the offset and value + VariantUtil.writeLittleEndianUnsigned( + buffer, nextOffset, offsetListOffset + (index * offsetSize), offsetSize); + // in a real implementation, the buffer should be passed to serialize + ByteBuffer valueBuffer = value.buffer(); + int valueSize = writeBufferAbsolute(buffer, dataOffset + nextOffset, valueBuffer); + // update next offset and index + nextOffset += valueSize; + index += 1; + } + + // write the final size of the data section + VariantUtil.writeLittleEndianUnsigned( + buffer, nextOffset, offsetListOffset + (index * offsetSize), offsetSize); + + return buffer; + } +}