Skip to content

Commit

Permalink
apacheGH-41321: [C++][Parquet] More strict Parquet level checking (ap…
Browse files Browse the repository at this point in the history
…ache#41346)

### Rationale for this change

In apache#41321 , user reports a corrupt when reading from a corrupt parquet file. This is because we lost some checking. Current code works on reading a normal parquet file. But when reading a corrupt file, this need to be more strict.

**Currently this patch just enhance the checking on Parquet Level, the correspond value check would be add in later patches**

### What changes are included in this PR?

More strict parquet checkings on Level

### Are these changes tested?

Already exists test, maybe we can introduce parquet file as test file

### Are there any user-facing changes?

More strict checkings

* GitHub Issue: apache#41321

Lead-authored-by: mwish <[email protected]>
Co-authored-by: mwish <[email protected]>
Signed-off-by: mwish <[email protected]>
  • Loading branch information
mapleFU authored May 21, 2024
1 parent e3cd0ae commit 1f07404
Show file tree
Hide file tree
Showing 3 changed files with 143 additions and 44 deletions.
109 changes: 67 additions & 42 deletions cpp/src/parquet/column_reader.cc
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,10 @@ inline void CheckNumberDecoded(int64_t number_decoded, int64_t expected) {
std::to_string(expected));
}
}

constexpr std::string_view kErrorRepDefLevelNotMatchesNumValues =
"Number of decoded rep / def levels do not match num_values in page header";

} // namespace

LevelDecoder::LevelDecoder() : num_values_remaining_(0) {}
Expand Down Expand Up @@ -907,6 +911,8 @@ class ColumnReaderImplBase {
static_cast<int>(data_size));
}

// Available values in the current data page, value includes repeated values
// and nulls.
int64_t available_values_current_page() const {
return num_buffered_values_ - num_decoded_values_;
}
Expand All @@ -933,7 +939,7 @@ class ColumnReaderImplBase {
int64_t num_buffered_values_;

// The number of values from the current data page that have been decoded
// into memory
// into memory or skipped over.
int64_t num_decoded_values_;

::arrow::MemoryPool* pool_;
Expand Down Expand Up @@ -1026,28 +1032,36 @@ class TypedColumnReaderImpl : public TypedColumnReader<DType>,

// Read definition and repetition levels. Also return the number of definition levels
// and number of values to read. This function is called before reading values.
//
// ReadLevels will throw exception when any num-levels read is not equal to the number
// of the levels can be read.
void ReadLevels(int64_t batch_size, int16_t* def_levels, int16_t* rep_levels,
int64_t* num_def_levels, int64_t* values_to_read) {
batch_size =
std::min(batch_size, this->num_buffered_values_ - this->num_decoded_values_);
int64_t* num_def_levels, int64_t* non_null_values_to_read) {
batch_size = std::min(batch_size, this->available_values_current_page());

// If the field is required and non-repeated, there are no definition levels
if (this->max_def_level_ > 0 && def_levels != nullptr) {
*num_def_levels = this->ReadDefinitionLevels(batch_size, def_levels);
if (ARROW_PREDICT_FALSE(*num_def_levels != batch_size)) {
throw ParquetException(kErrorRepDefLevelNotMatchesNumValues);
}
// TODO(wesm): this tallying of values-to-decode can be performed with better
// cache-efficiency if fused with the level decoding.
*values_to_read +=
*non_null_values_to_read +=
std::count(def_levels, def_levels + *num_def_levels, this->max_def_level_);
} else {
// Required field, read all values
*values_to_read = batch_size;
if (num_def_levels != nullptr) {
*num_def_levels = 0;
}
*non_null_values_to_read = batch_size;
}

// Not present for non-repeated fields
if (this->max_rep_level_ > 0 && rep_levels != nullptr) {
int64_t num_rep_levels = this->ReadRepetitionLevels(batch_size, rep_levels);
if (def_levels != nullptr && *num_def_levels != num_rep_levels) {
throw ParquetException("Number of decoded rep / def levels did not match");
if (batch_size != num_rep_levels) {
throw ParquetException(kErrorRepDefLevelNotMatchesNumValues);
}
}
}
Expand Down Expand Up @@ -1090,8 +1104,7 @@ int64_t TypedColumnReaderImpl<DType>::ReadBatchWithDictionary(
*indices_read = ReadDictionaryIndices(indices_to_read, indices);
int64_t total_indices = std::max<int64_t>(num_def_levels, *indices_read);
// Some callers use a batch size of 0 just to get the dictionary.
int64_t expected_values =
std::min(batch_size, this->num_buffered_values_ - this->num_decoded_values_);
int64_t expected_values = std::min(batch_size, this->available_values_current_page());
if (total_indices == 0 && expected_values > 0) {
std::stringstream ss;
ss << "Read 0 values, expected " << expected_values;
Expand All @@ -1106,7 +1119,8 @@ template <typename DType>
int64_t TypedColumnReaderImpl<DType>::ReadBatch(int64_t batch_size, int16_t* def_levels,
int16_t* rep_levels, T* values,
int64_t* values_read) {
// HasNext invokes ReadNewPage
// HasNext might invoke ReadNewPage until a data page with
// `available_values_current_page() > 0` is found.
if (!HasNext()) {
*values_read = 0;
return 0;
Expand All @@ -1115,20 +1129,31 @@ int64_t TypedColumnReaderImpl<DType>::ReadBatch(int64_t batch_size, int16_t* def
// TODO(wesm): keep reading data pages until batch_size is reached, or the
// row group is finished
int64_t num_def_levels = 0;
int64_t values_to_read = 0;
ReadLevels(batch_size, def_levels, rep_levels, &num_def_levels, &values_to_read);

*values_read = this->ReadValues(values_to_read, values);
// Number of non-null values to read within `num_def_levels`.
int64_t non_null_values_to_read = 0;
ReadLevels(batch_size, def_levels, rep_levels, &num_def_levels,
&non_null_values_to_read);
// Should not return more values than available in the current data page,
// since currently, ReadLevels would only consume level from current
// data page.
if (ARROW_PREDICT_FALSE(num_def_levels > this->available_values_current_page())) {
throw ParquetException(kErrorRepDefLevelNotMatchesNumValues);
}
if (non_null_values_to_read != 0) {
*values_read = this->ReadValues(non_null_values_to_read, values);
} else {
*values_read = 0;
}
// Adjust total_values, since if max_def_level_ == 0, num_def_levels would
// be 0 and `values_read` would adjust to `available_values_current_page()`.
int64_t total_values = std::max<int64_t>(num_def_levels, *values_read);
int64_t expected_values =
std::min(batch_size, this->num_buffered_values_ - this->num_decoded_values_);
int64_t expected_values = std::min(batch_size, this->available_values_current_page());
if (total_values == 0 && expected_values > 0) {
std::stringstream ss;
ss << "Read 0 values, expected " << expected_values;
ParquetException::EofException(ss.str());
}
this->ConsumeBufferedValues(total_values);

return total_values;
}

Expand All @@ -1137,29 +1162,33 @@ int64_t TypedColumnReaderImpl<DType>::ReadBatchSpaced(
int64_t batch_size, int16_t* def_levels, int16_t* rep_levels, T* values,
uint8_t* valid_bits, int64_t valid_bits_offset, int64_t* levels_read,
int64_t* values_read, int64_t* null_count_out) {
// HasNext invokes ReadNewPage
// HasNext might invoke ReadNewPage until a data page with
// `available_values_current_page() > 0` is found.
if (!HasNext()) {
*levels_read = 0;
*values_read = 0;
*null_count_out = 0;
return 0;
}

// Number of non-null values to read
int64_t total_values;
// TODO(wesm): keep reading data pages until batch_size is reached, or the
// row group is finished
batch_size =
std::min(batch_size, this->num_buffered_values_ - this->num_decoded_values_);
batch_size = std::min(batch_size, this->available_values_current_page());

// If the field is required and non-repeated, there are no definition levels
if (this->max_def_level_ > 0) {
int64_t num_def_levels = this->ReadDefinitionLevels(batch_size, def_levels);
if (ARROW_PREDICT_FALSE(num_def_levels != batch_size)) {
throw ParquetException(kErrorRepDefLevelNotMatchesNumValues);
}

// Not present for non-repeated fields
if (this->max_rep_level_ > 0) {
int64_t num_rep_levels = this->ReadRepetitionLevels(batch_size, rep_levels);
if (num_def_levels != num_rep_levels) {
throw ParquetException("Number of decoded rep / def levels did not match");
if (ARROW_PREDICT_FALSE(num_def_levels != num_rep_levels)) {
throw ParquetException(kErrorRepDefLevelNotMatchesNumValues);
}
}

Expand Down Expand Up @@ -1401,26 +1430,21 @@ class TypedRecordReader : public TypedColumnReaderImpl<DType>,
int16_t* def_levels = this->def_levels() + levels_written_;
int16_t* rep_levels = this->rep_levels() + levels_written_;

// Not present for non-repeated fields
int64_t levels_read = 0;
if (ARROW_PREDICT_FALSE(this->ReadDefinitionLevels(batch_size, def_levels) !=
batch_size)) {
throw ParquetException(kErrorRepDefLevelNotMatchesNumValues);
}
if (this->max_rep_level_ > 0) {
levels_read = this->ReadDefinitionLevels(batch_size, def_levels);
if (this->ReadRepetitionLevels(batch_size, rep_levels) != levels_read) {
throw ParquetException("Number of decoded rep / def levels did not match");
int64_t rep_levels_read = this->ReadRepetitionLevels(batch_size, rep_levels);
if (ARROW_PREDICT_FALSE(rep_levels_read != batch_size)) {
throw ParquetException(kErrorRepDefLevelNotMatchesNumValues);
}
} else if (this->max_def_level_ > 0) {
levels_read = this->ReadDefinitionLevels(batch_size, def_levels);
}

// Exhausted column chunk
if (levels_read == 0) {
break;
}

levels_written_ += levels_read;
levels_written_ += batch_size;
records_read += ReadRecordData(num_records - records_read);
} else {
// No repetition or definition levels
// No repetition and definition levels, we can read values directly
batch_size = std::min(num_records - records_read, batch_size);
records_read += ReadRecordData(batch_size);
}
Expand Down Expand Up @@ -1574,13 +1598,14 @@ class TypedRecordReader : public TypedColumnReaderImpl<DType>,
int16_t* def_levels = this->def_levels() + levels_written_;
int16_t* rep_levels = this->rep_levels() + levels_written_;

int64_t levels_read = 0;
levels_read = this->ReadDefinitionLevels(batch_size, def_levels);
if (this->ReadRepetitionLevels(batch_size, rep_levels) != levels_read) {
throw ParquetException("Number of decoded rep / def levels did not match");
if (this->ReadDefinitionLevels(batch_size, def_levels) != batch_size) {
throw ParquetException(kErrorRepDefLevelNotMatchesNumValues);
}
if (this->ReadRepetitionLevels(batch_size, rep_levels) != batch_size) {
throw ParquetException(kErrorRepDefLevelNotMatchesNumValues);
}

levels_written_ += levels_read;
levels_written_ += batch_size;
int64_t remaining_records = num_records - skipped_records;
// This updates at_record_start_.
skipped_records += DelimitAndSkipRecordsInBuffer(remaining_records);
Expand Down
2 changes: 1 addition & 1 deletion cpp/src/parquet/column_reader.h
Original file line number Diff line number Diff line change
Expand Up @@ -197,7 +197,7 @@ class PARQUET_EXPORT ColumnReader {
template <typename DType>
class TypedColumnReader : public ColumnReader {
public:
typedef typename DType::c_type T;
using T = typename DType::c_type;

// Read a batch of repetition levels, definition levels, and values from the
// column.
Expand Down
76 changes: 75 additions & 1 deletion cpp/src/parquet/column_reader_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -415,7 +415,7 @@ TEST_F(TestPrimitiveReader, TestReadValuesMissing) {
&descr, values, /*num_values=*/2, Encoding::PLAIN, /*indices=*/{},
/*indices_size=*/0, /*def_levels=*/input_def_levels, max_def_level_,
/*rep_levels=*/{},
/*max_rep_level=*/0);
/*max_rep_level=*/max_rep_level_);
pages_.push_back(data_page);
InitReader(&descr);
auto reader = static_cast<BoolReader*>(reader_.get());
Expand All @@ -431,6 +431,80 @@ TEST_F(TestPrimitiveReader, TestReadValuesMissing) {
ParquetException);
}

// GH-41321: When max_def_level > 0 or max_rep_level > 0, and
// Page has more or less levels than the `num_values` in
// PageHeader. We should detect and throw exception.
TEST_F(TestPrimitiveReader, DefRepLevelNotExpected) {
auto do_check = [&](const NodePtr& type, const std::vector<int16_t>& input_def_levels,
const std::vector<int16_t>& input_rep_levels, int num_values) {
std::vector<bool> values(num_values, false);
const ColumnDescriptor descr(type, max_def_level_, max_rep_level_);

// The data page falls back to plain encoding
std::shared_ptr<ResizableBuffer> dummy = AllocateBuffer();
std::shared_ptr<DataPageV1> data_page = MakeDataPage<BooleanType>(
&descr, values, /*num_values=*/num_values, Encoding::PLAIN, /*indices=*/{},
/*indices_size=*/0, /*def_levels=*/input_def_levels, max_def_level_,
/*rep_levels=*/input_rep_levels,
/*max_rep_level=*/max_rep_level_);
pages_.push_back(data_page);
InitReader(&descr);
auto reader = static_cast<BoolReader*>(reader_.get());
ASSERT_TRUE(reader->HasNext());

constexpr int batch_size = 10;
std::vector<int16_t> def_levels(batch_size, 0);
std::vector<int16_t> rep_levels(batch_size, 0);
bool values_out[batch_size];
int64_t values_read;
EXPECT_THROW_THAT(
[&]() {
reader->ReadBatch(batch_size, def_levels.data(), rep_levels.data(), values_out,
&values_read);
},
ParquetException,
::testing::Property(&ParquetException::what,
::testing::HasSubstr("Number of decoded rep / def levels do "
"not match num_values in page header")));
};
// storing def-levels less than value in page-header
{
max_def_level_ = 1;
max_rep_level_ = 0;
NodePtr type = schema::Boolean("a", Repetition::OPTIONAL);
std::vector<int16_t> input_def_levels(1, 1);
std::vector<int16_t> input_rep_levels{};
do_check(type, input_def_levels, input_rep_levels, /*num_values=*/3);
}
// storing def-levels more than value in page-header
{
max_def_level_ = 1;
max_rep_level_ = 0;
NodePtr type = schema::Boolean("a", Repetition::OPTIONAL);
std::vector<int16_t> input_def_levels(2, 1);
std::vector<int16_t> input_rep_levels{};
do_check(type, input_def_levels, input_rep_levels, /*num_values=*/1);
}
// storing rep-levels less than value in page-header
{
max_def_level_ = 0;
max_rep_level_ = 1;
NodePtr type = schema::Boolean("a", Repetition::REPEATED);
std::vector<int16_t> input_def_levels{};
std::vector<int16_t> input_rep_levels(3, 0);
do_check(type, input_def_levels, input_rep_levels, /*num_values=*/4);
}
// storing rep-levels more than value in page-header
{
max_def_level_ = 0;
max_rep_level_ = 1;
NodePtr type = schema::Boolean("a", Repetition::REPEATED);
std::vector<int16_t> input_def_levels{};
std::vector<int16_t> input_rep_levels(2, 1);
do_check(type, input_def_levels, input_rep_levels, /*num_values=*/1);
}
}

// Repetition level byte length reported in Page but Max Repetition level
// is zero for the column.
TEST_F(TestPrimitiveReader, TestRepetitionLvlBytesWithMaxRepetitionZero) {
Expand Down

0 comments on commit 1f07404

Please sign in to comment.