Skip to content

Commit

Permalink
PARQUET-2292: Improve default SpecificRecord model selection for Avro…
Browse files Browse the repository at this point in the history
…{Write,Read}Support (apache#1091)

This commit contains following patches:

* PARQUET-2265: Don't set default Model in AvroParquetWriter (apache#1049)
- Don't set default Model in AvroParquetWriter
- Test that data model is parsed from Configuration

* PARQUET-2292: Default SpecificRecord model reflects from MODEL$ field (apache#1078)
  • Loading branch information
clairemcginty authored May 6, 2023
1 parent bc5b658 commit 5b62b43
Show file tree
Hide file tree
Showing 10 changed files with 491 additions and 5 deletions.
24 changes: 24 additions & 0 deletions parquet-avro/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,30 @@
<type>test-jar</type>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.mockito</groupId>
<artifactId>mockito-core</artifactId>
<version>2.23.0</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.powermock</groupId>
<artifactId>powermock-module-junit4</artifactId>
<version>${powermock.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.powermock</groupId>
<artifactId>powermock-core</artifactId>
<version>${powermock.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.powermock</groupId>
<artifactId>powermock-api-mockito2</artifactId>
<version>${powermock.version}</version>
<scope>test</scope>
</dependency>
</dependencies>

<build>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -160,7 +160,7 @@ private static <T> WriteSupport<T> writeSupport(Configuration conf,

public static class Builder<T> extends ParquetWriter.Builder<T, Builder<T>> {
private Schema schema = null;
private GenericData model = SpecificData.get();
private GenericData model = null;

private Builder(Path file) {
super(file);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,8 @@
import org.apache.parquet.hadoop.api.ReadSupport;
import org.apache.parquet.io.api.RecordMaterializer;
import org.apache.parquet.schema.MessageType;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
* Avro implementation of {@link ReadSupport} for avro generic, specific, and
Expand All @@ -37,6 +39,8 @@
*/
public class AvroReadSupport<T> extends ReadSupport<T> {

private static final Logger LOG = LoggerFactory.getLogger(AvroReadSupport.class);

public static String AVRO_REQUESTED_PROJECTION = "parquet.avro.projection";
private static final String AVRO_READ_SCHEMA = "parquet.avro.read.schema";

Expand Down Expand Up @@ -134,7 +138,7 @@ public RecordMaterializer<T> prepareForRead(
avroSchema = new AvroSchemaConverter(configuration).convert(parquetSchema);
}

GenericData model = getDataModel(configuration);
GenericData model = getDataModel(configuration, avroSchema);
String compatEnabled = metadata.get(AvroReadSupport.AVRO_COMPATIBILITY);
if (compatEnabled != null && Boolean.valueOf(compatEnabled)) {
return newCompatMaterializer(parquetSchema, avroSchema, model);
Expand All @@ -149,10 +153,26 @@ private static <T> RecordMaterializer<T> newCompatMaterializer(
parquetSchema, avroSchema, model);
}

private GenericData getDataModel(Configuration conf) {
private GenericData getDataModel(Configuration conf, Schema schema) {
if (model != null) {
return model;
}

if (conf.get(AVRO_DATA_SUPPLIER) == null && schema != null) {
GenericData modelForSchema;
try {
modelForSchema = AvroRecordConverter.getModelForSchema(schema);
} catch (Exception e) {
LOG.warn(String.format("Failed to derive data model for Avro schema %s. Parquet will use default " +
"SpecificData model for reading from source.", schema), e);
modelForSchema = null;
}

if (modelForSchema != null) {
return modelForSchema;
}
}

Class<? extends AvroDataSupplier> suppClass = conf.getClass(
AVRO_DATA_SUPPLIER, SpecificDataSupplier.class, AvroDataSupplier.class);
return ReflectionUtils.newInstance(suppClass, conf).get();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,12 +30,15 @@
import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
import java.lang.reflect.Modifier;
import java.util.Arrays;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.LinkedHashMap;
import java.util.Objects;

import org.apache.avro.AvroTypeException;
import org.apache.avro.Conversion;
import org.apache.avro.LogicalType;
Expand All @@ -57,6 +60,8 @@
import org.apache.parquet.schema.GroupType;
import org.apache.parquet.schema.MessageType;
import org.apache.parquet.schema.Type;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import static org.apache.avro.SchemaCompatibility.SchemaCompatibilityType.COMPATIBLE;
import static org.apache.avro.SchemaCompatibility.checkReaderWriterCompatibility;
Expand All @@ -73,6 +78,8 @@
*/
class AvroRecordConverter<T> extends AvroConverters.AvroGroupConverter {

private static final Logger LOG = LoggerFactory.getLogger(AvroRecordConverter.class);

private static final String STRINGABLE_PROP = "avro.java.string";
private static final String JAVA_CLASS_PROP = "java-class";
private static final String JAVA_KEY_CLASS_PROP = "java-key-class";
Expand Down Expand Up @@ -169,6 +176,77 @@ public void add(Object value) {
}
}

/**
* Returns the specific data model for a given SpecificRecord schema by reflecting the underlying
* Avro class's `MODEL$` field, or Null if the class is not on the classpath or reflection fails.
*/
static SpecificData getModelForSchema(Schema schema) {
final Class<?> clazz;

if (schema != null && (schema.getType() == Schema.Type.RECORD || schema.getType() == Schema.Type.UNION)) {
clazz = SpecificData.get().getClass(schema);
} else {
return null;
}

// If clazz == null, the underlying Avro class for the schema is not on the classpath
if (clazz == null) {
return null;
}

final SpecificData model;
try {
final Field modelField = clazz.getDeclaredField("MODEL$");
modelField.setAccessible(true);

model = (SpecificData) modelField.get(null);
} catch (NoSuchFieldException e) {
LOG.info(String.format(
"Generated Avro class %s did not contain a MODEL$ field. Parquet will use default SpecificData model for " +
"reading and writing.", clazz));
return null;
} catch (IllegalAccessException e) {
LOG.warn(String.format(
"Field `MODEL$` in class %s was inaccessible. Parquet will use default SpecificData model for " +
"reading and writing.", clazz), e);
return null;
}

final String avroVersion = getRuntimeAvroVersion();
// Avro 1.7 and 1.8 don't include conversions in the MODEL$ field by default
if (avroVersion != null && (avroVersion.startsWith("1.8.") || avroVersion.startsWith("1.7."))) {
final Field conversionsField;
try {
conversionsField = clazz.getDeclaredField("conversions");
} catch (NoSuchFieldException e) {
// Avro classes without logical types (denoted by the "conversions" field) can be returned as-is
return model;
}

final Conversion<?>[] conversions;
try {
conversionsField.setAccessible(true);
conversions = (Conversion<?>[]) conversionsField.get(null);
} catch (IllegalAccessException e) {
LOG.warn(String.format("Field `conversions` in class %s was inaccessible. Parquet will use default " +
"SpecificData model for reading and writing.", clazz));
return null;
}

for (int i = 0; i < conversions.length; i++) {
if (conversions[i] != null) {
model.addLogicalTypeConversion(conversions[i]);
}
}
}

return model;
}

static String getRuntimeAvroVersion() {
return Schema.Parser.class.getPackage().getImplementationVersion();
}

// this was taken from Avro's ReflectData
private static Map<String, Class<?>> getFieldsByName(Class<?> recordClass,
boolean excludeJava) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,8 @@
import org.apache.parquet.schema.Type;
import org.apache.hadoop.util.ReflectionUtils;
import org.apache.parquet.Preconditions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
* Avro implementation of {@link WriteSupport} for generic, specific, and
Expand All @@ -51,6 +53,8 @@
*/
public class AvroWriteSupport<T> extends WriteSupport<T> {

private static final Logger LOG = LoggerFactory.getLogger(AvroWriteSupport.class);

public static final String AVRO_DATA_SUPPLIER = "parquet.avro.write.data.supplier";

public static void setAvroDataSupplier(
Expand Down Expand Up @@ -131,7 +135,7 @@ public WriteContext init(Configuration configuration) {
}

if (model == null) {
this.model = getDataModel(configuration);
this.model = getDataModel(configuration, rootAvroSchema);
}

boolean writeOldListStructure = configuration.getBoolean(
Expand Down Expand Up @@ -400,7 +404,23 @@ private Binary fromAvroString(Object value) {
return Binary.fromCharSequence(value.toString());
}

private static GenericData getDataModel(Configuration conf) {
private static GenericData getDataModel(Configuration conf, Schema schema) {
if (conf.get(AVRO_DATA_SUPPLIER) == null && schema != null) {
GenericData modelForSchema;
try {
modelForSchema = AvroRecordConverter.getModelForSchema(schema);
} catch (Exception e) {
LOG.warn(String.format("Failed to derive data model for Avro schema %s. Parquet will use default " +
"SpecificData model for writing to sink.", schema), e);
modelForSchema = null;
}


if (modelForSchema != null) {
return modelForSchema;
}
}

Class<? extends AvroDataSupplier> suppClass = conf.getClass(
AVRO_DATA_SUPPLIER, SpecificDataSupplier.class, AvroDataSupplier.class);
return ReflectionUtils.newInstance(suppClass, conf).get();
Expand Down
14 changes: 14 additions & 0 deletions parquet-avro/src/test/avro/logicalType.avsc
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
{
"type": "record",
"name": "LogicalTypesTest",
"namespace": "org.apache.parquet.avro",
"doc": "Record for testing logical types",
"fields": [
{
"name": "timestamp",
"type": {
"type": "long", "logicalType": "timestamp-millis"
}
}
]
}
Loading

0 comments on commit 5b62b43

Please sign in to comment.