-
Notifications
You must be signed in to change notification settings - Fork 2.3k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Core: Add Variant implementation to read serialized objects #11415
base: main
Are you sure you want to change the base?
Conversation
@aihuaxu I cleaned up my implementation, added tests, and fixed quite a few bugs. Please take a look to help validate that it implements the spec correctly. Thanks! |
My main overall question on this is whether or not this implementation belongs in the Iceberg project or in the Parquet project? I'm a little worried about a proliferation of implementations especially if we potentially would use two different implementations within the same code path (1 for spark and 1 for core) I'll do a real review later |
ByteBuffer buffer(); | ||
} | ||
|
||
public interface Metadata extends Serialized { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think it makes sense to have VariantPrimitive, VariantArray, VariantObject classes which implements VariantAccessor (or similar name). We probably shouldn't expose Metadata or Value directly since those are more internal representation of a Variant.
Similarly buffer()
may not need to be exposed as well if possible?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
As part of cleaning up the API, I've made this package-private. The methods that matter are writeTo
and sizeInBytes
on VariantValue
.
package org.apache.iceberg.variants; | ||
|
||
/** A variant metadata and value pair. */ | ||
public interface Variant { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Currently this is not used. We include for the future use?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think this would be what we use eventually in Type
for passing these. It will also be what is returned when reading, at least in Iceberg's object model.
* fields. This also does not allow updating or replacing the metadata for the unshredded object, | ||
* which could require recursively rewriting field IDs. | ||
*/ | ||
class ShreddedObject implements VariantObject { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Are you intentionally including shredded implementation in this PR? We probably should focus on the basic Variant core read in this PR.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm basically targeting reads in this PR, not just unshredded. Is that an issue? I think it's good to have this to produce serialized variant objects.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not an issue. I was thinking that we can review the original change without shredding faster so we can merge quickly. I haven't reviewed this ShreddedObject yet. Let me check out then.
case BINARY: | ||
return 5 + ((ByteBuffer) value).remaining(); // 1 header + 4 length + value length |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not very sure about remaining method but as I read the doc, would it be inaccurate if part of the buffer is read?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This uses the ByteBuffer
to pass a section of the backing array. If the position or limit of that buffer were changed, it would change the value. You're right that reading part of the buffer could potentially change the position and have that effect, but this is still a valid use of ByteBuffer
. We use the position and limit to track a section of the storage, not for state.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Let's say we have a PrimitiveWrapper of a binary b = new PrimitiveWrapper( binary, 0x1234)
, would we always expect b.sizeInBytes() always to be 5 + 2 and we shouldn't change that for this method contract?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm not sure I follow what you're suggesting.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Here is what I mean: if I wrap an integer in TRUE value in PrimitiveWrapper, then the size is always 1 (that seems to make sense even if the caller reads the value through PrimitiveWrapper.get()) . While if I wrap a binary, then the size would change if the value is read through PrimitiveWrapper.get().
I may not get "We use the position and limit to track a section of the storage, not for state.". I assume that PrimitiveWrapper accepts a value, then the size would never change?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Here is what I mean with the following test:
primitive input: Variants.of(ByteBuffer.wrap(new byte[] {0x0a, 0x0b, 0x0c, 0x0d}))
@ParameterizedTest
@FieldSource("PRIMITIVES")
public void testPrimitiveValueSerialization(VariantPrimitive<?> primitive) {
((ByteBuffer)primitive.get()).get(); -- add this line then
// write the value to the middle of a large buffer
int size = primitive.sizeInBytes();
ByteBuffer buffer = ByteBuffer.allocate(size + 1000).order(ByteOrder.LITTLE_ENDIAN);
primitive.writeTo(buffer, 300);
...
}
We would still expect to write the whole ByteBuffer of 9 bytes while now we are writing 8 bytes. That is not expected, right?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@aihuaxu, if you modify the buffer then it will change the value. What you're suggesting is equivalent to other modifications, like this:
byte[] arr = new byte[] { 0x0A, 0x0B, 0x0C, 0x0D };
VariantPrimitive<ByteBuffer> primitive = Variants.of(ByteBuffer.wrap(arr));
// modify the primitive value
arr[0] = 0;
...
If you modify the value then it is going to be different. We could duplicate the incoming buffer to avoid the case where position
is modified, but not the case where the backing byte array is modified. That would be okay, but I don't see a lot of value in worrying about that case. If you pass a buffer into these classes, you should not later modify that buffer.
} | ||
|
||
@Override | ||
default VariantArray asArray() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we have a code coverage for this?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not yet. I'm debating whether to have a type for reconstruction, like ShreddedObject
for arrays.
ARRAY | ||
} | ||
|
||
public static VariantValue from(ByteBuffer metadata, ByteBuffer value) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Personally I would like to refactor those static methods into their own interface.
public static VariantValue from(ByteBuffer metadata, ByteBuffer value)
static VariantValue from(SerializedMetadata metadata, ByteBuffer value)
- Move of to VariantPrimitive class.
But it's personal preference.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think it would have to be on either VariantValue
(which could be any value), here in Variants
, or in Variant
. I put it here to match the conventions from other parts of the library. For example, there are similar factory methods in many places: Literals.from
, Types.fromPrimitiveString
, and Expressions.*
core/src/main/java/org/apache/iceberg/variants/SerializedObject.java
Outdated
Show resolved
Hide resolved
The Spark failures are a port conflict. I think it's unrelated to these changes. We'll see the next time CI runs (I'm sure we'll have more changes to trigger them) |
public class Variants { | ||
private Variants() {} | ||
|
||
enum LogicalType { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I was thinking of simplifying the types to BasicType and PrimitiveType only here. LogicalType and PhysicalType may not needed? But we can revisit later if needed.
enum BasicType {
PRIMITIVE(0),
SHORT_STRING(1),
OBJECT(2),
ARRAY(3);
}
and
public enum PrimitiveType {
NULL(0),
TRUE(1),
FALSE(2),
...
}
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I've left these package-private for now. That gives us the flexibility to remove them later if needed.
assertThat(object.get("ZZ").asPrimitive().get()).isEqualTo(new BigDecimal("12.21")); | ||
} | ||
|
||
static VariantValue roundTripMinimalBuffer(ShreddedObject object, SerializedMetadata metadata) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: private scope?
return Variants.from(metadata, serialized); | ||
} | ||
|
||
static VariantValue roundTripLargeBuffer(ShreddedObject object, SerializedMetadata metadata) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Same here. nit: private scope?
} | ||
|
||
@Override | ||
public VariantValue get(String field) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I see what we are implementing in ShreddedObject: basically we are providing the same interface get(String field) as regular VariantObject.
Given the following example, assume event.location.latitude
is shredded while event.location.longitude
is not. How do we model shredded event
object - where to place the field location
? Of course, from read side, it doesn't matter if the we place location in shredded or unshredded. We check both.
event {
event_id;
location {
latitude;
longitude;
}
}
"b", | ||
Variants.of("iceberg"), | ||
"c", | ||
Variants.of(new BigDecimal("12.21"))); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In our ShreddedObject model, the field values can be a VariantObject as well which could be a ShreddedObject, right?
Can we add such coverage?
I think I have a question for PrimitiveWrapper.sizeInBytes() for binary. Otherwise, looks good to me. |
This PR adds an implementation of the Variant encoding spec that can read and construct serialized Variant buffers. This implementation was written using only the spec to validate that the spec is reasonably complete.
The public API interfaces are in core and consist of:
Variant
: a wrapper interface ofVariantMetadata
andVariantValue
VariantMetadata
: metadata dictionary for valuesVariantValue
: a generic interface for values that provides serialization toByteBuffer
VariantPrimitive
: a primitive valueVariantObject
: a variant object value withget(String)
to retrieve values by nameVariantArray
: a variant array value withget(int)
to retrieve values by positionThe implementation uses
ByteBuffer
and avoids copying. Values are lazily loaded as they are accessed and are initialized using slices of the parent buffer. Reads do not modify the original buffer. All buffers must use little-endian.Most testing is done by constructing variant cases as byte array constants. Many of these values can be used to check other implementations and may be added to the spec. This also includes test methods to create metadata, arrays, and objects for more complex cases such as multi-byte field IDs and offsets.