diff --git a/parquet-avro/src/main/java/org/apache/parquet/avro/AvroParquetOutputFormat.java b/parquet-avro/src/main/java/org/apache/parquet/avro/AvroParquetOutputFormat.java index afbaefcb00..9195925f7e 100644 --- a/parquet-avro/src/main/java/org/apache/parquet/avro/AvroParquetOutputFormat.java +++ b/parquet-avro/src/main/java/org/apache/parquet/avro/AvroParquetOutputFormat.java @@ -19,9 +19,7 @@ package org.apache.parquet.avro; import org.apache.avro.Schema; -import org.apache.avro.generic.IndexedRecord; import org.apache.hadoop.mapreduce.Job; -import org.apache.parquet.avro.AvroWriteSupport; import org.apache.parquet.hadoop.ParquetOutputFormat; import org.apache.parquet.hadoop.util.ContextUtil; diff --git a/parquet-avro/src/main/java/org/apache/parquet/avro/AvroParquetReader.java b/parquet-avro/src/main/java/org/apache/parquet/avro/AvroParquetReader.java index 8970b66eae..3c98948b69 100644 --- a/parquet-avro/src/main/java/org/apache/parquet/avro/AvroParquetReader.java +++ b/parquet-avro/src/main/java/org/apache/parquet/avro/AvroParquetReader.java @@ -26,6 +26,7 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; +import org.apache.parquet.conf.ParquetConfiguration; import org.apache.parquet.filter.UnboundRecordFilter; import org.apache.parquet.hadoop.ParquetReader; import org.apache.parquet.hadoop.api.ReadSupport; @@ -53,6 +54,10 @@ public static Builder builder(InputFile file) { return new Builder(file); } + public static Builder builder(InputFile file, ParquetConfiguration conf) { + return new Builder(file, conf); + } + /** * Convenience method for creating a ParquetReader which uses Avro * {@link GenericData} objects to store data from reads. @@ -67,6 +72,21 @@ public static ParquetReader genericRecordReader(InputFile file) t return new Builder(file).withDataModel(GenericData.get()).build(); } + /** + * Convenience method for creating a ParquetReader which uses Avro + * {@link GenericData} objects to store data from reads. + * + * @param file The location to read data from + * @param conf The configuration to use + * @return A {@code ParquetReader} which reads data as Avro + * {@code GenericData} + * @throws IOException if the InputFile has been closed, or if some other I/O + * error occurs + */ + public static ParquetReader genericRecordReader(InputFile file, ParquetConfiguration conf) throws IOException { + return new Builder(file, conf).withDataModel(GenericData.get()).build(); + } + /** * Convenience method for creating a ParquetReader which uses Avro * {@link GenericData} objects to store data from reads. @@ -143,6 +163,10 @@ private Builder(InputFile file) { super(file); } + private Builder(InputFile file, ParquetConfiguration conf) { + super(file, conf); + } + public Builder withDataModel(GenericData model) { this.model = model; diff --git a/parquet-avro/src/main/java/org/apache/parquet/avro/AvroParquetWriter.java b/parquet-avro/src/main/java/org/apache/parquet/avro/AvroParquetWriter.java index 94d8167b0a..0d87d007f9 100644 --- a/parquet-avro/src/main/java/org/apache/parquet/avro/AvroParquetWriter.java +++ b/parquet-avro/src/main/java/org/apache/parquet/avro/AvroParquetWriter.java @@ -25,6 +25,8 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.parquet.column.ParquetProperties.WriterVersion; +import org.apache.parquet.conf.HadoopParquetConfiguration; +import org.apache.parquet.conf.ParquetConfiguration; import org.apache.parquet.hadoop.ParquetWriter; import org.apache.parquet.hadoop.api.WriteSupport; import org.apache.parquet.hadoop.metadata.CompressionCodecName; @@ -154,6 +156,12 @@ private static WriteSupport writeSupport(Schema avroSchema, private static WriteSupport writeSupport(Configuration conf, Schema avroSchema, GenericData model) { + return writeSupport(new HadoopParquetConfiguration(conf), avroSchema, model); + } + + private static WriteSupport writeSupport(ParquetConfiguration conf, + Schema avroSchema, + GenericData model) { return new AvroWriteSupport( new AvroSchemaConverter(conf).convert(avroSchema), avroSchema, model); } @@ -189,5 +197,10 @@ protected Builder self() { protected WriteSupport getWriteSupport(Configuration conf) { return AvroParquetWriter.writeSupport(conf, schema, model); } + + @Override + protected WriteSupport getWriteSupport(ParquetConfiguration conf) { + return AvroParquetWriter.writeSupport(conf, schema, model); + } } } diff --git a/parquet-avro/src/main/java/org/apache/parquet/avro/AvroReadSupport.java b/parquet-avro/src/main/java/org/apache/parquet/avro/AvroReadSupport.java index 8f268a145a..0bda3d02ed 100644 --- a/parquet-avro/src/main/java/org/apache/parquet/avro/AvroReadSupport.java +++ b/parquet-avro/src/main/java/org/apache/parquet/avro/AvroReadSupport.java @@ -24,7 +24,10 @@ import org.apache.avro.generic.GenericData; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.util.ReflectionUtils; +import org.apache.parquet.conf.HadoopParquetConfiguration; +import org.apache.parquet.conf.ParquetConfiguration; import org.apache.parquet.hadoop.api.ReadSupport; +import org.apache.parquet.hadoop.util.ConfigurationUtil; import org.apache.parquet.io.api.RecordMaterializer; import org.apache.parquet.schema.MessageType; import org.slf4j.Logger; @@ -95,6 +98,13 @@ public AvroReadSupport(GenericData model) { public ReadContext init(Configuration configuration, Map keyValueMetaData, MessageType fileSchema) { + return init(new HadoopParquetConfiguration(configuration), keyValueMetaData, fileSchema); + } + + @Override + public ReadContext init(ParquetConfiguration configuration, + Map keyValueMetaData, + MessageType fileSchema) { MessageType projection = fileSchema; Map metadata = new LinkedHashMap(); @@ -120,6 +130,13 @@ public ReadContext init(Configuration configuration, public RecordMaterializer prepareForRead( Configuration configuration, Map keyValueMetaData, MessageType fileSchema, ReadContext readContext) { + return prepareForRead(new HadoopParquetConfiguration(configuration), keyValueMetaData, fileSchema, readContext); + } + + @Override + public RecordMaterializer prepareForRead( + ParquetConfiguration configuration, Map keyValueMetaData, + MessageType fileSchema, ReadContext readContext) { Map metadata = readContext.getReadSupportMetadata(); MessageType parquetSchema = readContext.getRequestedSchema(); Schema avroSchema; @@ -153,7 +170,7 @@ private static RecordMaterializer newCompatMaterializer( parquetSchema, avroSchema, model); } - private GenericData getDataModel(Configuration conf, Schema schema) { + private GenericData getDataModel(ParquetConfiguration conf, Schema schema) { if (model != null) { return model; } @@ -175,6 +192,6 @@ private GenericData getDataModel(Configuration conf, Schema schema) { Class suppClass = conf.getClass( AVRO_DATA_SUPPLIER, SpecificDataSupplier.class, AvroDataSupplier.class); - return ReflectionUtils.newInstance(suppClass, conf).get(); + return ReflectionUtils.newInstance(suppClass, ConfigurationUtil.createHadoopConfiguration(conf)).get(); } } diff --git a/parquet-avro/src/main/java/org/apache/parquet/avro/AvroSchemaConverter.java b/parquet-avro/src/main/java/org/apache/parquet/avro/AvroSchemaConverter.java index 0314bcd71a..abf94eaa2c 100644 --- a/parquet-avro/src/main/java/org/apache/parquet/avro/AvroSchemaConverter.java +++ b/parquet-avro/src/main/java/org/apache/parquet/avro/AvroSchemaConverter.java @@ -23,6 +23,8 @@ import org.apache.avro.Schema; import org.apache.hadoop.conf.Configuration; +import org.apache.parquet.conf.HadoopParquetConfiguration; +import org.apache.parquet.conf.ParquetConfiguration; import org.apache.parquet.schema.ConversionPatterns; import org.apache.parquet.schema.GroupType; import org.apache.parquet.schema.LogicalTypeAnnotation; @@ -102,6 +104,10 @@ public AvroSchemaConverter() { } public AvroSchemaConverter(Configuration conf) { + this(new HadoopParquetConfiguration(conf)); + } + + public AvroSchemaConverter(ParquetConfiguration conf) { this.assumeRepeatedIsListElement = conf.getBoolean( ADD_LIST_ELEMENT_RECORDS, ADD_LIST_ELEMENT_RECORDS_DEFAULT); this.writeOldListStructure = conf.getBoolean( diff --git a/parquet-avro/src/main/java/org/apache/parquet/avro/AvroWriteSupport.java b/parquet-avro/src/main/java/org/apache/parquet/avro/AvroWriteSupport.java index 564e745392..692e3fac0f 100644 --- a/parquet-avro/src/main/java/org/apache/parquet/avro/AvroWriteSupport.java +++ b/parquet-avro/src/main/java/org/apache/parquet/avro/AvroWriteSupport.java @@ -34,7 +34,10 @@ import org.apache.avro.generic.IndexedRecord; import org.apache.avro.util.Utf8; import org.apache.hadoop.conf.Configuration; +import org.apache.parquet.conf.HadoopParquetConfiguration; +import org.apache.parquet.conf.ParquetConfiguration; import org.apache.parquet.hadoop.api.WriteSupport; +import org.apache.parquet.hadoop.util.ConfigurationUtil; import org.apache.parquet.io.api.Binary; import org.apache.parquet.io.api.RecordConsumer; import org.apache.parquet.schema.GroupType; @@ -129,6 +132,11 @@ public static void setSchema(Configuration configuration, Schema schema) { @Override public WriteContext init(Configuration configuration) { + return init(new HadoopParquetConfiguration(configuration)); + } + + @Override + public WriteContext init(ParquetConfiguration configuration) { if (rootAvroSchema == null) { this.rootAvroSchema = new Schema.Parser().parse(configuration.get(AVRO_SCHEMA)); this.rootSchema = new AvroSchemaConverter(configuration).convert(rootAvroSchema); @@ -404,7 +412,7 @@ private Binary fromAvroString(Object value) { return Binary.fromCharSequence(value.toString()); } - private static GenericData getDataModel(Configuration conf, Schema schema) { + private static GenericData getDataModel(ParquetConfiguration conf, Schema schema) { if (conf.get(AVRO_DATA_SUPPLIER) == null && schema != null) { GenericData modelForSchema; try { @@ -423,7 +431,7 @@ private static GenericData getDataModel(Configuration conf, Schema schema) { Class suppClass = conf.getClass( AVRO_DATA_SUPPLIER, SpecificDataSupplier.class, AvroDataSupplier.class); - return ReflectionUtils.newInstance(suppClass, conf).get(); + return ReflectionUtils.newInstance(suppClass, ConfigurationUtil.createHadoopConfiguration(conf)).get(); } private abstract class ListWriter { diff --git a/parquet-avro/src/test/java/org/apache/parquet/avro/TestReadWrite.java b/parquet-avro/src/test/java/org/apache/parquet/avro/TestReadWrite.java index 81e751aba5..9aaa9e3b28 100644 --- a/parquet-avro/src/test/java/org/apache/parquet/avro/TestReadWrite.java +++ b/parquet-avro/src/test/java/org/apache/parquet/avro/TestReadWrite.java @@ -52,6 +52,8 @@ import org.apache.avro.util.Utf8; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; +import org.apache.parquet.conf.HadoopParquetConfiguration; +import org.apache.parquet.conf.ParquetConfiguration; import org.apache.parquet.hadoop.ParquetReader; import org.apache.parquet.hadoop.ParquetWriter; import org.apache.parquet.hadoop.api.WriteSupport; @@ -77,22 +79,31 @@ public class TestReadWrite { @Parameterized.Parameters public static Collection data() { Object[][] data = new Object[][] { - { false, false }, // use the new converters - { true, false }, // use the old converters - { false, true } }; // use a local disk location + { false, false, false }, // use the new converters with hadoop config + { true, false, false }, // use the old converters with hadoop config + { false, true, false }, // use a local disk location with hadoop config + { false, false, true }, // use the new converters with parquet config interface + { true, false, true }, // use the old converters with parquet config interface + { false, true, true } }; // use a local disk location with parquet config interface return Arrays.asList(data); } private final boolean compat; private final boolean local; + private final boolean confInterface; private final Configuration testConf = new Configuration(); + private final ParquetConfiguration parquetConf = new HadoopParquetConfiguration(true); - public TestReadWrite(boolean compat, boolean local) { + public TestReadWrite(boolean compat, boolean local, boolean confInterface) { this.compat = compat; this.local = local; + this.confInterface = confInterface; this.testConf.setBoolean(AvroReadSupport.AVRO_COMPATIBILITY, compat); - testConf.setBoolean("parquet.avro.add-list-element-records", false); - testConf.setBoolean("parquet.avro.write-old-list-structure", false); + this.testConf.setBoolean("parquet.avro.add-list-element-records", false); + this.testConf.setBoolean("parquet.avro.write-old-list-structure", false); + this.parquetConf.setBoolean(AvroReadSupport.AVRO_COMPATIBILITY, compat); + this.parquetConf.setBoolean("parquet.avro.add-list-element-records", false); + this.parquetConf.setBoolean("parquet.avro.write-old-list-structure", false); } @Test @@ -431,6 +442,11 @@ public void testAllUsingDefaultAvroSchema() throws Exception { @Override public WriteContext init(Configuration configuration) { + return init(new HadoopParquetConfiguration(configuration)); + } + + @Override + public WriteContext init(ParquetConfiguration configuration) { return new WriteContext(MessageTypeParser.parseMessageType(TestAvroSchemaConverter.ALL_PARQUET_SCHEMA), new HashMap()); } @@ -864,30 +880,44 @@ private File createTempFile() throws IOException { } private ParquetWriter writer(String file, Schema schema) throws IOException { + AvroParquetWriter.Builder writerBuilder; if (local) { - return AvroParquetWriter + writerBuilder = AvroParquetWriter .builder(new LocalOutputFile(Paths.get(file))) - .withSchema(schema) - .withConf(testConf) - .build(); + .withSchema(schema); } else { - return AvroParquetWriter + writerBuilder = AvroParquetWriter .builder(new Path(file)) - .withSchema(schema) + .withSchema(schema); + } + if (confInterface) { + return writerBuilder + .withConf(parquetConf) + .build(); + } else { + return writerBuilder .withConf(testConf) .build(); } } private ParquetReader reader(String file) throws IOException { + AvroParquetReader.Builder readerBuilder; if (local) { - return AvroParquetReader + readerBuilder = AvroParquetReader .builder(new LocalInputFile(Paths.get(file))) - .withDataModel(GenericData.get()) - .withConf(testConf) + .withDataModel(GenericData.get()); + } else { + return new AvroParquetReader<>(testConf, new Path(file)); + } + if (confInterface) { + return readerBuilder + .withConf(parquetConf) .build(); } else { - return new AvroParquetReader(testConf, new Path(file)); + return readerBuilder + .withConf(testConf) + .build(); } } diff --git a/parquet-avro/src/test/java/org/apache/parquet/avro/TestReadWriteOldListBehavior.java b/parquet-avro/src/test/java/org/apache/parquet/avro/TestReadWriteOldListBehavior.java index f12417cae9..31a221d5bc 100644 --- a/parquet-avro/src/test/java/org/apache/parquet/avro/TestReadWriteOldListBehavior.java +++ b/parquet-avro/src/test/java/org/apache/parquet/avro/TestReadWriteOldListBehavior.java @@ -38,6 +38,8 @@ import org.apache.avro.util.Utf8; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; +import org.apache.parquet.conf.HadoopParquetConfiguration; +import org.apache.parquet.conf.ParquetConfiguration; import org.apache.parquet.hadoop.ParquetWriter; import org.apache.parquet.hadoop.api.WriteSupport; import org.apache.parquet.io.api.Binary; @@ -358,6 +360,11 @@ public void testAllUsingDefaultAvroSchema() throws Exception { @Override public WriteContext init(Configuration configuration) { + return init(new HadoopParquetConfiguration(configuration)); + } + + @Override + public WriteContext init(ParquetConfiguration configuration) { return new WriteContext(MessageTypeParser.parseMessageType(TestAvroSchemaConverter.ALL_PARQUET_SCHEMA), new HashMap()); } diff --git a/parquet-common/src/main/java/org/apache/parquet/conf/ParquetConfiguration.java b/parquet-common/src/main/java/org/apache/parquet/conf/ParquetConfiguration.java new file mode 100644 index 0000000000..f8aae97297 --- /dev/null +++ b/parquet-common/src/main/java/org/apache/parquet/conf/ParquetConfiguration.java @@ -0,0 +1,178 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.parquet.conf; + +import java.util.Map; + +/** + * Configuration interface with the methods necessary to configure Parquet applications. + */ +public interface ParquetConfiguration extends Iterable> { + + /** + * Sets the value of the name property. + * + * @param name the property to set + * @param value the value to set the property to + */ + void set(String name, String value); + + /** + * Sets the value of the name property to a long. + * + * @param name the property to set + * @param value the value to set the property to + */ + void setLong(String name, long value); + + /** + * Sets the value of the name property to an integer. + * + * @param name the property to set + * @param value the value to set the property to + */ + void setInt(String name, int value); + + /** + * Sets the value of the name property to a boolean. + * + * @param name the property to set + * @param value the value to set the property to + */ + void setBoolean(String name, boolean value); + + /** + * Sets the value of the name property to an array of comma delimited values. + * + * @param name the property to set + * @param value the values to set the property to + */ + void setStrings(String name, String... value); + + /** + * Sets the value of the name property to a class. + * + * @param name the property to set + * @param value the value to set the property to + * @param xface the interface implemented by the value + */ + void setClass(String name, Class value, Class xface); + + /** + * Gets the value of the name property. Returns null if no such value exists. + * + * @param name the property to retrieve the value of + * @return the value of the property, or null if it does not exist + */ + String get(String name); + + /** + * Gets the value of the name property. Returns the default value if no such value exists. + * + * @param name the property to retrieve the value of + * @param defaultValue the default return if no value is set for the property + * @return the value of the property, or the default value if it does not exist + */ + String get(String name, String defaultValue); + + /** + * Gets the value of the name property as a long. Returns the default value if no such value exists. + * + * @param name the property to retrieve the value of + * @param defaultValue the default return if no value is set for the property + * @return the value of the property as a long, or the default value if it does not exist + */ + long getLong(String name, long defaultValue); + + /** + * Gets the value of the name property as an integer. Returns the default value if no such value exists. + * + * @param name the property to retrieve the value of + * @param defaultValue the default return if no value is set for the property + * @return the value of the property as an integer, or the default value if it does not exist + */ + int getInt(String name, int defaultValue); + + /** + * Gets the value of the name property as a boolean. Returns the default value if no such value exists. + * + * @param name the property to retrieve the value of + * @param defaultValue the default return if no value is set for the property + * @return the value of the property as a boolean, or the default value if it does not exist + */ + boolean getBoolean(String name, boolean defaultValue); + + /** + * Gets the trimmed value of the name property. Returns null if no such value exists. + * + * @param name the property to retrieve the value of + * @return the trimmed value of the property, or null if it does not exist + */ + String getTrimmed(String name); + + /** + * Gets the trimmed value of the name property as a boolean. + * Returns the default value if no such value exists. + * + * @param name the property to retrieve the value of + * @param defaultValue the default return if no value is set for the property + * @return the trimmed value of the property, or the default value if it does not exist + */ + String getTrimmed(String name, String defaultValue); + + /** + * Gets the value of the name property as an array of {@link String}s. + * Returns the default value if no such value exists. + * Interprets the stored value as a comma delimited array. + * + * @param name the property to retrieve the value of + * @param defaultValue the default return if no value is set for the property + * @return the value of the property as an array, or the default value if it does not exist + */ + String[] getStrings(String name, String[] defaultValue); + + /** + * Gets the value of the name property as a class. Returns the default value if no such value exists. + * + * @param name the property to retrieve the value of + * @param defaultValue the default return if no value is set for the property + * @return the value of the property as a class, or the default value if it does not exist + */ + Class getClass(String name, Class defaultValue); + + /** + * Gets the value of the name property as a class implementing the xface interface. + * Returns the default value if no such value exists. + * + * @param name the property to retrieve the value of + * @param defaultValue the default return if no value is set for the property + * @return the value of the property as a class, or the default value if it does not exist + */ + Class getClass(String name, Class defaultValue, Class xface); + + /** + * Load a class by name. + * + * @param name the name of the {@link Class} to load + * @return the loaded class + * @throws ClassNotFoundException when the specified class cannot be found + */ + Class getClassByName(String name) throws ClassNotFoundException; +} diff --git a/parquet-common/src/main/java/org/apache/parquet/util/Reflection.java b/parquet-common/src/main/java/org/apache/parquet/util/Reflection.java new file mode 100644 index 0000000000..695ebd9f46 --- /dev/null +++ b/parquet-common/src/main/java/org/apache/parquet/util/Reflection.java @@ -0,0 +1,50 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.parquet.util; + +import java.lang.reflect.Constructor; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; + +/** + * Lifted from Hadoop's org.apache.hadoop.util.ReflectionUtils. + */ +public class Reflection { + + private static final Class[] EMPTY_ARRAY = new Class[]{}; + private static final Map, Constructor> CONSTRUCTOR_CACHE = new ConcurrentHashMap, Constructor>(); + + @SuppressWarnings("unchecked") + public static T newInstance(Class theClass) { + T result; + try { + Constructor meth = (Constructor) CONSTRUCTOR_CACHE.get(theClass); + if (meth == null) { + meth = theClass.getDeclaredConstructor(EMPTY_ARRAY); + meth.setAccessible(true); + CONSTRUCTOR_CACHE.put(theClass, meth); + } + result = meth.newInstance(); + } catch (Exception e) { + throw new RuntimeException(e); + } + return result; + } +} diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/HadoopReadOptions.java b/parquet-hadoop/src/main/java/org/apache/parquet/HadoopReadOptions.java index 8f0d8d8933..9c98650100 100644 --- a/parquet-hadoop/src/main/java/org/apache/parquet/HadoopReadOptions.java +++ b/parquet-hadoop/src/main/java/org/apache/parquet/HadoopReadOptions.java @@ -23,29 +23,17 @@ import org.apache.hadoop.fs.Path; import org.apache.parquet.bytes.ByteBufferAllocator; import org.apache.parquet.compression.CompressionCodecFactory; +import org.apache.parquet.conf.HadoopParquetConfiguration; import org.apache.parquet.crypto.DecryptionPropertiesFactory; import org.apache.parquet.crypto.FileDecryptionProperties; import org.apache.parquet.filter2.compat.FilterCompat; import org.apache.parquet.format.converter.ParquetMetadataConverter.MetadataFilter; -import org.apache.parquet.hadoop.util.HadoopCodecs; import java.util.Map; -import static org.apache.parquet.hadoop.ParquetInputFormat.COLUMN_INDEX_FILTERING_ENABLED; -import static org.apache.parquet.hadoop.ParquetInputFormat.DICTIONARY_FILTERING_ENABLED; -import static org.apache.parquet.hadoop.ParquetInputFormat.BLOOM_FILTERING_ENABLED; -import static org.apache.parquet.hadoop.ParquetInputFormat.OFF_HEAP_DECRYPT_BUFFER_ENABLED; -import static org.apache.parquet.hadoop.ParquetInputFormat.getFilter; -import static org.apache.parquet.hadoop.ParquetInputFormat.PAGE_VERIFY_CHECKSUM_ENABLED; -import static org.apache.parquet.hadoop.ParquetInputFormat.RECORD_FILTERING_ENABLED; -import static org.apache.parquet.hadoop.ParquetInputFormat.STATS_FILTERING_ENABLED; -import static org.apache.parquet.hadoop.UnmaterializableRecordCounter.BAD_RECORD_THRESHOLD_CONF_KEY; - public class HadoopReadOptions extends ParquetReadOptions { private final Configuration conf; - private static final String ALLOCATION_SIZE = "parquet.read.allocation.size"; - private HadoopReadOptions(boolean useSignedStringMinMax, boolean useStatsFilter, boolean useDictionaryFilter, @@ -65,7 +53,7 @@ private HadoopReadOptions(boolean useSignedStringMinMax, super( useSignedStringMinMax, useStatsFilter, useDictionaryFilter, useRecordFilter, useColumnIndexFilter, usePageChecksumVerification, useBloomFilter, useOffHeapDecryptBuffer, recordFilter, metadataFilter, codecFactory, allocator, - maxAllocationSize, properties, fileDecryptionProperties + maxAllocationSize, properties, fileDecryptionProperties, new HadoopParquetConfiguration(conf) ); this.conf = conf; } @@ -100,24 +88,9 @@ public Builder(Configuration conf) { } public Builder(Configuration conf, Path filePath) { + super(new HadoopParquetConfiguration(conf)); this.conf = conf; this.filePath = filePath; - useSignedStringMinMax(conf.getBoolean("parquet.strings.signed-min-max.enabled", false)); - useDictionaryFilter(conf.getBoolean(DICTIONARY_FILTERING_ENABLED, true)); - useStatsFilter(conf.getBoolean(STATS_FILTERING_ENABLED, true)); - useRecordFilter(conf.getBoolean(RECORD_FILTERING_ENABLED, true)); - useColumnIndexFilter(conf.getBoolean(COLUMN_INDEX_FILTERING_ENABLED, true)); - usePageChecksumVerification(conf.getBoolean(PAGE_VERIFY_CHECKSUM_ENABLED, - usePageChecksumVerification)); - useBloomFilter(conf.getBoolean(BLOOM_FILTERING_ENABLED, true)); - useOffHeapDecryptBuffer(conf.getBoolean(OFF_HEAP_DECRYPT_BUFFER_ENABLED, false)); - withCodecFactory(HadoopCodecs.newFactory(conf, 0)); - withRecordFilter(getFilter(conf)); - withMaxAllocationInBytes(conf.getInt(ALLOCATION_SIZE, 8388608)); - String badRecordThresh = conf.get(BAD_RECORD_THRESHOLD_CONF_KEY); - if (badRecordThresh != null) { - set(BAD_RECORD_THRESHOLD_CONF_KEY, badRecordThresh); - } } @Override diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/ParquetReadOptions.java b/parquet-hadoop/src/main/java/org/apache/parquet/ParquetReadOptions.java index dc130ee8d2..8e93dc4ad5 100644 --- a/parquet-hadoop/src/main/java/org/apache/parquet/ParquetReadOptions.java +++ b/parquet-hadoop/src/main/java/org/apache/parquet/ParquetReadOptions.java @@ -22,6 +22,8 @@ import org.apache.parquet.bytes.ByteBufferAllocator; import org.apache.parquet.bytes.HeapByteBufferAllocator; import org.apache.parquet.compression.CompressionCodecFactory; +import org.apache.parquet.conf.HadoopParquetConfiguration; +import org.apache.parquet.conf.ParquetConfiguration; import org.apache.parquet.crypto.FileDecryptionProperties; import org.apache.parquet.filter2.compat.FilterCompat; import org.apache.parquet.format.converter.ParquetMetadataConverter; @@ -34,9 +36,21 @@ import java.util.Set; import static org.apache.parquet.format.converter.ParquetMetadataConverter.NO_FILTER; +import static org.apache.parquet.hadoop.ParquetInputFormat.BLOOM_FILTERING_ENABLED; +import static org.apache.parquet.hadoop.ParquetInputFormat.COLUMN_INDEX_FILTERING_ENABLED; +import static org.apache.parquet.hadoop.ParquetInputFormat.DICTIONARY_FILTERING_ENABLED; +import static org.apache.parquet.hadoop.ParquetInputFormat.OFF_HEAP_DECRYPT_BUFFER_ENABLED; +import static org.apache.parquet.hadoop.ParquetInputFormat.PAGE_VERIFY_CHECKSUM_ENABLED; +import static org.apache.parquet.hadoop.ParquetInputFormat.RECORD_FILTERING_ENABLED; +import static org.apache.parquet.hadoop.ParquetInputFormat.STATS_FILTERING_ENABLED; +import static org.apache.parquet.hadoop.ParquetInputFormat.getFilter; +import static org.apache.parquet.hadoop.UnmaterializableRecordCounter.BAD_RECORD_THRESHOLD_CONF_KEY; // Internal use only public class ParquetReadOptions { + + private static final String ALLOCATION_SIZE = "parquet.read.allocation.size"; + private static final boolean RECORD_FILTERING_ENABLED_DEFAULT = true; private static final boolean STATS_FILTERING_ENABLED_DEFAULT = true; private static final boolean DICTIONARY_FILTERING_ENABLED_DEFAULT = true; @@ -61,6 +75,7 @@ public class ParquetReadOptions { private final int maxAllocationSize; private final Map properties; private final FileDecryptionProperties fileDecryptionProperties; + private final ParquetConfiguration conf; ParquetReadOptions(boolean useSignedStringMinMax, boolean useStatsFilter, @@ -77,6 +92,28 @@ public class ParquetReadOptions { int maxAllocationSize, Map properties, FileDecryptionProperties fileDecryptionProperties) { + this(useSignedStringMinMax, useStatsFilter, useDictionaryFilter, useRecordFilter, useColumnIndexFilter, + usePageChecksumVerification, useBloomFilter, useOffHeapDecryptBuffer, recordFilter, metadataFilter, + codecFactory, allocator, maxAllocationSize, properties, fileDecryptionProperties, + new HadoopParquetConfiguration()); + } + + ParquetReadOptions(boolean useSignedStringMinMax, + boolean useStatsFilter, + boolean useDictionaryFilter, + boolean useRecordFilter, + boolean useColumnIndexFilter, + boolean usePageChecksumVerification, + boolean useBloomFilter, + boolean useOffHeapDecryptBuffer, + FilterCompat.Filter recordFilter, + ParquetMetadataConverter.MetadataFilter metadataFilter, + CompressionCodecFactory codecFactory, + ByteBufferAllocator allocator, + int maxAllocationSize, + Map properties, + FileDecryptionProperties fileDecryptionProperties, + ParquetConfiguration conf) { this.useSignedStringMinMax = useSignedStringMinMax; this.useStatsFilter = useStatsFilter; this.useDictionaryFilter = useDictionaryFilter; @@ -92,6 +129,7 @@ public class ParquetReadOptions { this.maxAllocationSize = maxAllocationSize; this.properties = Collections.unmodifiableMap(properties); this.fileDecryptionProperties = fileDecryptionProperties; + this.conf = conf; } public boolean useSignedStringMinMax() { @@ -164,10 +202,18 @@ public boolean isEnabled(String property, boolean defaultValue) { : defaultValue; } + public ParquetConfiguration getConfiguration() { + return conf; + } + public static Builder builder() { return new Builder(); } + public static Builder builder(ParquetConfiguration conf) { + return new Builder(conf); + } + public static class Builder { protected boolean useSignedStringMinMax = false; protected boolean useStatsFilter = STATS_FILTERING_ENABLED_DEFAULT; @@ -185,6 +231,31 @@ public static class Builder { protected int maxAllocationSize = ALLOCATION_SIZE_DEFAULT; protected Map properties = new HashMap<>(); protected FileDecryptionProperties fileDecryptionProperties = null; + protected ParquetConfiguration conf; + + public Builder() { + this(new HadoopParquetConfiguration()); + } + + public Builder(ParquetConfiguration conf) { + this.conf = conf; + useSignedStringMinMax(conf.getBoolean("parquet.strings.signed-min-max.enabled", false)); + useDictionaryFilter(conf.getBoolean(DICTIONARY_FILTERING_ENABLED, true)); + useStatsFilter(conf.getBoolean(STATS_FILTERING_ENABLED, true)); + useRecordFilter(conf.getBoolean(RECORD_FILTERING_ENABLED, true)); + useColumnIndexFilter(conf.getBoolean(COLUMN_INDEX_FILTERING_ENABLED, true)); + usePageChecksumVerification(conf.getBoolean(PAGE_VERIFY_CHECKSUM_ENABLED, + usePageChecksumVerification)); + useBloomFilter(conf.getBoolean(BLOOM_FILTERING_ENABLED, true)); + useOffHeapDecryptBuffer(conf.getBoolean(OFF_HEAP_DECRYPT_BUFFER_ENABLED, false)); + withCodecFactory(HadoopCodecs.newFactory(conf, 0)); + withRecordFilter(getFilter(conf)); + withMaxAllocationInBytes(conf.getInt(ALLOCATION_SIZE, 8388608)); + String badRecordThresh = conf.get(BAD_RECORD_THRESHOLD_CONF_KEY); + if (badRecordThresh != null) { + set(BAD_RECORD_THRESHOLD_CONF_KEY, badRecordThresh); + } + } public Builder useSignedStringMinMax(boolean useSignedStringMinMax) { this.useSignedStringMinMax = useSignedStringMinMax; @@ -325,6 +396,7 @@ public Builder copy(ParquetReadOptions options) { withAllocator(options.allocator); withPageChecksumVerification(options.usePageChecksumVerification); withDecryption(options.fileDecryptionProperties); + conf = options.conf; for (Map.Entry keyValue : options.properties.entrySet()) { set(keyValue.getKey(), keyValue.getValue()); } @@ -333,13 +405,17 @@ public Builder copy(ParquetReadOptions options) { public ParquetReadOptions build() { if (codecFactory == null) { - codecFactory = HadoopCodecs.newFactory(0); + if (conf == null) { + codecFactory = HadoopCodecs.newFactory(0); + } else { + codecFactory = HadoopCodecs.newFactory(conf, 0); + } } return new ParquetReadOptions( useSignedStringMinMax, useStatsFilter, useDictionaryFilter, useRecordFilter, useColumnIndexFilter, usePageChecksumVerification, useBloomFilter, useOffHeapDecryptBuffer, recordFilter, metadataFilter, - codecFactory, allocator, maxAllocationSize, properties, fileDecryptionProperties); + codecFactory, allocator, maxAllocationSize, properties, fileDecryptionProperties, conf); } } } diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/conf/HadoopParquetConfiguration.java b/parquet-hadoop/src/main/java/org/apache/parquet/conf/HadoopParquetConfiguration.java new file mode 100644 index 0000000000..26fce1e9b8 --- /dev/null +++ b/parquet-hadoop/src/main/java/org/apache/parquet/conf/HadoopParquetConfiguration.java @@ -0,0 +1,140 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.parquet.conf; + +import org.apache.hadoop.conf.Configuration; + +import java.util.Iterator; +import java.util.Map; + +/** + * Implementation of the Parquet configuration interface relying on Hadoop's + * Configuration to aid with interoperability and backwards compatibility. + */ +public class HadoopParquetConfiguration implements ParquetConfiguration { + + private final Configuration configuration; + + public HadoopParquetConfiguration() { + this(true); + } + + public HadoopParquetConfiguration(boolean loadDefaults) { + configuration = new Configuration(loadDefaults); + } + + public HadoopParquetConfiguration(Configuration conf) { + configuration = conf; + } + + public Configuration getConfiguration() { + return configuration; + } + + @Override + public void set(String name, String value) { + configuration.set(name, value); + } + + @Override + public void setLong(String name, long value) { + configuration.setLong(name, value); + } + + @Override + public void setInt(String name, int value) { + configuration.setInt(name, value); + } + + @Override + public void setBoolean(String name, boolean value) { + configuration.setBoolean(name, value); + } + + @Override + public void setStrings(String name, String... values) { + configuration.setStrings(name, values); + } + + @Override + public void setClass(String name, Class value, Class xface) { + configuration.setClass(name, value, xface); + } + + @Override + public String get(String name) { + return configuration.get(name); + } + + @Override + public String get(String name, String defaultValue) { + return configuration.get(name, defaultValue); + } + + @Override + public long getLong(String name, long defaultValue) { + return configuration.getLong(name, defaultValue); + } + + @Override + public int getInt(String name, int defaultValue) { + return configuration.getInt(name, defaultValue); + } + + @Override + public boolean getBoolean(String name, boolean defaultValue) { + return configuration.getBoolean(name, defaultValue); + } + + @Override + public String getTrimmed(String name) { + return configuration.getTrimmed(name); + } + + @Override + public String getTrimmed(String name, String defaultValue) { + return configuration.getTrimmed(name, defaultValue); + } + + @Override + public String[] getStrings(String name, String[] defaultValue) { + return configuration.getStrings(name, defaultValue); + } + + @Override + public Class getClass(String name, Class defaultValue) { + return configuration.getClass(name, defaultValue); + } + + @Override + public Class getClass(String name, Class defaultValue, Class xface) { + return configuration.getClass(name, defaultValue, xface); + } + + @Override + public Class getClassByName(String name) throws ClassNotFoundException { + return configuration.getClassByName(name); + } + + @Override + public Iterator> iterator() { + return configuration.iterator(); + } +} diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/CodecFactory.java b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/CodecFactory.java index d93b4071c2..a30c57aeb7 100644 --- a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/CodecFactory.java +++ b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/CodecFactory.java @@ -37,8 +37,11 @@ import org.apache.parquet.bytes.ByteBufferAllocator; import org.apache.parquet.bytes.BytesInput; import org.apache.parquet.compression.CompressionCodecFactory; +import org.apache.parquet.conf.HadoopParquetConfiguration; +import org.apache.parquet.conf.ParquetConfiguration; import org.apache.parquet.hadoop.codec.ZstandardCodec; import org.apache.parquet.hadoop.metadata.CompressionCodecName; +import org.apache.parquet.hadoop.util.ConfigurationUtil; public class CodecFactory implements CompressionCodecFactory { @@ -48,7 +51,7 @@ public class CodecFactory implements CompressionCodecFactory { private final Map compressors = new HashMap<>(); private final Map decompressors = new HashMap<>(); - protected final Configuration configuration; + protected final ParquetConfiguration configuration; protected final int pageSize; /** @@ -61,6 +64,19 @@ public class CodecFactory implements CompressionCodecFactory { * decompressors this parameter has no impact on the function of the factory */ public CodecFactory(Configuration configuration, int pageSize) { + this(new HadoopParquetConfiguration(configuration), pageSize); + } + + /** + * Create a new codec factory. + * + * @param configuration used to pass compression codec configuration information + * @param pageSize the expected page size, does not set a hard limit, currently just + * used to set the initial size of the output stream used when + * compressing a buffer. If this factory is only used to construct + * decompressors this parameter has no impact on the function of the factory + */ + public CodecFactory(ParquetConfiguration configuration, int pageSize) { this.configuration = configuration; this.pageSize = pageSize; } @@ -246,9 +262,9 @@ protected CompressionCodec getCodec(CompressionCodecName codecName) { codecClass = Class.forName(codecClassName); } catch (ClassNotFoundException e) { // Try to load the class using the job classloader - codecClass = configuration.getClassLoader().loadClass(codecClassName); + codecClass = new Configuration(false).getClassLoader().loadClass(codecClassName); } - codec = (CompressionCodec) ReflectionUtils.newInstance(codecClass, configuration); + codec = (CompressionCodec) ReflectionUtils.newInstance(codecClass, ConfigurationUtil.createHadoopConfiguration(configuration)); CODEC_BY_NAME.put(codecCacheKey, codec); return codec; } catch (ClassNotFoundException e) { diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/DirectZstd.java b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/DirectZstd.java index 5454a69b2a..588f93c892 100644 --- a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/DirectZstd.java +++ b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/DirectZstd.java @@ -24,6 +24,8 @@ import com.github.luben.zstd.ZstdOutputStream; import org.apache.hadoop.conf.Configuration; import org.apache.parquet.bytes.BytesInput; +import org.apache.parquet.conf.HadoopParquetConfiguration; +import org.apache.parquet.conf.ParquetConfiguration; import org.apache.parquet.hadoop.codec.ZstdDecompressorStream; import org.apache.parquet.hadoop.metadata.CompressionCodecName; @@ -50,6 +52,10 @@ class DirectZstd { static CodecFactory.BytesCompressor createCompressor(Configuration conf, int pageSize) { + return createCompressor(new HadoopParquetConfiguration(conf), pageSize); + } + + static CodecFactory.BytesCompressor createCompressor(ParquetConfiguration conf, int pageSize) { return new ZstdCompressor( getPool(conf), conf.getInt(PARQUET_COMPRESS_ZSTD_LEVEL, DEFAULT_PARQUET_COMPRESS_ZSTD_LEVEL), @@ -58,6 +64,10 @@ static CodecFactory.BytesCompressor createCompressor(Configuration conf, int pag } static CodecFactory.BytesDecompressor createDecompressor(Configuration conf) { + return createDecompressor(new HadoopParquetConfiguration(conf)); + } + + static CodecFactory.BytesDecompressor createDecompressor(ParquetConfiguration conf) { return new ZstdDecompressor(getPool(conf)); } @@ -133,7 +143,7 @@ BytesInput getBytesInput() { } } - private static BufferPool getPool(Configuration conf) { + private static BufferPool getPool(ParquetConfiguration conf) { if (conf.getBoolean(PARQUET_COMPRESS_ZSTD_BUFFERPOOL_ENABLED, DEFAULT_PARQUET_COMPRESS_ZSTD_BUFFERPOOL_ENABLED)) { return RecyclingBufferPool.INSTANCE; } else { diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/InternalParquetRecordReader.java b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/InternalParquetRecordReader.java index 8203e9098d..36da819fa7 100644 --- a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/InternalParquetRecordReader.java +++ b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/InternalParquetRecordReader.java @@ -22,6 +22,7 @@ import java.util.Collections; import java.util.HashMap; import java.util.Map; +import java.util.Objects; import java.util.Optional; import java.util.PrimitiveIterator; import java.util.Set; @@ -29,9 +30,9 @@ import org.apache.hadoop.conf.Configuration; -import org.apache.parquet.HadoopReadOptions; import org.apache.parquet.ParquetReadOptions; import org.apache.parquet.column.page.PageReadStore; +import org.apache.parquet.conf.ParquetConfiguration; import org.apache.parquet.filter.UnboundRecordFilter; import org.apache.parquet.filter2.compat.FilterCompat; import org.apache.parquet.filter2.compat.FilterCompat.Filter; @@ -167,10 +168,7 @@ public float getProgress() throws IOException, InterruptedException { public void initialize(ParquetFileReader reader, ParquetReadOptions options) { // copy custom configuration to the Configuration passed to the ReadSupport - Configuration conf = new Configuration(); - if (options instanceof HadoopReadOptions) { - conf = ((HadoopReadOptions) options).getConf(); - } + ParquetConfiguration conf = Objects.requireNonNull(options).getConfiguration(); for (String property : options.getPropertyNames()) { conf.set(property, options.getProperty(property)); } @@ -261,7 +259,7 @@ public boolean nextKeyValue() throws IOException, InterruptedException { LOG.debug("read value: {}", currentValue); } catch (RuntimeException e) { - throw new ParquetDecodingException(format("Can not read value at %d in block %d in file %s", current, currentBlock, reader.getPath()), e); + throw new ParquetDecodingException(format("Can not read value at %d in block %d in file %s", current, currentBlock, reader.getFile()), e); } } return true; diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetInputFormat.java b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetInputFormat.java index 1bfd4b20f0..7d355af78c 100644 --- a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetInputFormat.java +++ b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetInputFormat.java @@ -52,6 +52,8 @@ import org.apache.hadoop.mapreduce.lib.input.FileSplit; import org.apache.parquet.Preconditions; +import org.apache.parquet.conf.HadoopParquetConfiguration; +import org.apache.parquet.conf.ParquetConfiguration; import org.apache.parquet.filter.UnboundRecordFilter; import org.apache.parquet.filter2.compat.FilterCompat; import org.apache.parquet.filter2.compat.FilterCompat.Filter; @@ -193,18 +195,13 @@ public static Class getUnboundRecordFilter(Configuration configuration) { return ConfigurationUtil.getClassFromConfig(configuration, UNBOUND_RECORD_FILTER, UnboundRecordFilter.class); } - private static UnboundRecordFilter getUnboundRecordFilterInstance(Configuration configuration) { + private static UnboundRecordFilter getUnboundRecordFilterInstance(ParquetConfiguration configuration) { Class clazz = ConfigurationUtil.getClassFromConfig(configuration, UNBOUND_RECORD_FILTER, UnboundRecordFilter.class); - if (clazz == null) { return null; } - + if (clazz == null) { + return null; + } try { - UnboundRecordFilter unboundRecordFilter = (UnboundRecordFilter) clazz.newInstance(); - - if (unboundRecordFilter instanceof Configurable) { - ((Configurable)unboundRecordFilter).setConf(configuration); - } - - return unboundRecordFilter; + return (UnboundRecordFilter) clazz.newInstance(); } catch (InstantiationException | IllegalAccessException e) { throw new BadConfigurationException( "could not instantiate unbound record filter class", e); @@ -232,6 +229,10 @@ public static void setFilterPredicate(Configuration configuration, FilterPredica } private static FilterPredicate getFilterPredicate(Configuration configuration) { + return getFilterPredicate(new HadoopParquetConfiguration(configuration)); + } + + private static FilterPredicate getFilterPredicate(ParquetConfiguration configuration) { try { return SerializationUtil.readObjectFromConfAsBase64(FILTER_PREDICATE, configuration); } catch (IOException e) { @@ -247,6 +248,10 @@ private static FilterPredicate getFilterPredicate(Configuration configuration) { * @return a filter for the unbound record filter specified in conf */ public static Filter getFilter(Configuration conf) { + return getFilter(new HadoopParquetConfiguration(conf)); + } + + public static Filter getFilter(ParquetConfiguration conf) { return FilterCompat.get(getFilterPredicate(conf), getUnboundRecordFilterInstance(conf)); } diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetReader.java b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetReader.java index f9c8314dd3..785e5d05d8 100644 --- a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetReader.java +++ b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetReader.java @@ -36,11 +36,14 @@ import org.apache.parquet.bytes.ByteBufferAllocator; import org.apache.parquet.bytes.HeapByteBufferAllocator; import org.apache.parquet.compression.CompressionCodecFactory; +import org.apache.parquet.conf.HadoopParquetConfiguration; +import org.apache.parquet.conf.ParquetConfiguration; import org.apache.parquet.crypto.FileDecryptionProperties; import org.apache.parquet.filter.UnboundRecordFilter; import org.apache.parquet.filter2.compat.FilterCompat; import org.apache.parquet.filter2.compat.FilterCompat.Filter; import org.apache.parquet.hadoop.api.ReadSupport; +import org.apache.parquet.hadoop.util.ConfigurationUtil; import org.apache.parquet.hadoop.util.HadoopInputFile; import org.apache.parquet.HadoopReadOptions; import org.apache.parquet.hadoop.util.HiddenFileFilter; @@ -180,6 +183,10 @@ public static Builder read(InputFile file) throws IOException { return new Builder<>(file); } + public static Builder read(InputFile file, ParquetConfiguration conf) throws IOException { + return new Builder<>(file, conf); + } + public static Builder builder(ReadSupport readSupport, Path path) { return new Builder<>(readSupport, path); } @@ -190,7 +197,7 @@ public static class Builder { private final Path path; private Filter filter = null; private ByteBufferAllocator allocator = new HeapByteBufferAllocator(); - protected Configuration conf; + protected ParquetConfiguration conf; private ParquetReadOptions.Builder optionsBuilder; @Deprecated @@ -198,8 +205,9 @@ private Builder(ReadSupport readSupport, Path path) { this.readSupport = Objects.requireNonNull(readSupport, "readSupport cannot be null"); this.file = null; this.path = Objects.requireNonNull(path, "path cannot be null"); - this.conf = new Configuration(); - this.optionsBuilder = HadoopReadOptions.builder(conf, path); + Configuration hadoopConf = new Configuration(); + this.conf = new HadoopParquetConfiguration(hadoopConf); + this.optionsBuilder = HadoopReadOptions.builder(hadoopConf, path); } @Deprecated @@ -207,8 +215,9 @@ protected Builder(Path path) { this.readSupport = null; this.file = null; this.path = Objects.requireNonNull(path, "path cannot be null"); - this.conf = new Configuration(); - this.optionsBuilder = HadoopReadOptions.builder(conf, path); + Configuration hadoopConf = new Configuration(); + this.conf = new HadoopParquetConfiguration(hadoopConf); + this.optionsBuilder = HadoopReadOptions.builder(hadoopConf, path); } protected Builder(InputFile file) { @@ -217,17 +226,30 @@ protected Builder(InputFile file) { this.path = null; if (file instanceof HadoopInputFile) { HadoopInputFile hadoopFile = (HadoopInputFile) file; - this.conf = hadoopFile.getConfiguration(); - optionsBuilder = HadoopReadOptions.builder(conf, hadoopFile.getPath()); + Configuration hadoopConf = hadoopFile.getConfiguration(); + this.conf = new HadoopParquetConfiguration(hadoopConf); + optionsBuilder = HadoopReadOptions.builder(hadoopConf, hadoopFile.getPath()); } else { - this.conf = new Configuration(); - optionsBuilder = HadoopReadOptions.builder(conf); + optionsBuilder = ParquetReadOptions.builder(new HadoopParquetConfiguration()); + } + } + + protected Builder(InputFile file, ParquetConfiguration conf) { + this.readSupport = null; + this.file = Objects.requireNonNull(file, "file cannot be null"); + this.path = null; + this.conf = conf; + if (file instanceof HadoopInputFile) { + HadoopInputFile hadoopFile = (HadoopInputFile) file; + optionsBuilder = HadoopReadOptions.builder(ConfigurationUtil.createHadoopConfiguration(conf), hadoopFile.getPath()); + } else { + optionsBuilder = ParquetReadOptions.builder(conf); } } // when called, resets options to the defaults from conf public Builder withConf(Configuration conf) { - this.conf = Objects.requireNonNull(conf, "conf cannot be null"); + this.conf = new HadoopParquetConfiguration(Objects.requireNonNull(conf, "conf cannot be null")); // previous versions didn't use the builder, so may set filter before conf. this maintains // compatibility for filter. other options are reset by a new conf. @@ -239,6 +261,15 @@ public Builder withConf(Configuration conf) { return this; } + public Builder withConf(ParquetConfiguration conf) { + this.conf = conf; + this.optionsBuilder = ParquetReadOptions.builder(conf); + if (filter != null) { + optionsBuilder.withRecordFilter(filter); + } + return this; + } + public Builder withFilter(Filter filter) { this.filter = filter; optionsBuilder.withRecordFilter(filter); @@ -354,19 +385,20 @@ public ParquetReader build() throws IOException { .build(); if (path != null) { - FileSystem fs = path.getFileSystem(conf); + Configuration hadoopConf = ConfigurationUtil.createHadoopConfiguration(conf); + FileSystem fs = path.getFileSystem(hadoopConf); FileStatus stat = fs.getFileStatus(path); if (stat.isFile()) { return new ParquetReader<>( - Collections.singletonList((InputFile) HadoopInputFile.fromStatus(stat, conf)), + Collections.singletonList((InputFile) HadoopInputFile.fromStatus(stat, hadoopConf)), options, getReadSupport()); } else { List files = new ArrayList<>(); for (FileStatus fileStatus : fs.listStatus(path, HiddenFileFilter.INSTANCE)) { - files.add(HadoopInputFile.fromStatus(fileStatus, conf)); + files.add(HadoopInputFile.fromStatus(fileStatus, hadoopConf)); } return new ParquetReader(files, options, getReadSupport()); } diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetWriter.java b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetWriter.java index 7b78a93763..2e888722e3 100644 --- a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetWriter.java +++ b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetWriter.java @@ -26,11 +26,14 @@ import org.apache.parquet.column.ParquetProperties; import org.apache.parquet.column.ParquetProperties.WriterVersion; +import org.apache.parquet.conf.HadoopParquetConfiguration; +import org.apache.parquet.conf.ParquetConfiguration; import org.apache.parquet.compression.CompressionCodecFactory; import org.apache.parquet.crypto.FileEncryptionProperties; import org.apache.parquet.hadoop.api.WriteSupport; import org.apache.parquet.hadoop.metadata.CompressionCodecName; import org.apache.parquet.hadoop.metadata.ParquetMetadata; +import org.apache.parquet.hadoop.util.ConfigurationUtil; import org.apache.parquet.hadoop.util.HadoopOutputFile; import org.apache.parquet.io.OutputFile; import org.apache.parquet.schema.MessageType; @@ -276,6 +279,30 @@ public ParquetWriter(Path file, Configuration conf, WriteSupport writeSupport int maxPaddingSize, ParquetProperties encodingProps, FileEncryptionProperties encryptionProperties) throws IOException { + this( + file, + mode, + writeSupport, + compressionCodecName, + rowGroupSize, + validating, + new HadoopParquetConfiguration(conf), + maxPaddingSize, + encodingProps, + encryptionProperties); + } + + ParquetWriter( + OutputFile file, + ParquetFileWriter.Mode mode, + WriteSupport writeSupport, + CompressionCodecName compressionCodecName, + long rowGroupSize, + boolean validating, + ParquetConfiguration conf, + int maxPaddingSize, + ParquetProperties encodingProps, + FileEncryptionProperties encryptionProperties) throws IOException { WriteSupport.WriteContext writeContext = writeSupport.init(conf); MessageType schema = writeContext.getSchema(); @@ -283,8 +310,9 @@ public ParquetWriter(Path file, Configuration conf, WriteSupport writeSupport // encryptionProperties could be built from the implementation of EncryptionPropertiesFactory when it is attached. if (encryptionProperties == null) { String path = file == null ? null : file.getPath(); - encryptionProperties = ParquetOutputFormat.createEncryptionProperties(conf, - path == null ? null : new Path(path), writeContext); + Configuration hadoopConf = ConfigurationUtil.createHadoopConfiguration(conf); + encryptionProperties = ParquetOutputFormat.createEncryptionProperties( + hadoopConf, path == null ? null : new Path(path), writeContext); } ParquetFileWriter fileWriter = new ParquetFileWriter( @@ -353,7 +381,7 @@ public abstract static class Builder> { private OutputFile file = null; private Path path = null; private FileEncryptionProperties encryptionProperties = null; - private Configuration conf = new Configuration(); + private ParquetConfiguration conf = null; private ParquetFileWriter.Mode mode; private CompressionCodecName codecName = DEFAULT_COMPRESSION_CODEC_NAME; private long rowGroupSize = DEFAULT_BLOCK_SIZE; @@ -381,6 +409,14 @@ protected Builder(OutputFile path) { */ protected abstract WriteSupport getWriteSupport(Configuration conf); + /** + * @param conf a configuration + * @return an appropriate WriteSupport for the object model. + */ + protected WriteSupport getWriteSupport(ParquetConfiguration conf) { + throw new UnsupportedOperationException("Override ParquetWriter$Builder#getWriteSupport(ParquetConfiguration)"); + } + /** * Set the {@link Configuration} used by the constructed writer. * @@ -388,6 +424,17 @@ protected Builder(OutputFile path) { * @return this builder for method chaining. */ public SELF withConf(Configuration conf) { + this.conf = new HadoopParquetConfiguration(conf); + return self(); + } + + /** + * Set the {@link ParquetConfiguration} used by the constructed writer. + * + * @param conf a {@code ParquetConfiguration} + * @return this builder for method chaining. + */ + public SELF withConf(ParquetConfiguration conf) { this.conf = conf; return self(); } @@ -719,6 +766,9 @@ public SELF withStatisticsTruncateLength(int length) { * @return this builder for method chaining. */ public SELF config(String property, String value) { + if (conf == null) { + conf = new HadoopParquetConfiguration(); + } conf.set(property, value); return self(); } @@ -730,12 +780,15 @@ public SELF config(String property, String value) { * @throws IOException if there is an error while creating the writer */ public ParquetWriter build() throws IOException { + if (conf == null) { + conf = new HadoopParquetConfiguration(); + } if (file != null) { return new ParquetWriter<>(file, mode, getWriteSupport(conf), codecName, rowGroupSize, enableValidation, conf, maxPaddingSize, encodingPropsBuilder.build(), encryptionProperties); } else { - return new ParquetWriter<>(HadoopOutputFile.fromPath(path, conf), + return new ParquetWriter<>(HadoopOutputFile.fromPath(path, ConfigurationUtil.createHadoopConfiguration(conf)), mode, getWriteSupport(conf), codecName, rowGroupSize, enableValidation, conf, maxPaddingSize, encodingPropsBuilder.build(), encryptionProperties); diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/api/DelegatingReadSupport.java b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/api/DelegatingReadSupport.java index 8100a351f3..2593393d77 100644 --- a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/api/DelegatingReadSupport.java +++ b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/api/DelegatingReadSupport.java @@ -22,6 +22,7 @@ import org.apache.hadoop.conf.Configuration; +import org.apache.parquet.conf.ParquetConfiguration; import org.apache.parquet.io.api.RecordMaterializer; import org.apache.parquet.schema.MessageType; @@ -53,6 +54,15 @@ public RecordMaterializer prepareForRead( return delegate.prepareForRead(configuration, keyValueMetaData, fileSchema, readContext); } + @Override + public RecordMaterializer prepareForRead( + ParquetConfiguration configuration, + Map keyValueMetaData, + MessageType fileSchema, + ReadSupport.ReadContext readContext) { + return delegate.prepareForRead(configuration, keyValueMetaData, fileSchema, readContext); + } + @Override public String toString() { return this.getClass().getName() + "(" + delegate.toString() + ")"; diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/api/DelegatingWriteSupport.java b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/api/DelegatingWriteSupport.java index f5bbfc60de..926fe68c63 100644 --- a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/api/DelegatingWriteSupport.java +++ b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/api/DelegatingWriteSupport.java @@ -20,6 +20,7 @@ import org.apache.hadoop.conf.Configuration; +import org.apache.parquet.conf.ParquetConfiguration; import org.apache.parquet.io.api.RecordConsumer; /** @@ -42,6 +43,11 @@ public WriteSupport.WriteContext init(Configuration configuration) { return delegate.init(configuration); } + @Override + public WriteSupport.WriteContext init(ParquetConfiguration configuration) { + return delegate.init(configuration); + } + @Override public void prepareForWrite(RecordConsumer recordConsumer) { delegate.prepareForWrite(recordConsumer); diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/api/InitContext.java b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/api/InitContext.java index 6bc5e5d3d4..3d80811aa7 100644 --- a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/api/InitContext.java +++ b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/api/InitContext.java @@ -25,6 +25,9 @@ import org.apache.hadoop.conf.Configuration; +import org.apache.parquet.conf.HadoopParquetConfiguration; +import org.apache.parquet.conf.ParquetConfiguration; +import org.apache.parquet.hadoop.util.ConfigurationUtil; import org.apache.parquet.schema.MessageType; /** @@ -35,7 +38,7 @@ public class InitContext { private final Map> keyValueMetadata; private Map mergedKeyValueMetadata; - private final Configuration configuration; + private final ParquetConfiguration configuration; private final MessageType fileSchema; /** @@ -47,6 +50,13 @@ public InitContext( Configuration configuration, Map> keyValueMetadata, MessageType fileSchema) { + this(new HadoopParquetConfiguration(configuration), keyValueMetadata, fileSchema); + } + + public InitContext( + ParquetConfiguration configuration, + Map> keyValueMetadata, + MessageType fileSchema) { super(); this.keyValueMetadata = keyValueMetadata; this.configuration = configuration; @@ -77,6 +87,13 @@ public Map getMergedKeyValueMetaData() { * @return the configuration for this job */ public Configuration getConfiguration() { + return ConfigurationUtil.createHadoopConfiguration(configuration); + } + + /** + * @return the Parquet configuration for this job + */ + public ParquetConfiguration getParquetConfiguration() { return configuration; } diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/api/ReadSupport.java b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/api/ReadSupport.java index 62344522b6..a3dfe2caa2 100644 --- a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/api/ReadSupport.java +++ b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/api/ReadSupport.java @@ -22,6 +22,7 @@ import org.apache.hadoop.conf.Configuration; +import org.apache.parquet.conf.ParquetConfiguration; import org.apache.parquet.io.api.RecordMaterializer; import org.apache.parquet.schema.MessageType; import org.apache.parquet.schema.MessageTypeParser; @@ -69,10 +70,28 @@ public static MessageType getSchemaForRead(MessageType fileMessageType, MessageT */ @Deprecated public ReadContext init( - Configuration configuration, - Map keyValueMetaData, - MessageType fileSchema) { - throw new UnsupportedOperationException("Override init(InitContext)"); + Configuration configuration, + Map keyValueMetaData, + MessageType fileSchema) { + throw new UnsupportedOperationException("Override ReadSupport.init(InitContext)"); + } + + /** + * called in {@link org.apache.hadoop.mapreduce.InputFormat#getSplits(org.apache.hadoop.mapreduce.JobContext)} in the front end + * + * @param configuration the configuration + * @param keyValueMetaData the app specific metadata from the file + * @param fileSchema the schema of the file + * @return the readContext that defines how to read the file + * + * @deprecated override {@link ReadSupport#init(InitContext)} instead + */ + @Deprecated + public ReadContext init( + ParquetConfiguration configuration, + Map keyValueMetaData, + MessageType fileSchema) { + throw new UnsupportedOperationException("Override ReadSupport.init(InitContext)"); } /** @@ -82,7 +101,7 @@ public ReadContext init( * @return the readContext that defines how to read the file */ public ReadContext init(InitContext context) { - return init(context.getConfiguration(), context.getMergedKeyValueMetaData(), context.getFileSchema()); + return init(context.getParquetConfiguration(), context.getMergedKeyValueMetaData(), context.getFileSchema()); } /** @@ -101,6 +120,24 @@ abstract public RecordMaterializer prepareForRead( MessageType fileSchema, ReadContext readContext); + /** + * called in {@link org.apache.hadoop.mapreduce.RecordReader#initialize(org.apache.hadoop.mapreduce.InputSplit, org.apache.hadoop.mapreduce.TaskAttemptContext)} in the back end + * the returned RecordMaterializer will materialize the records and add them to the destination + * + * @param configuration the configuration + * @param keyValueMetaData the app specific metadata from the file + * @param fileSchema the schema of the file + * @param readContext returned by the init method + * @return the recordMaterializer that will materialize the records + */ + public RecordMaterializer prepareForRead( + ParquetConfiguration configuration, + Map keyValueMetaData, + MessageType fileSchema, + ReadContext readContext) { + throw new UnsupportedOperationException("Override ReadSupport.prepareForRead(ParquetConfiguration, Map, MessageType, ReadContext)"); + } + /** * information to read the file */ diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/api/WriteSupport.java b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/api/WriteSupport.java index 9549d5f492..b73e102c20 100644 --- a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/api/WriteSupport.java +++ b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/api/WriteSupport.java @@ -25,6 +25,7 @@ import org.apache.hadoop.conf.Configuration; +import org.apache.parquet.conf.ParquetConfiguration; import org.apache.parquet.io.api.RecordConsumer; import org.apache.parquet.schema.MessageType; @@ -105,6 +106,15 @@ public Map getExtraMetaData() { */ public abstract WriteContext init(Configuration configuration); + /** + * called first in the task + * @param configuration the job's configuration + * @return the information needed to write the file + */ + public WriteContext init(ParquetConfiguration configuration) { + throw new UnsupportedOperationException("Override WriteSupport#init(ParquetConfiguration)"); + } + /** * This will be called once per row group * @param recordConsumer the recordConsumer to write to diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/example/ExampleParquetWriter.java b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/example/ExampleParquetWriter.java index 12a67d301d..a151b4fce4 100644 --- a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/example/ExampleParquetWriter.java +++ b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/example/ExampleParquetWriter.java @@ -21,6 +21,7 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.parquet.column.ParquetProperties; +import org.apache.parquet.conf.ParquetConfiguration; import org.apache.parquet.example.data.Group; import org.apache.parquet.hadoop.ParquetWriter; import org.apache.parquet.hadoop.api.WriteSupport; @@ -111,6 +112,11 @@ protected Builder self() { @Override protected WriteSupport getWriteSupport(Configuration conf) { + return getWriteSupport((ParquetConfiguration) null); + } + + @Override + protected WriteSupport getWriteSupport(ParquetConfiguration conf) { return new GroupWriteSupport(type, extraMetaData); } diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/example/GroupReadSupport.java b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/example/GroupReadSupport.java index c49b681d5c..6cb4b6f7fe 100644 --- a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/example/GroupReadSupport.java +++ b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/example/GroupReadSupport.java @@ -22,6 +22,8 @@ import org.apache.hadoop.conf.Configuration; +import org.apache.parquet.conf.HadoopParquetConfiguration; +import org.apache.parquet.conf.ParquetConfiguration; import org.apache.parquet.example.data.Group; import org.apache.parquet.example.data.simple.convert.GroupRecordConverter; import org.apache.parquet.hadoop.api.ReadSupport; @@ -34,6 +36,13 @@ public class GroupReadSupport extends ReadSupport { public org.apache.parquet.hadoop.api.ReadSupport.ReadContext init( Configuration configuration, Map keyValueMetaData, MessageType fileSchema) { + return init(new HadoopParquetConfiguration(configuration), keyValueMetaData, fileSchema); + } + + @Override + public org.apache.parquet.hadoop.api.ReadSupport.ReadContext init( + ParquetConfiguration configuration, Map keyValueMetaData, + MessageType fileSchema) { String partialSchemaString = configuration.get(ReadSupport.PARQUET_READ_SCHEMA); MessageType requestedProjection = getSchemaForRead(fileSchema, partialSchemaString); return new ReadContext(requestedProjection); @@ -46,4 +55,11 @@ public RecordMaterializer prepareForRead(Configuration configuration, return new GroupRecordConverter(readContext.getRequestedSchema()); } + @Override + public RecordMaterializer prepareForRead(ParquetConfiguration configuration, + Map keyValueMetaData, MessageType fileSchema, + org.apache.parquet.hadoop.api.ReadSupport.ReadContext readContext) { + return new GroupRecordConverter(readContext.getRequestedSchema()); + } + } diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/example/GroupWriteSupport.java b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/example/GroupWriteSupport.java index dfed676c9c..a4d4a3f36d 100644 --- a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/example/GroupWriteSupport.java +++ b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/example/GroupWriteSupport.java @@ -26,6 +26,8 @@ import org.apache.hadoop.conf.Configuration; +import org.apache.parquet.conf.HadoopParquetConfiguration; +import org.apache.parquet.conf.ParquetConfiguration; import org.apache.parquet.example.data.Group; import org.apache.parquet.example.data.GroupWriter; import org.apache.parquet.hadoop.api.WriteSupport; @@ -41,6 +43,10 @@ public static void setSchema(MessageType schema, Configuration configuration) { } public static MessageType getSchema(Configuration configuration) { + return getSchema(new HadoopParquetConfiguration(configuration)); + } + + public static MessageType getSchema(ParquetConfiguration configuration) { return parseMessageType(Objects.requireNonNull(configuration.get(PARQUET_EXAMPLE_SCHEMA), PARQUET_EXAMPLE_SCHEMA)); } @@ -68,6 +74,11 @@ public String getName() { @Override public org.apache.parquet.hadoop.api.WriteSupport.WriteContext init(Configuration configuration) { + return init(new HadoopParquetConfiguration(configuration)); + } + + @Override + public org.apache.parquet.hadoop.api.WriteSupport.WriteContext init(ParquetConfiguration configuration) { // if present, prefer the schema passed to the constructor if (schema == null) { schema = getSchema(configuration); diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/util/ConfigurationUtil.java b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/util/ConfigurationUtil.java index 7f39cd76c6..ca524d90b6 100644 --- a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/util/ConfigurationUtil.java +++ b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/util/ConfigurationUtil.java @@ -19,18 +19,26 @@ package org.apache.parquet.hadoop.util; import org.apache.hadoop.conf.Configuration; +import org.apache.parquet.conf.HadoopParquetConfiguration; +import org.apache.parquet.conf.ParquetConfiguration; import org.apache.parquet.hadoop.BadConfigurationException; +import java.util.Map; + public class ConfigurationUtil { public static Class getClassFromConfig(Configuration configuration, String configName, Class assignableFrom) { + return getClassFromConfig(new HadoopParquetConfiguration(configuration), configName, assignableFrom); + } + + public static Class getClassFromConfig(ParquetConfiguration configuration, String configName, Class assignableFrom) { final String className = configuration.get(configName); if (className == null) { return null; } - + try { - final Class foundClass = configuration.getClassByName(className); + final Class foundClass = configuration.getClassByName(className); if (!assignableFrom.isAssignableFrom(foundClass)) { throw new BadConfigurationException("class " + className + " set in job conf at " + configName + " is not a subclass of " + assignableFrom.getCanonicalName()); @@ -41,4 +49,18 @@ public static Class getClassFromConfig(Configuration configuration, String co } } + public static Configuration createHadoopConfiguration(ParquetConfiguration conf) { + if (conf == null) { + return new Configuration(); + } + if (conf instanceof HadoopParquetConfiguration) { + return ((HadoopParquetConfiguration) conf).getConfiguration(); + } + Configuration configuration = new Configuration(); + for (Map.Entry entry : conf) { + configuration.set(entry.getKey(), entry.getValue()); + } + return configuration; + } + } diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/util/HadoopCodecs.java b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/util/HadoopCodecs.java index a46c8db216..845cafc5aa 100644 --- a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/util/HadoopCodecs.java +++ b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/util/HadoopCodecs.java @@ -22,6 +22,7 @@ import org.apache.hadoop.conf.Configuration; import org.apache.parquet.bytes.ByteBufferAllocator; import org.apache.parquet.compression.CompressionCodecFactory; +import org.apache.parquet.conf.ParquetConfiguration; import org.apache.parquet.hadoop.CodecFactory; public class HadoopCodecs { @@ -33,6 +34,10 @@ public static CompressionCodecFactory newFactory(Configuration conf, int sizeHin return new CodecFactory(conf, sizeHint); } + public static CompressionCodecFactory newFactory(ParquetConfiguration conf, int sizeHint) { + return new CodecFactory(conf, sizeHint); + } + public static CompressionCodecFactory newDirectFactory(Configuration conf, ByteBufferAllocator allocator, int sizeHint) { return CodecFactory.createDirectCodecFactory(conf, allocator, sizeHint); } diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/util/SerializationUtil.java b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/util/SerializationUtil.java index 199b774c43..6e669ed8ba 100644 --- a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/util/SerializationUtil.java +++ b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/util/SerializationUtil.java @@ -29,6 +29,8 @@ import java.util.zip.GZIPOutputStream; import org.apache.hadoop.conf.Configuration; +import org.apache.parquet.conf.HadoopParquetConfiguration; +import org.apache.parquet.conf.ParquetConfiguration; /** * Serialization utils copied from: @@ -70,8 +72,22 @@ public static void writeObjectToConfAsBase64(String key, Object obj, Configurati * @return the read object, or null if key is not present in conf * @throws IOException if there is an error while reading */ - @SuppressWarnings("unchecked") public static T readObjectFromConfAsBase64(String key, Configuration conf) throws IOException { + return readObjectFromConfAsBase64(key, new HadoopParquetConfiguration(conf)); + } + + /** + * Reads an object (that was written using + * {@link #writeObjectToConfAsBase64}) from a configuration + * + * @param key for the configuration + * @param conf to read from + * @param the Java type of the deserialized object + * @return the read object, or null if key is not present in conf + * @throws IOException if there is an error while reading + */ + @SuppressWarnings("unchecked") + public static T readObjectFromConfAsBase64(String key, ParquetConfiguration conf) throws IOException { String b64 = conf.get(key); if (b64 == null) { return null; diff --git a/parquet-hadoop/src/test/java/org/apache/parquet/DirectWriterTest.java b/parquet-hadoop/src/test/java/org/apache/parquet/DirectWriterTest.java index 074d2e8b66..e8e032c9cd 100644 --- a/parquet-hadoop/src/test/java/org/apache/parquet/DirectWriterTest.java +++ b/parquet-hadoop/src/test/java/org/apache/parquet/DirectWriterTest.java @@ -25,6 +25,7 @@ import java.util.UUID; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; +import org.apache.parquet.conf.ParquetConfiguration; import org.junit.Rule; import org.junit.rules.TemporaryFolder; import org.apache.parquet.hadoop.ParquetWriter; @@ -86,6 +87,11 @@ protected DirectWriteSupport(MessageType type, DirectWriter writer, @Override public WriteContext init(Configuration configuration) { + return init((ParquetConfiguration) null); + } + + @Override + public WriteContext init(ParquetConfiguration configuration) { return new WriteContext(type, metadata); } diff --git a/parquet-hadoop/src/test/java/org/apache/parquet/crypto/propertiesfactory/SchemaControlEncryptionTest.java b/parquet-hadoop/src/test/java/org/apache/parquet/crypto/propertiesfactory/SchemaControlEncryptionTest.java index 862ae672c6..a7c2002832 100644 --- a/parquet-hadoop/src/test/java/org/apache/parquet/crypto/propertiesfactory/SchemaControlEncryptionTest.java +++ b/parquet-hadoop/src/test/java/org/apache/parquet/crypto/propertiesfactory/SchemaControlEncryptionTest.java @@ -22,6 +22,8 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.parquet.column.ColumnDescriptor; +import org.apache.parquet.conf.HadoopParquetConfiguration; +import org.apache.parquet.conf.ParquetConfiguration; import org.apache.parquet.crypto.EncryptionPropertiesFactory; import org.apache.parquet.crypto.ParquetCipher; import org.apache.parquet.example.data.Group; @@ -203,6 +205,11 @@ public CryptoGroupWriteSupport() { @Override public WriteContext init(Configuration conf) { + return init(new HadoopParquetConfiguration(conf)); + } + + @Override + public WriteContext init(ParquetConfiguration conf) { WriteContext writeContext = super.init(conf); MessageType schema = writeContext.getSchema(); List columns = schema.getColumns(); @@ -219,6 +226,10 @@ public WriteContext init(Configuration conf) { } private void setMetadata(ColumnDescriptor column, Configuration conf) { + setMetadata(column, new HadoopParquetConfiguration(conf)); + } + + private void setMetadata(ColumnDescriptor column, ParquetConfiguration conf) { String columnShortName = column.getPath()[column.getPath().length - 1]; if (cryptoMetadata.containsKey(columnShortName) && cryptoMetadata.get(columnShortName).get("columnKeyMetaData") != null) { @@ -242,6 +253,11 @@ protected Builder self() { @Override protected WriteSupport getWriteSupport(Configuration conf) { + return getWriteSupport((ParquetConfiguration) null); + } + + @Override + protected WriteSupport getWriteSupport(ParquetConfiguration conf) { return new CryptoGroupWriteSupport(); } } diff --git a/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestEncryptionOptions.java b/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestEncryptionOptions.java index a212c091fe..991d588838 100644 --- a/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestEncryptionOptions.java +++ b/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestEncryptionOptions.java @@ -316,7 +316,7 @@ public void testWriteReadEncryptedParquetFiles() throws IOException { Path rootPath = new Path(temporaryFolder.getRoot().getPath()); LOG.info("======== testWriteReadEncryptedParquetFiles {} ========", rootPath.toString()); byte[] AADPrefix = AAD_PREFIX_STRING.getBytes(StandardCharsets.UTF_8); - // Write using various encryption configuraions + // Write using various encryption configurations testWriteEncryptedParquetFiles(rootPath, DATA); // Read using various decryption configurations. testReadEncryptedParquetFiles(rootPath, DATA); diff --git a/parquet-pig/src/main/java/org/apache/parquet/pig/TupleReadSupport.java b/parquet-pig/src/main/java/org/apache/parquet/pig/TupleReadSupport.java index 50f9ebcc3d..cca1a91f2d 100644 --- a/parquet-pig/src/main/java/org/apache/parquet/pig/TupleReadSupport.java +++ b/parquet-pig/src/main/java/org/apache/parquet/pig/TupleReadSupport.java @@ -27,6 +27,8 @@ import java.util.Set; import org.apache.hadoop.conf.Configuration; +import org.apache.parquet.conf.HadoopParquetConfiguration; +import org.apache.parquet.conf.ParquetConfiguration; import org.apache.pig.LoadPushDown.RequiredFieldList; import org.apache.pig.data.Tuple; import org.apache.pig.impl.logicalLayer.FrontendException; @@ -61,6 +63,14 @@ public class TupleReadSupport extends ReadSupport { * @return the pig schema requested by the user or null if none. */ static Schema getPigSchema(Configuration configuration) { + return getPigSchema(new HadoopParquetConfiguration(configuration)); + } + + /** + * @param configuration the configuration + * @return the pig schema requested by the user or null if none. + */ + static Schema getPigSchema(ParquetConfiguration configuration) { return parsePigSchema(configuration.get(PARQUET_PIG_SCHEMA)); } @@ -69,9 +79,17 @@ static Schema getPigSchema(Configuration configuration) { * @return List of required fields from pushProjection */ static RequiredFieldList getRequiredFields(Configuration configuration) { + return getRequiredFields(new HadoopParquetConfiguration(configuration)); + } + + /** + * @param configuration configuration + * @return List of required fields from pushProjection + */ + static RequiredFieldList getRequiredFields(ParquetConfiguration configuration) { String requiredFieldString = configuration.get(PARQUET_PIG_REQUIRED_FIELDS); - if(requiredFieldString == null) { + if (requiredFieldString == null) { return null; } @@ -154,9 +172,9 @@ private static FieldSchema union(FieldSchema mergedFieldSchema, FieldSchema newF @Override public ReadContext init(InitContext initContext) { - Schema pigSchema = getPigSchema(initContext.getConfiguration()); - RequiredFieldList requiredFields = getRequiredFields(initContext.getConfiguration()); - boolean columnIndexAccess = initContext.getConfiguration().getBoolean(PARQUET_COLUMN_INDEX_ACCESS, false); + Schema pigSchema = getPigSchema(initContext.getParquetConfiguration()); + RequiredFieldList requiredFields = getRequiredFields(initContext.getParquetConfiguration()); + boolean columnIndexAccess = initContext.getParquetConfiguration().getBoolean(PARQUET_COLUMN_INDEX_ACCESS, false); if (pigSchema == null) { return new ReadContext(initContext.getFileSchema()); @@ -174,9 +192,17 @@ public RecordMaterializer prepareForRead( Map keyValueMetaData, MessageType fileSchema, ReadContext readContext) { + return prepareForRead(new HadoopParquetConfiguration(configuration), keyValueMetaData, fileSchema, readContext); + } + + @Override + public RecordMaterializer prepareForRead( + ParquetConfiguration configuration, + Map keyValueMetaData, + MessageType fileSchema, + ReadContext readContext) { MessageType requestedSchema = readContext.getRequestedSchema(); Schema requestedPigSchema = getPigSchema(configuration); - if (requestedPigSchema == null) { throw new ParquetDecodingException("Missing Pig schema: ParquetLoader sets the schema in the job conf"); } diff --git a/parquet-pig/src/main/java/org/apache/parquet/pig/TupleWriteSupport.java b/parquet-pig/src/main/java/org/apache/parquet/pig/TupleWriteSupport.java index 68a7d7d22b..fd1bb39cdf 100644 --- a/parquet-pig/src/main/java/org/apache/parquet/pig/TupleWriteSupport.java +++ b/parquet-pig/src/main/java/org/apache/parquet/pig/TupleWriteSupport.java @@ -26,6 +26,8 @@ import java.util.Set; import org.apache.hadoop.conf.Configuration; +import org.apache.parquet.conf.HadoopParquetConfiguration; +import org.apache.parquet.conf.ParquetConfiguration; import org.apache.pig.backend.executionengine.ExecException; import org.apache.pig.data.DataBag; import org.apache.pig.data.DataByteArray; @@ -82,6 +84,11 @@ public MessageType getParquetSchema() { @Override public WriteContext init(Configuration configuration) { + return init(new HadoopParquetConfiguration(configuration)); + } + + @Override + public WriteContext init(ParquetConfiguration configuration) { Map extraMetaData = new HashMap(); new PigMetaData(rootPigSchema).addToMetaData(extraMetaData); return new WriteContext(rootSchema, extraMetaData); diff --git a/parquet-pig/src/test/java/org/apache/parquet/pig/TestTupleRecordConsumer.java b/parquet-pig/src/test/java/org/apache/parquet/pig/TestTupleRecordConsumer.java index ff4bd87d64..1c21044cb0 100644 --- a/parquet-pig/src/test/java/org/apache/parquet/pig/TestTupleRecordConsumer.java +++ b/parquet-pig/src/test/java/org/apache/parquet/pig/TestTupleRecordConsumer.java @@ -33,6 +33,7 @@ import java.util.Set; import org.apache.hadoop.conf.Configuration; +import org.apache.parquet.conf.ParquetConfiguration; import org.apache.pig.backend.executionengine.ExecException; import org.apache.pig.data.Tuple; import org.apache.pig.impl.logicalLayer.schema.Schema; @@ -175,7 +176,7 @@ private void testFromGroups(String pigSchemaString, List input) throws Pa private TupleWriteSupport newTupleWriter(String pigSchemaString, RecordMaterializer recordConsumer) throws ParserException { TupleWriteSupport tupleWriter = TupleWriteSupport.fromPigSchema(pigSchemaString); - tupleWriter.init(null); + tupleWriter.init((ParquetConfiguration) null); tupleWriter.prepareForWrite( new ConverterConsumer(recordConsumer.getRootConverter(), tupleWriter.getParquetSchema()) ); diff --git a/parquet-pig/src/test/java/org/apache/parquet/pig/TupleConsumerPerfTest.java b/parquet-pig/src/test/java/org/apache/parquet/pig/TupleConsumerPerfTest.java index c8e36aded9..12c3373f5a 100644 --- a/parquet-pig/src/test/java/org/apache/parquet/pig/TupleConsumerPerfTest.java +++ b/parquet-pig/src/test/java/org/apache/parquet/pig/TupleConsumerPerfTest.java @@ -23,6 +23,7 @@ import java.util.logging.Level; import org.apache.parquet.column.ParquetProperties; +import org.apache.parquet.conf.ParquetConfiguration; import org.apache.pig.backend.executionengine.ExecException; import org.apache.pig.data.DataBag; import org.apache.pig.data.NonSpillableDataBag; @@ -130,8 +131,8 @@ private static void read(PageReadStore columns, String pigSchemaString, String m TupleReadSupport tupleReadSupport = new TupleReadSupport(); Map pigMetaData = pigMetaData(pigSchemaString); MessageType schema = new PigSchemaConverter().convert(Utils.getSchemaFromString(pigSchemaString)); - ReadContext init = tupleReadSupport.init(null, pigMetaData, schema); - RecordMaterializer recordConsumer = tupleReadSupport.prepareForRead(null, pigMetaData, schema, init); + ReadContext init = tupleReadSupport.init((ParquetConfiguration) null, pigMetaData, schema); + RecordMaterializer recordConsumer = tupleReadSupport.prepareForRead((ParquetConfiguration) null, pigMetaData, schema, init); RecordReader recordReader = columnIO.getRecordReader(columns, recordConsumer); // TODO: put this back // if (DEBUG) { @@ -156,7 +157,7 @@ private static Map pigMetaData(String pigSchemaString) { private static void write(MemPageStore memPageStore, ColumnWriteStoreV1 columns, MessageType schema, String pigSchemaString) throws ExecException, ParserException { MessageColumnIO columnIO = newColumnFactory(pigSchemaString); TupleWriteSupport tupleWriter = TupleWriteSupport.fromPigSchema(pigSchemaString); - tupleWriter.init(null); + tupleWriter.init((ParquetConfiguration) null); tupleWriter.prepareForWrite(columnIO.getRecordWriter(columns)); write(memPageStore, tupleWriter, 10000); write(memPageStore, tupleWriter, 10000); diff --git a/parquet-protobuf/src/main/java/org/apache/parquet/proto/ProtoMessageConverter.java b/parquet-protobuf/src/main/java/org/apache/parquet/proto/ProtoMessageConverter.java index 8383fbc75f..da51788f2a 100644 --- a/parquet-protobuf/src/main/java/org/apache/parquet/proto/ProtoMessageConverter.java +++ b/parquet-protobuf/src/main/java/org/apache/parquet/proto/ProtoMessageConverter.java @@ -25,6 +25,8 @@ import com.twitter.elephantbird.util.Protobufs; import org.apache.hadoop.conf.Configuration; import org.apache.parquet.column.Dictionary; +import org.apache.parquet.conf.HadoopParquetConfiguration; +import org.apache.parquet.conf.ParquetConfiguration; import org.apache.parquet.hadoop.BadConfigurationException; import org.apache.parquet.io.InvalidRecordException; import org.apache.parquet.io.ParquetDecodingException; @@ -68,7 +70,7 @@ public void add(Object value) { } }; - protected final Configuration conf; + protected final ParquetConfiguration conf; protected final Converter[] converters; protected final ParentValueContainer parent; protected final Message.Builder myBuilder; @@ -88,8 +90,16 @@ public void add(Object value) { this(conf, pvc, Protobufs.getMessageBuilder(protoClass), parquetSchema, extraMetadata); } + ProtoMessageConverter(ParquetConfiguration conf, ParentValueContainer pvc, Class protoClass, GroupType parquetSchema, Map extraMetadata) { + this(conf, pvc, Protobufs.getMessageBuilder(protoClass), parquetSchema, extraMetadata); + } + // For usage in message arrays ProtoMessageConverter(Configuration conf, ParentValueContainer pvc, Message.Builder builder, GroupType parquetSchema, Map extraMetadata) { + this(new HadoopParquetConfiguration(conf), pvc, builder, parquetSchema, extraMetadata); + } + + ProtoMessageConverter(ParquetConfiguration conf, ParentValueContainer pvc, Message.Builder builder, GroupType parquetSchema, Map extraMetadata) { if (pvc == null) { throw new IllegalStateException("Missing parent value container"); } @@ -141,7 +151,12 @@ private void validateProtoField(boolean ignoreUnknownFields, private Converter dummyScalarConverter(ParentValueContainer pvc, Type parquetField, Configuration conf, Map extraMetadata) { + return dummyScalarConverter(pvc, parquetField, new HadoopParquetConfiguration(conf), extraMetadata); + } + private Converter dummyScalarConverter(ParentValueContainer pvc, + Type parquetField, ParquetConfiguration conf, + Map extraMetadata) { if (parquetField.isPrimitive()) { PrimitiveType primitiveType = parquetField.asPrimitiveType(); PrimitiveType.PrimitiveTypeName primitiveTypeName = primitiveType.getPrimitiveTypeName(); diff --git a/parquet-protobuf/src/main/java/org/apache/parquet/proto/ProtoParquetWriter.java b/parquet-protobuf/src/main/java/org/apache/parquet/proto/ProtoParquetWriter.java index a85b4ef555..f1bd64466e 100644 --- a/parquet-protobuf/src/main/java/org/apache/parquet/proto/ProtoParquetWriter.java +++ b/parquet-protobuf/src/main/java/org/apache/parquet/proto/ProtoParquetWriter.java @@ -21,6 +21,7 @@ import com.google.protobuf.Message; import com.google.protobuf.MessageOrBuilder; import org.apache.hadoop.fs.Path; +import org.apache.parquet.conf.ParquetConfiguration; import org.apache.parquet.hadoop.ParquetWriter; import org.apache.parquet.hadoop.api.WriteSupport; import org.apache.parquet.hadoop.metadata.CompressionCodecName; @@ -120,8 +121,15 @@ public Builder withMessage(Class protoMessage){ return this; } - protected WriteSupport getWriteSupport(Configuration conf) { - return (WriteSupport) ProtoParquetWriter.writeSupport(protoMessage); - } + @Override + protected WriteSupport getWriteSupport(Configuration conf) { + return getWriteSupport((ParquetConfiguration) null); + } + + @Override + @SuppressWarnings("unchecked") + protected WriteSupport getWriteSupport(ParquetConfiguration conf) { + return (WriteSupport) ProtoParquetWriter.writeSupport(protoMessage); + } } } diff --git a/parquet-protobuf/src/main/java/org/apache/parquet/proto/ProtoReadSupport.java b/parquet-protobuf/src/main/java/org/apache/parquet/proto/ProtoReadSupport.java index 78edf70d2e..6343992e58 100644 --- a/parquet-protobuf/src/main/java/org/apache/parquet/proto/ProtoReadSupport.java +++ b/parquet-protobuf/src/main/java/org/apache/parquet/proto/ProtoReadSupport.java @@ -21,6 +21,8 @@ import com.google.protobuf.Message; import com.twitter.elephantbird.util.Protobufs; import org.apache.hadoop.conf.Configuration; +import org.apache.parquet.conf.HadoopParquetConfiguration; +import org.apache.parquet.conf.ParquetConfiguration; import org.apache.parquet.hadoop.api.InitContext; import org.apache.parquet.hadoop.api.ReadSupport; import org.apache.parquet.io.api.RecordMaterializer; @@ -59,7 +61,7 @@ public static void setProtobufClass(Configuration configuration, String protobuf @Override public ReadContext init(InitContext context) { - String requestedProjectionString = context.getConfiguration().get(PB_REQUESTED_PROJECTION); + String requestedProjectionString = context.getParquetConfiguration().get(PB_REQUESTED_PROJECTION); if (requestedProjectionString != null && !requestedProjectionString.trim().isEmpty()) { MessageType requestedProjection = getSchemaForRead(context.getFileSchema(), requestedProjectionString); @@ -74,6 +76,11 @@ public ReadContext init(InitContext context) { @Override public RecordMaterializer prepareForRead(Configuration configuration, Map keyValueMetaData, MessageType fileSchema, ReadContext readContext) { + return prepareForRead(new HadoopParquetConfiguration(configuration), keyValueMetaData, fileSchema, readContext); + } + + @Override + public RecordMaterializer prepareForRead(ParquetConfiguration configuration, Map keyValueMetaData, MessageType fileSchema, ReadContext readContext) { String headerProtoClass = keyValueMetaData.get(PB_CLASS); String configuredProtoClass = configuration.get(PB_CLASS); diff --git a/parquet-protobuf/src/main/java/org/apache/parquet/proto/ProtoRecordConverter.java b/parquet-protobuf/src/main/java/org/apache/parquet/proto/ProtoRecordConverter.java index 75a67f12cf..4ddf23d6eb 100644 --- a/parquet-protobuf/src/main/java/org/apache/parquet/proto/ProtoRecordConverter.java +++ b/parquet-protobuf/src/main/java/org/apache/parquet/proto/ProtoRecordConverter.java @@ -22,6 +22,7 @@ import com.google.protobuf.Message; import com.google.protobuf.MessageOrBuilder; import org.apache.hadoop.conf.Configuration; +import org.apache.parquet.conf.ParquetConfiguration; import org.apache.parquet.schema.MessageType; import java.util.Collections; @@ -54,6 +55,11 @@ public ProtoRecordConverter(Configuration conf, Class protocl reusedBuilder = getBuilder(); } + public ProtoRecordConverter(ParquetConfiguration conf, Class protoclass, MessageType parquetSchema, Map extraMetadata) { + super(conf, new SkipParentValueContainer(), protoclass, parquetSchema, extraMetadata); + reusedBuilder = getBuilder(); + } + public ProtoRecordConverter(Configuration conf, Message.Builder builder, MessageType parquetSchema, Map extraMetadata) { super(conf, new SkipParentValueContainer(), builder, parquetSchema, extraMetadata); reusedBuilder = getBuilder(); diff --git a/parquet-protobuf/src/main/java/org/apache/parquet/proto/ProtoRecordMaterializer.java b/parquet-protobuf/src/main/java/org/apache/parquet/proto/ProtoRecordMaterializer.java index dd77ca6b61..63640d3300 100644 --- a/parquet-protobuf/src/main/java/org/apache/parquet/proto/ProtoRecordMaterializer.java +++ b/parquet-protobuf/src/main/java/org/apache/parquet/proto/ProtoRecordMaterializer.java @@ -21,6 +21,8 @@ import com.google.protobuf.Message; import com.google.protobuf.MessageOrBuilder; import org.apache.hadoop.conf.Configuration; +import org.apache.parquet.conf.HadoopParquetConfiguration; +import org.apache.parquet.conf.ParquetConfiguration; import org.apache.parquet.io.api.GroupConverter; import org.apache.parquet.io.api.RecordMaterializer; import org.apache.parquet.schema.MessageType; @@ -32,6 +34,10 @@ class ProtoRecordMaterializer extends RecordMaterial private final ProtoRecordConverter root; public ProtoRecordMaterializer(Configuration conf, MessageType requestedSchema, Class protobufClass, Map metadata) { + this(new HadoopParquetConfiguration(conf), requestedSchema, protobufClass, metadata); + } + + public ProtoRecordMaterializer(ParquetConfiguration conf, MessageType requestedSchema, Class protobufClass, Map metadata) { this.root = new ProtoRecordConverter(conf, protobufClass, requestedSchema, metadata); } diff --git a/parquet-protobuf/src/main/java/org/apache/parquet/proto/ProtoSchemaConverter.java b/parquet-protobuf/src/main/java/org/apache/parquet/proto/ProtoSchemaConverter.java index c3570323f0..a6a779d074 100644 --- a/parquet-protobuf/src/main/java/org/apache/parquet/proto/ProtoSchemaConverter.java +++ b/parquet-protobuf/src/main/java/org/apache/parquet/proto/ProtoSchemaConverter.java @@ -25,6 +25,8 @@ import com.google.protobuf.Message; import com.twitter.elephantbird.util.Protobufs; import org.apache.hadoop.conf.Configuration; +import org.apache.parquet.conf.HadoopParquetConfiguration; +import org.apache.parquet.conf.ParquetConfiguration; import org.apache.parquet.schema.LogicalTypeAnnotation; import org.apache.parquet.schema.MessageType; import org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName; @@ -81,9 +83,19 @@ public ProtoSchemaConverter(boolean parquetSpecsCompliant) { * Instantiate a schema converter to get the parquet schema corresponding to protobuf classes. * Returns instances that are not specs compliant and limited to 5 levels of recursion depth. * - * @param config Hadoop configuration object to parrse parquetSpecsCompliant and maxRecursion settings. + * @param config Hadoop configuration object to parse parquetSpecsCompliant and maxRecursion settings. */ public ProtoSchemaConverter(Configuration config) { + this(new HadoopParquetConfiguration(config)); + } + + /** + * Instantiate a schema converter to get the parquet schema corresponding to protobuf classes. + * Returns instances that are not specs compliant and limited to 5 levels of recursion depth. + * + * @param config Parquet configuration object to parse parquetSpecsCompliant and maxRecursion settings. + */ + public ProtoSchemaConverter(ParquetConfiguration config) { this( config.getBoolean(ProtoWriteSupport.PB_SPECS_COMPLIANT_WRITE, false), config.getInt(PB_MAX_RECURSION, 5)); diff --git a/parquet-protobuf/src/main/java/org/apache/parquet/proto/ProtoWriteSupport.java b/parquet-protobuf/src/main/java/org/apache/parquet/proto/ProtoWriteSupport.java index f15511062a..b13acd2a57 100644 --- a/parquet-protobuf/src/main/java/org/apache/parquet/proto/ProtoWriteSupport.java +++ b/parquet-protobuf/src/main/java/org/apache/parquet/proto/ProtoWriteSupport.java @@ -23,6 +23,8 @@ import com.google.protobuf.Descriptors.FieldDescriptor; import com.twitter.elephantbird.util.Protobufs; import org.apache.hadoop.conf.Configuration; +import org.apache.parquet.conf.HadoopParquetConfiguration; +import org.apache.parquet.conf.ParquetConfiguration; import org.apache.parquet.hadoop.BadConfigurationException; import org.apache.parquet.hadoop.api.WriteSupport; import org.apache.parquet.io.InvalidRecordException; @@ -118,6 +120,11 @@ public void prepareForWrite(RecordConsumer recordConsumer) { @Override public WriteContext init(Configuration configuration) { + return init(new HadoopParquetConfiguration(configuration)); + } + + @Override + public WriteContext init(ParquetConfiguration configuration) { Map extraMetaData = new HashMap<>(); diff --git a/parquet-thrift/src/main/java/org/apache/parquet/hadoop/thrift/AbstractThriftWriteSupport.java b/parquet-thrift/src/main/java/org/apache/parquet/hadoop/thrift/AbstractThriftWriteSupport.java index 0c3fe440d9..baa3ddb92f 100644 --- a/parquet-thrift/src/main/java/org/apache/parquet/hadoop/thrift/AbstractThriftWriteSupport.java +++ b/parquet-thrift/src/main/java/org/apache/parquet/hadoop/thrift/AbstractThriftWriteSupport.java @@ -18,6 +18,8 @@ import java.util.Map; import org.apache.hadoop.conf.Configuration; +import org.apache.parquet.conf.HadoopParquetConfiguration; +import org.apache.parquet.conf.ParquetConfiguration; import org.apache.thrift.TBase; import com.twitter.elephantbird.pig.util.ThriftToPig; @@ -40,14 +42,22 @@ public abstract class AbstractThriftWriteSupport extends WriteSupport { public static final String PARQUET_THRIFT_CLASS = "parquet.thrift.class"; private static final Logger LOG = LoggerFactory.getLogger(AbstractThriftWriteSupport.class); - private static Configuration conf; + private static ParquetConfiguration conf; public static void setGenericThriftClass(Configuration configuration, Class thriftClass) { + setGenericThriftClass(new HadoopParquetConfiguration(configuration), thriftClass); + } + + public static void setGenericThriftClass(ParquetConfiguration configuration, Class thriftClass) { conf = configuration; configuration.set(PARQUET_THRIFT_CLASS, thriftClass.getName()); } - public static Class getGenericThriftClass(Configuration configuration) { + public static Class getGenericThriftClass(Configuration configuration) { + return getGenericThriftClass(new HadoopParquetConfiguration(configuration)); + } + + public static Class getGenericThriftClass(ParquetConfiguration configuration) { final String thriftClassName = configuration.get(PARQUET_THRIFT_CLASS); if (thriftClassName == null) { throw new BadConfigurationException("the thrift class conf is missing in job conf at " + PARQUET_THRIFT_CLASS); @@ -111,9 +121,14 @@ protected boolean isPigLoaded() { @Override public WriteContext init(Configuration configuration) { + return init(new HadoopParquetConfiguration(configuration)); + } + + @Override + public WriteContext init(ParquetConfiguration configuration) { conf = configuration; if (writeContext == null) { - init(getGenericThriftClass(configuration)); + init((Class) getGenericThriftClass(configuration)); } return writeContext; } diff --git a/parquet-thrift/src/main/java/org/apache/parquet/hadoop/thrift/TBaseWriteSupport.java b/parquet-thrift/src/main/java/org/apache/parquet/hadoop/thrift/TBaseWriteSupport.java index c1ece9fcfb..60dfc12e77 100644 --- a/parquet-thrift/src/main/java/org/apache/parquet/hadoop/thrift/TBaseWriteSupport.java +++ b/parquet-thrift/src/main/java/org/apache/parquet/hadoop/thrift/TBaseWriteSupport.java @@ -16,6 +16,8 @@ package org.apache.parquet.hadoop.thrift; import org.apache.hadoop.conf.Configuration; +import org.apache.parquet.conf.HadoopParquetConfiguration; +import org.apache.parquet.conf.ParquetConfiguration; import org.apache.thrift.TBase; import org.apache.thrift.TException; @@ -25,14 +27,22 @@ public class TBaseWriteSupport> extends AbstractThriftWriteSupport { - private static Configuration conf; + private static ParquetConfiguration conf; public static > void setThriftClass(Configuration configuration, Class thriftClass) { + setThriftClass(new HadoopParquetConfiguration(configuration), thriftClass); + } + + public static > void setThriftClass(ParquetConfiguration configuration, Class thriftClass) { conf = configuration; AbstractThriftWriteSupport.setGenericThriftClass(configuration, thriftClass); } public static Class> getThriftClass(Configuration configuration) { + return getThriftClass(new HadoopParquetConfiguration(configuration)); + } + + public static Class> getThriftClass(ParquetConfiguration configuration) { return (Class>)AbstractThriftWriteSupport.getGenericThriftClass(configuration); } diff --git a/parquet-thrift/src/main/java/org/apache/parquet/hadoop/thrift/ThriftBytesWriteSupport.java b/parquet-thrift/src/main/java/org/apache/parquet/hadoop/thrift/ThriftBytesWriteSupport.java index 6b9d75d98f..6c9311f4ec 100644 --- a/parquet-thrift/src/main/java/org/apache/parquet/hadoop/thrift/ThriftBytesWriteSupport.java +++ b/parquet-thrift/src/main/java/org/apache/parquet/hadoop/thrift/ThriftBytesWriteSupport.java @@ -24,6 +24,8 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.io.BytesWritable; +import org.apache.parquet.conf.HadoopParquetConfiguration; +import org.apache.parquet.conf.ParquetConfiguration; import org.apache.thrift.TBase; import org.apache.thrift.TException; import org.apache.thrift.protocol.TBinaryProtocol; @@ -57,6 +59,10 @@ public static void setTProtocolClass(Configuration conf, C } public static Class getTProtocolFactoryClass(Configuration conf) { + return getTProtocolFactoryClass(new HadoopParquetConfiguration(conf)); + } + + public static Class getTProtocolFactoryClass(ParquetConfiguration conf) { final String tProtocolClassName = conf.get(PARQUET_PROTOCOL_CLASS); if (tProtocolClassName == null) { throw new BadConfigurationException("the protocol class conf is missing in job conf at " + PARQUET_PROTOCOL_CLASS); @@ -80,7 +86,7 @@ public static Class getTProtocolFactoryClass(Configuration con private StructType thriftStruct; private ParquetWriteProtocol parquetWriteProtocol; private final FieldIgnoredHandler errorHandler; - private Configuration configuration; + private ParquetConfiguration configuration; public ThriftBytesWriteSupport() { this.buffered = true; @@ -106,6 +112,15 @@ public ThriftBytesWriteSupport( Class> thriftClass, boolean buffered, FieldIgnoredHandler errorHandler) { + this(new HadoopParquetConfiguration(configuration), protocolFactory, thriftClass, buffered, errorHandler); + } + + public ThriftBytesWriteSupport( + ParquetConfiguration configuration, + TProtocolFactory protocolFactory, + Class> thriftClass, + boolean buffered, + FieldIgnoredHandler errorHandler) { super(); this.configuration = configuration; this.protocolFactory = protocolFactory; @@ -124,6 +139,11 @@ public String getName() { @Override public WriteContext init(Configuration configuration) { + return init(new HadoopParquetConfiguration(configuration)); + } + + @Override + public WriteContext init(ParquetConfiguration configuration) { this.configuration = configuration; if (this.protocolFactory == null) { try { diff --git a/parquet-thrift/src/main/java/org/apache/parquet/hadoop/thrift/ThriftReadSupport.java b/parquet-thrift/src/main/java/org/apache/parquet/hadoop/thrift/ThriftReadSupport.java index 2375a6df6c..bd9530b82b 100644 --- a/parquet-thrift/src/main/java/org/apache/parquet/hadoop/thrift/ThriftReadSupport.java +++ b/parquet-thrift/src/main/java/org/apache/parquet/hadoop/thrift/ThriftReadSupport.java @@ -25,6 +25,8 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.mapred.JobConf; +import org.apache.parquet.conf.HadoopParquetConfiguration; +import org.apache.parquet.conf.ParquetConfiguration; import org.apache.thrift.TBase; import org.apache.thrift.protocol.TProtocol; @@ -111,6 +113,10 @@ public static void setStrictFieldProjectionFilter(Configuration conf, String sem } public static FieldProjectionFilter getFieldProjectionFilter(Configuration conf) { + return getFieldProjectionFilter(new HadoopParquetConfiguration(conf)); + } + + public static FieldProjectionFilter getFieldProjectionFilter(ParquetConfiguration conf) { String deprecated = conf.get(THRIFT_COLUMN_FILTER_KEY); String strict = conf.get(STRICT_THRIFT_COLUMN_FILTER_KEY); @@ -155,7 +161,7 @@ public ThriftReadSupport(Class thriftClass) { @Override public org.apache.parquet.hadoop.api.ReadSupport.ReadContext init(InitContext context) { - final Configuration configuration = context.getConfiguration(); + final ParquetConfiguration configuration = context.getParquetConfiguration(); final MessageType fileMessageType = context.getFileSchema(); MessageType requestedProjection = fileMessageType; String partialSchemaString = configuration.get(ReadSupport.PARQUET_READ_SCHEMA); @@ -185,9 +191,14 @@ public org.apache.parquet.hadoop.api.ReadSupport.ReadContext init(InitContext co return new ReadContext(schemaForRead); } - @SuppressWarnings("unchecked") protected MessageType getProjectedSchema(Configuration configuration, FieldProjectionFilter fieldProjectionFilter) { + return getProjectedSchema(new HadoopParquetConfiguration(configuration), fieldProjectionFilter); + } + + @SuppressWarnings("unchecked") + protected MessageType getProjectedSchema(ParquetConfiguration configuration, FieldProjectionFilter + fieldProjectionFilter) { return new ThriftSchemaConverter(configuration, fieldProjectionFilter) .convert((Class>)thriftClass); } @@ -200,8 +211,12 @@ protected MessageType getProjectedSchema(FieldProjectionFilter .convert((Class>)thriftClass); } - @SuppressWarnings("unchecked") private void initThriftClassFromMultipleFiles(Map> fileMetadata, Configuration conf) throws ClassNotFoundException { + initThriftClassFromMultipleFiles(fileMetadata, new HadoopParquetConfiguration(conf)); + } + + @SuppressWarnings("unchecked") + private void initThriftClassFromMultipleFiles(Map> fileMetadata, ParquetConfiguration conf) throws ClassNotFoundException { if (thriftClass != null) { return; } @@ -216,8 +231,12 @@ private void initThriftClassFromMultipleFiles(Map> fileMetad thriftClass = (Class)Class.forName(className); } - @SuppressWarnings("unchecked") private void initThriftClass(ThriftMetaData metadata, Configuration conf) throws ClassNotFoundException { + initThriftClass(metadata, new HadoopParquetConfiguration(conf)); + } + + @SuppressWarnings("unchecked") + private void initThriftClass(ThriftMetaData metadata, ParquetConfiguration conf) throws ClassNotFoundException { if (thriftClass != null) { return; } @@ -254,10 +273,45 @@ public RecordMaterializer prepareForRead(Configuration configuration, configuration); } - @SuppressWarnings("unchecked") + @Override + public RecordMaterializer prepareForRead(ParquetConfiguration configuration, + Map keyValueMetaData, MessageType fileSchema, + org.apache.parquet.hadoop.api.ReadSupport.ReadContext readContext) { + ThriftMetaData thriftMetaData = ThriftMetaData.fromExtraMetaData(keyValueMetaData); + try { + initThriftClass(thriftMetaData, configuration); + } catch (ClassNotFoundException e) { + throw new RuntimeException("Cannot find Thrift object class for metadata: " + thriftMetaData, e); + } + + // if there was not metadata in the file, get it from requested class + if (thriftMetaData == null) { + thriftMetaData = ThriftMetaData.fromThriftClass(thriftClass); + } + + String converterClassName = configuration.get(RECORD_CONVERTER_CLASS_KEY, RECORD_CONVERTER_DEFAULT); + return getRecordConverterInstance(converterClassName, thriftClass, + readContext.getRequestedSchema(), thriftMetaData.getDescriptor(), + configuration); + } + private static ThriftRecordConverter getRecordConverterInstance( String converterClassName, Class thriftClass, MessageType requestedSchema, StructType descriptor, Configuration conf) { + return getRecordConverterInstance(converterClassName, thriftClass, requestedSchema, descriptor, conf, Configuration.class); + } + + private static ThriftRecordConverter getRecordConverterInstance( + String converterClassName, Class thriftClass, + MessageType requestedSchema, StructType descriptor, ParquetConfiguration conf) { + return getRecordConverterInstance(converterClassName, thriftClass, requestedSchema, descriptor, conf, ParquetConfiguration.class); + } + + @SuppressWarnings("unchecked") + private static ThriftRecordConverter getRecordConverterInstance( + String converterClassName, Class thriftClass, + MessageType requestedSchema, StructType descriptor, CONF conf, Class confClass) { + Class> converterClass; try { converterClass = (Class>) Class.forName(converterClassName); @@ -269,14 +323,14 @@ private static ThriftRecordConverter getRecordConverterInstance( // first try the new version that accepts a Configuration try { Constructor> constructor = - converterClass.getConstructor(Class.class, MessageType.class, StructType.class, Configuration.class); + converterClass.getConstructor(Class.class, MessageType.class, StructType.class, confClass); return constructor.newInstance(thriftClass, requestedSchema, descriptor, conf); } catch (IllegalAccessException | NoSuchMethodException e) { // try the other constructor pattern } Constructor> constructor = - converterClass.getConstructor(Class.class, MessageType.class, StructType.class); + converterClass.getConstructor(Class.class, MessageType.class, StructType.class); return constructor.newInstance(thriftClass, requestedSchema, descriptor); } catch (InstantiationException | InvocationTargetException e) { throw new RuntimeException("Failed to construct Thrift converter class: " + converterClassName, e); diff --git a/parquet-thrift/src/main/java/org/apache/parquet/hadoop/thrift/ThriftWriteSupport.java b/parquet-thrift/src/main/java/org/apache/parquet/hadoop/thrift/ThriftWriteSupport.java index a9864ff814..2ac4fccf17 100644 --- a/parquet-thrift/src/main/java/org/apache/parquet/hadoop/thrift/ThriftWriteSupport.java +++ b/parquet-thrift/src/main/java/org/apache/parquet/hadoop/thrift/ThriftWriteSupport.java @@ -19,6 +19,7 @@ package org.apache.parquet.hadoop.thrift; import org.apache.hadoop.conf.Configuration; +import org.apache.parquet.conf.ParquetConfiguration; import org.apache.thrift.TBase; import org.apache.parquet.hadoop.api.WriteSupport; @@ -68,6 +69,11 @@ public WriteContext init(Configuration configuration) { return this.writeSupport.init(configuration); } + @Override + public WriteContext init(ParquetConfiguration configuration) { + return this.writeSupport.init(configuration); + } + @Override public void prepareForWrite(RecordConsumer recordConsumer) { this.writeSupport.prepareForWrite(recordConsumer); diff --git a/parquet-thrift/src/main/java/org/apache/parquet/thrift/ParquetWriteProtocol.java b/parquet-thrift/src/main/java/org/apache/parquet/thrift/ParquetWriteProtocol.java index ba48b37794..f129f36b77 100644 --- a/parquet-thrift/src/main/java/org/apache/parquet/thrift/ParquetWriteProtocol.java +++ b/parquet-thrift/src/main/java/org/apache/parquet/thrift/ParquetWriteProtocol.java @@ -22,6 +22,8 @@ import java.nio.ByteBuffer; import org.apache.hadoop.conf.Configuration; +import org.apache.parquet.conf.HadoopParquetConfiguration; +import org.apache.parquet.conf.ParquetConfiguration; import org.apache.thrift.TException; import org.apache.thrift.protocol.TField; import org.apache.thrift.protocol.TList; @@ -500,6 +502,11 @@ private String toString(TMap map) { public ParquetWriteProtocol( Configuration configuration, RecordConsumer recordConsumer, MessageColumnIO schema, StructType thriftType) { + this(new HadoopParquetConfiguration(configuration), recordConsumer, schema, thriftType); + } + + public ParquetWriteProtocol( + ParquetConfiguration configuration, RecordConsumer recordConsumer, MessageColumnIO schema, StructType thriftType) { this.recordConsumer = recordConsumer; if (configuration != null) { this.writeThreeLevelList = configuration.getBoolean( diff --git a/parquet-thrift/src/main/java/org/apache/parquet/thrift/TBaseRecordConverter.java b/parquet-thrift/src/main/java/org/apache/parquet/thrift/TBaseRecordConverter.java index 78fc4a88f5..fa31a5c782 100644 --- a/parquet-thrift/src/main/java/org/apache/parquet/thrift/TBaseRecordConverter.java +++ b/parquet-thrift/src/main/java/org/apache/parquet/thrift/TBaseRecordConverter.java @@ -19,6 +19,8 @@ package org.apache.parquet.thrift; import org.apache.hadoop.conf.Configuration; +import org.apache.parquet.conf.HadoopParquetConfiguration; +import org.apache.parquet.conf.ParquetConfiguration; import org.apache.thrift.TBase; import org.apache.thrift.TException; import org.apache.thrift.protocol.TProtocol; @@ -38,10 +40,15 @@ public class TBaseRecordConverter> extends ThriftRecordConv */ @Deprecated public TBaseRecordConverter(final Class thriftClass, MessageType requestedParquetSchema, StructType thriftType) { - this(thriftClass, requestedParquetSchema, thriftType, null); + this(thriftClass, requestedParquetSchema, thriftType, (HadoopParquetConfiguration) null); } + @SuppressWarnings("unused") public TBaseRecordConverter(final Class thriftClass, MessageType requestedParquetSchema, StructType thriftType, Configuration conf) { + this(thriftClass, requestedParquetSchema, thriftType, new HadoopParquetConfiguration(conf)); + } + + public TBaseRecordConverter(final Class thriftClass, MessageType requestedParquetSchema, StructType thriftType, ParquetConfiguration conf) { super(new ThriftReader() { @Override public T readOneRecord(TProtocol protocol) throws TException { diff --git a/parquet-thrift/src/main/java/org/apache/parquet/thrift/ThriftRecordConverter.java b/parquet-thrift/src/main/java/org/apache/parquet/thrift/ThriftRecordConverter.java index 3244b32110..d0649212fb 100644 --- a/parquet-thrift/src/main/java/org/apache/parquet/thrift/ThriftRecordConverter.java +++ b/parquet-thrift/src/main/java/org/apache/parquet/thrift/ThriftRecordConverter.java @@ -25,6 +25,8 @@ import java.util.Map; import org.apache.hadoop.conf.Configuration; +import org.apache.parquet.conf.HadoopParquetConfiguration; +import org.apache.parquet.conf.ParquetConfiguration; import org.apache.thrift.TException; import org.apache.thrift.protocol.TField; import org.apache.thrift.protocol.TList; @@ -843,7 +845,7 @@ public void end() { */ @Deprecated public ThriftRecordConverter(ThriftReader thriftReader, String name, MessageType requestedParquetSchema, ThriftType.StructType thriftType) { - this(thriftReader, name, requestedParquetSchema, thriftType, null); + this(thriftReader, name, requestedParquetSchema, thriftType, (ParquetConfiguration) null); } /** @@ -855,6 +857,17 @@ public ThriftRecordConverter(ThriftReader thriftReader, String name, MessageT * @param conf a Configuration */ public ThriftRecordConverter(ThriftReader thriftReader, String name, MessageType requestedParquetSchema, ThriftType.StructType thriftType, Configuration conf) { + this(thriftReader, name, requestedParquetSchema, thriftType, new HadoopParquetConfiguration(conf)); + } + + /** + * @param thriftReader the class responsible for instantiating the final object and read from the protocol + * @param name the name of that type ( the thrift class simple name) + * @param requestedParquetSchema the schema for the incoming columnar events + * @param thriftType the thrift type descriptor + * @param conf a Configuration + */ + public ThriftRecordConverter(ThriftReader thriftReader, String name, MessageType requestedParquetSchema, ThriftType.StructType thriftType, ParquetConfiguration conf) { super(); this.thriftReader = thriftReader; this.protocol = new ParquetReadProtocol(); diff --git a/parquet-thrift/src/main/java/org/apache/parquet/thrift/ThriftSchemaConvertVisitor.java b/parquet-thrift/src/main/java/org/apache/parquet/thrift/ThriftSchemaConvertVisitor.java index f915a6ed11..c32df81477 100644 --- a/parquet-thrift/src/main/java/org/apache/parquet/thrift/ThriftSchemaConvertVisitor.java +++ b/parquet-thrift/src/main/java/org/apache/parquet/thrift/ThriftSchemaConvertVisitor.java @@ -24,6 +24,8 @@ import org.apache.hadoop.conf.Configuration; import org.apache.parquet.ShouldNeverHappenException; +import org.apache.parquet.conf.HadoopParquetConfiguration; +import org.apache.parquet.conf.ParquetConfiguration; import org.apache.parquet.schema.GroupType; import org.apache.parquet.schema.LogicalTypeAnnotation; import org.apache.parquet.schema.MessageType; @@ -80,6 +82,11 @@ class ThriftSchemaConvertVisitor implements ThriftType.StateVisitor clazz = configuration.getClassByName(className).asSubclass(TBase.class); thriftWriteSupport = new ThriftWriteSupport(clazz); diff --git a/parquet-thrift/src/test/java/org/apache/parquet/thrift/TestParquetWriteProtocol.java b/parquet-thrift/src/test/java/org/apache/parquet/thrift/TestParquetWriteProtocol.java index 1311d76904..98f22d12a0 100644 --- a/parquet-thrift/src/test/java/org/apache/parquet/thrift/TestParquetWriteProtocol.java +++ b/parquet-thrift/src/test/java/org/apache/parquet/thrift/TestParquetWriteProtocol.java @@ -33,6 +33,7 @@ import com.twitter.elephantbird.thrift.test.TestMapInList; import com.twitter.elephantbird.thrift.test.TestNameSet; import org.apache.hadoop.conf.Configuration; +import org.apache.parquet.conf.ParquetConfiguration; import org.junit.ComparisonFailure; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -708,7 +709,7 @@ private MessageType validatePig(String[] expectations, TBase a) { MessageType schema = new PigSchemaConverter().convert(pigSchema); LOG.info("{}", schema); TupleWriteSupport tupleWriteSupport = new TupleWriteSupport(pigSchema); - tupleWriteSupport.init(null); + tupleWriteSupport.init((ParquetConfiguration) null); tupleWriteSupport.prepareForWrite(recordConsumer); final Tuple pigTuple = thriftToPig.getPigTuple(a); LOG.info("{}", pigTuple); diff --git a/pom.xml b/pom.xml index 90e3d5a7e1..146fad5aed 100644 --- a/pom.xml +++ b/pom.xml @@ -540,6 +540,8 @@ ${shade.prefix} + org.apache.parquet.hadoop.CodecFactory + org.apache.parquet.hadoop.ParquetReader org.apache.parquet.thrift.projection.deprecated.PathGlobPattern org.apache.parquet.hadoop.ColumnChunkPageWriteStore