Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

PARQUET-869: Configurable record counts for block size checks #470

Open
wants to merge 2 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -1,14 +1,14 @@
/*
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
*
* http://www.apache.org/licenses/LICENSE-2.0
*
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
Expand Down Expand Up @@ -45,8 +45,11 @@ public class ParquetProperties {
public static final boolean DEFAULT_IS_DICTIONARY_ENABLED = true;
public static final WriterVersion DEFAULT_WRITER_VERSION = WriterVersion.PARQUET_1_0;
public static final boolean DEFAULT_ESTIMATE_ROW_COUNT_FOR_PAGE_SIZE_CHECK = true;
public static final int DEFAULT_MINIMUM_RECORD_COUNT_FOR_CHECK = 100;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not sure if anyone would use such constants but it is a braking change to remove them. It might be a good idea to deprecate them instead and use the new ones internally.

public static final int DEFAULT_MAXIMUM_RECORD_COUNT_FOR_CHECK = 10000;
public static final boolean DEFAULT_ESTIMATE_ROW_COUNT_FOR_ROW_GROUP_SIZE_CHECK = true;
public static final int DEFAULT_MINIMUM_RECORD_COUNT_FOR_PAGE_SIZE_CHECK = 100;
public static final int DEFAULT_MAXIMUM_RECORD_COUNT_FOR_PAGE_SIZE_CHECK = 10000;
public static final int DEFAULT_MINIMUM_RECORD_COUNT_FOR_ROW_GROUP_SIZE_CHECK = 100;
public static final int DEFAULT_MAXIMUM_RECORD_COUNT_FOR_ROW_GROUP_SIZE_CHECK = 10000;

public static final ValuesWriterFactory DEFAULT_VALUES_WRITER_FACTORY = new DefaultValuesWriterFactory();

Expand Down Expand Up @@ -80,12 +83,15 @@ public static WriterVersion fromString(String name) {
private final boolean enableDictionary;
private final int minRowCountForPageSizeCheck;
private final int maxRowCountForPageSizeCheck;
private final boolean estimateNextSizeCheck;
private final int minRowCountForRowGroupSizeCheck;
private final int maxRowCountForRowGroupSizeCheck;
private final boolean estimateNextPageSizeCheck;
private final boolean estimateNextRowGroupSizeCheck;
private final ByteBufferAllocator allocator;
private final ValuesWriterFactory valuesWriterFactory;

private ParquetProperties(WriterVersion writerVersion, int pageSize, int dictPageSize, boolean enableDict, int minRowCountForPageSizeCheck,
int maxRowCountForPageSizeCheck, boolean estimateNextSizeCheck, ByteBufferAllocator allocator,
int maxRowCountForPageSizeCheck, int minRowCountForRowGroupSizeCheck, int maxRowCountForRowGroupSizeCheck, boolean estimateNextPageSizeCheck, boolean estimateNextRowGroupSizeCheck, ByteBufferAllocator allocator,
ValuesWriterFactory writerFactory) {
this.pageSizeThreshold = pageSize;
this.initialSlabSize = CapacityByteArrayOutputStream
Expand All @@ -95,7 +101,10 @@ private ParquetProperties(WriterVersion writerVersion, int pageSize, int dictPag
this.enableDictionary = enableDict;
this.minRowCountForPageSizeCheck = minRowCountForPageSizeCheck;
this.maxRowCountForPageSizeCheck = maxRowCountForPageSizeCheck;
this.estimateNextSizeCheck = estimateNextSizeCheck;
this.minRowCountForRowGroupSizeCheck = minRowCountForRowGroupSizeCheck;
this.maxRowCountForRowGroupSizeCheck = maxRowCountForRowGroupSizeCheck;
this.estimateNextPageSizeCheck = estimateNextPageSizeCheck;
this.estimateNextRowGroupSizeCheck = estimateNextRowGroupSizeCheck;
this.allocator = allocator;

this.valuesWriterFactory = writerFactory;
Expand Down Expand Up @@ -179,12 +188,24 @@ public int getMaxRowCountForPageSizeCheck() {
return maxRowCountForPageSizeCheck;
}

public int getMinRowCountForRowGroupSizeCheck() {
return minRowCountForRowGroupSizeCheck;
}

public int getMaxRowCountForRowGroupSizeCheck() {
return maxRowCountForRowGroupSizeCheck;
}

public ValuesWriterFactory getValuesWriterFactory() {
return valuesWriterFactory;
}

public boolean estimateNextSizeCheck() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I suggest deprecating instead of removing.

return estimateNextSizeCheck;
public boolean estimateNextPageSizeCheck() {
return estimateNextPageSizeCheck;
}

public boolean estimateNextRowGroupSizeCheck() {
return estimateNextRowGroupSizeCheck;
}

public static Builder builder() {
Expand All @@ -200,9 +221,12 @@ public static class Builder {
private int dictPageSize = DEFAULT_DICTIONARY_PAGE_SIZE;
private boolean enableDict = DEFAULT_IS_DICTIONARY_ENABLED;
private WriterVersion writerVersion = DEFAULT_WRITER_VERSION;
private int minRowCountForPageSizeCheck = DEFAULT_MINIMUM_RECORD_COUNT_FOR_CHECK;
private int maxRowCountForPageSizeCheck = DEFAULT_MAXIMUM_RECORD_COUNT_FOR_CHECK;
private boolean estimateNextSizeCheck = DEFAULT_ESTIMATE_ROW_COUNT_FOR_PAGE_SIZE_CHECK;
private int minRowCountForPageSizeCheck = DEFAULT_MINIMUM_RECORD_COUNT_FOR_PAGE_SIZE_CHECK;
private int maxRowCountForPageSizeCheck = DEFAULT_MAXIMUM_RECORD_COUNT_FOR_PAGE_SIZE_CHECK;
private boolean estimateNextPageSizeCheck = DEFAULT_ESTIMATE_ROW_COUNT_FOR_PAGE_SIZE_CHECK;
private int minRowCountForRowGroupSizeCheck = DEFAULT_MINIMUM_RECORD_COUNT_FOR_ROW_GROUP_SIZE_CHECK;
private int maxRowCountForRowGroupSizeCheck = DEFAULT_MAXIMUM_RECORD_COUNT_FOR_ROW_GROUP_SIZE_CHECK;
private boolean estimateNextRowGroupSizeCheck = DEFAULT_ESTIMATE_ROW_COUNT_FOR_ROW_GROUP_SIZE_CHECK;
private ByteBufferAllocator allocator = new HeapByteBufferAllocator();
private ValuesWriterFactory valuesWriterFactory = DEFAULT_VALUES_WRITER_FACTORY;

Expand All @@ -215,7 +239,10 @@ private Builder(ParquetProperties toCopy) {
this.writerVersion = toCopy.writerVersion;
this.minRowCountForPageSizeCheck = toCopy.minRowCountForPageSizeCheck;
this.maxRowCountForPageSizeCheck = toCopy.maxRowCountForPageSizeCheck;
this.estimateNextSizeCheck = toCopy.estimateNextSizeCheck;
this.estimateNextPageSizeCheck = toCopy.estimateNextPageSizeCheck;
this.minRowCountForRowGroupSizeCheck = toCopy.minRowCountForRowGroupSizeCheck;
this.maxRowCountForRowGroupSizeCheck = toCopy.maxRowCountForRowGroupSizeCheck;
this.estimateNextRowGroupSizeCheck = toCopy.estimateNextRowGroupSizeCheck;
this.allocator = toCopy.allocator;
}

Expand Down Expand Up @@ -281,9 +308,28 @@ public Builder withMaxRowCountForPageSizeCheck(int max) {
return this;
}

public Builder withMinRowCountForRowGroupSizeCheck(int min) {
Preconditions.checkArgument(min > 0,
"Invalid row count for block size check (negative): %s", min);
this.minRowCountForRowGroupSizeCheck = min;
return this;
}

public Builder withMaxRowCountForRowGroupSizeCheck(int max) {
Preconditions.checkArgument(max > 0,
"Invalid row count for block size check (negative): %s", max);
this.maxRowCountForRowGroupSizeCheck = max;
return this;
}

// Do not attempt to predict next size check. Prevents issues with rows that vary significantly in size.
public Builder estimateRowCountForPageSizeCheck(boolean estimateNextSizeCheck) {
this.estimateNextSizeCheck = estimateNextSizeCheck;
this.estimateNextPageSizeCheck = estimateNextSizeCheck;
return this;
}

public Builder estimateRowCountForRowGroupSizeCheck(boolean estimateRowGroupSizeCheck) {
this.estimateNextRowGroupSizeCheck = estimateRowGroupSizeCheck;
return this;
}

Expand All @@ -303,7 +349,9 @@ public ParquetProperties build() {
ParquetProperties properties =
new ParquetProperties(writerVersion, pageSize, dictPageSize,
enableDict, minRowCountForPageSizeCheck, maxRowCountForPageSizeCheck,
estimateNextSizeCheck, allocator, valuesWriterFactory);
minRowCountForRowGroupSizeCheck, maxRowCountForRowGroupSizeCheck,
estimateNextPageSizeCheck, estimateNextRowGroupSizeCheck,
allocator, valuesWriterFactory);
// we pass a constructed but uninitialized factory to ParquetProperties above as currently
// creation of ValuesWriters is invoked from within ParquetProperties. In the future
// we'd like to decouple that and won't need to pass an object to properties and then pass the
Expand Down
Original file line number Diff line number Diff line change
@@ -1,14 +1,14 @@
/*
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
*
* http://www.apache.org/licenses/LICENSE-2.0
*
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
Expand Down Expand Up @@ -161,7 +161,7 @@ private void sizeCheck() {
minRecordToWait = props.getMinRowCountForPageSizeCheck();
}

if(props.estimateNextSizeCheck()) {
if(props.estimateNextPageSizeCheck()) {
// will check again halfway if between min and max
rowCountForNextSizeCheck = rowCount +
min(
Expand Down
Original file line number Diff line number Diff line change
@@ -1,14 +1,14 @@
/*
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
*
* http://www.apache.org/licenses/LICENSE-2.0
*
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
Expand Down Expand Up @@ -98,13 +98,13 @@ private void accountForValueWritten() {
+ dataColumn.getBufferedSize();
if (memSize > props.getPageSizeThreshold()) {
// we will write the current page and check again the size at the predicted middle of next page
if (props.estimateNextSizeCheck()) {
if (props.estimateNextPageSizeCheck()) {
valueCountForNextSizeCheck = valueCount / 2;
} else {
valueCountForNextSizeCheck = props.getMinRowCountForPageSizeCheck();
}
writePage();
} else if (props.estimateNextSizeCheck()) {
} else if (props.estimateNextPageSizeCheck()) {
// not reached the threshold, will check again midway
valueCountForNextSizeCheck = (int)(valueCount + ((float)valueCount * props.getPageSizeThreshold() / memSize)) / 2 + 1;
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,14 +43,11 @@
class InternalParquetRecordWriter<T> {
private static final Logger LOG = LoggerFactory.getLogger(InternalParquetRecordWriter.class);

private static final int MINIMUM_RECORD_COUNT_FOR_CHECK = 100;
private static final int MAXIMUM_RECORD_COUNT_FOR_CHECK = 10000;

private final ParquetFileWriter parquetFileWriter;
private final WriteSupport<T> writeSupport;
private final MessageType schema;
private final Map<String, String> extraMetaData;
private final long rowGroupSize;
private final boolean estimateSizeCheckRows;
private long rowGroupSizeThreshold;
private long nextRowGroupSize;
private final BytesCompressor compressor;
Expand All @@ -60,12 +57,14 @@ class InternalParquetRecordWriter<T> {
private boolean closed;

private long recordCount = 0;
private long recordCountForNextMemCheck = MINIMUM_RECORD_COUNT_FOR_CHECK;
private long recordCountForNextMemCheck;
private long lastRowGroupEndPos = 0;

private ColumnWriteStore columnStore;
private ColumnChunkPageWriteStore pageStore;
private RecordConsumer recordConsumer;
private int sizeCheckMinRows;
private int sizeCheckMaxRows;

/**
* @param parquetFileWriter the file to write to
Expand All @@ -88,12 +87,14 @@ public InternalParquetRecordWriter(
this.writeSupport = checkNotNull(writeSupport, "writeSupport");
this.schema = schema;
this.extraMetaData = extraMetaData;
this.rowGroupSize = rowGroupSize;
this.rowGroupSizeThreshold = rowGroupSize;
this.nextRowGroupSize = rowGroupSizeThreshold;
this.compressor = compressor;
this.validating = validating;
this.props = props;
this.sizeCheckMinRows = props.getMinRowCountForRowGroupSizeCheck();
this.sizeCheckMaxRows = props.getMaxRowCountForRowGroupSizeCheck();
this.estimateSizeCheckRows = props.estimateNextRowGroupSizeCheck();
initStore();
}

Expand Down Expand Up @@ -147,13 +148,22 @@ private void checkBlockSizeReached() throws IOException {
LOG.info("mem size {} > {}: flushing {} records to disk.", memSize, nextRowGroupSize, recordCount);
flushRowGroupToStore();
initStore();
recordCountForNextMemCheck = min(max(MINIMUM_RECORD_COUNT_FOR_CHECK, recordCount / 2), MAXIMUM_RECORD_COUNT_FOR_CHECK);
if (estimateSizeCheckRows) {
recordCountForNextMemCheck = min(max(sizeCheckMinRows, recordCount / 2), sizeCheckMaxRows);
} else {
recordCountForNextMemCheck = sizeCheckMinRows;
}
this.lastRowGroupEndPos = parquetFileWriter.getPos();
} else {
recordCountForNextMemCheck = min(
max(MINIMUM_RECORD_COUNT_FOR_CHECK, (recordCount + (long)(nextRowGroupSize / ((float)recordSize))) / 2), // will check halfway
recordCount + MAXIMUM_RECORD_COUNT_FOR_CHECK // will not look more than max records ahead
);
if (estimateSizeCheckRows) {
recordCountForNextMemCheck = min(
max(sizeCheckMinRows, (recordCount + (long) (nextRowGroupSize / ((float) recordSize))) / 2),
// will check halfway
recordCount + sizeCheckMaxRows // will not look more than max records ahead
);
} else {
recordCountForNextMemCheck += sizeCheckMinRows;
}
LOG.debug("Checked mem at {} will check again at: {}", recordCount, recordCountForNextMemCheck);
}
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,14 +1,14 @@
/*
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
*
* http://www.apache.org/licenses/LICENSE-2.0
*
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
Expand Down Expand Up @@ -142,7 +142,10 @@ public static enum JobSummaryLevel {
public static final String MAX_PADDING_BYTES = "parquet.writer.max-padding";
public static final String MIN_ROW_COUNT_FOR_PAGE_SIZE_CHECK = "parquet.page.size.row.check.min";
public static final String MAX_ROW_COUNT_FOR_PAGE_SIZE_CHECK = "parquet.page.size.row.check.max";
public static final String MIN_ROW_COUNT_FOR_ROW_GROUP_SIZE_CHECK = "parquet.row-group.size.row.check.min";
public static final String MAX_ROW_COUNT_FOR_ROW_GROUP_SIZE_CHECK = "parquet.row-group.size.row.check.max";
public static final String ESTIMATE_PAGE_SIZE_CHECK = "parquet.page.size.check.estimate";
public static final String ESTIMATE_ROW_GROUP_SIZE_CHECK = "parquet.row-group.size.check.estimate";

public static JobSummaryLevel getJobSummaryLevel(Configuration conf) {
String level = conf.get(JOB_SUMMARY_LEVEL);
Expand Down Expand Up @@ -243,12 +246,27 @@ public static boolean getEnableDictionary(Configuration configuration) {

public static int getMinRowCountForPageSizeCheck(Configuration configuration) {
return configuration.getInt(MIN_ROW_COUNT_FOR_PAGE_SIZE_CHECK,
ParquetProperties.DEFAULT_MINIMUM_RECORD_COUNT_FOR_CHECK);
ParquetProperties.DEFAULT_MINIMUM_RECORD_COUNT_FOR_PAGE_SIZE_CHECK);
}

public static int getMaxRowCountForPageSizeCheck(Configuration configuration) {
return configuration.getInt(MAX_ROW_COUNT_FOR_PAGE_SIZE_CHECK,
ParquetProperties.DEFAULT_MAXIMUM_RECORD_COUNT_FOR_CHECK);
ParquetProperties.DEFAULT_MAXIMUM_RECORD_COUNT_FOR_PAGE_SIZE_CHECK);
}

public static int getMinRowCountForRowGroupSizeCheck(Configuration configuration) {
return configuration.getInt(MIN_ROW_COUNT_FOR_ROW_GROUP_SIZE_CHECK,
ParquetProperties.DEFAULT_MINIMUM_RECORD_COUNT_FOR_ROW_GROUP_SIZE_CHECK);
}

public static int getMaxRowCountForRowGroupSizeCheck(Configuration configuration) {
return configuration.getInt(MAX_ROW_COUNT_FOR_ROW_GROUP_SIZE_CHECK,
ParquetProperties.DEFAULT_MAXIMUM_RECORD_COUNT_FOR_ROW_GROUP_SIZE_CHECK);
}

public static boolean getEstimateBlockSizeCheck(Configuration configuration) {
return configuration.getBoolean(ESTIMATE_ROW_GROUP_SIZE_CHECK,
ParquetProperties.DEFAULT_ESTIMATE_ROW_COUNT_FOR_ROW_GROUP_SIZE_CHECK);
}

public static boolean getEstimatePageSizeCheck(Configuration configuration) {
Expand Down Expand Up @@ -364,8 +382,11 @@ public RecordWriter<Void, T> getRecordWriter(Configuration conf, Path file, Comp
.withDictionaryEncoding(getEnableDictionary(conf))
.withWriterVersion(getWriterVersion(conf))
.estimateRowCountForPageSizeCheck(getEstimatePageSizeCheck(conf))
.estimateRowCountForRowGroupSizeCheck(getEstimateBlockSizeCheck(conf))
.withMinRowCountForPageSizeCheck(getMinRowCountForPageSizeCheck(conf))
.withMaxRowCountForPageSizeCheck(getMaxRowCountForPageSizeCheck(conf))
.withMinRowCountForRowGroupSizeCheck(getMinRowCountForRowGroupSizeCheck(conf))
.withMaxRowCountForRowGroupSizeCheck(getMaxRowCountForRowGroupSizeCheck(conf))
.build();

long blockSize = getLongBlockSize(conf);
Expand All @@ -380,9 +401,11 @@ public RecordWriter<Void, T> getRecordWriter(Configuration conf, Path file, Comp
LOG.info("Validation is {}", (validating ? "on" : "off"));
LOG.info("Writer version is: {}", props.getWriterVersion());
LOG.info("Maximum row group padding size is {} bytes", maxPaddingSize);
LOG.info("Page size checking is: {}", (props.estimateNextSizeCheck() ? "estimated" : "constant"));
LOG.info("Page size checking is: {}", (props.estimateNextPageSizeCheck() ? "estimated" : "constant"));
LOG.info("Min row count for page size check is: {}", props.getMinRowCountForPageSizeCheck());
LOG.info("Max row count for page size check is: {}", props.getMaxRowCountForPageSizeCheck());
LOG.info("Min row count for row group size check is: {}", props.getMinRowCountForRowGroupSizeCheck());
LOG.info("Max row count for row group size check is: {}", props.getMaxRowCountForRowGroupSizeCheck());
}

WriteContext init = writeSupport.init(conf);
Expand Down