Skip to content

Commit

Permalink
Use PlannedDataReader in RawDecoder.
Browse files Browse the repository at this point in the history
  • Loading branch information
rdblue committed Dec 16, 2024
1 parent 49aeed5 commit f3cd33a
Show file tree
Hide file tree
Showing 4 changed files with 41 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -103,9 +103,7 @@ public void addSchema(org.apache.iceberg.Schema writeSchema) {

private void addSchema(Schema writeSchema) {
long fp = SchemaNormalization.parsingFingerprint64(writeSchema);
RawDecoder decoder =
new RawDecoder<>(
readSchema, avroSchema -> DataReader.create(readSchema, avroSchema), writeSchema);
RawDecoder<D> decoder = RawDecoder.create(readSchema, PlannedDataReader::create, writeSchema);
decoders.put(fp, decoder);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -124,7 +124,7 @@ public ValueReader<?> map(Type ignored, Schema map, ValueReader<?> valueReader)
}

@Override
public ValueReader<?> primitive(Type ignored, Schema primitive) {
public ValueReader<?> primitive(Type partner, Schema primitive) {
LogicalType logicalType = primitive.getLogicalType();
if (logicalType != null) {
switch (logicalType.getName()) {
Expand Down Expand Up @@ -159,10 +159,16 @@ public ValueReader<?> primitive(Type ignored, Schema primitive) {
case BOOLEAN:
return ValueReaders.booleans();
case INT:
if (partner != null && partner.typeId() == Type.TypeID.LONG) {
return ValueReaders.intsAsLongs();
}
return ValueReaders.ints();
case LONG:
return ValueReaders.longs();
case FLOAT:
if (partner != null && partner.typeId() == Type.TypeID.DOUBLE) {
return ValueReaders.floatsAsDoubles();
}
return ValueReaders.floats();
case DOUBLE:
return ValueReaders.doubles();
Expand Down
32 changes: 32 additions & 0 deletions core/src/main/java/org/apache/iceberg/data/avro/RawDecoder.java
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,27 @@
public class RawDecoder<D> extends MessageDecoder.BaseDecoder<D> {
private static final ThreadLocal<BinaryDecoder> DECODER = new ThreadLocal<>();

/**
* Creates a new {@link MessageDecoder} that constructs datum instances described by the {@link
* Schema readSchema}.
*
* <p>The {@code readSchema} is used for the expected schema and the {@code writeSchema} is the
* schema used to decode buffers. The {@code writeSchema} must be the schema that was used to
* encode all buffers decoded by this class.
*
* @param readSchema an Iceberg schema to produce when reading
* @param readerFunction a function that produces a DatumReader from the read schema
* @param writeSchema an Avro schema that describes serialized data to be read
*/
public static <D> RawDecoder<D> create(
org.apache.iceberg.Schema readSchema,
Function<org.apache.iceberg.Schema, DatumReader<D>> readerFunction,
Schema writeSchema) {
DatumReader<D> reader = readerFunction.apply(readSchema);
reader.setSchema(writeSchema);
return new RawDecoder<>(reader);
}

private final DatumReader<D> reader;

/**
Expand All @@ -42,7 +63,11 @@ public class RawDecoder<D> extends MessageDecoder.BaseDecoder<D> {
* <p>The {@code readSchema} is used for the expected schema and the {@code writeSchema} is the
* schema used to decode buffers. The {@code writeSchema} must be the schema that was used to
* encode all buffers decoded by this class.
*
* @deprecated will be removed in 2.0.0; use {@link #create(org.apache.iceberg.Schema, Function,
* Schema)} instead
*/
@Deprecated
public RawDecoder(
org.apache.iceberg.Schema readSchema,
Function<Schema, DatumReader<?>> readerFunction,
Expand All @@ -51,6 +76,13 @@ public RawDecoder(
this.reader.setSchema(writeSchema);
}

/**
* Creates a new {@link MessageDecoder} that constructs datum instances using the {@code reader}.
*/
private RawDecoder(DatumReader<D> reader) {
this.reader = reader;
}

@Override
public D decode(InputStream stream, D reuse) {
BinaryDecoder decoder = DecoderFactory.get().directBinaryDecoder(stream, DECODER.get());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ public StandardKeyMetadata decode(InputStream stream, StandardKeyMetadata reuse)
RawDecoder<StandardKeyMetadata> decoder = decoders.get(writeSchemaVersion);

if (decoder == null) {
decoder = new RawDecoder<>(readSchema, GenericAvroReader::create, writeSchema);
decoder = RawDecoder.create(readSchema, GenericAvroReader::create, writeSchema);

decoders.put(writeSchemaVersion, decoder);
}
Expand Down

0 comments on commit f3cd33a

Please sign in to comment.