Skip to content

Commit

Permalink
PARQUET-2054: fix TCP leaking when calling ParquetFileWriter.appendFi…
Browse files Browse the repository at this point in the history
…le (apache#913)

* use try-with-resource statement for ParquetFileReader to call close explicitly
  • Loading branch information
vectorijk authored Jun 22, 2021
1 parent 98ddadf commit bab3d53
Show file tree
Hide file tree
Showing 8 changed files with 221 additions and 210 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -108,10 +108,8 @@ public boolean apply(@Nullable ColumnDescriptor input) {
}));

// now check to see if the data is actually corrupt
ParquetFileReader reader = new ParquetFileReader(getConf(),
fakeMeta, path, footer.getBlocks(), columns);

try {
try (ParquetFileReader reader = new ParquetFileReader(getConf(),
fakeMeta, path, footer.getBlocks(), columns)) {
PageStatsValidator validator = new PageStatsValidator();
for (PageReadStore pages = reader.readNextRowGroup(); pages != null;
pages = reader.readNextRowGroup()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -119,9 +119,10 @@ private String getParquetSchema(String source) throws IOException {

switch (format) {
case PARQUET:
return new ParquetFileReader(
getConf(), qualifiedPath(source), ParquetMetadataConverter.NO_FILTER)
.getFileMetaData().getSchema().toString();
try (ParquetFileReader reader = new ParquetFileReader(
getConf(), qualifiedPath(source), ParquetMetadataConverter.NO_FILTER)) {
return reader.getFileMetaData().getSchema().toString();
}
default:
throw new IllegalArgumentException(String.format(
"Could not get a Parquet schema for format %s: %s", format, source));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,56 +64,57 @@ public int run() throws IOException {

String source = targets.get(0);

ParquetFileReader reader = ParquetFileReader.open(getConf(), qualifiedPath(source));
MessageType schema = reader.getFileMetaData().getSchema();
ColumnDescriptor descriptor = Util.descriptor(column, schema);
PrimitiveType type = Util.primitive(column, schema);
Preconditions.checkNotNull(type);

DictionaryPageReadStore dictionaryReader;
int rowGroup = 0;
while ((dictionaryReader = reader.getNextDictionaryReader()) != null) {
DictionaryPage page = dictionaryReader.readDictionaryPage(descriptor);

Dictionary dict = page.getEncoding().initDictionary(descriptor, page);

console.info("\nRow group {} dictionary for \"{}\":", rowGroup, column, page.getCompressedSize());
for (int i = 0; i <= dict.getMaxId(); i += 1) {
switch(type.getPrimitiveTypeName()) {
case BINARY:
if (type.getLogicalTypeAnnotation() instanceof LogicalTypeAnnotation.StringLogicalTypeAnnotation) {
try (ParquetFileReader reader = ParquetFileReader.open(getConf(), qualifiedPath(source))) {
MessageType schema = reader.getFileMetaData().getSchema();
ColumnDescriptor descriptor = Util.descriptor(column, schema);
PrimitiveType type = Util.primitive(column, schema);
Preconditions.checkNotNull(type);

DictionaryPageReadStore dictionaryReader;
int rowGroup = 0;
while ((dictionaryReader = reader.getNextDictionaryReader()) != null) {
DictionaryPage page = dictionaryReader.readDictionaryPage(descriptor);

Dictionary dict = page.getEncoding().initDictionary(descriptor, page);

console.info("\nRow group {} dictionary for \"{}\":", rowGroup, column, page.getCompressedSize());
for (int i = 0; i <= dict.getMaxId(); i += 1) {
switch(type.getPrimitiveTypeName()) {
case BINARY:
if (type.getLogicalTypeAnnotation() instanceof LogicalTypeAnnotation.StringLogicalTypeAnnotation) {
console.info("{}: {}", String.format("%6d", i),
Util.humanReadable(dict.decodeToBinary(i).toStringUsingUTF8(), 70));
} else {
console.info("{}: {}", String.format("%6d", i),
Util.humanReadable(dict.decodeToBinary(i).getBytesUnsafe(), 70));
}
break;
case INT32:
console.info("{}: {}", String.format("%6d", i),
Util.humanReadable(dict.decodeToBinary(i).toStringUsingUTF8(), 70));
} else {
dict.decodeToInt(i));
break;
case INT64:
console.info("{}: {}", String.format("%6d", i),
Util.humanReadable(dict.decodeToBinary(i).getBytesUnsafe(), 70));
}
break;
case INT32:
console.info("{}: {}", String.format("%6d", i),
dict.decodeToInt(i));
break;
case INT64:
console.info("{}: {}", String.format("%6d", i),
dict.decodeToLong(i));
break;
case FLOAT:
console.info("{}: {}", String.format("%6d", i),
dict.decodeToFloat(i));
break;
case DOUBLE:
console.info("{}: {}", String.format("%6d", i),
dict.decodeToDouble(i));
break;
default:
throw new IllegalArgumentException(
"Unknown dictionary type: " + type.getPrimitiveTypeName());
dict.decodeToLong(i));
break;
case FLOAT:
console.info("{}: {}", String.format("%6d", i),
dict.decodeToFloat(i));
break;
case DOUBLE:
console.info("{}: {}", String.format("%6d", i),
dict.decodeToDouble(i));
break;
default:
throw new IllegalArgumentException(
"Unknown dictionary type: " + type.getPrimitiveTypeName());
}
}
}

reader.skipNextRowGroup();
reader.skipNextRowGroup();

rowGroup += 1;
rowGroup += 1;
}
}

console.info("");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -75,57 +75,57 @@ public int run() throws IOException {
"Cannot process multiple Parquet files.");

String source = targets.get(0);
ParquetFileReader reader = ParquetFileReader.open(getConf(), qualifiedPath(source));

MessageType schema = reader.getFileMetaData().getSchema();
Map<ColumnDescriptor, PrimitiveType> columns = Maps.newLinkedHashMap();
if (this.columns == null || this.columns.isEmpty()) {
for (ColumnDescriptor descriptor : schema.getColumns()) {
columns.put(descriptor, primitive(schema, descriptor.getPath()));
}
} else {
for (String column : this.columns) {
columns.put(descriptor(column, schema), primitive(column, schema));
}
}

CompressionCodecName codec = reader.getRowGroups().get(0).getColumns().get(0).getCodec();
// accumulate formatted lines to print by column
Map<String, List<String>> formatted = Maps.newLinkedHashMap();
PageFormatter formatter = new PageFormatter();
PageReadStore pageStore;
int rowGroupNum = 0;
while ((pageStore = reader.readNextRowGroup()) != null) {
for (ColumnDescriptor descriptor : columns.keySet()) {
List<String> lines = formatted.get(columnName(descriptor));
if (lines == null) {
lines = Lists.newArrayList();
formatted.put(columnName(descriptor), lines);
try (ParquetFileReader reader = ParquetFileReader.open(getConf(), qualifiedPath(source))) {
MessageType schema = reader.getFileMetaData().getSchema();
Map<ColumnDescriptor, PrimitiveType> columns = Maps.newLinkedHashMap();
if (this.columns == null || this.columns.isEmpty()) {
for (ColumnDescriptor descriptor : schema.getColumns()) {
columns.put(descriptor, primitive(schema, descriptor.getPath()));
}

formatter.setContext(rowGroupNum, columns.get(descriptor), codec);
PageReader pages = pageStore.getPageReader(descriptor);

DictionaryPage dict = pages.readDictionaryPage();
if (dict != null) {
lines.add(formatter.format(dict));
} else {
for (String column : this.columns) {
columns.put(descriptor(column, schema), primitive(column, schema));
}
DataPage page;
while ((page = pages.readPage()) != null) {
lines.add(formatter.format(page));
}

CompressionCodecName codec = reader.getRowGroups().get(0).getColumns().get(0).getCodec();
// accumulate formatted lines to print by column
Map<String, List<String>> formatted = Maps.newLinkedHashMap();
PageFormatter formatter = new PageFormatter();
PageReadStore pageStore;
int rowGroupNum = 0;
while ((pageStore = reader.readNextRowGroup()) != null) {
for (ColumnDescriptor descriptor : columns.keySet()) {
List<String> lines = formatted.get(columnName(descriptor));
if (lines == null) {
lines = Lists.newArrayList();
formatted.put(columnName(descriptor), lines);
}

formatter.setContext(rowGroupNum, columns.get(descriptor), codec);
PageReader pages = pageStore.getPageReader(descriptor);

DictionaryPage dict = pages.readDictionaryPage();
if (dict != null) {
lines.add(formatter.format(dict));
}
DataPage page;
while ((page = pages.readPage()) != null) {
lines.add(formatter.format(page));
}
}
rowGroupNum += 1;
}
rowGroupNum += 1;
}

// TODO: Show total column size and overall size per value in the column summary line
for (String columnName : formatted.keySet()) {
console.info(String.format("\nColumn: %s\n%s", columnName, new TextStringBuilder(80).appendPadding(80, '-')));
console.info(formatter.getHeader());
for (String line : formatted.get(columnName)) {
console.info(line);
// TODO: Show total column size and overall size per value in the column summary line
for (String columnName : formatted.keySet()) {
console.info(String.format("\nColumn: %s\n%s", columnName, new TextStringBuilder(80).appendPadding(80, '-')));
console.info(formatter.getHeader());
for (String line : formatted.get(columnName)) {
console.info(line);
}
console.info("");
}
console.info("");
}

return 0;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -887,7 +887,9 @@ public void endBlock() throws IOException {
*/
@Deprecated
public void appendFile(Configuration conf, Path file) throws IOException {
ParquetFileReader.open(conf, file).appendTo(this);
try (ParquetFileReader reader = ParquetFileReader.open(conf, file)) {
reader.appendTo(this);
}
}

public void appendFile(InputFile file) throws IOException {
Expand Down
Loading

0 comments on commit bab3d53

Please sign in to comment.