From bab3d53bff84a74743b2f62f5e394cbd9410b31f Mon Sep 17 00:00:00 2001 From: Kai Jiang Date: Tue, 22 Jun 2021 00:52:43 -0700 Subject: [PATCH] PARQUET-2054: fix TCP leaking when calling ParquetFileWriter.appendFile (#913) * use try-with-resource statement for ParquetFileReader to call close explicitly --- .../cli/commands/CheckParquet251Command.java | 6 +- .../parquet/cli/commands/SchemaCommand.java | 7 +- .../cli/commands/ShowDictionaryCommand.java | 91 +++++----- .../cli/commands/ShowPagesCommand.java | 90 +++++----- .../parquet/hadoop/ParquetFileWriter.java | 4 +- .../parquet/hadoop/TestParquetFileWriter.java | 167 +++++++++--------- .../parquet/hadoop/TestParquetWriter.java | 17 +- .../hadoop/TestReadWriteEncodingStats.java | 49 ++--- 8 files changed, 221 insertions(+), 210 deletions(-) diff --git a/parquet-cli/src/main/java/org/apache/parquet/cli/commands/CheckParquet251Command.java b/parquet-cli/src/main/java/org/apache/parquet/cli/commands/CheckParquet251Command.java index fbeebdfba6..d7aa82dcfd 100644 --- a/parquet-cli/src/main/java/org/apache/parquet/cli/commands/CheckParquet251Command.java +++ b/parquet-cli/src/main/java/org/apache/parquet/cli/commands/CheckParquet251Command.java @@ -108,10 +108,8 @@ public boolean apply(@Nullable ColumnDescriptor input) { })); // now check to see if the data is actually corrupt - ParquetFileReader reader = new ParquetFileReader(getConf(), - fakeMeta, path, footer.getBlocks(), columns); - - try { + try (ParquetFileReader reader = new ParquetFileReader(getConf(), + fakeMeta, path, footer.getBlocks(), columns)) { PageStatsValidator validator = new PageStatsValidator(); for (PageReadStore pages = reader.readNextRowGroup(); pages != null; pages = reader.readNextRowGroup()) { diff --git a/parquet-cli/src/main/java/org/apache/parquet/cli/commands/SchemaCommand.java b/parquet-cli/src/main/java/org/apache/parquet/cli/commands/SchemaCommand.java index ca29dd0268..988ab0f40f 100644 --- a/parquet-cli/src/main/java/org/apache/parquet/cli/commands/SchemaCommand.java +++ b/parquet-cli/src/main/java/org/apache/parquet/cli/commands/SchemaCommand.java @@ -119,9 +119,10 @@ private String getParquetSchema(String source) throws IOException { switch (format) { case PARQUET: - return new ParquetFileReader( - getConf(), qualifiedPath(source), ParquetMetadataConverter.NO_FILTER) - .getFileMetaData().getSchema().toString(); + try (ParquetFileReader reader = new ParquetFileReader( + getConf(), qualifiedPath(source), ParquetMetadataConverter.NO_FILTER)) { + return reader.getFileMetaData().getSchema().toString(); + } default: throw new IllegalArgumentException(String.format( "Could not get a Parquet schema for format %s: %s", format, source)); diff --git a/parquet-cli/src/main/java/org/apache/parquet/cli/commands/ShowDictionaryCommand.java b/parquet-cli/src/main/java/org/apache/parquet/cli/commands/ShowDictionaryCommand.java index 20a694ff7f..7a167ed635 100644 --- a/parquet-cli/src/main/java/org/apache/parquet/cli/commands/ShowDictionaryCommand.java +++ b/parquet-cli/src/main/java/org/apache/parquet/cli/commands/ShowDictionaryCommand.java @@ -64,56 +64,57 @@ public int run() throws IOException { String source = targets.get(0); - ParquetFileReader reader = ParquetFileReader.open(getConf(), qualifiedPath(source)); - MessageType schema = reader.getFileMetaData().getSchema(); - ColumnDescriptor descriptor = Util.descriptor(column, schema); - PrimitiveType type = Util.primitive(column, schema); - Preconditions.checkNotNull(type); - - DictionaryPageReadStore dictionaryReader; - int rowGroup = 0; - while ((dictionaryReader = reader.getNextDictionaryReader()) != null) { - DictionaryPage page = dictionaryReader.readDictionaryPage(descriptor); - - Dictionary dict = page.getEncoding().initDictionary(descriptor, page); - - console.info("\nRow group {} dictionary for \"{}\":", rowGroup, column, page.getCompressedSize()); - for (int i = 0; i <= dict.getMaxId(); i += 1) { - switch(type.getPrimitiveTypeName()) { - case BINARY: - if (type.getLogicalTypeAnnotation() instanceof LogicalTypeAnnotation.StringLogicalTypeAnnotation) { + try (ParquetFileReader reader = ParquetFileReader.open(getConf(), qualifiedPath(source))) { + MessageType schema = reader.getFileMetaData().getSchema(); + ColumnDescriptor descriptor = Util.descriptor(column, schema); + PrimitiveType type = Util.primitive(column, schema); + Preconditions.checkNotNull(type); + + DictionaryPageReadStore dictionaryReader; + int rowGroup = 0; + while ((dictionaryReader = reader.getNextDictionaryReader()) != null) { + DictionaryPage page = dictionaryReader.readDictionaryPage(descriptor); + + Dictionary dict = page.getEncoding().initDictionary(descriptor, page); + + console.info("\nRow group {} dictionary for \"{}\":", rowGroup, column, page.getCompressedSize()); + for (int i = 0; i <= dict.getMaxId(); i += 1) { + switch(type.getPrimitiveTypeName()) { + case BINARY: + if (type.getLogicalTypeAnnotation() instanceof LogicalTypeAnnotation.StringLogicalTypeAnnotation) { + console.info("{}: {}", String.format("%6d", i), + Util.humanReadable(dict.decodeToBinary(i).toStringUsingUTF8(), 70)); + } else { + console.info("{}: {}", String.format("%6d", i), + Util.humanReadable(dict.decodeToBinary(i).getBytesUnsafe(), 70)); + } + break; + case INT32: console.info("{}: {}", String.format("%6d", i), - Util.humanReadable(dict.decodeToBinary(i).toStringUsingUTF8(), 70)); - } else { + dict.decodeToInt(i)); + break; + case INT64: console.info("{}: {}", String.format("%6d", i), - Util.humanReadable(dict.decodeToBinary(i).getBytesUnsafe(), 70)); - } - break; - case INT32: - console.info("{}: {}", String.format("%6d", i), - dict.decodeToInt(i)); - break; - case INT64: - console.info("{}: {}", String.format("%6d", i), - dict.decodeToLong(i)); - break; - case FLOAT: - console.info("{}: {}", String.format("%6d", i), - dict.decodeToFloat(i)); - break; - case DOUBLE: - console.info("{}: {}", String.format("%6d", i), - dict.decodeToDouble(i)); - break; - default: - throw new IllegalArgumentException( - "Unknown dictionary type: " + type.getPrimitiveTypeName()); + dict.decodeToLong(i)); + break; + case FLOAT: + console.info("{}: {}", String.format("%6d", i), + dict.decodeToFloat(i)); + break; + case DOUBLE: + console.info("{}: {}", String.format("%6d", i), + dict.decodeToDouble(i)); + break; + default: + throw new IllegalArgumentException( + "Unknown dictionary type: " + type.getPrimitiveTypeName()); + } } - } - reader.skipNextRowGroup(); + reader.skipNextRowGroup(); - rowGroup += 1; + rowGroup += 1; + } } console.info(""); diff --git a/parquet-cli/src/main/java/org/apache/parquet/cli/commands/ShowPagesCommand.java b/parquet-cli/src/main/java/org/apache/parquet/cli/commands/ShowPagesCommand.java index 58321064e3..bf030ac606 100644 --- a/parquet-cli/src/main/java/org/apache/parquet/cli/commands/ShowPagesCommand.java +++ b/parquet-cli/src/main/java/org/apache/parquet/cli/commands/ShowPagesCommand.java @@ -75,57 +75,57 @@ public int run() throws IOException { "Cannot process multiple Parquet files."); String source = targets.get(0); - ParquetFileReader reader = ParquetFileReader.open(getConf(), qualifiedPath(source)); - - MessageType schema = reader.getFileMetaData().getSchema(); - Map columns = Maps.newLinkedHashMap(); - if (this.columns == null || this.columns.isEmpty()) { - for (ColumnDescriptor descriptor : schema.getColumns()) { - columns.put(descriptor, primitive(schema, descriptor.getPath())); - } - } else { - for (String column : this.columns) { - columns.put(descriptor(column, schema), primitive(column, schema)); - } - } - - CompressionCodecName codec = reader.getRowGroups().get(0).getColumns().get(0).getCodec(); - // accumulate formatted lines to print by column - Map> formatted = Maps.newLinkedHashMap(); - PageFormatter formatter = new PageFormatter(); - PageReadStore pageStore; - int rowGroupNum = 0; - while ((pageStore = reader.readNextRowGroup()) != null) { - for (ColumnDescriptor descriptor : columns.keySet()) { - List lines = formatted.get(columnName(descriptor)); - if (lines == null) { - lines = Lists.newArrayList(); - formatted.put(columnName(descriptor), lines); + try (ParquetFileReader reader = ParquetFileReader.open(getConf(), qualifiedPath(source))) { + MessageType schema = reader.getFileMetaData().getSchema(); + Map columns = Maps.newLinkedHashMap(); + if (this.columns == null || this.columns.isEmpty()) { + for (ColumnDescriptor descriptor : schema.getColumns()) { + columns.put(descriptor, primitive(schema, descriptor.getPath())); } - - formatter.setContext(rowGroupNum, columns.get(descriptor), codec); - PageReader pages = pageStore.getPageReader(descriptor); - - DictionaryPage dict = pages.readDictionaryPage(); - if (dict != null) { - lines.add(formatter.format(dict)); + } else { + for (String column : this.columns) { + columns.put(descriptor(column, schema), primitive(column, schema)); } - DataPage page; - while ((page = pages.readPage()) != null) { - lines.add(formatter.format(page)); + } + + CompressionCodecName codec = reader.getRowGroups().get(0).getColumns().get(0).getCodec(); + // accumulate formatted lines to print by column + Map> formatted = Maps.newLinkedHashMap(); + PageFormatter formatter = new PageFormatter(); + PageReadStore pageStore; + int rowGroupNum = 0; + while ((pageStore = reader.readNextRowGroup()) != null) { + for (ColumnDescriptor descriptor : columns.keySet()) { + List lines = formatted.get(columnName(descriptor)); + if (lines == null) { + lines = Lists.newArrayList(); + formatted.put(columnName(descriptor), lines); + } + + formatter.setContext(rowGroupNum, columns.get(descriptor), codec); + PageReader pages = pageStore.getPageReader(descriptor); + + DictionaryPage dict = pages.readDictionaryPage(); + if (dict != null) { + lines.add(formatter.format(dict)); + } + DataPage page; + while ((page = pages.readPage()) != null) { + lines.add(formatter.format(page)); + } } + rowGroupNum += 1; } - rowGroupNum += 1; - } - // TODO: Show total column size and overall size per value in the column summary line - for (String columnName : formatted.keySet()) { - console.info(String.format("\nColumn: %s\n%s", columnName, new TextStringBuilder(80).appendPadding(80, '-'))); - console.info(formatter.getHeader()); - for (String line : formatted.get(columnName)) { - console.info(line); + // TODO: Show total column size and overall size per value in the column summary line + for (String columnName : formatted.keySet()) { + console.info(String.format("\nColumn: %s\n%s", columnName, new TextStringBuilder(80).appendPadding(80, '-'))); + console.info(formatter.getHeader()); + for (String line : formatted.get(columnName)) { + console.info(line); + } + console.info(""); } - console.info(""); } return 0; diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileWriter.java b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileWriter.java index a246a52c73..afbdf7637e 100644 --- a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileWriter.java +++ b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileWriter.java @@ -887,7 +887,9 @@ public void endBlock() throws IOException { */ @Deprecated public void appendFile(Configuration conf, Path file) throws IOException { - ParquetFileReader.open(conf, file).appendTo(this); + try (ParquetFileReader reader = ParquetFileReader.open(conf, file)) { + reader.appendTo(this); + } } public void appendFile(InputFile file) throws IOException { diff --git a/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestParquetFileWriter.java b/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestParquetFileWriter.java index 73ef70e462..5b8c5ed1b4 100644 --- a/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestParquetFileWriter.java +++ b/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestParquetFileWriter.java @@ -204,35 +204,37 @@ public void testWriteRead() throws Exception { assertEquals(expectedEncoding,rowGroup.getColumns().get(0).getEncodings()); { // read first block of col #1 - ParquetFileReader r = new ParquetFileReader(configuration, readFooter.getFileMetaData(), path, - Arrays.asList(rowGroup), Arrays.asList(SCHEMA.getColumnDescription(PATH1))); - PageReadStore pages = r.readNextRowGroup(); - assertEquals(3, pages.getRowCount()); - validateContains(SCHEMA, pages, PATH1, 2, BytesInput.from(BYTES1)); - validateContains(SCHEMA, pages, PATH1, 3, BytesInput.from(BYTES1)); - assertNull(r.readNextRowGroup()); + try (ParquetFileReader r = new ParquetFileReader(configuration, readFooter.getFileMetaData(), path, + Arrays.asList(rowGroup), Arrays.asList(SCHEMA.getColumnDescription(PATH1)))) { + PageReadStore pages = r.readNextRowGroup(); + assertEquals(3, pages.getRowCount()); + validateContains(SCHEMA, pages, PATH1, 2, BytesInput.from(BYTES1)); + validateContains(SCHEMA, pages, PATH1, 3, BytesInput.from(BYTES1)); + assertNull(r.readNextRowGroup()); + } } { // read all blocks of col #1 and #2 - ParquetFileReader r = new ParquetFileReader(configuration, readFooter.getFileMetaData(), path, - readFooter.getBlocks(), Arrays.asList(SCHEMA.getColumnDescription(PATH1), SCHEMA.getColumnDescription(PATH2))); + try (ParquetFileReader r = new ParquetFileReader(configuration, readFooter.getFileMetaData(), path, + readFooter.getBlocks(), Arrays.asList(SCHEMA.getColumnDescription(PATH1), SCHEMA.getColumnDescription(PATH2)))) { - PageReadStore pages = r.readNextRowGroup(); - assertEquals(3, pages.getRowCount()); - validateContains(SCHEMA, pages, PATH1, 2, BytesInput.from(BYTES1)); - validateContains(SCHEMA, pages, PATH1, 3, BytesInput.from(BYTES1)); - validateContains(SCHEMA, pages, PATH2, 2, BytesInput.from(BYTES2)); - validateContains(SCHEMA, pages, PATH2, 3, BytesInput.from(BYTES2)); - validateContains(SCHEMA, pages, PATH2, 1, BytesInput.from(BYTES2)); + PageReadStore pages = r.readNextRowGroup(); + assertEquals(3, pages.getRowCount()); + validateContains(SCHEMA, pages, PATH1, 2, BytesInput.from(BYTES1)); + validateContains(SCHEMA, pages, PATH1, 3, BytesInput.from(BYTES1)); + validateContains(SCHEMA, pages, PATH2, 2, BytesInput.from(BYTES2)); + validateContains(SCHEMA, pages, PATH2, 3, BytesInput.from(BYTES2)); + validateContains(SCHEMA, pages, PATH2, 1, BytesInput.from(BYTES2)); - pages = r.readNextRowGroup(); - assertEquals(4, pages.getRowCount()); + pages = r.readNextRowGroup(); + assertEquals(4, pages.getRowCount()); - validateContains(SCHEMA, pages, PATH1, 7, BytesInput.from(BYTES3)); - validateContains(SCHEMA, pages, PATH2, 8, BytesInput.from(BYTES4)); + validateContains(SCHEMA, pages, PATH1, 7, BytesInput.from(BYTES3)); + validateContains(SCHEMA, pages, PATH2, 8, BytesInput.from(BYTES4)); - assertNull(r.readNextRowGroup()); + assertNull(r.readNextRowGroup()); + } } PrintFooter.main(new String[] {path.toString()}); } @@ -281,12 +283,14 @@ public void testBloomFilterWriteRead() throws Exception { w.endBlock(); w.end(new HashMap<>()); ParquetMetadata readFooter = ParquetFileReader.readFooter(configuration, path); - ParquetFileReader r = new ParquetFileReader(configuration, readFooter.getFileMetaData(), path, - Arrays.asList(readFooter.getBlocks().get(0)), Arrays.asList(schema.getColumnDescription(colPath))); - BloomFilterReader bloomFilterReader = r.getBloomFilterDataReader(readFooter.getBlocks().get(0)); - BloomFilter bloomFilter = bloomFilterReader.readBloomFilter(readFooter.getBlocks().get(0).getColumns().get(0)); - assertTrue(bloomFilter.findHash(blockSplitBloomFilter.hash(Binary.fromString("hello")))); - assertTrue(bloomFilter.findHash(blockSplitBloomFilter.hash(Binary.fromString("world")))); + + try (ParquetFileReader r = new ParquetFileReader(configuration, readFooter.getFileMetaData(), path, + Arrays.asList(readFooter.getBlocks().get(0)), Arrays.asList(schema.getColumnDescription(colPath)))) { + BloomFilterReader bloomFilterReader = r.getBloomFilterDataReader(readFooter.getBlocks().get(0)); + BloomFilter bloomFilter = bloomFilterReader.readBloomFilter(readFooter.getBlocks().get(0).getColumns().get(0)); + assertTrue(bloomFilter.findHash(blockSplitBloomFilter.hash(Binary.fromString("hello")))); + assertTrue(bloomFilter.findHash(blockSplitBloomFilter.hash(Binary.fromString("world")))); + } } @Test @@ -340,16 +344,16 @@ public void testWriteReadDataPageV2() throws Exception { expectedEncoding.add(PLAIN); assertEquals(expectedEncoding, readFooter.getBlocks().get(0).getColumns().get(0).getEncodings()); - ParquetFileReader reader = new ParquetFileReader(configuration, readFooter.getFileMetaData(), path, - readFooter.getBlocks(), Arrays.asList(SCHEMA.getColumnDescription(PATH1), SCHEMA.getColumnDescription(PATH2))); - - PageReadStore pages = reader.readNextRowGroup(); - assertEquals(14, pages.getRowCount()); - validateV2Page(SCHEMA, pages, PATH1, 3, 4, 1, repLevels.toByteArray(), defLevels.toByteArray(), data.toByteArray(), 12); - validateV2Page(SCHEMA, pages, PATH1, 3, 3, 0, repLevels.toByteArray(), defLevels.toByteArray(),data.toByteArray(), 12); - validateV2Page(SCHEMA, pages, PATH2, 3, 5, 2, repLevels.toByteArray(), defLevels.toByteArray(), data2.toByteArray(), 12); - validateV2Page(SCHEMA, pages, PATH2, 2, 2, 0, repLevels.toByteArray(), defLevels.toByteArray(), data2.toByteArray(), 12); - assertNull(reader.readNextRowGroup()); + try (ParquetFileReader reader = new ParquetFileReader(configuration, readFooter.getFileMetaData(), path, + readFooter.getBlocks(), Arrays.asList(SCHEMA.getColumnDescription(PATH1), SCHEMA.getColumnDescription(PATH2)))) { + PageReadStore pages = reader.readNextRowGroup(); + assertEquals(14, pages.getRowCount()); + validateV2Page(SCHEMA, pages, PATH1, 3, 4, 1, repLevels.toByteArray(), defLevels.toByteArray(), data.toByteArray(), 12); + validateV2Page(SCHEMA, pages, PATH1, 3, 3, 0, repLevels.toByteArray(), defLevels.toByteArray(), data.toByteArray(), 12); + validateV2Page(SCHEMA, pages, PATH2, 3, 5, 2, repLevels.toByteArray(), defLevels.toByteArray(), data2.toByteArray(), 12); + validateV2Page(SCHEMA, pages, PATH2, 2, 2, 0, repLevels.toByteArray(), defLevels.toByteArray(), data2.toByteArray(), 12); + assertNull(reader.readNextRowGroup()); + } } @Test @@ -426,35 +430,37 @@ public void testAlignmentWithPadding() throws Exception { 120, readFooter.getBlocks().get(1).getStartingPos()); { // read first block of col #1 - ParquetFileReader r = new ParquetFileReader(conf, readFooter.getFileMetaData(), path, - Arrays.asList(readFooter.getBlocks().get(0)), Arrays.asList(SCHEMA.getColumnDescription(PATH1))); - PageReadStore pages = r.readNextRowGroup(); - assertEquals(3, pages.getRowCount()); - validateContains(SCHEMA, pages, PATH1, 2, BytesInput.from(BYTES1)); - validateContains(SCHEMA, pages, PATH1, 3, BytesInput.from(BYTES1)); - assertNull(r.readNextRowGroup()); + try (ParquetFileReader r = new ParquetFileReader(conf, readFooter.getFileMetaData(), path, + Arrays.asList(readFooter.getBlocks().get(0)), Arrays.asList(SCHEMA.getColumnDescription(PATH1)))) { + PageReadStore pages = r.readNextRowGroup(); + assertEquals(3, pages.getRowCount()); + validateContains(SCHEMA, pages, PATH1, 2, BytesInput.from(BYTES1)); + validateContains(SCHEMA, pages, PATH1, 3, BytesInput.from(BYTES1)); + assertNull(r.readNextRowGroup()); + } } { // read all blocks of col #1 and #2 - ParquetFileReader r = new ParquetFileReader(conf, readFooter.getFileMetaData(), path, - readFooter.getBlocks(), Arrays.asList(SCHEMA.getColumnDescription(PATH1), SCHEMA.getColumnDescription(PATH2))); + try (ParquetFileReader r = new ParquetFileReader(conf, readFooter.getFileMetaData(), path, + readFooter.getBlocks(), Arrays.asList(SCHEMA.getColumnDescription(PATH1), SCHEMA.getColumnDescription(PATH2)))) { - PageReadStore pages = r.readNextRowGroup(); - assertEquals(3, pages.getRowCount()); - validateContains(SCHEMA, pages, PATH1, 2, BytesInput.from(BYTES1)); - validateContains(SCHEMA, pages, PATH1, 3, BytesInput.from(BYTES1)); - validateContains(SCHEMA, pages, PATH2, 2, BytesInput.from(BYTES2)); - validateContains(SCHEMA, pages, PATH2, 3, BytesInput.from(BYTES2)); - validateContains(SCHEMA, pages, PATH2, 1, BytesInput.from(BYTES2)); + PageReadStore pages = r.readNextRowGroup(); + assertEquals(3, pages.getRowCount()); + validateContains(SCHEMA, pages, PATH1, 2, BytesInput.from(BYTES1)); + validateContains(SCHEMA, pages, PATH1, 3, BytesInput.from(BYTES1)); + validateContains(SCHEMA, pages, PATH2, 2, BytesInput.from(BYTES2)); + validateContains(SCHEMA, pages, PATH2, 3, BytesInput.from(BYTES2)); + validateContains(SCHEMA, pages, PATH2, 1, BytesInput.from(BYTES2)); - pages = r.readNextRowGroup(); - assertEquals(4, pages.getRowCount()); + pages = r.readNextRowGroup(); + assertEquals(4, pages.getRowCount()); - validateContains(SCHEMA, pages, PATH1, 7, BytesInput.from(BYTES3)); - validateContains(SCHEMA, pages, PATH2, 8, BytesInput.from(BYTES4)); + validateContains(SCHEMA, pages, PATH1, 7, BytesInput.from(BYTES3)); + validateContains(SCHEMA, pages, PATH2, 8, BytesInput.from(BYTES4)); - assertNull(r.readNextRowGroup()); + assertNull(r.readNextRowGroup()); + } } PrintFooter.main(new String[] {path.toString()}); } @@ -533,35 +539,36 @@ public void testAlignmentWithNoPaddingNeeded() throws Exception { 109, readFooter.getBlocks().get(1).getStartingPos()); { // read first block of col #1 - ParquetFileReader r = new ParquetFileReader(conf, readFooter.getFileMetaData(), path, - Arrays.asList(readFooter.getBlocks().get(0)), Arrays.asList(SCHEMA.getColumnDescription(PATH1))); - PageReadStore pages = r.readNextRowGroup(); - assertEquals(3, pages.getRowCount()); - validateContains(SCHEMA, pages, PATH1, 2, BytesInput.from(BYTES1)); - validateContains(SCHEMA, pages, PATH1, 3, BytesInput.from(BYTES1)); - assertNull(r.readNextRowGroup()); + try (ParquetFileReader r = new ParquetFileReader(conf, readFooter.getFileMetaData(), path, + Arrays.asList(readFooter.getBlocks().get(0)), Arrays.asList(SCHEMA.getColumnDescription(PATH1)))) { + PageReadStore pages = r.readNextRowGroup(); + assertEquals(3, pages.getRowCount()); + validateContains(SCHEMA, pages, PATH1, 2, BytesInput.from(BYTES1)); + validateContains(SCHEMA, pages, PATH1, 3, BytesInput.from(BYTES1)); + assertNull(r.readNextRowGroup()); + } } { // read all blocks of col #1 and #2 - ParquetFileReader r = new ParquetFileReader(conf, readFooter.getFileMetaData(), path, - readFooter.getBlocks(), Arrays.asList(SCHEMA.getColumnDescription(PATH1), SCHEMA.getColumnDescription(PATH2))); + try (ParquetFileReader r = new ParquetFileReader(conf, readFooter.getFileMetaData(), path, + readFooter.getBlocks(), Arrays.asList(SCHEMA.getColumnDescription(PATH1), SCHEMA.getColumnDescription(PATH2)))) { + PageReadStore pages = r.readNextRowGroup(); + assertEquals(3, pages.getRowCount()); + validateContains(SCHEMA, pages, PATH1, 2, BytesInput.from(BYTES1)); + validateContains(SCHEMA, pages, PATH1, 3, BytesInput.from(BYTES1)); + validateContains(SCHEMA, pages, PATH2, 2, BytesInput.from(BYTES2)); + validateContains(SCHEMA, pages, PATH2, 3, BytesInput.from(BYTES2)); + validateContains(SCHEMA, pages, PATH2, 1, BytesInput.from(BYTES2)); - PageReadStore pages = r.readNextRowGroup(); - assertEquals(3, pages.getRowCount()); - validateContains(SCHEMA, pages, PATH1, 2, BytesInput.from(BYTES1)); - validateContains(SCHEMA, pages, PATH1, 3, BytesInput.from(BYTES1)); - validateContains(SCHEMA, pages, PATH2, 2, BytesInput.from(BYTES2)); - validateContains(SCHEMA, pages, PATH2, 3, BytesInput.from(BYTES2)); - validateContains(SCHEMA, pages, PATH2, 1, BytesInput.from(BYTES2)); + pages = r.readNextRowGroup(); + assertEquals(4, pages.getRowCount()); - pages = r.readNextRowGroup(); - assertEquals(4, pages.getRowCount()); + validateContains(SCHEMA, pages, PATH1, 7, BytesInput.from(BYTES3)); + validateContains(SCHEMA, pages, PATH2, 8, BytesInput.from(BYTES4)); - validateContains(SCHEMA, pages, PATH1, 7, BytesInput.from(BYTES3)); - validateContains(SCHEMA, pages, PATH2, 8, BytesInput.from(BYTES4)); - - assertNull(r.readNextRowGroup()); + assertNull(r.readNextRowGroup()); + } } PrintFooter.main(new String[] {path.toString()}); } diff --git a/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestParquetWriter.java b/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestParquetWriter.java index 9e9b735f32..b2ae72a641 100644 --- a/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestParquetWriter.java +++ b/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestParquetWriter.java @@ -270,14 +270,15 @@ public void testParquetFileWithBloomFilter() throws IOException { } } - ParquetFileReader reader = ParquetFileReader.open(HadoopInputFile.fromPath(path, new Configuration())); - BlockMetaData blockMetaData = reader.getFooter().getBlocks().get(0); - BloomFilter bloomFilter = reader.getBloomFilterDataReader(blockMetaData) - .readBloomFilter(blockMetaData.getColumns().get(0)); - - for (String name: testNames) { - assertTrue(bloomFilter.findHash( - LongHashFunction.xx(0).hashBytes(Binary.fromString(name).toByteBuffer()))); + try (ParquetFileReader reader = ParquetFileReader.open(HadoopInputFile.fromPath(path, new Configuration()))) { + BlockMetaData blockMetaData = reader.getFooter().getBlocks().get(0); + BloomFilter bloomFilter = reader.getBloomFilterDataReader(blockMetaData) + .readBloomFilter(blockMetaData.getColumns().get(0)); + + for (String name : testNames) { + assertTrue(bloomFilter.findHash( + LongHashFunction.xx(0).hashBytes(Binary.fromString(name).toByteBuffer()))); + } } } diff --git a/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestReadWriteEncodingStats.java b/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestReadWriteEncodingStats.java index 69e11c14e9..fdb7c8677d 100644 --- a/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestReadWriteEncodingStats.java +++ b/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestReadWriteEncodingStats.java @@ -93,29 +93,30 @@ public void testReadWrite() throws Exception { writeData(writer); writer.close(); - ParquetFileReader reader = ParquetFileReader.open(CONF, path); - assertEquals("Should have one row group", 1, reader.getRowGroups().size()); - BlockMetaData rowGroup = reader.getRowGroups().get(0); - - ColumnChunkMetaData dictColumn = rowGroup.getColumns().get(0); - EncodingStats dictStats = dictColumn.getEncodingStats(); - assertNotNull("Dict column should have non-null encoding stats", dictStats); - assertTrue("Dict column should have a dict page", dictStats.hasDictionaryPages()); - assertTrue("Dict column should have dict-encoded pages", dictStats.hasDictionaryEncodedPages()); - assertFalse("Dict column should not have non-dict pages", dictStats.hasNonDictionaryEncodedPages()); - - ColumnChunkMetaData plainColumn = rowGroup.getColumns().get(1); - EncodingStats plainStats = plainColumn.getEncodingStats(); - assertNotNull("Plain column should have non-null encoding stats", plainStats); - assertFalse("Plain column should not have a dict page", plainStats.hasDictionaryPages()); - assertFalse("Plain column should not have dict-encoded pages", plainStats.hasDictionaryEncodedPages()); - assertTrue("Plain column should have non-dict pages", plainStats.hasNonDictionaryEncodedPages()); - - ColumnChunkMetaData fallbackColumn = rowGroup.getColumns().get(2); - EncodingStats fallbackStats = fallbackColumn.getEncodingStats(); - assertNotNull("Fallback column should have non-null encoding stats", fallbackStats); - assertTrue("Fallback column should have a dict page", fallbackStats.hasDictionaryPages()); - assertTrue("Fallback column should have dict-encoded pages", fallbackStats.hasDictionaryEncodedPages()); - assertTrue("Fallback column should have non-dict pages", fallbackStats.hasNonDictionaryEncodedPages()); + try (ParquetFileReader reader = ParquetFileReader.open(CONF, path)) { + assertEquals("Should have one row group", 1, reader.getRowGroups().size()); + BlockMetaData rowGroup = reader.getRowGroups().get(0); + + ColumnChunkMetaData dictColumn = rowGroup.getColumns().get(0); + EncodingStats dictStats = dictColumn.getEncodingStats(); + assertNotNull("Dict column should have non-null encoding stats", dictStats); + assertTrue("Dict column should have a dict page", dictStats.hasDictionaryPages()); + assertTrue("Dict column should have dict-encoded pages", dictStats.hasDictionaryEncodedPages()); + assertFalse("Dict column should not have non-dict pages", dictStats.hasNonDictionaryEncodedPages()); + + ColumnChunkMetaData plainColumn = rowGroup.getColumns().get(1); + EncodingStats plainStats = plainColumn.getEncodingStats(); + assertNotNull("Plain column should have non-null encoding stats", plainStats); + assertFalse("Plain column should not have a dict page", plainStats.hasDictionaryPages()); + assertFalse("Plain column should not have dict-encoded pages", plainStats.hasDictionaryEncodedPages()); + assertTrue("Plain column should have non-dict pages", plainStats.hasNonDictionaryEncodedPages()); + + ColumnChunkMetaData fallbackColumn = rowGroup.getColumns().get(2); + EncodingStats fallbackStats = fallbackColumn.getEncodingStats(); + assertNotNull("Fallback column should have non-null encoding stats", fallbackStats); + assertTrue("Fallback column should have a dict page", fallbackStats.hasDictionaryPages()); + assertTrue("Fallback column should have dict-encoded pages", fallbackStats.hasDictionaryEncodedPages()); + assertTrue("Fallback column should have non-dict pages", fallbackStats.hasNonDictionaryEncodedPages()); + } } }